WIP: adding vm updates
finalized code for update but didn't test it
This commit is contained in:
		
							parent
							
								
									4dfaa3f465
								
							
						
					
					
						commit
						7ff71cbf0f
					
				| @ -29,6 +29,7 @@ pub mod prelude { | |||||||
|     pub use super::general::*; |     pub use super::general::*; | ||||||
|     pub use super::vm::*; |     pub use super::vm::*; | ||||||
|     pub use super::*; |     pub use super::*; | ||||||
|  |     pub use detee_shared::snp::pb::vm_proto::{MeasurementArgs, VmNodeFilters}; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn db_connection( | pub async fn db_connection( | ||||||
|  | |||||||
							
								
								
									
										230
									
								
								src/db/vm.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										230
									
								
								src/db/vm.rs
									
									
									
									
									
								
							| @ -2,8 +2,8 @@ use std::str::FromStr; | |||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
| 
 | 
 | ||||||
| use super::Error; | use super::Error; | ||||||
| use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE}; | use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE}; | ||||||
| use crate::db::general::Report; | use crate::db::{MeasurementArgs, Report, VmNodeFilters}; | ||||||
| use crate::old_brain; | use crate::old_brain; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use surrealdb::engine::remote::ws::Client; | use surrealdb::engine::remote::ws::Client; | ||||||
| @ -80,7 +80,7 @@ impl VmNodeWithReports { | |||||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 |     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 | ||||||
|     pub async fn find_by_filters( |     pub async fn find_by_filters( | ||||||
|         db: &Surreal<Client>, |         db: &Surreal<Client>, | ||||||
|         filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, |         filters: VmNodeFilters, | ||||||
|     ) -> Result<Vec<Self>, Error> { |     ) -> Result<Vec<Self>, Error> { | ||||||
|         let mut query = format!( |         let mut query = format!( | ||||||
|             "select *, <-report.* as reports from {VM_NODE} where
 |             "select *, <-report.* as reports from {VM_NODE} where
 | ||||||
| @ -193,35 +193,49 @@ impl NewVmReq { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// first string is the vm_id
 | /// first string is the vm_id
 | ||||||
| pub enum NewVmResp { | pub enum WrappedMeasurement { | ||||||
|     // TODO: find a more elegant way to do this than importing gRPC in the DB module
 |     Args(String, MeasurementArgs), | ||||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 |  | ||||||
|     Args(String, detee_shared::snp::pb::vm_proto::MeasurementArgs), |  | ||||||
|     Error(String, String), |     Error(String, String), | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl NewVmResp { | impl WrappedMeasurement { | ||||||
|     pub async fn listen(db: &Surreal<Client>, vm_id: &str) -> Result<NewVmResp, Error> { |     /// table must be NEW_VM_REQ or UPDATE_VM_REQ
 | ||||||
|  |     /// it will however be enforced if you send anything else
 | ||||||
|  |     pub async fn listen( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|  |         vm_id: &str, | ||||||
|  |         table: &str, | ||||||
|  |     ) -> Result<WrappedMeasurement, Error> { | ||||||
|  |         let table = match table { | ||||||
|  |             UPDATE_VM_REQ => UPDATE_VM_REQ, | ||||||
|  |             _ => NEW_VM_REQ, | ||||||
|  |         }; | ||||||
|  |         #[derive(Deserialize)] | ||||||
|  |         struct ErrorMessage { | ||||||
|  |             error: String, | ||||||
|  |         } | ||||||
|         let mut resp = db |         let mut resp = db | ||||||
|             .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) |             .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "live select * from measurement_args where id = measurement_args:{vm_id};" |                 "live select * from measurement_args where id = measurement_args:{vm_id};" | ||||||
|             )) |             )) | ||||||
|             .await?; |             .await?; | ||||||
|         let mut new_vm_stream = resp.stream::<Notification<NewVmReq>>(0)?; |         let mut new_vm_stream = resp.stream::<Notification<ErrorMessage>>(0)?; | ||||||
|         let mut args_stream = |         let mut args_stream = resp.stream::<Notification<MeasurementArgs>>(1)?; | ||||||
|             resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?; |  | ||||||
| 
 | 
 | ||||||
|         tokio::time::timeout(Duration::from_secs(10), async { |         tokio::time::timeout(Duration::from_secs(10), async { | ||||||
|             loop { |             loop { | ||||||
|                 tokio::select! { |                 tokio::select! { | ||||||
|                     new_vm_req_notif = new_vm_stream.next() => { |                     new_vm_req_notif = new_vm_stream.next() => { | ||||||
|                         log::debug!("Got stream 1..."); |  | ||||||
|                         if let Some(new_vm_req_notif) = new_vm_req_notif { |                         if let Some(new_vm_req_notif) = new_vm_req_notif { | ||||||
|                             match new_vm_req_notif { |                             match new_vm_req_notif { | ||||||
|                                 Ok(new_vm_req_notif) => { |                                 Ok(new_vm_req_notif) => { | ||||||
|                                     if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() { |                                     if new_vm_req_notif.action == surrealdb::Action::Update | ||||||
|                                         return Ok::<NewVmResp, Error>(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); |                                     && !new_vm_req_notif.data.error.is_empty() { | ||||||
|  |                                         return Ok::<WrappedMeasurement, Error>( | ||||||
|  |                                             Self::Error(vm_id.to_string(), | ||||||
|  |                                             new_vm_req_notif.data.error) | ||||||
|  |                                         ); | ||||||
|                                     }; |                                     }; | ||||||
|                                 }, |                                 }, | ||||||
|                                 Err(e) => return Err(e.into()), |                                 Err(e) => return Err(e.into()), | ||||||
| @ -232,7 +246,9 @@ impl NewVmResp { | |||||||
|                         if let Some(args_notif) = args_notif { |                         if let Some(args_notif) = args_notif { | ||||||
|                             match args_notif { |                             match args_notif { | ||||||
|                                 Ok(args_notif) => { |                                 Ok(args_notif) => { | ||||||
|                                     if args_notif.action == surrealdb::Action::Create { |                                     if args_notif.action == surrealdb::Action::Create || | ||||||
|  |                                         args_notif.action == surrealdb::Action::Update | ||||||
|  |                                     { | ||||||
|                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); |                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); | ||||||
|                                     }; |                                     }; | ||||||
|                                 }, |                                 }, | ||||||
| @ -242,11 +258,12 @@ impl NewVmResp { | |||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         }).await? |         }) | ||||||
|  |         .await? | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Clone, Debug, Serialize, Deserialize)] | ||||||
| pub struct ActiveVm { | pub struct ActiveVm { | ||||||
|     pub id: RecordId, |     pub id: RecordId, | ||||||
|     #[serde(rename = "in")] |     #[serde(rename = "in")] | ||||||
| @ -269,10 +286,32 @@ pub struct ActiveVm { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl ActiveVm { | impl ActiveVm { | ||||||
|  |     /// total hardware units of this VM
 | ||||||
|  |     fn total_units(&self) -> u64 { | ||||||
|  |         // TODO: Optimize this based on price of hardware.
 | ||||||
|  |         // I tried, but this can be done better.
 | ||||||
|  |         // Storage cost should also be based on tier
 | ||||||
|  |         (self.vcpus as u64 * 10) | ||||||
|  |             + ((self.memory_mb + 256) as u64 / 200) | ||||||
|  |             + (self.disk_size_gb as u64 / 10) | ||||||
|  |             + (!self.public_ipv4.is_empty() as u64 * 10) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns price per minute in nanoLP
 | ||||||
|  |     pub fn price_per_minute(&self) -> u64 { | ||||||
|  |         self.total_units() * self.price_per_unit | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||||
|  |         let contract: Option<Self> = | ||||||
|  |             db.query(format!("select * from {ACTIVE_VM}:{uuid};")).await?.take(0)?; | ||||||
|  |         Ok(contract) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub async fn activate( |     pub async fn activate( | ||||||
|         db: &Surreal<Client>, |         db: &Surreal<Client>, | ||||||
|         id: &str, |         id: &str, | ||||||
|         args: detee_shared::vm_proto::MeasurementArgs, |         args: MeasurementArgs, | ||||||
|     ) -> Result<(), Error> { |     ) -> Result<(), Error> { | ||||||
|         let new_vm_req = match NewVmReq::get(db, id).await? { |         let new_vm_req = match NewVmReq::get(db, id).await? { | ||||||
|             Some(r) => r, |             Some(r) => r, | ||||||
| @ -327,6 +366,60 @@ impl ActiveVm { | |||||||
|         NewVmReq::delete(db, id).await?; |         NewVmReq::delete(db, id).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn update(db: &Surreal<Client>, id: &str) -> Result<(), Error> { | ||||||
|  |         let update_vm_req = match UpdateVmReq::get(db, id).await? { | ||||||
|  |             Some(r) => r, | ||||||
|  |             None => return Ok(()), | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         let mut active_vm = match Self::get_by_uuid(db, id).await? { | ||||||
|  |             Some(vm) => vm, | ||||||
|  |             None => return Ok(()), | ||||||
|  |         }; | ||||||
|  |         let deleted_vm: DeletedVm = active_vm.clone().into(); | ||||||
|  |         let _: Vec<Self> = db.insert(DELETED_VM).relation(deleted_vm).await?; | ||||||
|  | 
 | ||||||
|  |         if update_vm_req.vcpus > 0 { | ||||||
|  |             active_vm.vcpus = update_vm_req.vcpus; | ||||||
|  |         } | ||||||
|  |         if update_vm_req.memory_mb > 0 { | ||||||
|  |             active_vm.memory_mb = update_vm_req.memory_mb; | ||||||
|  |         } | ||||||
|  |         if update_vm_req.disk_size_gb > 0 { | ||||||
|  |             active_vm.disk_size_gb = update_vm_req.disk_size_gb; | ||||||
|  |         } | ||||||
|  |         if !update_vm_req.dtrfs_sha.is_empty() && !update_vm_req.kernel_sha.is_empty() { | ||||||
|  |             active_vm.dtrfs_sha = update_vm_req.dtrfs_sha; | ||||||
|  |             active_vm.kernel_sha = update_vm_req.kernel_sha; | ||||||
|  |         } | ||||||
|  |         active_vm.created_at = update_vm_req.created_at; | ||||||
|  | 
 | ||||||
|  |         let _: Vec<ActiveVm> = db.update(()).content(active_vm).await?; | ||||||
|  | 
 | ||||||
|  |         UpdateVmReq::delete(db, id).await?; | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn change_hostname( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|  |         id: &str, | ||||||
|  |         new_hostname: &str, | ||||||
|  |     ) -> Result<bool, Error> { | ||||||
|  |         let contract: Option<Self> = db | ||||||
|  |             .query(format!( | ||||||
|  |                 "UPDATE {ACTIVE_VM}:{id} SET hostname = '{new_hostname}' RETURN BEFORE;" | ||||||
|  |             )) | ||||||
|  |             .await? | ||||||
|  |             .take(0)?; | ||||||
|  |         if let Some(contract) = contract { | ||||||
|  |             if contract.hostname != new_hostname { | ||||||
|  |                 return Ok(true); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         Ok(false) | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| @ -344,8 +437,54 @@ pub struct UpdateVmReq { | |||||||
|     pub kernel_sha: String, |     pub kernel_sha: String, | ||||||
|     pub kernel_url: String, |     pub kernel_url: String, | ||||||
|     pub created_at: Datetime, |     pub created_at: Datetime, | ||||||
|     pub price_per_unit: u64, |     pub error: String, | ||||||
|     pub locked_nano: u64, | } | ||||||
|  | 
 | ||||||
|  | impl UpdateVmReq { | ||||||
|  |     pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> { | ||||||
|  |         let update_vm_req: Option<Self> = db.select((UPDATE_VM_REQ, id)).await?; | ||||||
|  |         Ok(update_vm_req) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> { | ||||||
|  |         let _: Option<Self> = db.delete((UPDATE_VM_REQ, id)).await?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// returns None if VM does not exist
 | ||||||
|  |     /// returns Some(false) if hw update is not needed
 | ||||||
|  |     /// returns Some(true) if hw update is needed and got submitted
 | ||||||
|  |     /// returns error if something happened with the DB
 | ||||||
|  |     pub async fn request_hw_update(mut self, db: &Surreal<Client>) -> Result<Option<bool>, Error> { | ||||||
|  |         let contract = ActiveVm::get_by_uuid(db, &self.id.key().to_string()).await?; | ||||||
|  | 
 | ||||||
|  |         if contract.is_none() { | ||||||
|  |             return Ok(None); | ||||||
|  |         } | ||||||
|  |         let contract = contract.unwrap(); | ||||||
|  |         // this is needed cause TryFrom does not support await
 | ||||||
|  |         self.vm_node = contract.vm_node; | ||||||
|  | 
 | ||||||
|  |         if !((self.vcpus != 0 && contract.vcpus != self.vcpus) | ||||||
|  |             || (self.memory_mb != 0 && contract.memory_mb != self.memory_mb) | ||||||
|  |             || (!self.dtrfs_sha.is_empty() && contract.dtrfs_sha != self.dtrfs_sha) | ||||||
|  |             || (self.disk_size_gb != 0 && contract.disk_size_gb != self.disk_size_gb)) | ||||||
|  |         { | ||||||
|  |             return Ok(Some(false)); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let _: Vec<Self> = db.insert(UPDATE_VM_REQ).relation(self).await?; | ||||||
|  |         Ok(Some(true)) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> { | ||||||
|  |         #[derive(Serialize)] | ||||||
|  |         struct UpdateVmError { | ||||||
|  |             error: String, | ||||||
|  |         } | ||||||
|  |         let _: Option<Self> = db.update((UPDATE_VM_REQ, id)).merge(UpdateVmError { error }).await?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| @ -369,6 +508,35 @@ pub struct DeletedVm { | |||||||
|     pub price_per_unit: u64, |     pub price_per_unit: u64, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | impl From<ActiveVm> for DeletedVm { | ||||||
|  |     fn from(active_vm: ActiveVm) -> Self { | ||||||
|  |         use std::time::{SystemTime, UNIX_EPOCH}; | ||||||
|  |         let now = SystemTime::now(); | ||||||
|  |         let unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_secs(); | ||||||
|  | 
 | ||||||
|  |         Self { | ||||||
|  |             id: RecordId::from(( | ||||||
|  |                 DELETED_VM, | ||||||
|  |                 active_vm.id.key().to_string() + &unix_timestamp.to_string(), | ||||||
|  |             )), | ||||||
|  |             admin: active_vm.admin, | ||||||
|  |             vm_node: active_vm.vm_node, | ||||||
|  |             hostname: active_vm.hostname, | ||||||
|  |             mapped_ports: active_vm.mapped_ports, | ||||||
|  |             public_ipv4: active_vm.public_ipv4, | ||||||
|  |             public_ipv6: active_vm.public_ipv6, | ||||||
|  |             disk_size_gb: active_vm.disk_size_gb, | ||||||
|  |             vcpus: active_vm.vcpus, | ||||||
|  |             memory_mb: active_vm.memory_mb, | ||||||
|  |             dtrfs_sha: active_vm.dtrfs_sha, | ||||||
|  |             kernel_sha: active_vm.kernel_sha, | ||||||
|  |             created_at: active_vm.created_at, | ||||||
|  |             deleted_at: Datetime::default(), | ||||||
|  |             price_per_unit: active_vm.price_per_unit, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl DeletedVm { | impl DeletedVm { | ||||||
|     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { |     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||||
|         let contract: Option<Self> = |         let contract: Option<Self> = | ||||||
| @ -431,24 +599,6 @@ impl DeletedVm { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl ActiveVm { |  | ||||||
|     /// total hardware units of this VM
 |  | ||||||
|     fn total_units(&self) -> u64 { |  | ||||||
|         // TODO: Optimize this based on price of hardware.
 |  | ||||||
|         // I tried, but this can be done better.
 |  | ||||||
|         // Storage cost should also be based on tier
 |  | ||||||
|         (self.vcpus as u64 * 10) |  | ||||||
|             + ((self.memory_mb + 256) as u64 / 200) |  | ||||||
|             + (self.disk_size_gb as u64 / 10) |  | ||||||
|             + (!self.public_ipv4.is_empty() as u64 * 10) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Returns price per minute in nanoLP
 |  | ||||||
|     pub fn price_per_minute(&self) -> u64 { |  | ||||||
|         self.total_units() * self.price_per_unit |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| pub struct ActiveVmWithNode { | pub struct ActiveVmWithNode { | ||||||
|     pub id: RecordId, |     pub id: RecordId, | ||||||
|  | |||||||
| @ -61,15 +61,45 @@ impl From<db::NewVmReq> for NewVmReq { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl From<db::NewVmResp> for NewVmResp { | impl From<db::WrappedMeasurement> for NewVmResp { | ||||||
|     fn from(resp: db::NewVmResp) -> Self { |     fn from(resp: db::WrappedMeasurement) -> Self { | ||||||
|         match resp { |         match resp { | ||||||
|             // TODO: This will require a small architecture change to pass MeasurementArgs from
 |             db::WrappedMeasurement::Args(uuid, args) => { | ||||||
|             // Daemon to CLI
 |                 Self { uuid, error: String::new(), args: Some(args) } | ||||||
|             db::NewVmResp::Args(uuid, args) => { |  | ||||||
|                 NewVmResp { uuid, error: String::new(), args: Some(args) } |  | ||||||
|             } |             } | ||||||
|             db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, |             db::WrappedMeasurement::Error(uuid, error) => NewVmResp { uuid, error, args: None }, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TODO: NewVmResp is identical to UpdateVmResp so we can actually remove it from proto
 | ||||||
|  | impl From<db::WrappedMeasurement> for UpdateVmResp { | ||||||
|  |     fn from(resp: db::WrappedMeasurement) -> Self { | ||||||
|  |         match resp { | ||||||
|  |             db::WrappedMeasurement::Args(uuid, args) => { | ||||||
|  |                 Self { uuid, error: String::new(), args: Some(args) } | ||||||
|  |             } | ||||||
|  |             db::WrappedMeasurement::Error(uuid, error) => Self { uuid, error, args: None }, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl From<UpdateVmReq> for db::UpdateVmReq { | ||||||
|  |     fn from(new_vm_req: UpdateVmReq) -> Self { | ||||||
|  |         Self { | ||||||
|  |             id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))), | ||||||
|  |             admin: RecordId::from((ACCOUNT, new_vm_req.admin_pubkey)), | ||||||
|  |             // vm_node gets modified later, and only if the db::UpdateVmReq is required
 | ||||||
|  |             vm_node: RecordId::from((VM_NODE, String::new())), | ||||||
|  |             disk_size_gb: new_vm_req.disk_size_gb, | ||||||
|  |             vcpus: new_vm_req.vcpus, | ||||||
|  |             memory_mb: new_vm_req.memory_mb, | ||||||
|  |             kernel_url: new_vm_req.kernel_url, | ||||||
|  |             kernel_sha: new_vm_req.kernel_sha, | ||||||
|  |             dtrfs_url: new_vm_req.dtrfs_url, | ||||||
|  |             dtrfs_sha: new_vm_req.dtrfs_sha, | ||||||
|  |             created_at: surrealdb::sql::Datetime::default(), | ||||||
|  |             error: String::new(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										105
									
								
								src/grpc/vm.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										105
									
								
								src/grpc/vm.rs
									
									
									
									
									
								
							| @ -1,5 +1,5 @@ | |||||||
| #![allow(dead_code)] | #![allow(dead_code)] | ||||||
| use crate::constants::{ACCOUNT, VM_NODE}; | use crate::constants::{ACCOUNT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE}; | ||||||
| use crate::db::prelude as db; | use crate::db::prelude as db; | ||||||
| use crate::grpc::{check_sig_from_parts, check_sig_from_req}; | use crate::grpc::{check_sig_from_parts, check_sig_from_req}; | ||||||
| use detee_shared::common_proto::Empty; | use detee_shared::common_proto::Empty; | ||||||
| @ -148,8 +148,6 @@ impl BrainVmDaemon for VmDaemonServer { | |||||||
|             match daemon_message { |             match daemon_message { | ||||||
|                 Ok(msg) => match msg.msg { |                 Ok(msg) => match msg.msg { | ||||||
|                     Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { |                     Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { | ||||||
|                         // TODO: move new_vm_req to active_vm
 |  | ||||||
|                         // also handle failure properly
 |  | ||||||
|                         if !new_vm_resp.error.is_empty() { |                         if !new_vm_resp.error.is_empty() { | ||||||
|                             db::NewVmReq::submit_error( |                             db::NewVmReq::submit_error( | ||||||
|                                 &self.db, |                                 &self.db, | ||||||
| @ -170,9 +168,24 @@ impl BrainVmDaemon for VmDaemonServer { | |||||||
|                             } |                             } | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                     Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => { |                     Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { | ||||||
|                         todo!(); |                         if !update_vm_resp.error.is_empty() { | ||||||
|                         // self.data.submit_updatevm_resp(update_vm_resp).await;
 |                             db::UpdateVmReq::submit_error( | ||||||
|  |                                 &self.db, | ||||||
|  |                                 &update_vm_resp.uuid, | ||||||
|  |                                 update_vm_resp.error, | ||||||
|  |                             ) | ||||||
|  |                             .await?; | ||||||
|  |                         } else { | ||||||
|  |                             db::upsert_record( | ||||||
|  |                                 &self.db, | ||||||
|  |                                 "measurement_args", | ||||||
|  |                                 &update_vm_resp.uuid, | ||||||
|  |                                 update_vm_resp.args.clone(), | ||||||
|  |                             ) | ||||||
|  |                             .await?; | ||||||
|  |                             db::ActiveVm::update(&self.db, &update_vm_resp.uuid).await?; | ||||||
|  |                         } | ||||||
|                     } |                     } | ||||||
|                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { |                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { | ||||||
|                         let node_resources: db::VmNodeResources = node_resources.into(); |                         let node_resources: db::VmNodeResources = node_resources.into(); | ||||||
| @ -207,28 +220,31 @@ impl BrainVmCli for VmCliServer { | |||||||
|     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { |     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("New VM requested via CLI: {req:?}"); |         info!("New VM requested via CLI: {req:?}"); | ||||||
|         if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { |         if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey) | ||||||
|  |             .await? | ||||||
|  |         { | ||||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); |             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let new_vm_req: db::NewVmReq = req.into(); |         let db_req: db::NewVmReq = req.into(); | ||||||
|         let id = new_vm_req.id.key().to_string(); |         let id = db_req.id.key().to_string(); | ||||||
| 
 |  | ||||||
|         let db = self.db.clone(); |  | ||||||
| 
 | 
 | ||||||
|         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); |         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); | ||||||
|  |         let db = self.db.clone(); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); |             let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, NEW_VM_REQ).await); | ||||||
|         }); |         }); | ||||||
|         new_vm_req.submit(&self.db).await?; |         db_req.submit(&self.db).await?; | ||||||
| 
 | 
 | ||||||
|         match oneshot_rx.await { |         match oneshot_rx.await { | ||||||
|             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), |             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( | ||||||
|  |                 "Network timeout. Please try again later or contact the DeTEE devs team.", | ||||||
|  |             )), | ||||||
|             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), |             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 log::error!("Something weird happened. Reached error {e:?}"); |                 log::error!("Something weird happened during CLI NewVmReq. Reached error {e:?}"); | ||||||
|                 Err(Status::unknown( |                 Err(Status::unknown( | ||||||
|                     "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", |                     "Unknown error. Please try again or contact the DeTEE devs team.", | ||||||
|                 )) |                 )) | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @ -237,18 +253,51 @@ impl BrainVmCli for VmCliServer { | |||||||
|     async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> { |     async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("Update VM requested via CLI: {req:?}"); |         info!("Update VM requested via CLI: {req:?}"); | ||||||
|         todo!(); | 
 | ||||||
|         // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
 |         let db_req: db::UpdateVmReq = req.clone().into(); | ||||||
|         // self.data.submit_updatevm_req(req, oneshot_tx).await;
 |         let id = db_req.id.key().to_string(); | ||||||
|         // match oneshot_rx.await {
 |         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); | ||||||
|         //     Ok(response) => {
 |         let db = self.db.clone(); | ||||||
|         //         info!("Sending UpdateVMResp: {response:?}");
 |         tokio::spawn(async move { | ||||||
|         //         Ok(Response::new(response))
 |             let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, UPDATE_VM_REQ).await); | ||||||
|         //     }
 |         }); | ||||||
|         //     Err(e) => Err(Status::unknown(format!(
 |         let hw_change_submitted = db_req.request_hw_update(&self.db).await?; | ||||||
|         //         "Update VM request failed due to error: {e}"
 | 
 | ||||||
|         //     ))),
 |         if hw_change_submitted.is_none() { | ||||||
|         // }
 |             return Ok(Response::new(UpdateVmResp { | ||||||
|  |                 uuid: req.uuid.clone(), | ||||||
|  |                 error: "VM Contract does not exist.".to_string(), | ||||||
|  |                 args: None, | ||||||
|  |             })); | ||||||
|  |         } | ||||||
|  |         let hw_change_submitted = hw_change_submitted.unwrap(); | ||||||
|  | 
 | ||||||
|  |         let mut hostname_changed = false; | ||||||
|  |         if !req.hostname.is_empty() { | ||||||
|  |             hostname_changed = | ||||||
|  |                 db::ActiveVm::change_hostname(&self.db, &req.uuid, &req.hostname).await?; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if !hw_change_submitted && !hostname_changed { | ||||||
|  |             return Ok(Response::new(UpdateVmResp { | ||||||
|  |                 uuid: req.uuid.clone(), | ||||||
|  |                 error: "No modification required".to_string(), | ||||||
|  |                 args: None, | ||||||
|  |             })); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         match oneshot_rx.await { | ||||||
|  |             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( | ||||||
|  |                 "Network timeout. Please try again later or contact the DeTEE devs team.", | ||||||
|  |             )), | ||||||
|  |             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), | ||||||
|  |             Err(e) => { | ||||||
|  |                 log::error!("Something weird happened during CLI VM Update. Reached error {e:?}"); | ||||||
|  |                 Err(Status::unknown( | ||||||
|  |                     "Unknown error. Please try again or contact the DeTEE devs team.", | ||||||
|  |                 )) | ||||||
|  |             } | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> { |     async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> { | ||||||
|  | |||||||
| @ -5,10 +5,8 @@ use dotenv::dotenv; | |||||||
| use hyper_util::rt::TokioIo; | use hyper_util::rt::TokioIo; | ||||||
| use std::net::SocketAddr; | use std::net::SocketAddr; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
| use surreal_brain::grpc::{ | use surreal_brain::grpc::general::GeneralCliServer; | ||||||
|     general::GeneralCliServer, | use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; | ||||||
|     vm::{VmCliServer, VmDaemonServer}, |  | ||||||
| }; |  | ||||||
| use surrealdb::engine::remote::ws::Client; | use surrealdb::engine::remote::ws::Client; | ||||||
| use surrealdb::Surreal; | use surrealdb::Surreal; | ||||||
| use tokio::io::DuplexStream; | use tokio::io::DuplexStream; | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user