added code to process new vm request

This commit is contained in:
ghe0 2024-12-20 21:56:34 +02:00
parent f1d9bcb07a
commit b14c590d01
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 247 additions and 70 deletions

75
Cargo.lock generated

@ -155,12 +155,14 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
name = "brain-mock"
version = "0.1.0"
dependencies = [
"dashmap",
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"uuid",
]
[[package]]
@ -181,6 +183,26 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "either"
version = "1.13.0"
@ -302,6 +324,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.2"
@ -460,6 +488,16 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.22"
@ -525,6 +563,19 @@ version = "1.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -692,6 +743,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
@ -746,6 +806,12 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.216"
@ -1005,6 +1071,15 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "uuid"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
]
[[package]]
name = "want"
version = "0.3.1"

@ -4,11 +4,13 @@ version = "0.1.0"
edition = "2021"
[dependencies]
dashmap = "6.1.0"
prost = "0.13.4"
prost-types = "0.13.4"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.17"
tonic = "0.12"
uuid = { version = "1.11.0", features = ["v4"] }
[build-dependencies]
tonic-build = "0.12"

@ -4,6 +4,10 @@ package brain;
message Empty {
}
message NodePubkey {
string node_pubkey = 1;
}
message RegisterNodeRequest {
string node_pubkey = 1;
string country = 2;
@ -55,9 +59,10 @@ message ListVMContractsReq {
}
message NewVMConfirmation {
repeated uint32 exposed_ports = 1;
string public_ipv4 = 2;
string public_ipv6 = 3;
string uuid = 1;
repeated uint32 exposed_ports = 2;
string public_ipv4 = 3;
string public_ipv6 = 4;
}
message DeletedVMUpdate {
@ -66,7 +71,8 @@ message DeletedVMUpdate {
service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
rpc NewVMUpdates (stream NewVMConfirmation) returns (stream NewVMRequest);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
}

@ -1,11 +1,10 @@
#![allow(dead_code)]
use crate::grpc::brain as grpc;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct BrainData {
pub nodes: RwLock<Vec<Node>>,
pub contracts: RwLock<Vec<Contract>>,
}
use std::sync::RwLock;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Sender as OneshotSender;
#[derive(Eq, Hash, PartialEq, Clone)]
pub struct Node {
@ -24,9 +23,9 @@ pub struct Node {
pub max_ports_per_vm: u32,
}
impl Into<crate::grpc::brain::NodeListResp> for Node {
fn into(self) -> crate::grpc::brain::NodeListResp {
crate::grpc::brain::NodeListResp {
impl Into<grpc::NodeListResp> for Node {
fn into(self) -> grpc::NodeListResp {
grpc::NodeListResp {
node_pubkey: self.public_key,
country: self.country,
city: self.city,
@ -54,9 +53,9 @@ pub struct Contract {
pub created_at: String,
}
impl Into<crate::grpc::brain::VmContract> for Contract {
fn into(self) -> crate::grpc::brain::VmContract {
crate::grpc::brain::VmContract {
impl Into<grpc::VmContract> for Contract {
fn into(self) -> grpc::VmContract {
grpc::VmContract {
uuid: self.uuid,
hostname: self.hostname,
admin_pubkey: self.admin_pubkey,
@ -74,39 +73,108 @@ impl Into<crate::grpc::brain::VmContract> for Contract {
}
}
impl BrainData {
pub fn new() -> Arc<Self> {
Arc::new(Self {
nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()),
})
}
#[derive(Default)]
pub struct BrainData {
nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, grpc::NewVmRequest>,
cli_vmcontract_tx: DashMap<String, OneshotSender<grpc::NewVmConfirmation>>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
}
pub struct GuardCliTx {
brain_data: Arc<BrainData>,
admin_pubkey: String,
}
impl Drop for GuardCliTx {
fn drop(&mut self) {
self.brain_data.del_cli_vmcontract_tx(&self.admin_pubkey);
}
}
impl BrainData {
pub async fn insert_node(&self, node: Node) {
let mut nodes = self.nodes.write().await;
let mut nodes = self.nodes.write().unwrap();
nodes.push(node);
}
pub async fn insert_contract(&self, contract: Contract) {
let mut contracts = self.contracts.write().await;
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
}
pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
}
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) {
let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) {
Some((_, r)) => r,
None => return,
};
if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) {
let _ = client_tx.send(confirmation.clone());
}
let contract = Contract {
uuid: confirmation.uuid,
exposed_ports: confirmation.exposed_ports,
public_ipv4: confirmation.public_ipv4,
public_ipv6: confirmation.public_ipv6,
created_at: format!("{:?}", std::time::SystemTime::now()),
hostname: newvmreq.hostname,
admin_pubkey: newvmreq.admin_pubkey,
node_pubkey: newvmreq.node_pubkey,
disk_size_gb: newvmreq.disk_size_gb,
vcpus: newvmreq.vcpus,
memory_mb: newvmreq.memory_mb,
kernel_sha: newvmreq.kernel_sha,
dtrfs_sha: newvmreq.dtrfs_sha,
};
self.contracts.write().unwrap().push(contract);
}
pub async fn submit_newvmrequest(
self: Arc<Self>,
mut req: grpc::NewVmRequest,
tx: OneshotSender<grpc::NewVmConfirmation>,
) -> Option<GuardCliTx> {
req.uuid = uuid::Uuid::new_v4().to_string();
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone());
self.cli_vmcontract_tx
.insert(req.admin_pubkey.to_string(), tx);
if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) {
if server_tx.send(req.clone()).await.is_err() {
return None;
}
}
Some(GuardCliTx {
brain_data: self,
admin_pubkey: req.admin_pubkey.to_string(),
})
}
pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) {
self.cli_vmcontract_tx.remove(admin_pubkey);
}
pub fn insert_contract(&self, contract: Contract) {
let mut contracts = self.contracts.write().unwrap();
contracts.push(contract);
}
pub async fn find_nodes_by_pubkey(&self, public_key: &str) -> Option<Node> {
let nodes = self.nodes.read().await;
pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes.iter().cloned().find(|n| n.public_key == public_key)
}
pub async fn find_nodes_by_owner_key(&self, owner_key: &str) -> Option<Node> {
let nodes = self.nodes.read().await;
pub fn find_ns_by_owner_key(&self, owner_key: &str) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes.iter().cloned().find(|n| n.owner_key == owner_key)
}
pub async fn find_nodes_by_filters(
&self,
filters: &crate::grpc::brain::NodeFilters,
) -> Vec<Node> {
let nodes = self.nodes.read().await;
pub fn find_nodes_by_filters(&self, filters: &crate::grpc::brain::NodeFilters) -> Vec<Node> {
let nodes = self.nodes.read().unwrap();
nodes
.iter()
.cloned()
@ -121,13 +189,13 @@ impl BrainData {
.collect()
}
pub async fn find_contract_by_uuid(&self, uuid: &str) -> Option<Contract> {
let contracts = self.contracts.read().await;
pub fn find_contract_by_uuid(&self, uuid: &str) -> Option<Contract> {
let contracts = self.contracts.read().unwrap();
contracts.iter().cloned().find(|c| c.uuid == uuid)
}
pub async fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<Contract> {
let contracts = self.contracts.read().await;
pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<Contract> {
let contracts = self.contracts.read().unwrap();
contracts
.iter()
.cloned()
@ -135,8 +203,8 @@ impl BrainData {
.collect()
}
pub async fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<Contract> {
let contracts = self.contracts.read().await;
pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<Contract> {
let contracts = self.contracts.read().unwrap();
contracts
.iter()
.cloned()

@ -11,7 +11,7 @@ use brain::*;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
struct BrainDaemonMock {
@ -30,13 +30,38 @@ impl BrainDaemonService for BrainDaemonMock {
todo!()
}
type NewVMUpdatesStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
async fn new_vm_updates(
async fn get_new_vm_reqs(
&self,
_req: Request<Streaming<NewVmConfirmation>>,
) -> Result<Response<Self::NewVMUpdatesStream>, Status> {
todo!()
req: Request<NodePubkey>,
) -> Result<Response<Self::GetNewVMReqsStream>, Status> {
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx);
tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await {
let _ = grpc_tx.send(Ok(newvmreq)).await;
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::GetNewVMReqsStream
))
}
async fn send_vm_confirmations(
&self,
req: Request<Streaming<NewVmConfirmation>>,
) -> Result<Response<Empty>, Status> {
let mut confirmations = req.into_inner();
while let Some(confirmation) = confirmations.next().await {
if let Ok(confirmation) = confirmation {
self.data.submit_vmconfirmation(confirmation).await;
}
}
Ok(Response::new(Empty {}))
}
type DeletedVMUpdatesStream =
@ -56,16 +81,11 @@ impl BrainDaemonService for BrainDaemonMock {
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
let req = req.into_inner();
let contracts = self
.data
.find_contracts_by_admin_pubkey(&req.node_pubkey)
.await;
let (tx, rx) = mpsc::channel(128);
let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx
.send(Ok(contract.into()))
.await;
let _ = tx.send(Ok(contract.into())).await;
}
});
@ -80,9 +100,22 @@ impl BrainDaemonService for BrainDaemonMock {
impl BrainCliService for BrainCliMock {
async fn create_vm_contract(
&self,
_req: Request<NewVmRequest>,
req: Request<NewVmRequest>,
) -> Result<Response<NewVmConfirmation>, Status> {
todo!();
let req = req.into_inner();
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel();
let dropper = self.data.clone().submit_newvmrequest(req, engine_tx).await;
if dropper.is_none() {
return Err(Status::unavailable(
"The node you picked is currently offline.",
));
}
if let Ok(response) = engine_rx.await {
return Ok(Response::new(response));
}
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
@ -91,16 +124,11 @@ impl BrainCliService for BrainCliMock {
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
let req = req.into_inner();
let contracts = self
.data
.find_contracts_by_admin_pubkey(&req.admin_pubkey)
.await;
let (tx, rx) = mpsc::channel(128);
let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx
.send(Ok(contract.into()))
.await;
let _ = tx.send(Ok(contract.into())).await;
}
});
@ -116,13 +144,11 @@ impl BrainCliService for BrainCliMock {
req: Request<NodeFilters>,
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
let req = req.into_inner();
let nodes = self.data.find_nodes_by_filters(&req).await;
let (tx, rx) = mpsc::channel(128);
let nodes = self.data.find_nodes_by_filters(&req);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
let _ = tx
.send(Ok(node.into()))
.await;
let _ = tx.send(Ok(node.into())).await;
}
});