forked from ghe0/brain-mock
manageed container contract and node registraion
This commit is contained in:
parent
8f40adcbf8
commit
c6c44da0e8
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -329,7 +329,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "detee-shared"
|
name = "detee-shared"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484"
|
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#6e1b1853838905c44d535d984d1221dd5d0dc2bc"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"prost",
|
"prost",
|
||||||
|
117
src/data.rs
117
src/data.rs
@ -10,7 +10,6 @@ use tokio::sync::oneshot::Sender as OneshotSender;
|
|||||||
|
|
||||||
use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes};
|
use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes};
|
||||||
use detee_shared::pb::shared as sharedPb;
|
use detee_shared::pb::shared as sharedPb;
|
||||||
use detee_shared::pb::shared::Container;
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
@ -138,6 +137,60 @@ impl Into<grpc::Contract> for Contract {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct AppContract {
|
||||||
|
pub uuid: String,
|
||||||
|
pub package_url: String,
|
||||||
|
pub admin_pubkey: String,
|
||||||
|
pub node_pubkey: String,
|
||||||
|
pub mapped_ports: Vec<(u16, u16)>,
|
||||||
|
pub host_ipv4: String,
|
||||||
|
pub disk_size_gb: u32,
|
||||||
|
pub vcpus: u32,
|
||||||
|
pub memory_mb: u32,
|
||||||
|
pub created_at: chrono::DateTime<Utc>,
|
||||||
|
pub updated_at: chrono::DateTime<Utc>,
|
||||||
|
// price per unit per minute
|
||||||
|
// recommended value is 20000
|
||||||
|
pub price_per_unit: u64,
|
||||||
|
pub locked_nano: u64,
|
||||||
|
pub collected_at: chrono::DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<AppContract> for sharedPb::ContainerContracts {
|
||||||
|
fn from(value: AppContract) -> Self {
|
||||||
|
Self {
|
||||||
|
uuid: value.uuid,
|
||||||
|
admin_pubkey: value.admin_pubkey,
|
||||||
|
node_pubkey: value.node_pubkey,
|
||||||
|
package_url: value.package_url,
|
||||||
|
exposed_ports: value
|
||||||
|
.mapped_ports
|
||||||
|
.into_iter()
|
||||||
|
.map(sharedPb::MappedPort::from)
|
||||||
|
.collect(),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)]
|
||||||
|
pub struct AppNode {
|
||||||
|
pub public_key: String,
|
||||||
|
pub owner_key: String,
|
||||||
|
pub country: String,
|
||||||
|
pub region: String,
|
||||||
|
pub city: String,
|
||||||
|
pub ip: String,
|
||||||
|
pub avail_mem_mb: u32,
|
||||||
|
pub avail_vcpus: u32,
|
||||||
|
pub avail_storage_gbs: u32,
|
||||||
|
pub avail_ports: u32,
|
||||||
|
pub max_ports_per_app: u32,
|
||||||
|
// nanotokens per unit per minute
|
||||||
|
pub price: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct BrainData {
|
pub struct BrainData {
|
||||||
// amount of nanotokens in each account
|
// amount of nanotokens in each account
|
||||||
@ -148,8 +201,10 @@ pub struct BrainData {
|
|||||||
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
||||||
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
||||||
|
|
||||||
sgx_daemon_tx: DashMap<String, Sender<BrainMessageSgx>>,
|
app_nodes: RwLock<Vec<AppNode>>,
|
||||||
|
app_daemon_tx: DashMap<String, Sender<BrainMessageSgx>>,
|
||||||
tmp_new_container_reqs: DashMap<String, (sharedPb::Container, OneshotSender<NewContainerRes>)>,
|
tmp_new_container_reqs: DashMap<String, (sharedPb::Container, OneshotSender<NewContainerRes>)>,
|
||||||
|
app_contracts: RwLock<Vec<AppContract>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -169,8 +224,10 @@ impl BrainData {
|
|||||||
tmp_updatevm_reqs: DashMap::new(),
|
tmp_updatevm_reqs: DashMap::new(),
|
||||||
daemon_tx: DashMap::new(),
|
daemon_tx: DashMap::new(),
|
||||||
|
|
||||||
sgx_daemon_tx: DashMap::new(),
|
app_daemon_tx: DashMap::new(),
|
||||||
tmp_new_container_reqs: DashMap::new(),
|
tmp_new_container_reqs: DashMap::new(),
|
||||||
|
app_contracts: RwLock::new(Vec::new()),
|
||||||
|
app_nodes: RwLock::new(Vec::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -681,17 +738,40 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BrainData {
|
impl BrainData {
|
||||||
pub fn add_sgx_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageSgx>) {
|
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageSgx>) {
|
||||||
self.sgx_daemon_tx.insert(node_pubkey.to_string(), tx);
|
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_sgx_daemon_tx(&self, node_pubkey: &str) {
|
pub fn del_app_daemon_tx(&self, node_pubkey: &str) {
|
||||||
self.sgx_daemon_tx.remove(node_pubkey);
|
self.app_daemon_tx.remove(node_pubkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_app_node(&self, node: AppNode) {
|
||||||
|
info!("Registering app node {node:?}");
|
||||||
|
let mut nodes = self.app_nodes.write().unwrap();
|
||||||
|
for n in nodes.iter_mut() {
|
||||||
|
if n.public_key == node.public_key {
|
||||||
|
// TODO: figure what to do in this case.
|
||||||
|
warn!("Node {} already exists. Updating data.", n.public_key);
|
||||||
|
*n = node;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodes.push(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_app_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<AppContract> {
|
||||||
|
let app_contracts = self.app_contracts.read().unwrap();
|
||||||
|
app_contracts
|
||||||
|
.iter()
|
||||||
|
.filter(|c| c.node_pubkey == node_pubkey)
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_new_container_req(
|
pub async fn send_new_container_req(
|
||||||
&self,
|
&self,
|
||||||
mut req: Container,
|
mut req: sharedPb::Container,
|
||||||
tx: OneshotSender<NewContainerRes>,
|
tx: OneshotSender<NewContainerRes>,
|
||||||
) {
|
) {
|
||||||
req.uuid = uuid::Uuid::new_v4().to_string();
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
@ -700,7 +780,7 @@ impl BrainData {
|
|||||||
self.tmp_new_container_reqs
|
self.tmp_new_container_reqs
|
||||||
.insert(req.uuid.clone(), (req.clone(), tx));
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
||||||
|
|
||||||
if let Some(sgx_daemon_tx) = self.sgx_daemon_tx.get(&req.node_pubkey) {
|
if let Some(app_daemon_tx) = self.app_daemon_tx.get(&req.node_pubkey) {
|
||||||
debug!(
|
debug!(
|
||||||
"Found daemon TX for {}. Sending newVMReq {}",
|
"Found daemon TX for {}. Sending newVMReq {}",
|
||||||
req.node_pubkey, req.uuid
|
req.node_pubkey, req.uuid
|
||||||
@ -710,13 +790,13 @@ impl BrainData {
|
|||||||
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
if let Err(e) = sgx_daemon_tx.send(msg).await {
|
if let Err(e) = app_daemon_tx.send(msg).await {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to send new container request to {} due to error: {e:?}",
|
"Failed to send new container request to {} due to error: {e:?}",
|
||||||
req.node_pubkey
|
req.node_pubkey
|
||||||
);
|
);
|
||||||
info!("Deleting daemon TX for {}", req.node_pubkey);
|
info!("Deleting daemon TX for {}", req.node_pubkey);
|
||||||
self.del_sgx_daemon_tx(&req.node_pubkey);
|
self.del_app_daemon_tx(&req.node_pubkey);
|
||||||
self.send_new_container_resp(NewContainerRes {
|
self.send_new_container_resp(NewContainerRes {
|
||||||
uuid: req.uuid,
|
uuid: req.uuid,
|
||||||
status: "Failed".to_string(),
|
status: "Failed".to_string(),
|
||||||
@ -741,12 +821,23 @@ impl BrainData {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let Err(_) = new_container_req.1.send(new_container_resp.clone()) {
|
if let Err(err) = new_container_req.1.send(new_container_resp.clone()) {
|
||||||
log::error!(
|
log::error!(
|
||||||
"CLI RX for {} dropped before receiving confirmation {:?}.",
|
"CLI RX for {} dropped before receiving confirmation {:?}.\n{:?}",
|
||||||
&new_container_req.0.admin_pubkey,
|
&new_container_req.0.admin_pubkey,
|
||||||
new_container_resp,
|
new_container_resp,
|
||||||
|
err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let app_contracts = AppContract {
|
||||||
|
uuid: new_container_req.0.uuid,
|
||||||
|
node_pubkey: new_container_req.0.node_pubkey.clone(),
|
||||||
|
package_url: new_container_req.0.package_url,
|
||||||
|
admin_pubkey: new_container_req.0.admin_pubkey,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
log::info!("Created new app contract: {app_contracts:?}");
|
||||||
|
self.app_contracts.write().unwrap().push(app_contracts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
32
src/grpc.rs
32
src/grpc.rs
@ -342,15 +342,31 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::RegisterNodeReq>,
|
req: tonic::Request<detee_shared::pb::shared::RegisterNodeReq>,
|
||||||
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
||||||
dbg!(req);
|
log::info!("registering app node : {:?}", &req);
|
||||||
let (tx, rx) = mpsc::channel(6);
|
let req_data = req.into_inner();
|
||||||
|
|
||||||
|
let app_node = crate::data::AppNode {
|
||||||
|
public_key: req_data.node_pubkey.clone(),
|
||||||
|
owner_key: req_data.owner_pubkey,
|
||||||
|
ip: req_data.main_ip,
|
||||||
|
city: req_data.city,
|
||||||
|
region: req_data.region,
|
||||||
|
country: req_data.country,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
self.data.insert_app_node(app_node);
|
||||||
|
log::info!("Sending existing contracts to {}", &req_data.node_pubkey);
|
||||||
|
|
||||||
|
let app_contracts = self
|
||||||
|
.data
|
||||||
|
.find_app_contracts_by_node_pubkey(&req_data.node_pubkey);
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel(6);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _ = tx
|
for contract in app_contracts {
|
||||||
.send(detee_shared::pb::shared::ContainerContracts {
|
let _ = tx.send(contract.into()).await;
|
||||||
..Default::default()
|
}
|
||||||
})
|
|
||||||
.await;
|
|
||||||
});
|
});
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||||
Ok(Response::new(Box::pin(output_stream)))
|
Ok(Response::new(Box::pin(output_stream)))
|
||||||
@ -363,7 +379,7 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
||||||
let (tx, rx) = mpsc::channel(6);
|
let (tx, rx) = mpsc::channel(6);
|
||||||
self.data.add_sgx_daemon_tx(&req.pubkey, tx);
|
self.data.add_app_daemon_tx(&req.pubkey, tx);
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||||
Ok(Response::new(
|
Ok(Response::new(
|
||||||
Box::pin(output_stream) as Self::BrainMessagesStream
|
Box::pin(output_stream) as Self::BrainMessagesStream
|
||||||
|
Loading…
Reference in New Issue
Block a user