diff --git a/brain.proto b/brain.proto index 05d8e4e..62224e3 100644 --- a/brain.proto +++ b/brain.proto @@ -40,6 +40,23 @@ message NewVMRequest { string dtrfs_sha = 14; } +message UpdateVMRequest { + string uuid = 1; + uint32 disk_size_gb = 2; + uint32 vcpus = 3; + uint32 memory_mb = 4; + string kernel_url = 5; + string kernel_sha = 6; + string dtrfs_url = 7; + string dtrfs_sha = 8; +} + +message UpdateVMResp { + string uuid = 1; + string timestamp = 2; + string error = 3; +} + message VMContract { string uuid = 1; string hostname = 2; @@ -79,6 +96,7 @@ service BrainDaemonService { rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); + rpc UpdateVMs (NodePubkey) returns (stream UpdateVMRequest); } message NodeFilters { @@ -105,5 +123,5 @@ service BrainCliService { rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc DeleteVM (DeletedVMUpdate) returns (Empty); -} - + rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp); +} \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index 639fdd1..624e707 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,10 +1,10 @@ #![allow(dead_code)] -use crate::grpc::brain as grpc; +use crate::grpc::brain::{self as grpc, UpdateVmRequest, UpdateVmResp}; use dashmap::DashMap; use log::{debug, info, warn}; use std::sync::RwLock; use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot::Sender as OneshotSender; +use tokio::sync::oneshot::{self, Sender as OneshotSender}; #[derive(Eq, Hash, PartialEq, Clone)] pub struct Node { @@ -98,6 +98,7 @@ pub struct BrainData { tmp_vmrequests: DashMap)>, daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, + daemon_updatevm_tx: DashMap>, } #[derive(Debug)] @@ -115,6 +116,7 @@ impl BrainData { tmp_vmrequests: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), + daemon_updatevm_tx: DashMap::new(), } } diff --git a/src/grpc.rs b/src/grpc.rs index 72d2f69..fe51c9e 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -159,6 +159,29 @@ impl BrainDaemonService for BrainDaemonMock { Box::pin(output_stream) as Self::ListVMContractsStream )) } + + type UpdateVMsStream = Pin> + Send>>; + + async fn update_v_ms( + &self, + req: Request, + ) -> Result, Status> { + let node_pubkey = req.into_inner().node_pubkey; + let (grpc_tx, grpc_rx) = mpsc::channel(6); + let (_, mut data_rx) = mpsc::channel(6); + + tokio::spawn(async move { + while let Some(update_req) = data_rx.recv().await { + if let Err(e) = grpc_tx.send(Ok(update_req)).await { + warn!("Could not send UpdateVmRequest to {node_pubkey}: {e:?}"); + break; + } + } + }); + + let output_stream = ReceiverStream::new(grpc_rx); + Ok(Response::new(Box::pin(output_stream) as Self::UpdateVMsStream)) + } } #[tonic::async_trait] @@ -232,4 +255,24 @@ impl BrainCliService for BrainCliMock { self.data.delete_vm(req).await; Ok(Response::new(Empty {})) } + + async fn update_vm( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + info!("CLI requested UpdateVM: {req:?}"); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + self.data.submit_updatevmrequest(req, oneshot_tx).await; + match oneshot_rx.await { + Ok(resp) => { + info!("Returning UpdateVmResp: {resp:?}"); + Ok(Response::new(resp)) + }, + Err(e) => { + log::error!("Error waiting for UpdateVmResp: {e:?}"); + Err(Status::unknown("Daemon not responding.")) + } + } + } }