updater #1
| @ -8,7 +8,7 @@ message NodePubkey { | ||||
|   string node_pubkey = 1; | ||||
| } | ||||
| 
 | ||||
| message RegisterNodeRequest {  | ||||
| message RegisterNodeReq {  | ||||
|   string node_pubkey = 1; | ||||
|   string owner_pubkey = 2; | ||||
|   string ip = 3; | ||||
| @ -23,7 +23,7 @@ message RegisterNodeRequest { | ||||
|   uint32 max_ports_per_vm = 12; | ||||
| } | ||||
| 
 | ||||
| message NewVMRequest { | ||||
| message NewVMReq { | ||||
|   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID | ||||
|   string hostname = 2; | ||||
|   string admin_pubkey = 3; | ||||
| @ -40,6 +40,22 @@ message NewVMRequest { | ||||
|   string dtrfs_sha = 14; | ||||
| } | ||||
| 
 | ||||
| message UpdateVMReq { | ||||
|   string uuid = 1; | ||||
|   uint32 disk_size_gb = 3; | ||||
|   uint32 vcpus = 4; | ||||
|   uint32 memory_mb = 5; | ||||
|   string kernel_url = 6; | ||||
|   string kernel_sha = 7; | ||||
|   string dtrfs_url = 8; | ||||
|   string dtrfs_sha = 9; | ||||
| } | ||||
| 
 | ||||
| message UpdateVMResp { | ||||
|   string uuid = 1; | ||||
|   string error = 3; | ||||
| } | ||||
| 
 | ||||
| message VMContract { | ||||
|   string uuid = 1; | ||||
|   string hostname = 2; | ||||
| @ -54,6 +70,7 @@ message VMContract { | ||||
|   string kernel_sha = 11; | ||||
|   string dtrfs_sha = 12; | ||||
|   string created_at = 13; | ||||
|   string updated_at = 14; | ||||
| } | ||||
| 
 | ||||
| message ListVMContractsReq { | ||||
| @ -61,7 +78,7 @@ message ListVMContractsReq { | ||||
|   string node_pubkey = 2; | ||||
| } | ||||
| 
 | ||||
| message NewVMConfirmation { | ||||
| message NewVMResp { | ||||
|   string uuid = 1; | ||||
|   repeated uint32 exposed_ports = 2; | ||||
|   string public_ipv4 = 3; | ||||
| @ -69,18 +86,21 @@ message NewVMConfirmation { | ||||
|   string error = 5; | ||||
| } | ||||
| 
 | ||||
| message DeletedVMUpdate { | ||||
| message DeleteVMReq { | ||||
|   string uuid = 1; | ||||
| } | ||||
| 
 | ||||
| service BrainDaemonService { | ||||
|   rpc RegisterNode (RegisterNodeRequest) returns (Empty); | ||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); | ||||
|   rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); | ||||
|   rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); | ||||
|   rpc RegisterNode (RegisterNodeReq) returns (Empty); | ||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); | ||||
|   rpc SendNewVMResp (stream NewVMResp) returns (Empty); | ||||
|   rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); | ||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||
|   rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); | ||||
|   rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| message NodeFilters { | ||||
|   uint32 free_ports = 1; | ||||
|   bool offers_ipv4 = 2; | ||||
| @ -101,9 +121,9 @@ message NodeListResp { | ||||
| } | ||||
| 
 | ||||
| service BrainCliService { | ||||
|   rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); | ||||
|   rpc CreateVMContract (NewVMReq) returns (NewVMResp); | ||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||
|   rpc ListNodes (NodeFilters) returns (stream NodeListResp); | ||||
|   rpc DeleteVM (DeletedVMUpdate) returns (Empty); | ||||
|   rpc DeleteVM (DeleteVMReq) returns (Empty); | ||||
|   rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -5,8 +5,8 @@ pub mod brain { | ||||
| 
 | ||||
| use anyhow::Result; | ||||
| use brain::{ | ||||
|     brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, | ||||
|     NewVmRequest, NodeFilters, NodeListResp, VmContract, | ||||
|     brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq, | ||||
|     NodeFilters, NodeListResp, UpdateVmReq, VmContract, | ||||
| }; | ||||
| use lazy_static::lazy_static; | ||||
| use log::{debug, info, warn}; | ||||
| @ -65,7 +65,7 @@ async fn submit_vm_request( | ||||
|     mut client: BrainCliServiceClient<Channel>, | ||||
|     node_pubkey: &str, | ||||
| ) -> Result<()> { | ||||
|     let req = NewVmRequest { | ||||
|     let req = NewVmReq { | ||||
|         uuid: String::new(), | ||||
|         admin_pubkey: SECURE_PUBLIC_KEY.clone(), | ||||
|         node_pubkey: node_pubkey.to_string(), | ||||
| @ -84,12 +84,12 @@ async fn submit_vm_request( | ||||
|     info!("Creating VM {req:?}"); | ||||
|     let result = client.create_vm_contract(req).await; | ||||
|     match result { | ||||
|         Ok(confirmation) => { | ||||
|             let confirmation = confirmation.into_inner(); | ||||
|             if confirmation.error.is_empty() { | ||||
|                 info!("Got VM confirmation: {confirmation:?}"); | ||||
|         Ok(resp) => { | ||||
|             let resp = resp.into_inner(); | ||||
|             if resp.error.is_empty() { | ||||
|                 info!("Got NewVMResp: {resp:?}"); | ||||
|             } else { | ||||
|                 warn!("Got VM confirmation error: {}", confirmation.error); | ||||
|                 warn!("Got new VM error: {}", resp.error); | ||||
|             }; | ||||
|         } | ||||
|         Err(e) => log::error!("Could not create vm: {e:?}"), | ||||
| @ -123,7 +123,7 @@ async fn list_contracts(mut client: BrainCliServiceClient<Channel>) -> Result<Ve | ||||
| } | ||||
| 
 | ||||
| async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> { | ||||
|     let req = DeletedVmUpdate { | ||||
|     let req = DeleteVmReq { | ||||
|         uuid: uuid.to_string(), | ||||
|     }; | ||||
|     info!("Creating VM {req:?}"); | ||||
| @ -135,6 +135,36 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn update_vm_request( | ||||
|     mut client: BrainCliServiceClient<Channel>, | ||||
|     uuid: &str, | ||||
| ) -> Result<()> { | ||||
|     let req = UpdateVmReq { | ||||
|         uuid: uuid.to_string(), | ||||
|         vcpus: 4, | ||||
|         memory_mb: 4096, | ||||
|         disk_size_gb: 40, | ||||
|         kernel_url: "thisIsMyNewURL".to_string(), | ||||
|         kernel_sha: "thisIsMyNewSha".to_string(), | ||||
|         dtrfs_url: "thisIsMyNewURL".to_string(), | ||||
|         dtrfs_sha: "thisIsMyNewSha".to_string(), | ||||
|     }; | ||||
|     info!("Updating VM {req:?}"); | ||||
|     let result = client.update_vm(req).await; | ||||
|     match result { | ||||
|         Ok(resp) => { | ||||
|             let resp = resp.into_inner(); | ||||
|             if resp.error.is_empty() { | ||||
|                 info!("Got VM update response: {resp:?}"); | ||||
|             } else { | ||||
|                 warn!("Got VM update error: {}", resp.error); | ||||
|             }; | ||||
|         } | ||||
|         Err(e) => log::error!("Could not update vm: {e:?}"), | ||||
|     }; | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<()> { | ||||
|     env_logger::builder() | ||||
| @ -154,6 +184,16 @@ async fn main() -> Result<()> { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let contracts = list_contracts(client.clone()).await?; | ||||
|     for contract in contracts { | ||||
|         if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await { | ||||
|             log::error!( | ||||
|                 "Received error when updating VM on node {}: {e:?}", | ||||
|                 &contract.node_pubkey | ||||
|             ); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     if std::env::var("DELETE_VMS").is_err() { | ||||
|         return Ok(()); | ||||
|     } | ||||
|  | ||||
| @ -8,7 +8,7 @@ message NodePubkey { | ||||
|   string node_pubkey = 1; | ||||
| } | ||||
| 
 | ||||
| message RegisterNodeRequest {  | ||||
| message RegisterNodeReq {  | ||||
|   string node_pubkey = 1; | ||||
|   string owner_pubkey = 2; | ||||
|   string ip = 3; | ||||
| @ -23,7 +23,7 @@ message RegisterNodeRequest { | ||||
|   uint32 max_ports_per_vm = 12; | ||||
| } | ||||
| 
 | ||||
| message NewVMRequest { | ||||
| message NewVMReq { | ||||
|   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID | ||||
|   string hostname = 2; | ||||
|   string admin_pubkey = 3; | ||||
| @ -40,6 +40,22 @@ message NewVMRequest { | ||||
|   string dtrfs_sha = 14; | ||||
| } | ||||
| 
 | ||||
| message UpdateVMReq { | ||||
|   string uuid = 1; | ||||
|   uint32 disk_size_gb = 3; | ||||
|   uint32 vcpus = 4; | ||||
|   uint32 memory_mb = 5; | ||||
|   string kernel_url = 6; | ||||
|   string kernel_sha = 7; | ||||
|   string dtrfs_url = 8; | ||||
|   string dtrfs_sha = 9; | ||||
| } | ||||
| 
 | ||||
| message UpdateVMResp { | ||||
|   string uuid = 1; | ||||
|   string error = 3; | ||||
| } | ||||
| 
 | ||||
| message VMContract { | ||||
|   string uuid = 1; | ||||
|   string hostname = 2; | ||||
| @ -54,6 +70,7 @@ message VMContract { | ||||
|   string kernel_sha = 11; | ||||
|   string dtrfs_sha = 12; | ||||
|   string created_at = 13; | ||||
|   string updated_at = 14; | ||||
| } | ||||
| 
 | ||||
| message ListVMContractsReq { | ||||
| @ -61,7 +78,7 @@ message ListVMContractsReq { | ||||
|   string node_pubkey = 2; | ||||
| } | ||||
| 
 | ||||
| message NewVMConfirmation { | ||||
| message NewVMResp { | ||||
|   string uuid = 1; | ||||
|   repeated uint32 exposed_ports = 2; | ||||
|   string public_ipv4 = 3; | ||||
| @ -69,18 +86,21 @@ message NewVMConfirmation { | ||||
|   string error = 5; | ||||
| } | ||||
| 
 | ||||
| message DeletedVMUpdate { | ||||
| message DeleteVMReq { | ||||
|   string uuid = 1; | ||||
| } | ||||
| 
 | ||||
| service BrainDaemonService { | ||||
|   rpc RegisterNode (RegisterNodeRequest) returns (Empty); | ||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); | ||||
|   rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); | ||||
|   rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); | ||||
|   rpc RegisterNode (RegisterNodeReq) returns (Empty); | ||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); | ||||
|   rpc SendNewVMResp (stream NewVMResp) returns (Empty); | ||||
|   rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); | ||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||
|   rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); | ||||
|   rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| message NodeFilters { | ||||
|   uint32 free_ports = 1; | ||||
|   bool offers_ipv4 = 2; | ||||
| @ -101,9 +121,9 @@ message NodeListResp { | ||||
| } | ||||
| 
 | ||||
| service BrainCliService { | ||||
|   rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); | ||||
|   rpc CreateVMContract (NewVMReq) returns (NewVMResp); | ||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||
|   rpc ListNodes (NodeFilters) returns (stream NodeListResp); | ||||
|   rpc DeleteVM (DeletedVMUpdate) returns (Empty); | ||||
|   rpc DeleteVM (DeleteVMReq) returns (Empty); | ||||
|   rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -5,8 +5,8 @@ pub mod brain { | ||||
| 
 | ||||
| use anyhow::Result; | ||||
| use brain::{ | ||||
|     brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, | ||||
|     NewVmRequest, NodePubkey, RegisterNodeRequest, | ||||
|     brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp, | ||||
|     NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, | ||||
| }; | ||||
| use lazy_static::lazy_static; | ||||
| use log::{debug, error, info, warn}; | ||||
| @ -31,7 +31,7 @@ fn generate_random_string() -> String { | ||||
| 
 | ||||
| async fn listen_for_new_vm_reqs( | ||||
|     mut client: BrainDaemonServiceClient<Channel>, | ||||
|     tx: Sender<NewVmRequest>, | ||||
|     tx: Sender<NewVmReq>, | ||||
| ) -> Result<()> { | ||||
|     debug!("starting listen_for_new_vm_reqs"); | ||||
|     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||
| @ -54,20 +54,20 @@ async fn listen_for_new_vm_reqs( | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn send_confirmations( | ||||
| async fn send_newvm_resp( | ||||
|     mut client: BrainDaemonServiceClient<Channel>, | ||||
|     rx: Receiver<NewVmConfirmation>, | ||||
|     rx: Receiver<NewVmResp>, | ||||
| ) -> Result<()> { | ||||
|     debug!("starting send_confirmations stream"); | ||||
|     debug!("starting send_newvm_resp stream"); | ||||
|     let rx_stream = ReceiverStream::new(rx); | ||||
|     client.send_vm_confirmations(rx_stream).await?; | ||||
|     debug!("send_confirmations is about to exit"); | ||||
|     client.send_new_vm_resp(rx_stream).await?; | ||||
|     debug!("send_newvm_resp is about to exit"); | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { | ||||
|     debug!("Starting node registration..."); | ||||
|     let req = RegisterNodeRequest { | ||||
|     let req = RegisterNodeReq { | ||||
|         node_pubkey: SECURE_PUBLIC_KEY.clone(), | ||||
|         owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, | ||||
|         ip: "10.0.10.1".to_string(), | ||||
| @ -92,12 +92,12 @@ async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { | ||||
| 
 | ||||
| async fn listen_for_deleted_vms( | ||||
|     mut client: BrainDaemonServiceClient<Channel>, | ||||
|     tx: Sender<DeletedVmUpdate>, | ||||
|     tx: Sender<DeleteVmReq>, | ||||
| ) -> Result<()> { | ||||
|     debug!("starting listen_for_new_vm_reqs"); | ||||
|     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||
|     let mut grpc_stream = client | ||||
|         .deleted_vm_updates(NodePubkey { node_pubkey }) | ||||
|         .get_delete_vm_req(NodePubkey { node_pubkey }) | ||||
|         .await? | ||||
|         .into_inner(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
| @ -115,7 +115,59 @@ async fn listen_for_deleted_vms( | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) { | ||||
| async fn listen_for_update_vm_reqs( | ||||
|     mut client: BrainDaemonServiceClient<Channel>, | ||||
|     tx: Sender<UpdateVmReq>, | ||||
| ) -> Result<()> { | ||||
|     debug!("starting listen_for_update_vm_reqs"); | ||||
|     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||
|     let mut grpc_stream = client | ||||
|         .get_update_vm_req(NodePubkey { node_pubkey }) | ||||
|         .await? | ||||
|         .into_inner(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(req) => { | ||||
|                 info!("Received update vm request: {req:?}"); | ||||
|                 let _ = tx.send(req).await; | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 warn!("Brain disconnected from listen_for_update_vm_reqs: {e}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     debug!("listen_for_update_vm_reqs is about to exit"); | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn handle_update_vm_requests( | ||||
|     mut req: Receiver<UpdateVmReq>, | ||||
|     resp_chan: Sender<UpdateVmResp> | ||||
|     ) { | ||||
|     info!("Started to handle update vm requests."); | ||||
|     while let Some(update_vm) = req.recv().await { | ||||
|         let update_vm_resp = UpdateVmResp { | ||||
|             uuid: update_vm.uuid, | ||||
|             error: "".to_string(), | ||||
|         }; | ||||
|         info!("Sending UpdateVmResp: {update_vm_resp:?}"); | ||||
|         let _ = resp_chan.send(update_vm_resp).await; | ||||
|     }; | ||||
|     warn!("update vm request handler is ending"); | ||||
| } | ||||
| 
 | ||||
| async fn send_updatevm_resp( | ||||
|     mut client: BrainDaemonServiceClient<Channel>, | ||||
|     rx: Receiver<UpdateVmResp>, | ||||
| ) -> Result<()> { | ||||
|     debug!("starting send_updatevm_resp stream"); | ||||
|     let rx_stream = ReceiverStream::new(rx); | ||||
|     client.send_update_vm_resp(rx_stream).await?; | ||||
|     debug!("send_updatevm_resp is about to exit"); | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) { | ||||
|     info!("Started to handle vm requests. 1 out of 5 requests will return error."); | ||||
|     let mut i = 0; | ||||
|     while let Some(new_vm) = req.recv().await { | ||||
| @ -128,11 +180,11 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC | ||||
|             false => String::new(), | ||||
|         }; | ||||
|         let public_ipv6 = match new_vm.public_ipv6 { | ||||
|             true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), | ||||
|             true => "2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), | ||||
|             false => String::new(), | ||||
|         }; | ||||
|         if i != 3 { | ||||
|             let confirmation = NewVmConfirmation { | ||||
|             let confirmation = NewVmResp{ | ||||
|                 uuid: new_vm.uuid, | ||||
|                 exposed_ports, | ||||
|                 public_ipv4, | ||||
| @ -142,7 +194,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC | ||||
|             info!("Sending NewVmConfirmation: {confirmation:?}"); | ||||
|             let _ = resp.send(confirmation).await; | ||||
|         } else { | ||||
|             let confirmation = NewVmConfirmation { | ||||
|             let confirmation = NewVmResp { | ||||
|                 uuid: new_vm.uuid, | ||||
|                 exposed_ports: Vec::new(), | ||||
|                 public_ipv4: String::new(), | ||||
| @ -172,12 +224,25 @@ async fn connect_and_run() -> Result<()> { | ||||
| 
 | ||||
|     let confirm_client = client.clone(); | ||||
|     let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); | ||||
|     streaming_tasks.spawn(send_confirmations(confirm_client, rx)); | ||||
|     streaming_tasks.spawn(send_newvm_resp(confirm_client, rx)); | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         handle_vm_requests(newvm_rx, confirm_tx).await; | ||||
|     }); | ||||
| 
 | ||||
|     let updatevm_client = client.clone(); | ||||
|     let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6); | ||||
|     streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx)); | ||||
| 
 | ||||
|     let resp_client = client.clone(); | ||||
|     let (resp_tx, rx) = tokio::sync::mpsc::channel(6); | ||||
|     streaming_tasks.spawn(send_updatevm_resp(resp_client, rx)); | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         handle_update_vm_requests(updatevm_rx, resp_tx).await; | ||||
|     }); | ||||
| 
 | ||||
| 
 | ||||
|     let deletevms_client = client.clone(); | ||||
|     let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); | ||||
|     streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user