diff --git a/cli-mock/brain.proto b/cli-mock/brain.proto index 05d8e4e..9a9e84f 100644 --- a/cli-mock/brain.proto +++ b/cli-mock/brain.proto @@ -8,7 +8,7 @@ message NodePubkey { string node_pubkey = 1; } -message RegisterNodeRequest { +message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string ip = 3; @@ -23,7 +23,7 @@ message RegisterNodeRequest { uint32 max_ports_per_vm = 12; } -message NewVMRequest { +message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; string admin_pubkey = 3; @@ -40,6 +40,22 @@ message NewVMRequest { string dtrfs_sha = 14; } +message UpdateVMReq { + string uuid = 1; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; +} + +message UpdateVMResp { + string uuid = 1; + string error = 3; +} + message VMContract { string uuid = 1; string hostname = 2; @@ -54,6 +70,7 @@ message VMContract { string kernel_sha = 11; string dtrfs_sha = 12; string created_at = 13; + string updated_at = 14; } message ListVMContractsReq { @@ -61,7 +78,7 @@ message ListVMContractsReq { string node_pubkey = 2; } -message NewVMConfirmation { +message NewVMResp { string uuid = 1; repeated uint32 exposed_ports = 2; string public_ipv4 = 3; @@ -69,18 +86,21 @@ message NewVMConfirmation { string error = 5; } -message DeletedVMUpdate { +message DeleteVMReq { string uuid = 1; } service BrainDaemonService { - rpc RegisterNode (RegisterNodeRequest) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); - rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); - rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); + rpc RegisterNode (RegisterNodeReq) returns (Empty); + rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); + rpc SendNewVMResp (stream NewVMResp) returns (Empty); + rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); + rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); + rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } + message NodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; @@ -101,9 +121,9 @@ message NodeListResp { } service BrainCliService { - rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); + rpc CreateVMContract (NewVMReq) returns (NewVMResp); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); - rpc DeleteVM (DeletedVMUpdate) returns (Empty); + rpc DeleteVM (DeleteVMReq) returns (Empty); + rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); } - diff --git a/cli-mock/src/main.rs b/cli-mock/src/main.rs index 0e51c7f..2b4655b 100644 --- a/cli-mock/src/main.rs +++ b/cli-mock/src/main.rs @@ -5,8 +5,8 @@ pub mod brain { use anyhow::Result; use brain::{ - brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, - NewVmRequest, NodeFilters, NodeListResp, VmContract, + brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq, + NodeFilters, NodeListResp, UpdateVmReq, VmContract, }; use lazy_static::lazy_static; use log::{debug, info, warn}; @@ -65,7 +65,7 @@ async fn submit_vm_request( mut client: BrainCliServiceClient, node_pubkey: &str, ) -> Result<()> { - let req = NewVmRequest { + let req = NewVmReq { uuid: String::new(), admin_pubkey: SECURE_PUBLIC_KEY.clone(), node_pubkey: node_pubkey.to_string(), @@ -84,12 +84,12 @@ async fn submit_vm_request( info!("Creating VM {req:?}"); let result = client.create_vm_contract(req).await; match result { - Ok(confirmation) => { - let confirmation = confirmation.into_inner(); - if confirmation.error.is_empty() { - info!("Got VM confirmation: {confirmation:?}"); + Ok(resp) => { + let resp = resp.into_inner(); + if resp.error.is_empty() { + info!("Got NewVMResp: {resp:?}"); } else { - warn!("Got VM confirmation error: {}", confirmation.error); + warn!("Got new VM error: {}", resp.error); }; } Err(e) => log::error!("Could not create vm: {e:?}"), @@ -123,7 +123,7 @@ async fn list_contracts(mut client: BrainCliServiceClient) -> Result, uuid: &str) -> Result<()> { - let req = DeletedVmUpdate { + let req = DeleteVmReq { uuid: uuid.to_string(), }; info!("Creating VM {req:?}"); @@ -135,6 +135,36 @@ async fn delete_vm(mut client: BrainCliServiceClient, uuid: &str) -> Re Ok(()) } +async fn update_vm_request( + mut client: BrainCliServiceClient, + uuid: &str, +) -> Result<()> { + let req = UpdateVmReq { + uuid: uuid.to_string(), + vcpus: 4, + memory_mb: 4096, + disk_size_gb: 40, + kernel_url: "thisIsMyNewURL".to_string(), + kernel_sha: "thisIsMyNewSha".to_string(), + dtrfs_url: "thisIsMyNewURL".to_string(), + dtrfs_sha: "thisIsMyNewSha".to_string(), + }; + info!("Updating VM {req:?}"); + let result = client.update_vm(req).await; + match result { + Ok(resp) => { + let resp = resp.into_inner(); + if resp.error.is_empty() { + info!("Got VM update response: {resp:?}"); + } else { + warn!("Got VM update error: {}", resp.error); + }; + } + Err(e) => log::error!("Could not update vm: {e:?}"), + }; + Ok(()) +} + #[tokio::main] async fn main() -> Result<()> { env_logger::builder() @@ -154,6 +184,16 @@ async fn main() -> Result<()> { } } + let contracts = list_contracts(client.clone()).await?; + for contract in contracts { + if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await { + log::error!( + "Received error when updating VM on node {}: {e:?}", + &contract.node_pubkey + ); + } + } + if std::env::var("DELETE_VMS").is_err() { return Ok(()); } diff --git a/daemon-mock/brain.proto b/daemon-mock/brain.proto index 05d8e4e..9a9e84f 100644 --- a/daemon-mock/brain.proto +++ b/daemon-mock/brain.proto @@ -8,7 +8,7 @@ message NodePubkey { string node_pubkey = 1; } -message RegisterNodeRequest { +message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string ip = 3; @@ -23,7 +23,7 @@ message RegisterNodeRequest { uint32 max_ports_per_vm = 12; } -message NewVMRequest { +message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; string admin_pubkey = 3; @@ -40,6 +40,22 @@ message NewVMRequest { string dtrfs_sha = 14; } +message UpdateVMReq { + string uuid = 1; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; +} + +message UpdateVMResp { + string uuid = 1; + string error = 3; +} + message VMContract { string uuid = 1; string hostname = 2; @@ -54,6 +70,7 @@ message VMContract { string kernel_sha = 11; string dtrfs_sha = 12; string created_at = 13; + string updated_at = 14; } message ListVMContractsReq { @@ -61,7 +78,7 @@ message ListVMContractsReq { string node_pubkey = 2; } -message NewVMConfirmation { +message NewVMResp { string uuid = 1; repeated uint32 exposed_ports = 2; string public_ipv4 = 3; @@ -69,18 +86,21 @@ message NewVMConfirmation { string error = 5; } -message DeletedVMUpdate { +message DeleteVMReq { string uuid = 1; } service BrainDaemonService { - rpc RegisterNode (RegisterNodeRequest) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); - rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); - rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); + rpc RegisterNode (RegisterNodeReq) returns (Empty); + rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); + rpc SendNewVMResp (stream NewVMResp) returns (Empty); + rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); + rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); + rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } + message NodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; @@ -101,9 +121,9 @@ message NodeListResp { } service BrainCliService { - rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); + rpc CreateVMContract (NewVMReq) returns (NewVMResp); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); - rpc DeleteVM (DeletedVMUpdate) returns (Empty); + rpc DeleteVM (DeleteVMReq) returns (Empty); + rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); } - diff --git a/daemon-mock/src/main.rs b/daemon-mock/src/main.rs index a07da74..1233265 100644 --- a/daemon-mock/src/main.rs +++ b/daemon-mock/src/main.rs @@ -5,8 +5,8 @@ pub mod brain { use anyhow::Result; use brain::{ - brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, - NewVmRequest, NodePubkey, RegisterNodeRequest, + brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp, + NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, }; use lazy_static::lazy_static; use log::{debug, error, info, warn}; @@ -31,7 +31,7 @@ fn generate_random_string() -> String { async fn listen_for_new_vm_reqs( mut client: BrainDaemonServiceClient, - tx: Sender, + tx: Sender, ) -> Result<()> { debug!("starting listen_for_new_vm_reqs"); let node_pubkey = SECURE_PUBLIC_KEY.clone(); @@ -54,20 +54,20 @@ async fn listen_for_new_vm_reqs( Ok(()) } -async fn send_confirmations( +async fn send_newvm_resp( mut client: BrainDaemonServiceClient, - rx: Receiver, + rx: Receiver, ) -> Result<()> { - debug!("starting send_confirmations stream"); + debug!("starting send_newvm_resp stream"); let rx_stream = ReceiverStream::new(rx); - client.send_vm_confirmations(rx_stream).await?; - debug!("send_confirmations is about to exit"); + client.send_new_vm_resp(rx_stream).await?; + debug!("send_newvm_resp is about to exit"); Ok(()) } async fn register_node(mut client: BrainDaemonServiceClient) { debug!("Starting node registration..."); - let req = RegisterNodeRequest { + let req = RegisterNodeReq { node_pubkey: SECURE_PUBLIC_KEY.clone(), owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, ip: "10.0.10.1".to_string(), @@ -92,12 +92,12 @@ async fn register_node(mut client: BrainDaemonServiceClient) { async fn listen_for_deleted_vms( mut client: BrainDaemonServiceClient, - tx: Sender, + tx: Sender, ) -> Result<()> { debug!("starting listen_for_new_vm_reqs"); let node_pubkey = SECURE_PUBLIC_KEY.clone(); let mut grpc_stream = client - .deleted_vm_updates(NodePubkey { node_pubkey }) + .get_delete_vm_req(NodePubkey { node_pubkey }) .await? .into_inner(); while let Some(stream_update) = grpc_stream.next().await { @@ -115,7 +115,59 @@ async fn listen_for_deleted_vms( Ok(()) } -async fn handle_vm_requests(mut req: Receiver, resp: Sender) { +async fn listen_for_update_vm_reqs( + mut client: BrainDaemonServiceClient, + tx: Sender, +) -> Result<()> { + debug!("starting listen_for_update_vm_reqs"); + let node_pubkey = SECURE_PUBLIC_KEY.clone(); + let mut grpc_stream = client + .get_update_vm_req(NodePubkey { node_pubkey }) + .await? + .into_inner(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(req) => { + info!("Received update vm request: {req:?}"); + let _ = tx.send(req).await; + } + Err(e) => { + warn!("Brain disconnected from listen_for_update_vm_reqs: {e}"); + } + } + } + debug!("listen_for_update_vm_reqs is about to exit"); + Ok(()) +} + +async fn handle_update_vm_requests( + mut req: Receiver, + resp_chan: Sender + ) { + info!("Started to handle update vm requests."); + while let Some(update_vm) = req.recv().await { + let update_vm_resp = UpdateVmResp { + uuid: update_vm.uuid, + error: "".to_string(), + }; + info!("Sending UpdateVmResp: {update_vm_resp:?}"); + let _ = resp_chan.send(update_vm_resp).await; + }; + warn!("update vm request handler is ending"); +} + +async fn send_updatevm_resp( + mut client: BrainDaemonServiceClient, + rx: Receiver, +) -> Result<()> { + debug!("starting send_updatevm_resp stream"); + let rx_stream = ReceiverStream::new(rx); + client.send_update_vm_resp(rx_stream).await?; + debug!("send_updatevm_resp is about to exit"); + Ok(()) +} + +async fn handle_vm_requests(mut req: Receiver, resp: Sender) { info!("Started to handle vm requests. 1 out of 5 requests will return error."); let mut i = 0; while let Some(new_vm) = req.recv().await { @@ -128,11 +180,11 @@ async fn handle_vm_requests(mut req: Receiver, resp: Sender String::new(), }; let public_ipv6 = match new_vm.public_ipv6 { - true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), + true => "2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), false => String::new(), }; if i != 3 { - let confirmation = NewVmConfirmation { + let confirmation = NewVmResp{ uuid: new_vm.uuid, exposed_ports, public_ipv4, @@ -142,7 +194,7 @@ async fn handle_vm_requests(mut req: Receiver, resp: Sender Result<()> { let confirm_client = client.clone(); let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); - streaming_tasks.spawn(send_confirmations(confirm_client, rx)); + streaming_tasks.spawn(send_newvm_resp(confirm_client, rx)); tokio::spawn(async move { handle_vm_requests(newvm_rx, confirm_tx).await; }); + let updatevm_client = client.clone(); + let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx)); + + let resp_client = client.clone(); + let (resp_tx, rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(send_updatevm_resp(resp_client, rx)); + + tokio::spawn(async move { + handle_update_vm_requests(updatevm_rx, resp_tx).await; + }); + + let deletevms_client = client.clone(); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));