fix: refund on new_vm error
refund locked_nano on daemon returns error on new_vm fix live select error on new_vm_req refactor mock daemon for return error extensive tests for new_vm
This commit is contained in:
parent
05759a5149
commit
0367d70ef3
66
src/db/vm.rs
66
src/db/vm.rs
@ -191,12 +191,50 @@ impl NewVmReq {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
||||||
#[derive(Serialize)]
|
let tx_query = String::from(
|
||||||
struct NewVmError {
|
"
|
||||||
error: String,
|
BEGIN TRANSACTION;
|
||||||
|
|
||||||
|
LET $new_vm_req = $new_vm_req_input;
|
||||||
|
LET $error = $error_input;
|
||||||
|
LET $record = (select * from $new_vm_req)[0];
|
||||||
|
LET $admin = $record.in;
|
||||||
|
|
||||||
|
if $record == None {{
|
||||||
|
THROW 'vm req not exist ' + <string>$new_vm_req
|
||||||
|
}};
|
||||||
|
|
||||||
|
UPDATE $new_vm_req SET error = $error;
|
||||||
|
|
||||||
|
UPDATE $admin SET tmp_locked -= $record.locked_nano;
|
||||||
|
UPDATE $admin SET balance += $record.locked_nano;
|
||||||
|
|
||||||
|
COMMIT TRANSACTION;",
|
||||||
|
);
|
||||||
|
|
||||||
|
log::trace!("submit_new_vm_err query: {tx_query}");
|
||||||
|
|
||||||
|
let mut query_resp = db
|
||||||
|
.query(tx_query)
|
||||||
|
.bind(("new_vm_req_input", RecordId::from((NEW_VM_REQ, id))))
|
||||||
|
.bind(("error_input", error))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let query_err = query_resp.take_errors();
|
||||||
|
if !query_err.is_empty() {
|
||||||
|
log::trace!("errors in submit_new_vm_err: {query_err:?}");
|
||||||
|
let tx_fail_err_str =
|
||||||
|
String::from("The query was not executed due to a failed transaction");
|
||||||
|
|
||||||
|
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
|
||||||
|
log::error!("vm req not exist: {}", query_err[&4]);
|
||||||
|
return Err(Error::ContractNotFound);
|
||||||
|
} else {
|
||||||
|
log::error!("Unknown error in submit_new_vm_err: {query_err:?}");
|
||||||
|
return Err(Error::Unknown("submit_new_vm_req".to_string()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let _: Option<Self> = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
|
|
||||||
// TODO: IMP refund tmp_locked
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,6 +277,9 @@ impl NewVmReq {
|
|||||||
self.public_ipv6,
|
self.public_ipv6,
|
||||||
self.price_per_unit
|
self.price_per_unit
|
||||||
);
|
);
|
||||||
|
|
||||||
|
log::trace!("submit_new_vm_req query: {tx_query}");
|
||||||
|
|
||||||
let mut query_resp = db
|
let mut query_resp = db
|
||||||
.query(tx_query)
|
.query(tx_query)
|
||||||
.bind(("account_input", self.admin))
|
.bind(("account_input", self.admin))
|
||||||
@ -253,6 +294,7 @@ impl NewVmReq {
|
|||||||
|
|
||||||
let query_err = query_resp.take_errors();
|
let query_err = query_resp.take_errors();
|
||||||
if !query_err.is_empty() {
|
if !query_err.is_empty() {
|
||||||
|
log::trace!("errors in submit_new_vm_req: {query_err:?}");
|
||||||
let tx_fail_err_str =
|
let tx_fail_err_str =
|
||||||
String::from("The query was not executed due to a failed transaction");
|
String::from("The query was not executed due to a failed transaction");
|
||||||
|
|
||||||
@ -287,14 +329,18 @@ impl WrappedMeasurement {
|
|||||||
UPDATE_VM_REQ => UPDATE_VM_REQ,
|
UPDATE_VM_REQ => UPDATE_VM_REQ,
|
||||||
_ => NEW_VM_REQ,
|
_ => NEW_VM_REQ,
|
||||||
};
|
};
|
||||||
let mut resp = db
|
|
||||||
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
|
let mut args_stream = db
|
||||||
.query(format!(
|
.query(format!(
|
||||||
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
||||||
))
|
))
|
||||||
.await?;
|
.await?
|
||||||
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
|
.stream::<Notification<vm_proto::MeasurementArgs>>(0)?;
|
||||||
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
|
|
||||||
|
let mut error_stream = db
|
||||||
|
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
|
||||||
|
.await?
|
||||||
|
.stream::<Notification<ErrorFromTable>>(0)?;
|
||||||
|
|
||||||
let args: Option<vm_proto::MeasurementArgs> =
|
let args: Option<vm_proto::MeasurementArgs> =
|
||||||
db.delete(("measurement_args", vm_id)).await?;
|
db.delete(("measurement_args", vm_id)).await?;
|
||||||
|
@ -46,7 +46,7 @@ pub async fn create_new_vm(
|
|||||||
assert!(new_vm_resp.uuid.len() == 40);
|
assert!(new_vm_resp.uuid.len() == 40);
|
||||||
|
|
||||||
// wait for update db
|
// wait for update db
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;
|
let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;
|
||||||
|
|
||||||
|
@ -8,7 +8,10 @@ use tokio::sync::mpsc;
|
|||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tonic::transport::Channel;
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
|
pub async fn mock_vm_daemon(
|
||||||
|
brain_channel: &Channel,
|
||||||
|
daemon_error: Option<String>,
|
||||||
|
) -> Result<String> {
|
||||||
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
|
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
|
||||||
let daemon_key = Key::new();
|
let daemon_key = Key::new();
|
||||||
|
|
||||||
@ -26,7 +29,7 @@ pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
|
|||||||
rx,
|
rx,
|
||||||
));
|
));
|
||||||
|
|
||||||
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
|
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
|
||||||
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||||
|
|
||||||
@ -102,6 +105,7 @@ pub async fn daemon_msg_sender(
|
|||||||
pub async fn daemon_engine(
|
pub async fn daemon_engine(
|
||||||
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
||||||
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
||||||
|
new_vm_err: Option<String>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
log::info!("daemon engine vm_daemon");
|
log::info!("daemon engine vm_daemon");
|
||||||
while let Some(brain_msg) = rx.recv().await {
|
while let Some(brain_msg) = rx.recv().await {
|
||||||
@ -120,7 +124,7 @@ pub async fn daemon_engine(
|
|||||||
let new_vm_resp = vm_proto::NewVmResp {
|
let new_vm_resp = vm_proto::NewVmResp {
|
||||||
uuid: new_vm_req.uuid.clone(),
|
uuid: new_vm_req.uuid.clone(),
|
||||||
args,
|
args,
|
||||||
error: String::new(),
|
error: new_vm_err.clone().unwrap_or_default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let res_data = vm_proto::VmDaemonMessage {
|
let res_data = vm_proto::VmDaemonMessage {
|
||||||
|
@ -116,7 +116,7 @@ async fn test_report_node() {
|
|||||||
let db = prepare_test_db().await.unwrap();
|
let db = prepare_test_db().await.unwrap();
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
let brain_channel = run_service_for_stream().await.unwrap();
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
|
||||||
|
|
||||||
let key = Key::new();
|
let key = Key::new();
|
||||||
|
|
||||||
|
@ -2,24 +2,30 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
|
|||||||
use common::test_utils::Key;
|
use common::test_utils::Key;
|
||||||
use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts};
|
use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts};
|
||||||
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
|
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
|
||||||
|
use detee_shared::vm_proto;
|
||||||
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
||||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
||||||
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
|
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use std::vec;
|
use std::vec;
|
||||||
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM};
|
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
|
||||||
use surreal_brain::db::prelude as db;
|
use surreal_brain::db::prelude as db;
|
||||||
use surreal_brain::db::vm::ActiveVm;
|
|
||||||
|
|
||||||
mod common;
|
mod common;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_vm_creation() {
|
async fn test_vm_creation() {
|
||||||
let db = prepare_test_db().await.unwrap();
|
let db = prepare_test_db().await.unwrap();
|
||||||
// env_logger::builder().filter_level(log::LevelFilter::Error).init();
|
/*
|
||||||
|
env_logger::builder()
|
||||||
|
.filter_level(log::LevelFilter::Debug)
|
||||||
|
.filter_module("tungstenite", log::LevelFilter::Debug)
|
||||||
|
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
|
||||||
|
.init();
|
||||||
|
*/
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
let brain_channel = run_service_for_stream().await.unwrap();
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
|
||||||
|
|
||||||
let key = Key::new();
|
let key = Key::new();
|
||||||
|
|
||||||
@ -29,11 +35,60 @@ async fn test_vm_creation() {
|
|||||||
let grpc_error_message = new_vm_resp.err().unwrap().to_string();
|
let grpc_error_message = new_vm_resp.err().unwrap().to_string();
|
||||||
assert!(grpc_error_message.contains("Insufficient funds"));
|
assert!(grpc_error_message.contains("Insufficient funds"));
|
||||||
|
|
||||||
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
|
let airdrop_amount = 10;
|
||||||
|
airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap();
|
||||||
|
|
||||||
let new_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
|
let new_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
|
||||||
let active_vm: Option<ActiveVm> = db.select((ACTIVE_VM, new_vm_id)).await.unwrap();
|
let active_vm: Option<db::ActiveVm> = db.select((ACTIVE_VM, new_vm_id)).await.unwrap();
|
||||||
assert!(active_vm.is_some());
|
assert!(active_vm.is_some());
|
||||||
|
|
||||||
|
let daemon_key_02 =
|
||||||
|
mock_vm_daemon(&brain_channel, Some("something went wrong 04".to_string())).await.unwrap();
|
||||||
|
|
||||||
|
let locking_nano = 100;
|
||||||
|
let mut new_vm_req = vm_proto::NewVmReq {
|
||||||
|
admin_pubkey: key.pubkey.clone(),
|
||||||
|
node_pubkey: daemon_key_02.clone(),
|
||||||
|
price_per_unit: 1200,
|
||||||
|
extra_ports: vec![8080, 8081],
|
||||||
|
locked_nano: locking_nano,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
|
||||||
|
let new_vm_resp = client_vm_cli
|
||||||
|
.new_vm(key.sign_request(new_vm_req.clone()).unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
assert!(!new_vm_resp.error.is_empty());
|
||||||
|
|
||||||
|
let vm_req_db: Option<db::NewVmReq> =
|
||||||
|
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
|
||||||
|
|
||||||
|
if let Some(new_vm_req) = vm_req_db {
|
||||||
|
assert!(!new_vm_req.error.is_empty());
|
||||||
|
assert_eq!(new_vm_resp.error, new_vm_req.error);
|
||||||
|
}
|
||||||
|
|
||||||
|
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||||
|
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL);
|
||||||
|
assert_eq!(acc_db.tmp_locked, 0);
|
||||||
|
|
||||||
|
new_vm_req.node_pubkey = daemon_key;
|
||||||
|
|
||||||
|
let new_vm_resp =
|
||||||
|
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
|
||||||
|
assert!(new_vm_resp.error.is_empty());
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
let active_vm: Option<db::ActiveVm> = db.select((ACTIVE_VM, new_vm_resp.uuid)).await.unwrap();
|
||||||
|
assert!(active_vm.is_some());
|
||||||
|
|
||||||
|
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||||
|
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano);
|
||||||
|
assert_eq!(acc_db.tmp_locked, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -29,7 +29,7 @@ async fn test_brain_message() {
|
|||||||
prepare_test_db().await.unwrap();
|
prepare_test_db().await.unwrap();
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
let brain_channel = run_service_for_stream().await.unwrap();
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
|
||||||
let mut cli_client = BrainVmCliClient::new(brain_channel.clone());
|
let mut cli_client = BrainVmCliClient::new(brain_channel.clone());
|
||||||
|
|
||||||
let cli_key = Key::new();
|
let cli_key = Key::new();
|
||||||
|
Loading…
Reference in New Issue
Block a user