updater #1
| @ -8,7 +8,7 @@ message NodePubkey { | |||||||
|   string node_pubkey = 1; |   string node_pubkey = 1; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message RegisterNodeRequest {  | message RegisterNodeReq {  | ||||||
|   string node_pubkey = 1; |   string node_pubkey = 1; | ||||||
|   string owner_pubkey = 2; |   string owner_pubkey = 2; | ||||||
|   string ip = 3; |   string ip = 3; | ||||||
| @ -23,7 +23,7 @@ message RegisterNodeRequest { | |||||||
|   uint32 max_ports_per_vm = 12; |   uint32 max_ports_per_vm = 12; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message NewVMRequest { | message NewVMReq { | ||||||
|   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID |   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID | ||||||
|   string hostname = 2; |   string hostname = 2; | ||||||
|   string admin_pubkey = 3; |   string admin_pubkey = 3; | ||||||
| @ -40,6 +40,22 @@ message NewVMRequest { | |||||||
|   string dtrfs_sha = 14; |   string dtrfs_sha = 14; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | message UpdateVMReq { | ||||||
|  |   string uuid = 1; | ||||||
|  |   uint32 disk_size_gb = 3; | ||||||
|  |   uint32 vcpus = 4; | ||||||
|  |   uint32 memory_mb = 5; | ||||||
|  |   string kernel_url = 6; | ||||||
|  |   string kernel_sha = 7; | ||||||
|  |   string dtrfs_url = 8; | ||||||
|  |   string dtrfs_sha = 9; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | message UpdateVMResp { | ||||||
|  |   string uuid = 1; | ||||||
|  |   string error = 3; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| message VMContract { | message VMContract { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
|   string hostname = 2; |   string hostname = 2; | ||||||
| @ -54,6 +70,7 @@ message VMContract { | |||||||
|   string kernel_sha = 11; |   string kernel_sha = 11; | ||||||
|   string dtrfs_sha = 12; |   string dtrfs_sha = 12; | ||||||
|   string created_at = 13; |   string created_at = 13; | ||||||
|  |   string updated_at = 14; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message ListVMContractsReq { | message ListVMContractsReq { | ||||||
| @ -61,7 +78,7 @@ message ListVMContractsReq { | |||||||
|   string node_pubkey = 2; |   string node_pubkey = 2; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message NewVMConfirmation { | message NewVMResp { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
|   repeated uint32 exposed_ports = 2; |   repeated uint32 exposed_ports = 2; | ||||||
|   string public_ipv4 = 3; |   string public_ipv4 = 3; | ||||||
| @ -69,18 +86,21 @@ message NewVMConfirmation { | |||||||
|   string error = 5; |   string error = 5; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message DeletedVMUpdate { | message DeleteVMReq { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| service BrainDaemonService { | service BrainDaemonService { | ||||||
|   rpc RegisterNode (RegisterNodeRequest) returns (Empty); |   rpc RegisterNode (RegisterNodeReq) returns (Empty); | ||||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); |   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); | ||||||
|   rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); |   rpc SendNewVMResp (stream NewVMResp) returns (Empty); | ||||||
|   rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); |   rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); | ||||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); |   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||||
|  |   rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); | ||||||
|  |   rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| message NodeFilters { | message NodeFilters { | ||||||
|   uint32 free_ports = 1; |   uint32 free_ports = 1; | ||||||
|   bool offers_ipv4 = 2; |   bool offers_ipv4 = 2; | ||||||
| @ -101,9 +121,9 @@ message NodeListResp { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| service BrainCliService { | service BrainCliService { | ||||||
|   rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); |   rpc CreateVMContract (NewVMReq) returns (NewVMResp); | ||||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); |   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||||
|   rpc ListNodes (NodeFilters) returns (stream NodeListResp); |   rpc ListNodes (NodeFilters) returns (stream NodeListResp); | ||||||
|   rpc DeleteVM (DeletedVMUpdate) returns (Empty); |   rpc DeleteVM (DeleteVMReq) returns (Empty); | ||||||
|  |   rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); | ||||||
| } | } | ||||||
| 
 |  | ||||||
|  | |||||||
| @ -5,8 +5,8 @@ pub mod brain { | |||||||
| 
 | 
 | ||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| use brain::{ | use brain::{ | ||||||
|     brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, |     brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq, | ||||||
|     NewVmRequest, NodeFilters, NodeListResp, VmContract, |     NodeFilters, NodeListResp, UpdateVmReq, VmContract, | ||||||
| }; | }; | ||||||
| use lazy_static::lazy_static; | use lazy_static::lazy_static; | ||||||
| use log::{debug, info, warn}; | use log::{debug, info, warn}; | ||||||
| @ -65,7 +65,7 @@ async fn submit_vm_request( | |||||||
|     mut client: BrainCliServiceClient<Channel>, |     mut client: BrainCliServiceClient<Channel>, | ||||||
|     node_pubkey: &str, |     node_pubkey: &str, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     let req = NewVmRequest { |     let req = NewVmReq { | ||||||
|         uuid: String::new(), |         uuid: String::new(), | ||||||
|         admin_pubkey: SECURE_PUBLIC_KEY.clone(), |         admin_pubkey: SECURE_PUBLIC_KEY.clone(), | ||||||
|         node_pubkey: node_pubkey.to_string(), |         node_pubkey: node_pubkey.to_string(), | ||||||
| @ -84,12 +84,12 @@ async fn submit_vm_request( | |||||||
|     info!("Creating VM {req:?}"); |     info!("Creating VM {req:?}"); | ||||||
|     let result = client.create_vm_contract(req).await; |     let result = client.create_vm_contract(req).await; | ||||||
|     match result { |     match result { | ||||||
|         Ok(confirmation) => { |         Ok(resp) => { | ||||||
|             let confirmation = confirmation.into_inner(); |             let resp = resp.into_inner(); | ||||||
|             if confirmation.error.is_empty() { |             if resp.error.is_empty() { | ||||||
|                 info!("Got VM confirmation: {confirmation:?}"); |                 info!("Got NewVMResp: {resp:?}"); | ||||||
|             } else { |             } else { | ||||||
|                 warn!("Got VM confirmation error: {}", confirmation.error); |                 warn!("Got new VM error: {}", resp.error); | ||||||
|             }; |             }; | ||||||
|         } |         } | ||||||
|         Err(e) => log::error!("Could not create vm: {e:?}"), |         Err(e) => log::error!("Could not create vm: {e:?}"), | ||||||
| @ -123,7 +123,7 @@ async fn list_contracts(mut client: BrainCliServiceClient<Channel>) -> Result<Ve | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> { | async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> { | ||||||
|     let req = DeletedVmUpdate { |     let req = DeleteVmReq { | ||||||
|         uuid: uuid.to_string(), |         uuid: uuid.to_string(), | ||||||
|     }; |     }; | ||||||
|     info!("Creating VM {req:?}"); |     info!("Creating VM {req:?}"); | ||||||
| @ -135,6 +135,36 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re | |||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | async fn update_vm_request( | ||||||
|  |     mut client: BrainCliServiceClient<Channel>, | ||||||
|  |     uuid: &str, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     let req = UpdateVmReq { | ||||||
|  |         uuid: uuid.to_string(), | ||||||
|  |         vcpus: 4, | ||||||
|  |         memory_mb: 4096, | ||||||
|  |         disk_size_gb: 40, | ||||||
|  |         kernel_url: "thisIsMyNewURL".to_string(), | ||||||
|  |         kernel_sha: "thisIsMyNewSha".to_string(), | ||||||
|  |         dtrfs_url: "thisIsMyNewURL".to_string(), | ||||||
|  |         dtrfs_sha: "thisIsMyNewSha".to_string(), | ||||||
|  |     }; | ||||||
|  |     info!("Updating VM {req:?}"); | ||||||
|  |     let result = client.update_vm(req).await; | ||||||
|  |     match result { | ||||||
|  |         Ok(resp) => { | ||||||
|  |             let resp = resp.into_inner(); | ||||||
|  |             if resp.error.is_empty() { | ||||||
|  |                 info!("Got VM update response: {resp:?}"); | ||||||
|  |             } else { | ||||||
|  |                 warn!("Got VM update error: {}", resp.error); | ||||||
|  |             }; | ||||||
|  |         } | ||||||
|  |         Err(e) => log::error!("Could not update vm: {e:?}"), | ||||||
|  |     }; | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() -> Result<()> { | async fn main() -> Result<()> { | ||||||
|     env_logger::builder() |     env_logger::builder() | ||||||
| @ -154,6 +184,16 @@ async fn main() -> Result<()> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     let contracts = list_contracts(client.clone()).await?; | ||||||
|  |     for contract in contracts { | ||||||
|  |         if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await { | ||||||
|  |             log::error!( | ||||||
|  |                 "Received error when updating VM on node {}: {e:?}", | ||||||
|  |                 &contract.node_pubkey | ||||||
|  |             ); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     if std::env::var("DELETE_VMS").is_err() { |     if std::env::var("DELETE_VMS").is_err() { | ||||||
|         return Ok(()); |         return Ok(()); | ||||||
|     } |     } | ||||||
|  | |||||||
| @ -8,7 +8,7 @@ message NodePubkey { | |||||||
|   string node_pubkey = 1; |   string node_pubkey = 1; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message RegisterNodeRequest {  | message RegisterNodeReq {  | ||||||
|   string node_pubkey = 1; |   string node_pubkey = 1; | ||||||
|   string owner_pubkey = 2; |   string owner_pubkey = 2; | ||||||
|   string ip = 3; |   string ip = 3; | ||||||
| @ -23,7 +23,7 @@ message RegisterNodeRequest { | |||||||
|   uint32 max_ports_per_vm = 12; |   uint32 max_ports_per_vm = 12; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message NewVMRequest { | message NewVMReq { | ||||||
|   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID |   string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID | ||||||
|   string hostname = 2; |   string hostname = 2; | ||||||
|   string admin_pubkey = 3; |   string admin_pubkey = 3; | ||||||
| @ -40,6 +40,22 @@ message NewVMRequest { | |||||||
|   string dtrfs_sha = 14; |   string dtrfs_sha = 14; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | message UpdateVMReq { | ||||||
|  |   string uuid = 1; | ||||||
|  |   uint32 disk_size_gb = 3; | ||||||
|  |   uint32 vcpus = 4; | ||||||
|  |   uint32 memory_mb = 5; | ||||||
|  |   string kernel_url = 6; | ||||||
|  |   string kernel_sha = 7; | ||||||
|  |   string dtrfs_url = 8; | ||||||
|  |   string dtrfs_sha = 9; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | message UpdateVMResp { | ||||||
|  |   string uuid = 1; | ||||||
|  |   string error = 3; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| message VMContract { | message VMContract { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
|   string hostname = 2; |   string hostname = 2; | ||||||
| @ -54,6 +70,7 @@ message VMContract { | |||||||
|   string kernel_sha = 11; |   string kernel_sha = 11; | ||||||
|   string dtrfs_sha = 12; |   string dtrfs_sha = 12; | ||||||
|   string created_at = 13; |   string created_at = 13; | ||||||
|  |   string updated_at = 14; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message ListVMContractsReq { | message ListVMContractsReq { | ||||||
| @ -61,7 +78,7 @@ message ListVMContractsReq { | |||||||
|   string node_pubkey = 2; |   string node_pubkey = 2; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message NewVMConfirmation { | message NewVMResp { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
|   repeated uint32 exposed_ports = 2; |   repeated uint32 exposed_ports = 2; | ||||||
|   string public_ipv4 = 3; |   string public_ipv4 = 3; | ||||||
| @ -69,18 +86,21 @@ message NewVMConfirmation { | |||||||
|   string error = 5; |   string error = 5; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| message DeletedVMUpdate { | message DeleteVMReq { | ||||||
|   string uuid = 1; |   string uuid = 1; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| service BrainDaemonService { | service BrainDaemonService { | ||||||
|   rpc RegisterNode (RegisterNodeRequest) returns (Empty); |   rpc RegisterNode (RegisterNodeReq) returns (Empty); | ||||||
|   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); |   rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); | ||||||
|   rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); |   rpc SendNewVMResp (stream NewVMResp) returns (Empty); | ||||||
|   rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); |   rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); | ||||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); |   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||||
|  |   rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); | ||||||
|  |   rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| message NodeFilters { | message NodeFilters { | ||||||
|   uint32 free_ports = 1; |   uint32 free_ports = 1; | ||||||
|   bool offers_ipv4 = 2; |   bool offers_ipv4 = 2; | ||||||
| @ -101,9 +121,9 @@ message NodeListResp { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| service BrainCliService { | service BrainCliService { | ||||||
|   rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); |   rpc CreateVMContract (NewVMReq) returns (NewVMResp); | ||||||
|   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); |   rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); | ||||||
|   rpc ListNodes (NodeFilters) returns (stream NodeListResp); |   rpc ListNodes (NodeFilters) returns (stream NodeListResp); | ||||||
|   rpc DeleteVM (DeletedVMUpdate) returns (Empty); |   rpc DeleteVM (DeleteVMReq) returns (Empty); | ||||||
|  |   rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); | ||||||
| } | } | ||||||
| 
 |  | ||||||
|  | |||||||
| @ -5,8 +5,8 @@ pub mod brain { | |||||||
| 
 | 
 | ||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| use brain::{ | use brain::{ | ||||||
|     brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, |     brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp, | ||||||
|     NewVmRequest, NodePubkey, RegisterNodeRequest, |     NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, | ||||||
| }; | }; | ||||||
| use lazy_static::lazy_static; | use lazy_static::lazy_static; | ||||||
| use log::{debug, error, info, warn}; | use log::{debug, error, info, warn}; | ||||||
| @ -31,7 +31,7 @@ fn generate_random_string() -> String { | |||||||
| 
 | 
 | ||||||
| async fn listen_for_new_vm_reqs( | async fn listen_for_new_vm_reqs( | ||||||
|     mut client: BrainDaemonServiceClient<Channel>, |     mut client: BrainDaemonServiceClient<Channel>, | ||||||
|     tx: Sender<NewVmRequest>, |     tx: Sender<NewVmReq>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     debug!("starting listen_for_new_vm_reqs"); |     debug!("starting listen_for_new_vm_reqs"); | ||||||
|     let node_pubkey = SECURE_PUBLIC_KEY.clone(); |     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||||
| @ -54,20 +54,20 @@ async fn listen_for_new_vm_reqs( | |||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn send_confirmations( | async fn send_newvm_resp( | ||||||
|     mut client: BrainDaemonServiceClient<Channel>, |     mut client: BrainDaemonServiceClient<Channel>, | ||||||
|     rx: Receiver<NewVmConfirmation>, |     rx: Receiver<NewVmResp>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     debug!("starting send_confirmations stream"); |     debug!("starting send_newvm_resp stream"); | ||||||
|     let rx_stream = ReceiverStream::new(rx); |     let rx_stream = ReceiverStream::new(rx); | ||||||
|     client.send_vm_confirmations(rx_stream).await?; |     client.send_new_vm_resp(rx_stream).await?; | ||||||
|     debug!("send_confirmations is about to exit"); |     debug!("send_newvm_resp is about to exit"); | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { | async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { | ||||||
|     debug!("Starting node registration..."); |     debug!("Starting node registration..."); | ||||||
|     let req = RegisterNodeRequest { |     let req = RegisterNodeReq { | ||||||
|         node_pubkey: SECURE_PUBLIC_KEY.clone(), |         node_pubkey: SECURE_PUBLIC_KEY.clone(), | ||||||
|         owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, |         owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, | ||||||
|         ip: "10.0.10.1".to_string(), |         ip: "10.0.10.1".to_string(), | ||||||
| @ -92,12 +92,12 @@ async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { | |||||||
| 
 | 
 | ||||||
| async fn listen_for_deleted_vms( | async fn listen_for_deleted_vms( | ||||||
|     mut client: BrainDaemonServiceClient<Channel>, |     mut client: BrainDaemonServiceClient<Channel>, | ||||||
|     tx: Sender<DeletedVmUpdate>, |     tx: Sender<DeleteVmReq>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     debug!("starting listen_for_new_vm_reqs"); |     debug!("starting listen_for_new_vm_reqs"); | ||||||
|     let node_pubkey = SECURE_PUBLIC_KEY.clone(); |     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||||
|     let mut grpc_stream = client |     let mut grpc_stream = client | ||||||
|         .deleted_vm_updates(NodePubkey { node_pubkey }) |         .get_delete_vm_req(NodePubkey { node_pubkey }) | ||||||
|         .await? |         .await? | ||||||
|         .into_inner(); |         .into_inner(); | ||||||
|     while let Some(stream_update) = grpc_stream.next().await { |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
| @ -115,7 +115,59 @@ async fn listen_for_deleted_vms( | |||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) { | async fn listen_for_update_vm_reqs( | ||||||
|  |     mut client: BrainDaemonServiceClient<Channel>, | ||||||
|  |     tx: Sender<UpdateVmReq>, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     debug!("starting listen_for_update_vm_reqs"); | ||||||
|  |     let node_pubkey = SECURE_PUBLIC_KEY.clone(); | ||||||
|  |     let mut grpc_stream = client | ||||||
|  |         .get_update_vm_req(NodePubkey { node_pubkey }) | ||||||
|  |         .await? | ||||||
|  |         .into_inner(); | ||||||
|  |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|  |         match stream_update { | ||||||
|  |             Ok(req) => { | ||||||
|  |                 info!("Received update vm request: {req:?}"); | ||||||
|  |                 let _ = tx.send(req).await; | ||||||
|  |             } | ||||||
|  |             Err(e) => { | ||||||
|  |                 warn!("Brain disconnected from listen_for_update_vm_reqs: {e}"); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     debug!("listen_for_update_vm_reqs is about to exit"); | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async fn handle_update_vm_requests( | ||||||
|  |     mut req: Receiver<UpdateVmReq>, | ||||||
|  |     resp_chan: Sender<UpdateVmResp> | ||||||
|  |     ) { | ||||||
|  |     info!("Started to handle update vm requests."); | ||||||
|  |     while let Some(update_vm) = req.recv().await { | ||||||
|  |         let update_vm_resp = UpdateVmResp { | ||||||
|  |             uuid: update_vm.uuid, | ||||||
|  |             error: "".to_string(), | ||||||
|  |         }; | ||||||
| 
					
					ghe0 marked this conversation as resolved
					
						
						
							Outdated
						
					
				 | |||||||
|  |         info!("Sending UpdateVmResp: {update_vm_resp:?}"); | ||||||
|  |         let _ = resp_chan.send(update_vm_resp).await; | ||||||
|  |     }; | ||||||
|  |     warn!("update vm request handler is ending"); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async fn send_updatevm_resp( | ||||||
|  |     mut client: BrainDaemonServiceClient<Channel>, | ||||||
|  |     rx: Receiver<UpdateVmResp>, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     debug!("starting send_updatevm_resp stream"); | ||||||
|  |     let rx_stream = ReceiverStream::new(rx); | ||||||
|  |     client.send_update_vm_resp(rx_stream).await?; | ||||||
|  |     debug!("send_updatevm_resp is about to exit"); | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) { | ||||||
|     info!("Started to handle vm requests. 1 out of 5 requests will return error."); |     info!("Started to handle vm requests. 1 out of 5 requests will return error."); | ||||||
|     let mut i = 0; |     let mut i = 0; | ||||||
|     while let Some(new_vm) = req.recv().await { |     while let Some(new_vm) = req.recv().await { | ||||||
| @ -128,11 +180,11 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC | |||||||
|             false => String::new(), |             false => String::new(), | ||||||
|         }; |         }; | ||||||
|         let public_ipv6 = match new_vm.public_ipv6 { |         let public_ipv6 = match new_vm.public_ipv6 { | ||||||
|             true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), |             true => "2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), | ||||||
|             false => String::new(), |             false => String::new(), | ||||||
|         }; |         }; | ||||||
|         if i != 3 { |         if i != 3 { | ||||||
|             let confirmation = NewVmConfirmation { |             let confirmation = NewVmResp{ | ||||||
|                 uuid: new_vm.uuid, |                 uuid: new_vm.uuid, | ||||||
|                 exposed_ports, |                 exposed_ports, | ||||||
|                 public_ipv4, |                 public_ipv4, | ||||||
| @ -142,7 +194,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC | |||||||
|             info!("Sending NewVmConfirmation: {confirmation:?}"); |             info!("Sending NewVmConfirmation: {confirmation:?}"); | ||||||
|             let _ = resp.send(confirmation).await; |             let _ = resp.send(confirmation).await; | ||||||
|         } else { |         } else { | ||||||
|             let confirmation = NewVmConfirmation { |             let confirmation = NewVmResp { | ||||||
|                 uuid: new_vm.uuid, |                 uuid: new_vm.uuid, | ||||||
|                 exposed_ports: Vec::new(), |                 exposed_ports: Vec::new(), | ||||||
|                 public_ipv4: String::new(), |                 public_ipv4: String::new(), | ||||||
| @ -172,12 +224,25 @@ async fn connect_and_run() -> Result<()> { | |||||||
| 
 | 
 | ||||||
|     let confirm_client = client.clone(); |     let confirm_client = client.clone(); | ||||||
|     let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); |     let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); | ||||||
|     streaming_tasks.spawn(send_confirmations(confirm_client, rx)); |     streaming_tasks.spawn(send_newvm_resp(confirm_client, rx)); | ||||||
| 
 | 
 | ||||||
|     tokio::spawn(async move { |     tokio::spawn(async move { | ||||||
|         handle_vm_requests(newvm_rx, confirm_tx).await; |         handle_vm_requests(newvm_rx, confirm_tx).await; | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|  |     let updatevm_client = client.clone(); | ||||||
|  |     let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6); | ||||||
|  |     streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx)); | ||||||
|  | 
 | ||||||
|  |     let resp_client = client.clone(); | ||||||
|  |     let (resp_tx, rx) = tokio::sync::mpsc::channel(6); | ||||||
|  |     streaming_tasks.spawn(send_updatevm_resp(resp_client, rx)); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(async move { | ||||||
|  |         handle_update_vm_requests(updatevm_rx, resp_tx).await; | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|     let deletevms_client = client.clone(); |     let deletevms_client = client.clone(); | ||||||
|     let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); |     let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); | ||||||
|     streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); |     streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	
The timestamp must be set from the brain, for security reasons.
Also, this does not require chrono.
Check this: