Refactor register node: returns deleted contracts #5
							
								
								
									
										81
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										81
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -216,6 +216,26 @@ version = "1.6.0" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" | checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "bincode" | ||||||
|  | version = "2.0.1" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" | ||||||
|  | dependencies = [ | ||||||
|  |  "bincode_derive", | ||||||
|  |  "serde", | ||||||
|  |  "unty", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "bincode_derive" | ||||||
|  | version = "2.0.1" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" | ||||||
|  | dependencies = [ | ||||||
|  |  "virtue", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "bitflags" | name = "bitflags" | ||||||
| version = "2.8.0" | version = "2.8.0" | ||||||
| @ -264,6 +284,8 @@ version = "1.2.10" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229" | checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  |  "jobserver", | ||||||
|  |  "libc", | ||||||
|  "shlex", |  "shlex", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| @ -411,15 +433,17 @@ dependencies = [ | |||||||
| [[package]] | [[package]] | ||||||
| name = "detee-shared" | name = "detee-shared" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
| source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=main#3024c00b8e1c93e70902793385b92bc0a8d1f26a" | source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#0b195b4589e4ec689af7ddca27dc051716ecee78" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "base64", |  "bincode", | ||||||
|  "prost", |  "prost", | ||||||
|  "serde", |  "serde", | ||||||
|  "serde_yaml", |  "serde_yaml", | ||||||
|  |  "tar", | ||||||
|  "thiserror", |  "thiserror", | ||||||
|  "tonic", |  "tonic", | ||||||
|  "tonic-build", |  "tonic-build", | ||||||
|  |  "zstd", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| @ -1062,6 +1086,15 @@ version = "1.0.14" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" | checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "jobserver" | ||||||
|  | version = "0.1.32" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" | ||||||
|  | dependencies = [ | ||||||
|  |  "libc", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "js-sys" | name = "js-sys" | ||||||
| version = "0.3.77" | version = "0.3.77" | ||||||
| @ -1851,9 +1884,9 @@ dependencies = [ | |||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "tar" | name = "tar" | ||||||
| version = "0.4.43" | version = "0.4.44" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6" | checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "filetime", |  "filetime", | ||||||
|  "libc", |  "libc", | ||||||
| @ -2144,6 +2177,12 @@ version = "0.9.0" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" | checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "unty" | ||||||
|  | version = "0.0.4" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "url" | name = "url" | ||||||
| version = "2.5.4" | version = "2.5.4" | ||||||
| @ -2185,6 +2224,12 @@ version = "0.9.5" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" | checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "virtue" | ||||||
|  | version = "0.0.18" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "want" | name = "want" | ||||||
| version = "0.3.1" | version = "0.3.1" | ||||||
| @ -2518,3 +2563,31 @@ dependencies = [ | |||||||
|  "quote", |  "quote", | ||||||
|  "syn", |  "syn", | ||||||
| ] | ] | ||||||
|  | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "zstd" | ||||||
|  | version = "0.13.3" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" | ||||||
|  | dependencies = [ | ||||||
|  |  "zstd-safe", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "zstd-safe" | ||||||
|  | version = "7.2.4" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" | ||||||
|  | dependencies = [ | ||||||
|  |  "zstd-sys", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "zstd-sys" | ||||||
|  | version = "2.0.15+zstd.1.5.7" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" | ||||||
|  | dependencies = [ | ||||||
|  |  "cc", | ||||||
|  |  "pkg-config", | ||||||
|  | ] | ||||||
|  | |||||||
| @ -25,7 +25,7 @@ bs58 = "0.5.1" | |||||||
| chrono = "0.4.39" | chrono = "0.4.39" | ||||||
| sha2 = "0.10.8" | sha2 = "0.10.8" | ||||||
| 
 | 
 | ||||||
| detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "main" } | detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } | ||||||
| # detee-shared = { path = "../detee-shared" } | # detee-shared = { path = "../detee-shared" } | ||||||
| 
 | 
 | ||||||
| [build-dependencies] | [build-dependencies] | ||||||
|  | |||||||
| @ -14,7 +14,7 @@ pub struct HostConfig { | |||||||
|     pub max_memory_mb_per_app: u32, |     pub max_memory_mb_per_app: u32, | ||||||
|     pub max_vcpu_reservation: u32, |     pub max_vcpu_reservation: u32, | ||||||
|     pub max_mem_reservation_mb: u32, |     pub max_mem_reservation_mb: u32, | ||||||
|     pub max_disk_reservation_mb: u32, |     pub max_disk_reservation_gb: u32, | ||||||
|     pub max_ports_per_app: u32, |     pub max_ports_per_app: u32, | ||||||
|     // price per unit per minute
 |     // price per unit per minute
 | ||||||
|     pub price: u64, |     pub price: u64, | ||||||
|  | |||||||
| @ -22,13 +22,13 @@ pub fn deploy_enclave( | |||||||
|     ); |     ); | ||||||
| 
 | 
 | ||||||
|     let memory_mb = app_resource.memory_mb; |     let memory_mb = app_resource.memory_mb; | ||||||
|     let vcpu = app_resource.vcpu; |     let vcpus = app_resource.vcpus; | ||||||
|     // TODO: docker limit disk space
 |     // TODO: docker limit disk space
 | ||||||
|     // let disk_mb = app_resource.disk_mb;
 |     // let disk_mb = app_resource.disk_mb;
 | ||||||
|     // --storage-opt size={disk_mb}m
 |     // --storage-opt size={disk_mb}m
 | ||||||
| 
 | 
 | ||||||
|     let docker_deploy_str = format!( |     let docker_deploy_str = format!( | ||||||
|         "docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mb}m --cpus={vcpu} \ |         "docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mb}m --cpus={vcpus} \ | ||||||
|         -v {enclave_path}:/enclave_package --device /dev/sgx/enclave --device /dev/sgx/provision \ |         -v {enclave_path}:/enclave_package --device /dev/sgx/enclave --device /dev/sgx/provision \ | ||||||
|         {port_maping_string} noormohammedb/occlum-enclave:v1 {hratls_pubkey}" |         {port_maping_string} noormohammedb/occlum-enclave:v1 {hratls_pubkey}" | ||||||
|     ); |     ); | ||||||
|  | |||||||
							
								
								
									
										16
									
								
								src/data.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										16
									
								
								src/data.rs
									
									
									
									
									
								
							| @ -22,7 +22,7 @@ pub struct HostResources { | |||||||
|     pub existing_apps: HashSet<String>, |     pub existing_apps: HashSet<String>, | ||||||
|     pub reserved_vcpus: u32, |     pub reserved_vcpus: u32, | ||||||
|     pub reserved_memory_mb: u32, |     pub reserved_memory_mb: u32, | ||||||
|     pub reserved_disk_mb: u32, |     pub reserved_disk_gb: u32, | ||||||
|     pub reserved_host_ports: HashSet<u16>, |     pub reserved_host_ports: HashSet<u16>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -51,8 +51,8 @@ impl HostResources { | |||||||
| 
 | 
 | ||||||
|     pub fn reserve_resources(&mut self, app: &App) -> Result<()> { |     pub fn reserve_resources(&mut self, app: &App) -> Result<()> { | ||||||
|         self.reserved_memory_mb += app.app_resource.memory_mb; |         self.reserved_memory_mb += app.app_resource.memory_mb; | ||||||
|         self.reserved_vcpus += app.app_resource.vcpu; |         self.reserved_vcpus += app.app_resource.vcpus; | ||||||
|         self.reserved_disk_mb += app.app_resource.disk_mb; |         self.reserved_disk_gb += app.app_resource.disk_size_gb; | ||||||
| 
 | 
 | ||||||
|         for (port, _) in app.mapped_ports.iter() { |         for (port, _) in app.mapped_ports.iter() { | ||||||
|             self.reserved_host_ports.insert(*port); |             self.reserved_host_ports.insert(*port); | ||||||
| @ -64,8 +64,8 @@ impl HostResources { | |||||||
| 
 | 
 | ||||||
|     pub fn un_reserve_resources(&mut self, app: &App) -> Result<()> { |     pub fn un_reserve_resources(&mut self, app: &App) -> Result<()> { | ||||||
|         self.reserved_memory_mb -= app.app_resource.memory_mb; |         self.reserved_memory_mb -= app.app_resource.memory_mb; | ||||||
|         self.reserved_vcpus -= app.app_resource.vcpu; |         self.reserved_vcpus -= app.app_resource.vcpus; | ||||||
|         self.reserved_disk_mb -= app.app_resource.disk_mb; |         self.reserved_disk_gb -= app.app_resource.disk_size_gb; | ||||||
| 
 | 
 | ||||||
|         for (port, _) in app.mapped_ports.iter() { |         for (port, _) in app.mapped_ports.iter() { | ||||||
|             self.reserved_host_ports.remove(port); |             self.reserved_host_ports.remove(port); | ||||||
| @ -141,7 +141,7 @@ impl App { | |||||||
| 
 | 
 | ||||||
|         let app_uuid = new_app_req.uuid.clone(); |         let app_uuid = new_app_req.uuid.clone(); | ||||||
| 
 | 
 | ||||||
|         if host_config.max_cores_per_app < new_app_req.resource.vcpu { |         if host_config.max_cores_per_app < new_app_req.resource.vcpus { | ||||||
|             return Err(anyhow!("too many vcpus for app")); |             return Err(anyhow!("too many vcpus for app")); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
| @ -153,7 +153,7 @@ impl App { | |||||||
|         if host_config.max_vcpu_reservation |         if host_config.max_vcpu_reservation | ||||||
|             < host_resource |             < host_resource | ||||||
|                 .reserved_vcpus |                 .reserved_vcpus | ||||||
|                 .saturating_add(new_app_req.resource.vcpu) |                 .saturating_add(new_app_req.resource.vcpus) | ||||||
|         { |         { | ||||||
|             return Err(anyhow!("vcpus not available")); |             return Err(anyhow!("vcpus not available")); | ||||||
|         } |         } | ||||||
| @ -164,7 +164,7 @@ impl App { | |||||||
|         { |         { | ||||||
|             return Err(anyhow!("not enough memory available")); |             return Err(anyhow!("not enough memory available")); | ||||||
|         } |         } | ||||||
|         if new_app_req.resource.disk_mb < 128 { |         if new_app_req.resource.disk_size_gb < 1 { | ||||||
|             return Err(anyhow!("disk too small")); |             return Err(anyhow!("disk too small")); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										40
									
								
								src/grpc.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										40
									
								
								src/grpc.rs
									
									
									
									
									
								
							| @ -1,7 +1,7 @@ | |||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; | use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; | ||||||
| use detee_shared::app_proto::{ | use detee_shared::app_proto::{ | ||||||
|     AppContract, BrainMessageApp, DaemonAuth, DaemonMessageApp, RegisterAppNodeReq, |     BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, RegisterAppNodeReq, | ||||||
| }; | }; | ||||||
| use tokio::sync::mpsc::Receiver; | use tokio::sync::mpsc::Receiver; | ||||||
| use tokio::sync::mpsc::Sender; | use tokio::sync::mpsc::Sender; | ||||||
| @ -12,14 +12,14 @@ use tonic::metadata::AsciiMetadataValue; | |||||||
| use tonic::transport::{Certificate, Channel, ClientTlsConfig}; | use tonic::transport::{Certificate, Channel, ClientTlsConfig}; | ||||||
| use tonic::Request; | use tonic::Request; | ||||||
| 
 | 
 | ||||||
| use crate::global::{IP_INFO, PUBLIC_KEY, BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA}; | use crate::global::{BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA, IP_INFO, PUBLIC_KEY}; | ||||||
| 
 | 
 | ||||||
| pub struct ConnectionData { | pub struct ConnectionData { | ||||||
|     pub network: String, |     pub network: String, | ||||||
|     pub brain_msg_tx: Sender<BrainMessageApp>, |     pub brain_msg_tx: Sender<BrainMessageApp>, | ||||||
|     pub daemon_msg_rx: Receiver<DaemonMessageApp>, |     pub daemon_msg_rx: Receiver<DaemonMessageApp>, | ||||||
|     pub daemon_msg_tx: Sender<DaemonMessageApp>, |     pub daemon_msg_tx: Sender<DaemonMessageApp>, | ||||||
|     pub app_contracts_uuid: Vec<String>, |     pub del_apps_uuid: Vec<String>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> { | async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> { | ||||||
| @ -32,17 +32,23 @@ async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> { | |||||||
|             )) |             )) | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|  |     log::info!("brain_url: {brain_url}, brain_san: {brain_san}"); | ||||||
|     let pem = std::fs::read_to_string(DETEE_ROOT_CA)?; |     let pem = std::fs::read_to_string(DETEE_ROOT_CA)?; | ||||||
|     let ca = Certificate::from_pem(pem); |     let ca = Certificate::from_pem(pem); | ||||||
| 
 | 
 | ||||||
|     let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(brain_san); |     let tls = ClientTlsConfig::new() | ||||||
|  |         .ca_certificate(ca) | ||||||
|  |         .domain_name(brain_san); | ||||||
| 
 | 
 | ||||||
|     let channel = Channel::from_shared(brain_url.to_string())?.tls_config(tls)?.connect().await?; |     let channel = Channel::from_shared(brain_url.to_string())? | ||||||
|  |         .tls_config(tls)? | ||||||
|  |         .connect() | ||||||
|  |         .await?; | ||||||
| 
 | 
 | ||||||
|     Ok(BrainAppDaemonClient::new(channel)) |     Ok(BrainAppDaemonClient::new(channel)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract>> { | pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<DelAppReq>> { | ||||||
|     let mut client = client(&config.network).await?; |     let mut client = client(&config.network).await?; | ||||||
| 
 | 
 | ||||||
|     log::debug!("registering node with brain"); |     log::debug!("registering node with brain"); | ||||||
| @ -68,13 +74,13 @@ pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract | |||||||
|     req.metadata_mut().insert("pubkey", pubkey); |     req.metadata_mut().insert("pubkey", pubkey); | ||||||
|     req.metadata_mut().insert("request-signature", signature); |     req.metadata_mut().insert("request-signature", signature); | ||||||
| 
 | 
 | ||||||
|     let mut container_contracts = vec![]; |     let mut del_app_reqs = vec![]; | ||||||
| 
 | 
 | ||||||
|     let mut grpc_stream = client.register_app_node(req).await?.into_inner(); |     let mut grpc_stream = client.register_app_node(req).await?.into_inner(); | ||||||
|     while let Some(stream_update) = grpc_stream.next().await { |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|         match stream_update { |         match stream_update { | ||||||
|             Ok(contract) => { |             Ok(del_app_req) => { | ||||||
|                 container_contracts.push(contract); |                 del_app_reqs.push(del_app_req); | ||||||
|             } |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 println!("Brain disconnected from register_node: {e}"); |                 println!("Brain disconnected from register_node: {e}"); | ||||||
| @ -83,10 +89,10 @@ pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract | |||||||
|     } |     } | ||||||
|     log::info!( |     log::info!( | ||||||
|         "Brain registration succcessful, with contract count: {}", |         "Brain registration succcessful, with contract count: {}", | ||||||
|         container_contracts.len() |         del_app_reqs.len() | ||||||
|     ); |     ); | ||||||
| 
 | 
 | ||||||
|     Ok(container_contracts) |     Ok(del_app_reqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { | pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { | ||||||
| @ -96,12 +102,12 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { | |||||||
| 
 | 
 | ||||||
|     streaming_tasks.spawn(receive_messages( |     streaming_tasks.spawn(receive_messages( | ||||||
|         client.clone(), |         client.clone(), | ||||||
|         conn_data.app_contracts_uuid.clone(), |         conn_data.del_apps_uuid.clone(), | ||||||
|         conn_data.brain_msg_tx, |         conn_data.brain_msg_tx, | ||||||
|     )); |     )); | ||||||
|     streaming_tasks.spawn(send_messages( |     streaming_tasks.spawn(send_messages( | ||||||
|         client.clone(), |         client.clone(), | ||||||
|         conn_data.app_contracts_uuid.clone(), |         conn_data.del_apps_uuid.clone(), | ||||||
|         conn_data.daemon_msg_rx, |         conn_data.daemon_msg_rx, | ||||||
|         conn_data.daemon_msg_tx, |         conn_data.daemon_msg_tx, | ||||||
|     )); |     )); | ||||||
| @ -126,12 +132,12 @@ fn sign_stream_auth(contracts: Vec<String>) -> Result<DaemonAuth> { | |||||||
| 
 | 
 | ||||||
| pub async fn receive_messages( | pub async fn receive_messages( | ||||||
|     mut client: BrainAppDaemonClient<Channel>, |     mut client: BrainAppDaemonClient<Channel>, | ||||||
|     contracts: Vec<String>, |     del_apps_uuid: Vec<String>, | ||||||
|     tx: Sender<BrainMessageApp>, |     tx: Sender<BrainMessageApp>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     log::debug!("starting to listen for messages from brain"); |     log::debug!("starting to listen for messages from brain"); | ||||||
|     let mut grpc_stream = client |     let mut grpc_stream = client | ||||||
|         .brain_messages(sign_stream_auth(contracts)?) |         .brain_messages(sign_stream_auth(del_apps_uuid)?) | ||||||
|         .await? |         .await? | ||||||
|         .into_inner(); |         .into_inner(); | ||||||
| 
 | 
 | ||||||
| @ -152,14 +158,14 @@ pub async fn receive_messages( | |||||||
| 
 | 
 | ||||||
| pub async fn send_messages( | pub async fn send_messages( | ||||||
|     mut client: BrainAppDaemonClient<Channel>, |     mut client: BrainAppDaemonClient<Channel>, | ||||||
|     contracts: Vec<String>, |     del_apps_uuid: Vec<String>, | ||||||
|     rx: Receiver<DaemonMessageApp>, |     rx: Receiver<DaemonMessageApp>, | ||||||
|     tx: Sender<DaemonMessageApp>, |     tx: Sender<DaemonMessageApp>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     let rx_stream = ReceiverStream::new(rx); |     let rx_stream = ReceiverStream::new(rx); | ||||||
|     tx.send(DaemonMessageApp { |     tx.send(DaemonMessageApp { | ||||||
|         msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth( |         msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth( | ||||||
|             sign_stream_auth(contracts)?, |             sign_stream_auth(del_apps_uuid)?, | ||||||
|         )), |         )), | ||||||
|     }) |     }) | ||||||
|     .await?; |     .await?; | ||||||
|  | |||||||
							
								
								
									
										37
									
								
								src/main.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										37
									
								
								src/main.rs
									
									
									
									
									
								
							| @ -9,14 +9,13 @@ use anyhow::anyhow; | |||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| use data::App; | use data::App; | ||||||
| use detee_shared::app_proto::{ | use detee_shared::app_proto::{ | ||||||
|     brain_message_app, AppContract, AppNodeResources, BrainMessageApp, DaemonMessageApp, |     brain_message_app, AppNodeResources, BrainMessageApp, DaemonMessageApp, NewAppRes, | ||||||
|     MappedPort, NewAppRes, |  | ||||||
| }; | }; | ||||||
|  | use detee_shared::common_proto::MappedPort; | ||||||
| use detee_shared::sgx::types::brain::AppDeployConfig; | use detee_shared::sgx::types::brain::AppDeployConfig; | ||||||
| use global::PUBLIC_KEY; | use global::PUBLIC_KEY; | ||||||
| use log::info; | use log::info; | ||||||
| use log::warn; | use log::warn; | ||||||
| use std::collections::HashSet; |  | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
| use std::path::Path; | use std::path::Path; | ||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
| @ -66,22 +65,16 @@ impl AppHandler { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn handle_contracts(&mut self, contracts: Vec<AppContract>) { |     async fn handle_contracts(&mut self, del_apps_uuid: Vec<String>) { | ||||||
|         let apps_in_host = self.host_resource.existing_apps.clone(); |         let apps_in_host = self.host_resource.existing_apps.clone(); | ||||||
|         let apps_in_brain: HashSet<String> = contracts |         for del_app_uuid in del_apps_uuid { | ||||||
|             .iter() |             if apps_in_host.contains(&del_app_uuid) { | ||||||
|             .map(|contact| contact.uuid.clone()) |                 if let Err(e) = self.handle_del_app_req(del_app_uuid.clone()).await { | ||||||
|             .collect(); |  | ||||||
| 
 |  | ||||||
|         let deleted_apps: HashSet<String> = |  | ||||||
|             apps_in_host.difference(&apps_in_brain).cloned().collect(); |  | ||||||
| 
 |  | ||||||
|         for uuid in deleted_apps { |  | ||||||
|             if let Err(e) = self.handle_del_app_req(uuid.clone()).await { |  | ||||||
|                     log::error!("Failed to delete app: {e}"); |                     log::error!("Failed to delete app: {e}"); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     async fn run(mut self) { |     async fn run(mut self) { | ||||||
|         sleep(Duration::from_millis(500)).await; |         sleep(Duration::from_millis(500)).await; | ||||||
| @ -117,7 +110,6 @@ impl AppHandler { | |||||||
|                 info!("Succesfully started App {uuid}"); |                 info!("Succesfully started App {uuid}"); | ||||||
|                 let res = NewAppRes { |                 let res = NewAppRes { | ||||||
|                     uuid, |                     uuid, | ||||||
|                     status: "success".to_string(), |  | ||||||
|                     error: "".to_string(), |                     error: "".to_string(), | ||||||
|                     mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), |                     mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), | ||||||
|                     ip_address: self.host_config.host_ip_address.clone(), |                     ip_address: self.host_config.host_ip_address.clone(), | ||||||
| @ -127,7 +119,6 @@ impl AppHandler { | |||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 let res = NewAppRes { |                 let res = NewAppRes { | ||||||
|                     uuid, |                     uuid, | ||||||
|                     status: "failed".to_string(), |  | ||||||
|                     error: e.to_string(), |                     error: e.to_string(), | ||||||
|                     ..Default::default() |                     ..Default::default() | ||||||
|                 }; |                 }; | ||||||
| @ -163,7 +154,7 @@ impl AppHandler { | |||||||
| 
 | 
 | ||||||
|         let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus; |         let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus; | ||||||
|         let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb; |         let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb; | ||||||
|         let avail_storage_mb = host_config.max_disk_reservation_mb - host_resource.reserved_disk_mb; |         let avail_storage_gb = host_config.max_disk_reservation_gb - host_resource.reserved_disk_gb; | ||||||
|         let max_ports_per_app = host_config.max_ports_per_app; |         let max_ports_per_app = host_config.max_ports_per_app; | ||||||
| 
 | 
 | ||||||
|         let resource_update = AppNodeResources { |         let resource_update = AppNodeResources { | ||||||
| @ -171,7 +162,7 @@ impl AppHandler { | |||||||
|             avail_no_of_port, |             avail_no_of_port, | ||||||
|             avail_vcpus, |             avail_vcpus, | ||||||
|             avail_memory_mb, |             avail_memory_mb, | ||||||
|             avail_storage_mb, |             avail_storage_gb, | ||||||
|             max_ports_per_app, |             max_ports_per_app, | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
| @ -199,11 +190,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let mut contracts = vec![]; |         let mut del_apps_uuid = vec![]; | ||||||
|         match grpc::register_node(&app_handler.host_config).await { |         match grpc::register_node(&app_handler.host_config).await { | ||||||
|             Ok(app_contracts) => { |             Ok(del_app_reqs) => { | ||||||
|                 contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect()); |                 del_apps_uuid.append(&mut del_app_reqs.iter().map(|c| c.uuid.clone()).collect()); | ||||||
|                 app_handler.handle_contracts(app_contracts).await |                 app_handler.handle_contracts(del_apps_uuid.clone()).await | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             Err(e) => log::error!("Failed to connect to brain: {e}"), |             Err(e) => log::error!("Failed to connect to brain: {e}"), | ||||||
| @ -219,7 +210,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||||||
|             brain_msg_tx, |             brain_msg_tx, | ||||||
|             daemon_msg_rx, |             daemon_msg_rx, | ||||||
|             daemon_msg_tx, |             daemon_msg_tx, | ||||||
|             app_contracts_uuid: contracts, |             del_apps_uuid, | ||||||
|         }) |         }) | ||||||
|         .await |         .await | ||||||
|         { |         { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user