135 lines
4.3 KiB
Rust
135 lines
4.3 KiB
Rust
use super::test_utils::Key;
|
|
use detee_shared::vm_proto;
|
|
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
|
use detee_shared::vm_proto::RegisterVmNodeReq;
|
|
use futures::StreamExt;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tonic::transport::Channel;
|
|
|
|
pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
|
|
let daemon_client = BrainVmDaemonClient::new(brain_channel);
|
|
let daemon_key = Key::new();
|
|
|
|
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
|
|
|
|
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
|
|
|
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
|
|
|
|
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
|
|
tokio::spawn(daemon_msg_sender(
|
|
daemon_client.clone(),
|
|
daemon_key.clone(),
|
|
daemon_msg_tx.clone(),
|
|
rx,
|
|
));
|
|
|
|
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
|
|
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
|
|
|
daemon_key.pubkey
|
|
}
|
|
|
|
pub async fn register_vm_node(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
key: Key,
|
|
operator_wallet: String,
|
|
) -> Vec<vm_proto::VmContract> {
|
|
let node_pubkey = key.pubkey.clone();
|
|
|
|
let req = RegisterVmNodeReq {
|
|
node_pubkey,
|
|
operator_wallet,
|
|
main_ip: String::from("185.243.218.213"),
|
|
city: String::from("Oslo"),
|
|
country: String::from("Norway"),
|
|
region: String::from("EU"),
|
|
price: 1200,
|
|
};
|
|
|
|
let mut grpc_stream =
|
|
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
|
|
|
|
let mut vm_contracts = Vec::new();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(vm_c) => {
|
|
vm_contracts.push(vm_c);
|
|
}
|
|
Err(e) => {
|
|
panic!("Received error instead of vm_contracts: {e:?}");
|
|
}
|
|
}
|
|
}
|
|
vm_contracts
|
|
}
|
|
|
|
pub async fn daemon_listener(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
key: Key,
|
|
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
|
|
) {
|
|
let mut grpc_stream =
|
|
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
|
|
|
|
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
|
log::info!("vm deamon got notified: {:?}", &stream_update);
|
|
let _ = tx.send(stream_update).await;
|
|
}
|
|
}
|
|
|
|
pub async fn daemon_msg_sender(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
key: Key,
|
|
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
|
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
|
|
) {
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(vm_proto::VmDaemonMessage {
|
|
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
client.daemon_messages(rx_stream).await.unwrap();
|
|
}
|
|
|
|
pub async fn daemon_engine(
|
|
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
|
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
|
) {
|
|
while let Some(brain_msg) = rx.recv().await {
|
|
match brain_msg.msg {
|
|
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
|
|
let args = Some(vm_proto::MeasurementArgs {
|
|
dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
|
|
exposed_ports: new_vm_req.extra_ports,
|
|
ovmf_hash: String::from(
|
|
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
|
|
),
|
|
ips: vec![],
|
|
});
|
|
|
|
let new_vm_resp = vm_proto::NewVmResp {
|
|
uuid: new_vm_req.uuid.clone(),
|
|
args,
|
|
error: String::new(),
|
|
};
|
|
|
|
let res_data = vm_proto::VmDaemonMessage {
|
|
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
|
|
};
|
|
tx.send(res_data).await.unwrap();
|
|
}
|
|
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
|
|
todo!()
|
|
}
|
|
Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
|
|
todo!()
|
|
}
|
|
None => todo!(),
|
|
}
|
|
}
|
|
}
|