From c6c44da0e82dd3c65994af7e1548e4565b3ebeb8 Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 31 Jan 2025 12:59:20 +0000 Subject: [PATCH] manageed container contract and node registraion --- Cargo.lock | 2 +- src/data.rs | 117 ++++++++++++++++++++++++++++++++++++++++++++++------ src/grpc.rs | 32 ++++++++++---- 3 files changed, 129 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41262fc..5db03a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#6e1b1853838905c44d535d984d1221dd5d0dc2bc" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index bdff0b9..c3e3a7f 100644 --- a/src/data.rs +++ b/src/data.rs @@ -10,7 +10,6 @@ use tokio::sync::oneshot::Sender as OneshotSender; use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes}; use detee_shared::pb::shared as sharedPb; -use detee_shared::pb::shared::Container; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -138,6 +137,60 @@ impl Into for Contract { } } +#[derive(Clone, Debug, Default)] +pub struct AppContract { + pub uuid: String, + pub package_url: String, + pub admin_pubkey: String, + pub node_pubkey: String, + pub mapped_ports: Vec<(u16, u16)>, + pub host_ipv4: String, + pub disk_size_gb: u32, + pub vcpus: u32, + pub memory_mb: u32, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + // price per unit per minute + // recommended value is 20000 + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: chrono::DateTime, +} + +impl From for sharedPb::ContainerContracts { + fn from(value: AppContract) -> Self { + Self { + uuid: value.uuid, + admin_pubkey: value.admin_pubkey, + node_pubkey: value.node_pubkey, + package_url: value.package_url, + exposed_ports: value + .mapped_ports + .into_iter() + .map(sharedPb::MappedPort::from) + .collect(), + ..Default::default() + } + } +} + +#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] +pub struct AppNode { + pub public_key: String, + pub owner_key: String, + pub country: String, + pub region: String, + pub city: String, + pub ip: String, + pub avail_mem_mb: u32, + pub avail_vcpus: u32, + pub avail_storage_gbs: u32, + pub avail_ports: u32, + pub max_ports_per_app: u32, + // nanotokens per unit per minute + pub price: u64, +} + #[derive(Default)] pub struct BrainData { // amount of nanotokens in each account @@ -148,8 +201,10 @@ pub struct BrainData { tmp_updatevm_reqs: DashMap)>, daemon_tx: DashMap>, - sgx_daemon_tx: DashMap>, + app_nodes: RwLock>, + app_daemon_tx: DashMap>, tmp_new_container_reqs: DashMap)>, + app_contracts: RwLock>, } #[derive(Debug)] @@ -169,8 +224,10 @@ impl BrainData { tmp_updatevm_reqs: DashMap::new(), daemon_tx: DashMap::new(), - sgx_daemon_tx: DashMap::new(), + app_daemon_tx: DashMap::new(), tmp_new_container_reqs: DashMap::new(), + app_contracts: RwLock::new(Vec::new()), + app_nodes: RwLock::new(Vec::new()), } } @@ -681,17 +738,40 @@ impl BrainData { } impl BrainData { - pub fn add_sgx_daemon_tx(&self, node_pubkey: &str, tx: Sender) { - self.sgx_daemon_tx.insert(node_pubkey.to_string(), tx); + pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + self.app_daemon_tx.insert(node_pubkey.to_string(), tx); } - pub fn del_sgx_daemon_tx(&self, node_pubkey: &str) { - self.sgx_daemon_tx.remove(node_pubkey); + pub fn del_app_daemon_tx(&self, node_pubkey: &str) { + self.app_daemon_tx.remove(node_pubkey); + } + + pub fn insert_app_node(&self, node: AppNode) { + info!("Registering app node {node:?}"); + let mut nodes = self.app_nodes.write().unwrap(); + for n in nodes.iter_mut() { + if n.public_key == node.public_key { + // TODO: figure what to do in this case. + warn!("Node {} already exists. Updating data.", n.public_key); + *n = node; + return; + } + } + nodes.push(node); + } + + pub fn find_app_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { + let app_contracts = self.app_contracts.read().unwrap(); + app_contracts + .iter() + .filter(|c| c.node_pubkey == node_pubkey) + .cloned() + .collect() } pub async fn send_new_container_req( &self, - mut req: Container, + mut req: sharedPb::Container, tx: OneshotSender, ) { req.uuid = uuid::Uuid::new_v4().to_string(); @@ -700,7 +780,7 @@ impl BrainData { self.tmp_new_container_reqs .insert(req.uuid.clone(), (req.clone(), tx)); - if let Some(sgx_daemon_tx) = self.sgx_daemon_tx.get(&req.node_pubkey) { + if let Some(app_daemon_tx) = self.app_daemon_tx.get(&req.node_pubkey) { debug!( "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid @@ -710,13 +790,13 @@ impl BrainData { detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()), ), }; - if let Err(e) = sgx_daemon_tx.send(msg).await { + if let Err(e) = app_daemon_tx.send(msg).await { warn!( "Failed to send new container request to {} due to error: {e:?}", req.node_pubkey ); info!("Deleting daemon TX for {}", req.node_pubkey); - self.del_sgx_daemon_tx(&req.node_pubkey); + self.del_app_daemon_tx(&req.node_pubkey); self.send_new_container_resp(NewContainerRes { uuid: req.uuid, status: "Failed".to_string(), @@ -741,12 +821,23 @@ impl BrainData { return; } }; - if let Err(_) = new_container_req.1.send(new_container_resp.clone()) { + if let Err(err) = new_container_req.1.send(new_container_resp.clone()) { log::error!( - "CLI RX for {} dropped before receiving confirmation {:?}.", + "CLI RX for {} dropped before receiving confirmation {:?}.\n{:?}", &new_container_req.0.admin_pubkey, new_container_resp, + err ); } + + let app_contracts = AppContract { + uuid: new_container_req.0.uuid, + node_pubkey: new_container_req.0.node_pubkey.clone(), + package_url: new_container_req.0.package_url, + admin_pubkey: new_container_req.0.admin_pubkey, + ..Default::default() + }; + log::info!("Created new app contract: {app_contracts:?}"); + self.app_contracts.write().unwrap().push(app_contracts); } } diff --git a/src/grpc.rs b/src/grpc.rs index dd98efe..5353f35 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -342,15 +342,31 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { &self, req: tonic::Request, ) -> Result, Status> { - dbg!(req); - let (tx, rx) = mpsc::channel(6); + log::info!("registering app node : {:?}", &req); + let req_data = req.into_inner(); + let app_node = crate::data::AppNode { + public_key: req_data.node_pubkey.clone(), + owner_key: req_data.owner_pubkey, + ip: req_data.main_ip, + city: req_data.city, + region: req_data.region, + country: req_data.country, + ..Default::default() + }; + + self.data.insert_app_node(app_node); + log::info!("Sending existing contracts to {}", &req_data.node_pubkey); + + let app_contracts = self + .data + .find_app_contracts_by_node_pubkey(&req_data.node_pubkey); + + let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { - let _ = tx - .send(detee_shared::pb::shared::ContainerContracts { - ..Default::default() - }) - .await; + for contract in app_contracts { + let _ = tx.send(contract.into()).await; + } }); let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); Ok(Response::new(Box::pin(output_stream))) @@ -363,7 +379,7 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { let req = req.into_inner(); info!("Daemon {} connected to receive brain messages", req.pubkey); let (tx, rx) = mpsc::channel(6); - self.data.add_sgx_daemon_tx(&req.pubkey, tx); + self.data.add_app_daemon_tx(&req.pubkey, tx); let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); Ok(Response::new( Box::pin(output_stream) as Self::BrainMessagesStream