Compare commits

..

2 Commits

Author SHA1 Message Date
0367d70ef3
fix: refund on new_vm error
refund locked_nano on daemon returns error on new_vm
fix live select error on new_vm_req
refactor mock daemon for return error
extensive tests for new_vm
2025-05-29 17:37:53 +05:30
05759a5149
improve query handling in new_vm
enhanced transaction error handling
bind all string input into NewVmReq submission query
2025-05-28 17:17:26 +05:30
6 changed files with 178 additions and 58 deletions

@ -191,63 +191,120 @@ impl NewVmReq {
} }
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> { pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
#[derive(Serialize)] let tx_query = String::from(
struct NewVmError { "
error: String, BEGIN TRANSACTION;
LET $new_vm_req = $new_vm_req_input;
LET $error = $error_input;
LET $record = (select * from $new_vm_req)[0];
LET $admin = $record.in;
if $record == None {{
THROW 'vm req not exist ' + <string>$new_vm_req
}};
UPDATE $new_vm_req SET error = $error;
UPDATE $admin SET tmp_locked -= $record.locked_nano;
UPDATE $admin SET balance += $record.locked_nano;
COMMIT TRANSACTION;",
);
log::trace!("submit_new_vm_err query: {tx_query}");
let mut query_resp = db
.query(tx_query)
.bind(("new_vm_req_input", RecordId::from((NEW_VM_REQ, id))))
.bind(("error_input", error))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_vm_err: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
log::error!("vm req not exist: {}", query_err[&4]);
return Err(Error::ContractNotFound);
} else {
log::error!("Unknown error in submit_new_vm_err: {query_err:?}");
return Err(Error::Unknown("submit_new_vm_req".to_string()));
}
} }
let _: Option<Self> = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
// TODO: IMP refund tmp_locked
Ok(()) Ok(())
} }
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> { pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
let locked_nano = self.locked_nano; let locked_nano = self.locked_nano;
let account = self.admin.key().to_string(); let tx_query = format!("
let vm_id = self.id.key().to_string();
let vm_node = self.vm_node.key().to_string();
// TODO: check for possible injection and maybe use .bind()
let query = format!(
"
BEGIN TRANSACTION; BEGIN TRANSACTION;
UPDATE account:{account} SET balance -= {locked_nano};
IF account:{account}.balance < 0 {{ LET $account = $account_input;
THROW 'Insufficient funds.' LET $new_vm_req = $new_vm_req_input;
}}; LET $vm_node = $vm_node_input;
UPDATE account:{account} SET tmp_locked += {locked_nano}; LET $hostname = $hostname_input;
RELATE LET $dtrfs_url = $dtrfs_url_input;
account:{account} LET $dtrfs_sha = $dtrfs_sha_input;
->new_vm_req:{vm_id} LET $kernel_url = $kernel_url_input;
->vm_node:{vm_node} LET $kernel_sha = $kernel_sha_input;
CONTENT {{
created_at: time::now(), hostname: '{}', vcpus: {}, memory_mb: {}, disk_size_gb: {}, UPDATE $account SET balance -= {locked_nano};
extra_ports: {:?}, public_ipv4: {:?}, public_ipv6: {:?}, IF $account.balance < 0 {{
dtrfs_url: '{}', dtrfs_sha: '{}', kernel_url: '{}', kernel_sha: '{}', THROW 'Insufficient funds.'
price_per_unit: {}, locked_nano: {locked_nano}, error: '' }};
}}; UPDATE $account SET tmp_locked += {locked_nano};
COMMIT TRANSACTION; RELATE
", $account
self.hostname, ->$new_vm_req
->$vm_node
CONTENT {{
created_at: time::now(), hostname: $hostname, vcpus: {}, memory_mb: {}, disk_size_gb: {},
extra_ports: {:?}, public_ipv4: {}, public_ipv6: {},
dtrfs_url: $dtrfs_url, dtrfs_sha: $dtrfs_sha, kernel_url: $kernel_url, kernel_sha: $kernel_sha,
price_per_unit: {}, locked_nano: {locked_nano}, error: ''
}};
COMMIT TRANSACTION;",
self.vcpus, self.vcpus,
self.memory_mb, self.memory_mb,
self.disk_size_gb, self.disk_size_gb,
self.extra_ports, self.extra_ports,
self.public_ipv4, self.public_ipv4,
self.public_ipv6, self.public_ipv6,
self.dtrfs_url,
self.dtrfs_sha,
self.kernel_url,
self.kernel_sha,
self.price_per_unit self.price_per_unit
); );
//let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?;
let mut query_resp = db.query(query).await?;
let resp_err = query_resp.take_errors();
if let Some(surrealdb::Error::Api(surrealdb::error::Api::Query(tx_query_error))) = log::trace!("submit_new_vm_req query: {tx_query}");
resp_err.get(&1)
{ let mut query_resp = db
log::error!("Transaction error: {tx_query_error}"); .query(tx_query)
return Err(Error::InsufficientFunds); .bind(("account_input", self.admin))
.bind(("new_vm_req_input", self.id))
.bind(("vm_node_input", self.vm_node))
.bind(("hostname_input", self.hostname))
.bind(("dtrfs_url_input", self.dtrfs_url))
.bind(("dtrfs_sha_input", self.dtrfs_sha))
.bind(("kernel_url_input", self.kernel_url))
.bind(("kernel_sha_input", self.kernel_sha))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_vm_req: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&9) && query_err[&9].to_string() != tx_fail_err_str {
log::error!("Transaction error: {}", query_err[&9]);
return Err(Error::InsufficientFunds);
} else {
log::error!("Unknown error in submit_new_vm_req: {query_err:?}");
return Err(Error::Unknown("submit_new_vm_req".to_string()));
}
} }
Ok(()) Ok(())
@ -272,14 +329,18 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ, UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ, _ => NEW_VM_REQ,
}; };
let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) let mut args_stream = db
.query(format!( .query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};" "live select * from measurement_args where id = measurement_args:{vm_id};"
)) ))
.await?; .await?
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?; .stream::<Notification<vm_proto::MeasurementArgs>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let mut error_stream = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.await?
.stream::<Notification<ErrorFromTable>>(0)?;
let args: Option<vm_proto::MeasurementArgs> = let args: Option<vm_proto::MeasurementArgs> =
db.delete(("measurement_args", vm_id)).await?; db.delete(("measurement_args", vm_id)).await?;

