updater branch merge #1

Merged
ghe0 merged 10 commits from updater into main 2024-12-25 22:59:18 +00:00
5 changed files with 402 additions and 45 deletions

162
Cargo.lock generated

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
@ -204,6 +219,7 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
name = "brain-mock"
version = "0.1.0"
dependencies = [
"chrono",
"dashmap",
"env_logger",
"log",
@ -216,6 +232,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -228,18 +250,47 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
[[package]]
name = "cc"
version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e"
dependencies = [
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -527,6 +578,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "indexmap"
version = "1.9.3"
@ -568,6 +642,16 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "js-sys"
version = "0.3.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "libc"
version = "0.2.168"
@ -640,6 +724,15 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "object"
version = "0.36.5"
@ -924,6 +1017,12 @@ dependencies = [
"syn",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "slab"
version = "0.4.9"
@ -1193,6 +1292,69 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396"
dependencies = [
"cfg-if",
"once_cell",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.52.0"

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.39"
dashmap = "6.1.0"
env_logger = "0.11.6"
log = "0.4.22"

@ -8,7 +8,7 @@ message NodePubkey {
string node_pubkey = 1;
}
message RegisterNodeRequest {
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
string ip = 3;
@ -23,7 +23,7 @@ message RegisterNodeRequest {
uint32 max_ports_per_vm = 12;
}
message NewVMRequest {
message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2;
string admin_pubkey = 3;
@ -40,6 +40,22 @@ message NewVMRequest {
string dtrfs_sha = 14;
}
message UpdateVMReq {
string uuid = 1;
uint32 disk_size_gb = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
string kernel_url = 6;
string kernel_sha = 7;
string dtrfs_url = 8;
string dtrfs_sha = 9;
}
message UpdateVMResp {
string uuid = 1;
string error = 3;
}
ghe0 marked this conversation as resolved
Review

I believe timestamp is not needed in the response here. It's enough if the timestamp gets recorded in the Contract.
This also means that the contract will have two fields: created_at and updated_at.

I believe timestamp is not needed in the response here. It's enough if the timestamp gets recorded in the Contract. This also means that the contract will have two fields: `created_at` and `updated_at`.
message VMContract {
string uuid = 1;
string hostname = 2;
@ -54,6 +70,7 @@ message VMContract {
string kernel_sha = 11;
string dtrfs_sha = 12;
string created_at = 13;
string updated_at = 14;
}
message ListVMContractsReq {
@ -61,7 +78,7 @@ message ListVMContractsReq {
string node_pubkey = 2;
}
message NewVMConfirmation {
message NewVMResp {
string uuid = 1;
repeated uint32 exposed_ports = 2;
string public_ipv4 = 3;
@ -69,18 +86,21 @@ message NewVMConfirmation {
string error = 5;
}
message DeletedVMUpdate {
message DeleteVMReq {
string uuid = 1;
}
service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
ghe0 marked this conversation as resolved Outdated
Outdated
Review

These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages.

Let's do this:

service BrainDaemonService {
  rpc RegisterNode (RegisterNodeReq) returns (Empty);
  rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
  rpc SendNewVMResp (stream NewVMResp) returns (Empty);
  rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
  rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
  rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
  rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
}
These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages. Let's do this: ``` service BrainDaemonService { rpc RegisterNode (RegisterNodeReq) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } ```
}
message NodeFilters {
uint32 free_ports = 1;
bool offers_ipv4 = 2;
@ -101,9 +121,9 @@ message NodeListResp {
}
service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty);
rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
}

@ -1,5 +1,6 @@
#![allow(dead_code)]
use crate::grpc::brain as grpc;
use chrono::Utc;
use dashmap::DashMap;
use log::{debug, info, warn};
use std::sync::RwLock;
@ -22,8 +23,8 @@ pub struct Node {
pub max_ports_per_vm: u32,
}
impl From<grpc::RegisterNodeRequest> for Node {
fn from(node: grpc::RegisterNodeRequest) -> Self {
impl From<grpc::RegisterNodeReq> for Node {
fn from(node: grpc::RegisterNodeReq) -> Self {
Node {
public_key: node.node_pubkey,
owner_key: node.owner_pubkey,
@ -69,6 +70,7 @@ pub struct Contract {
pub kernel_sha: String,
pub dtrfs_sha: String,
pub created_at: String,
pub updated_at: String,
}
impl Into<grpc::VmContract> for Contract {
@ -87,6 +89,7 @@ impl Into<grpc::VmContract> for Contract {
kernel_sha: self.kernel_sha,
dtrfs_sha: self.dtrfs_sha,
created_at: self.created_at,
updated_at: self.updated_at,
}
}
}
@ -95,9 +98,11 @@ impl Into<grpc::VmContract> for Contract {
pub struct BrainData {
nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeleteVmReq>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmReq>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmReq>>,
}
#[derive(Debug)]
@ -112,13 +117,15 @@ impl BrainData {
Self {
nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()),
tmp_vmrequests: DashMap::new(),
tmp_newvm_reqs: DashMap::new(),
tmp_updatevm_reqs: DashMap::new(),
daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(),
daemon_updatevm_tx: DashMap::new(),
}
}
pub fn insert_node(&self, node: grpc::RegisterNodeRequest) {
pub fn insert_node(&self, node: grpc::RegisterNodeReq) {
info!("Registering node {node:?}");
let mut nodes = self.nodes.write().unwrap();
for n in nodes.iter_mut() {
@ -131,11 +138,17 @@ impl BrainData {
nodes.push(node.into());
}
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) {
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeleteVmReq>) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
}
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmReq>) {
log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey);
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx);
info!("Added daemon TX for {}", node_pubkey);
}
pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
info!("Found vm {}. Deleting...", delete_vm.uuid);
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
@ -155,14 +168,14 @@ impl BrainData {
}
}
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
self.tmp_vmrequests
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmReq>) {
self.tmp_newvm_reqs
.retain(|_, req| req.0.node_pubkey != node_pubkey);
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
}
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) {
let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) {
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmResp) {
let newvmreq = match self.tmp_newvm_reqs.remove(&confirmation.uuid) {
Some((_, r)) => r,
None => {
log::error!(
@ -188,7 +201,8 @@ impl BrainData {
exposed_ports: confirmation.exposed_ports,
public_ipv4: confirmation.public_ipv4,
public_ipv6: confirmation.public_ipv6,
created_at: format!("{:?}", std::time::SystemTime::now()),
created_at: Utc::now().to_rfc3339(),
updated_at: String::new(),
hostname: newvmreq.0.hostname,
admin_pubkey: newvmreq.0.admin_pubkey,
node_pubkey: newvmreq.0.node_pubkey,
@ -202,16 +216,44 @@ impl BrainData {
self.contracts.write().unwrap().push(contract);
}
ghe0 marked this conversation as resolved Outdated
Outdated
Review

This method really needs a new name. 😄

Maybe process_updatevmresp. This means we should check all methods in data.rs to see if they can be renamed to something that makes more sense.

This method really needs a new name. 😄 Maybe `process_updatevmresp`. This means we should check all methods in `data.rs` to see if they can be renamed to something that makes more sense.

idk, the current name is self-describtive, unlike process_updatevmresp... like process could mean to do anything

idk, the current name is self-describtive, unlike process_updatevmresp... like process could mean to do anything
Outdated
Review

Let me think about this one...

Let me think about this one...
pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) {
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) {
Some((_, r)) => r,
None => {
log::error!(
"Received confirmation for ghost UpdateVMRequest {}",
resp.uuid
);
return;
}
};
if let Err(e) = updatevmreq.1.send(resp.clone()) {
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
}
if resp.error != "" {
return;
}
let mut contracts = self.contracts.write().unwrap();
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) {
contract.disk_size_gb = updatevmreq.0.disk_size_gb;
contract.vcpus = updatevmreq.0.vcpus;
contract.memory_mb = updatevmreq.0.memory_mb;
ghe0 marked this conversation as resolved
Review

I believe this is where we should modify the updated_at field.

I believe this is where we should modify the `updated_at` field.
contract.kernel_sha = updatevmreq.0.kernel_sha;
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha;
contract.updated_at = Utc::now().to_rfc3339();
}
}
pub async fn submit_newvmrequest(
&self,
mut req: grpc::NewVmRequest,
tx: OneshotSender<grpc::NewVmConfirmation>,
mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmResp>,
) {
let uuid = uuid::Uuid::new_v4().to_string();
req.uuid = uuid.clone();
info!("Inserting new vm request in memory: {req:?}");
self.tmp_vmrequests
self.tmp_newvm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) {
debug!(
@ -225,7 +267,7 @@ impl BrainData {
"Daemon {} RX dropped before sending update. Cleaning memory...",
req.node_pubkey
);
self.submit_vmconfirmation(grpc::NewVmConfirmation {
self.submit_vmconfirmation(grpc::NewVmResp {
error: "Daemon is offline.".to_string(),
uuid,
exposed_ports: Vec::new(),
@ -237,6 +279,61 @@ impl BrainData {
}
}
pub async fn submit_updatevm_req(
&self,
req: grpc::UpdateVmReq,
tx: OneshotSender<grpc::UpdateVmResp>,
) {
let uuid = req.uuid.clone();
info!("Inserting new vm update request in memory: {req:?}");
let node_pubkey = match self.find_contract_by_uuid(&req.uuid) {
Some(contract) => contract.node_pubkey,
None => {
log::warn!(
"Received UpdateVMReq for a contract that does not exist: {}",
req.uuid
);
let _ = tx.send(grpc::UpdateVmResp {
uuid,
error: "Contract does not exist.".to_string(),
});
return;
}
};
self.tmp_updatevm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_updatevm_tx.get(&node_pubkey) {
debug!(
"Found daemon TX for {}. Sending updateVMReq {}",
node_pubkey, req.uuid
);
match server_tx.send(req.clone()).await {
Ok(_) => {
debug!("Successfully sent updateVMReq to {}", node_pubkey);
return;
}
Err(e) => {
warn!(
"Failed to send updateVMReq to {}: {}. Cleaning memory...",
node_pubkey, e
);
self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid,
error: "Daemon is offline.".to_string(),
})
.await;
}
}
} else {
warn!("No daemon TX found for {}", node_pubkey);
self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid,
error: "Daemon is offline.".to_string(),
})
.await;
}
}
pub fn insert_contract(&self, contract: Contract) {
let mut contracts = self.contracts.write().unwrap();
contracts.push(contract);

@ -41,13 +41,13 @@ impl BrainCliMock {
impl BrainDaemonService for BrainDaemonMock {
async fn register_node(
&self,
req: Request<RegisterNodeRequest>,
req: Request<RegisterNodeReq>,
) -> Result<Response<Empty>, Status> {
self.data.insert_node(req.into_inner());
Ok(Response::new(Empty {}))
}
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmReq, Status>> + Send>>;
async fn get_new_vm_reqs(
&self,
@ -67,7 +67,7 @@ impl BrainDaemonService for BrainDaemonMock {
debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey);
if let Err(e) = grpc_tx.send(Ok(newvmreq)).await {
warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey);
data.submit_vmconfirmation(NewVmConfirmation {
data.submit_vmconfirmation(NewVmResp {
error: "Daemon not connected.".to_string(),
uuid,
..Default::default()
@ -83,11 +83,11 @@ impl BrainDaemonService for BrainDaemonMock {
))
}
async fn send_vm_confirmations(
async fn send_new_vm_resp(
&self,
req: Request<Streaming<NewVmConfirmation>>,
req: Request<Streaming<NewVmResp>>,
) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NewVmConfirmation");
debug!("Some node connected to stream NewVMResp");
let mut confirmations = req.into_inner();
while let Some(confirmation) = confirmations.next().await {
match confirmation {
@ -96,22 +96,21 @@ impl BrainDaemonService for BrainDaemonMock {
self.data.submit_vmconfirmation(c).await;
}
Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}")
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
}
}
}
Ok(Response::new(Empty {}))
}
type DeletedVMUpdatesStream =
Pin<Box<dyn Stream<Item = Result<DeletedVmUpdate, Status>> + Send>>;
type GetDeleteVMReqStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
async fn deleted_vm_updates(
async fn get_delete_vm_req(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
) -> Result<Response<Self::GetDeleteVMReqStream>, Status> {
let node_pubkey = req.into_inner().node_pubkey;
info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream");
info!("Daemon {node_pubkey} requested GetDeleteVMReqStream");
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
@ -135,7 +134,7 @@ impl BrainDaemonService for BrainDaemonMock {
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::DeletedVMUpdatesStream
Box::pin(output_stream) as Self::GetDeleteVMReqStream
))
}
@ -159,14 +158,73 @@ impl BrainDaemonService for BrainDaemonMock {
Box::pin(output_stream) as Self::ListVMContractsStream
))
}
type GetUpdateVMReqStream = Pin<Box<dyn Stream<Item = Result<UpdateVmReq, Status>> + Send>>;
async fn get_update_vm_req(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::GetUpdateVMReqStream>, Status> {
let req = req.into_inner();
info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey);
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.add_daemon_updatevm_tx(&req.node_pubkey, data_tx);
let data = self.data.clone();
tokio::spawn(async move {
while let Some(updatevmreq) = data_rx.recv().await {
debug!(
"Sending UpdateVMRequest to {}: {updatevmreq:?}",
req.node_pubkey
);
if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await {
warn!(
"Could not send UpdateVMRequest to {}: {e:?}",
req.node_pubkey
);
data.submit_updatevm_resp(UpdateVmResp {
error: "Daemon not connected.".to_string(),
uuid: updatevmreq.uuid,
})
.await;
break;
}
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::GetUpdateVMReqStream
))
}
async fn send_update_vm_resp(
&self,
req: Request<Streaming<UpdateVmResp>>,
) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NewVMResp");
let mut resp_stream = req.into_inner();
while let Some(update_vm_resp) = resp_stream.next().await {
match update_vm_resp {
Ok(c) => {
info!("Received confirmation from daemon: {c:?}");
self.data.submit_updatevm_resp(c).await;
}
Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
}
}
}
Ok(Response::new(Empty {}))
}
}
#[tonic::async_trait]
impl BrainCliService for BrainCliMock {
async fn create_vm_contract(
&self,
req: Request<NewVmRequest>,
) -> Result<Response<NewVmConfirmation>, Status> {
req: Request<NewVmReq>,
) -> Result<Response<NewVmResp>, Status> {
let req = req.into_inner();
info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone();
@ -186,6 +244,25 @@ impl BrainCliService for BrainCliMock {
}
}
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner();
info!("Update VM requested via CLI: {req:?}");
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_updatevm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending UpdateVMResp: {response:?}");
Ok(Response::new(response))
}
Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
@ -226,7 +303,7 @@ impl BrainCliService for BrainCliMock {
))
}
async fn delete_vm(&self, req: Request<DeletedVmUpdate>) -> Result<Response<Empty>, Status> {
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = req.into_inner();
info!("Unknown CLI requested to delete vm {}", req.uuid);
self.data.delete_vm(req).await;