diff --git a/brain.proto b/brain.proto index 62224e3..4d93e73 100644 --- a/brain.proto +++ b/brain.proto @@ -42,13 +42,14 @@ message NewVMRequest { message UpdateVMRequest { string uuid = 1; - uint32 disk_size_gb = 2; - uint32 vcpus = 3; - uint32 memory_mb = 4; - string kernel_url = 5; - string kernel_sha = 6; - string dtrfs_url = 7; - string dtrfs_sha = 8; + string node_pubkey = 2; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; } message UpdateVMResp { diff --git a/src/data.rs b/src/data.rs index 624e707..23b59ec 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,10 +1,10 @@ #![allow(dead_code)] -use crate::grpc::brain::{self as grpc, UpdateVmRequest, UpdateVmResp}; +use crate::grpc::brain as grpc; use dashmap::DashMap; use log::{debug, info, warn}; use std::sync::RwLock; use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot::{self, Sender as OneshotSender}; +use tokio::sync::oneshot::Sender as OneshotSender; #[derive(Eq, Hash, PartialEq, Clone)] pub struct Node { @@ -96,6 +96,7 @@ pub struct BrainData { nodes: RwLock>, contracts: RwLock>, tmp_vmrequests: DashMap)>, + tmp_updatevmrequests: DashMap)>, daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, daemon_updatevm_tx: DashMap>, @@ -114,6 +115,7 @@ impl BrainData { nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), tmp_vmrequests: DashMap::new(), + tmp_updatevmrequests: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(), @@ -239,6 +241,38 @@ impl BrainData { } } + pub async fn submit_updatevmrequest( + &self, + req: grpc::UpdateVmRequest, + tx: OneshotSender, + ) { + let uuid = req.uuid.clone(); + info!("Inserting update vm request in memory: {:?}", req); + self.tmp_updatevmrequests.insert(uuid.clone(), (req.clone(), tx)); + + if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) { + if server_tx.send(req.clone()).await.is_err() { + warn!("Daemon {} RX dropped before sending update VM. Cleaning memory...", &req.node_pubkey); + if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) { + let _ = oneshot_tx.1.send(grpc::UpdateVmResp { + uuid, + timestamp: "".to_string(), + error: "Daemon is offline.".to_string(), + }); + } + } + } else { + warn!("No daemon TX found for {}", req.node_pubkey); + if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) { + let _ = oneshot_tx.1.send(grpc::UpdateVmResp { + uuid, + timestamp: "".to_string(), + error: "Daemon is offline.".to_string(), + }); + } + } + } + pub fn insert_contract(&self, contract: Contract) { let mut contracts = self.contracts.write().unwrap(); contracts.push(contract);