@ -46,7 +46,7 @@ pub async fn create_new_vm(
assert!(new_vm_resp.uuid.len() == 40); assert!(new_vm_resp.uuid.len() == 40);
// wait for update db // wait for update db
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?; let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;

@ -8,7 +8,10 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel; use tonic::transport::Channel;
pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> { pub async fn mock_vm_daemon(
brain_channel: &Channel,
daemon_error: Option<String>,
) -> Result<String> {
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new(); let daemon_key = Key::new();
@ -26,7 +29,7 @@ pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
rx, rx,
)); ));
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@ -102,6 +105,7 @@ pub async fn daemon_msg_sender(
pub async fn daemon_engine( pub async fn daemon_engine(
tx: mpsc::Sender<vm_proto::VmDaemonMessage>, tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
new_vm_err: Option<String>,
) -> Result<()> { ) -> Result<()> {
log::info!("daemon engine vm_daemon"); log::info!("daemon engine vm_daemon");
while let Some(brain_msg) = rx.recv().await { while let Some(brain_msg) = rx.recv().await {
@ -120,7 +124,7 @@ pub async fn daemon_engine(
let new_vm_resp = vm_proto::NewVmResp { let new_vm_resp = vm_proto::NewVmResp {
uuid: new_vm_req.uuid.clone(), uuid: new_vm_req.uuid.clone(),
args, args,
error: String::new(), error: new_vm_err.clone().unwrap_or_default(),
}; };
let res_data = vm_proto::VmDaemonMessage { let res_data = vm_proto::VmDaemonMessage {

@ -116,7 +116,7 @@ async fn test_report_node() {
let db = prepare_test_db().await.unwrap(); let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap(); let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
let key = Key::new(); let key = Key::new();

@ -2,24 +2,30 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::Key; use common::test_utils::Key;
use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts}; use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
use futures::StreamExt; use futures::StreamExt;
use std::vec; use std::vec;
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM}; use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db; use surreal_brain::db::prelude as db;
use surreal_brain::db::vm::ActiveVm;
mod common; mod common;
#[tokio::test] #[tokio::test]
async fn test_vm_creation() { async fn test_vm_creation() {
let db = prepare_test_db().await.unwrap(); let db = prepare_test_db().await.unwrap();
// env_logger::builder().filter_level(log::LevelFilter::Error).init(); /*
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.filter_module("tungstenite", log::LevelFilter::Debug)
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
.init();
*/
let brain_channel = run_service_for_stream().await.unwrap(); let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
let key = Key::new(); let key = Key::new();
@ -29,11 +35,60 @@ async fn test_vm_creation() {
let grpc_error_message = new_vm_resp.err().unwrap().to_string(); let grpc_error_message = new_vm_resp.err().unwrap().to_string();
assert!(grpc_error_message.contains("Insufficient funds")); assert!(grpc_error_message.contains("Insufficient funds"));
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); let airdrop_amount = 10;
airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap();
let new_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); let new_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
let active_vm: Option<ActiveVm> = db.select((ACTIVE_VM, new_vm_id)).await.unwrap(); let active_vm: Option<db::ActiveVm> = db.select((ACTIVE_VM, new_vm_id)).await.unwrap();
assert!(active_vm.is_some()); assert!(active_vm.is_some());
let daemon_key_02 =
mock_vm_daemon(&brain_channel, Some("something went wrong 04".to_string())).await.unwrap();
let locking_nano = 100;
let mut new_vm_req = vm_proto::NewVmReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key_02.clone(),
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: locking_nano,
..Default::default()
};
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let new_vm_resp = client_vm_cli
.new_vm(key.sign_request(new_vm_req.clone()).unwrap())
.await
.unwrap()
.into_inner();
assert!(!new_vm_resp.error.is_empty());
let vm_req_db: Option<db::NewVmReq> =
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
if let Some(new_vm_req) = vm_req_db {
assert!(!new_vm_req.error.is_empty());
assert_eq!(new_vm_resp.error, new_vm_req.error);
}
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL);
assert_eq!(acc_db.tmp_locked, 0);
new_vm_req.node_pubkey = daemon_key;
let new_vm_resp =
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
assert!(new_vm_resp.error.is_empty());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let active_vm: Option<db::ActiveVm> = db.select((ACTIVE_VM, new_vm_resp.uuid)).await.unwrap();
assert!(active_vm.is_some());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano);
assert_eq!(acc_db.tmp_locked, 0);
} }
#[tokio::test] #[tokio::test]

@ -29,7 +29,7 @@ async fn test_brain_message() {
prepare_test_db().await.unwrap(); prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap(); let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
let mut cli_client = BrainVmCliClient::new(brain_channel.clone()); let mut cli_client = BrainVmCliClient::new(brain_channel.clone());
let cli_key = Key::new(); let cli_key = Key::new();