revision to updater
This commit is contained in:
parent
b27ea64722
commit
1368b9cbc4
80
src/data.rs
80
src/data.rs
@ -96,6 +96,7 @@ pub struct BrainData {
|
|||||||
nodes: RwLock<Vec<Node>>,
|
nodes: RwLock<Vec<Node>>,
|
||||||
contracts: RwLock<Vec<Contract>>,
|
contracts: RwLock<Vec<Contract>>,
|
||||||
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
|
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
|
||||||
|
tmp_updatevmrequests: DashMap<String, (grpc::UpdateVmRequest, OneshotSender<grpc::UpdateVmResp>)>,
|
||||||
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
|
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
|
||||||
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
|
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
|
||||||
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>,
|
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>,
|
||||||
@ -114,6 +115,7 @@ impl BrainData {
|
|||||||
nodes: RwLock::new(Vec::new()),
|
nodes: RwLock::new(Vec::new()),
|
||||||
contracts: RwLock::new(Vec::new()),
|
contracts: RwLock::new(Vec::new()),
|
||||||
tmp_vmrequests: DashMap::new(),
|
tmp_vmrequests: DashMap::new(),
|
||||||
|
tmp_updatevmrequests: DashMap::new(),
|
||||||
daemon_deletevm_tx: DashMap::new(),
|
daemon_deletevm_tx: DashMap::new(),
|
||||||
daemon_newvm_tx: DashMap::new(),
|
daemon_newvm_tx: DashMap::new(),
|
||||||
daemon_updatevm_tx: DashMap::new(),
|
daemon_updatevm_tx: DashMap::new(),
|
||||||
@ -137,8 +139,11 @@ impl BrainData {
|
|||||||
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
|
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender<grpc::UpdateVmRequest>) {
|
pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmRequest>) {
|
||||||
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), _tx);
|
log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey);
|
||||||
|
self.tmp_updatevmrequests
|
||||||
|
.retain(|_, req| req.0.node_pubkey != node_pubkey);
|
||||||
|
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
|
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
|
||||||
@ -208,6 +213,38 @@ impl BrainData {
|
|||||||
self.contracts.write().unwrap().push(contract);
|
self.contracts.write().unwrap().push(contract);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn submit_update_vmconfirmation(&self, confirmation: grpc::UpdateVmResp) {
|
||||||
|
let updatevmreq = match self.tmp_updatevmrequests.remove(&confirmation.uuid) {
|
||||||
|
Some((_, r)) => r,
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"Received confirmation for ghost UpdateVMRequest {}",
|
||||||
|
confirmation.uuid
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = updatevmreq.1.send(confirmation.clone()) {
|
||||||
|
log::error!(
|
||||||
|
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}",
|
||||||
|
&updatevmreq.0.node_pubkey,
|
||||||
|
confirmation,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if confirmation.error != "" {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut contracts = self.contracts.write().unwrap();
|
||||||
|
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == confirmation.uuid) {
|
||||||
|
contract.disk_size_gb = updatevmreq.0.disk_size_gb;
|
||||||
|
contract.vcpus = updatevmreq.0.vcpus;
|
||||||
|
contract.memory_mb = updatevmreq.0.memory_mb;
|
||||||
|
contract.kernel_sha = updatevmreq.0.kernel_sha;
|
||||||
|
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn submit_newvmrequest(
|
pub async fn submit_newvmrequest(
|
||||||
&self,
|
&self,
|
||||||
mut req: grpc::NewVmRequest,
|
mut req: grpc::NewVmRequest,
|
||||||
@ -243,17 +280,34 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> {
|
pub async fn submit_updatevmrequest(
|
||||||
let mut contracts = self.contracts.write().unwrap();
|
&self,
|
||||||
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == req.uuid) {
|
req: grpc::UpdateVmRequest,
|
||||||
contract.disk_size_gb = req.disk_size_gb;
|
tx: OneshotSender<grpc::UpdateVmResp>,
|
||||||
contract.vcpus = req.vcpus;
|
) {
|
||||||
contract.memory_mb = req.memory_mb;
|
let uuid = req.uuid.clone();
|
||||||
contract.kernel_sha = req.kernel_sha;
|
info!("Inserting new vm update request in memory: {req:?}");
|
||||||
contract.dtrfs_sha = req.dtrfs_sha;
|
self.tmp_updatevmrequests
|
||||||
Ok(())
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
||||||
} else {
|
if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) {
|
||||||
Err(format!("Contract {} not found", req.uuid))
|
debug!(
|
||||||
|
"Found daemon TX for {}. Sending updateVMReq {}",
|
||||||
|
req.node_pubkey, req.uuid
|
||||||
|
);
|
||||||
|
if server_tx.send(req.clone()).await.is_ok() {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"Daemon {} RX dropped before sending update. Cleaning memory...",
|
||||||
|
req.node_pubkey
|
||||||
|
);
|
||||||
|
self.submit_update_vmconfirmation(grpc::UpdateVmResp {
|
||||||
|
uuid,
|
||||||
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
|
error: "Daemon is offline.".to_string(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
48
src/grpc.rs
48
src/grpc.rs
@ -16,7 +16,6 @@ use std::sync::Arc;
|
|||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
use chrono;
|
|
||||||
|
|
||||||
pub struct BrainDaemonMock {
|
pub struct BrainDaemonMock {
|
||||||
data: Arc<BrainData>,
|
data: Arc<BrainData>,
|
||||||
@ -165,19 +164,27 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
async fn get_update_vm(
|
async fn get_update_vm(
|
||||||
&self,
|
&self,
|
||||||
req: Request<NodePubkey>,
|
req: Request<NodePubkey>,
|
||||||
) -> Result<Response<Self::GetUpdateVMStream>, Status> {
|
) -> Result<Response<Self::GetUpdateVMStream>, Status> {
|
||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
info!("Daemon {} requested UpdateVMReqsStream", req.node_pubkey);
|
info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey);
|
||||||
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
||||||
let (data_tx, mut data_rx) = mpsc::channel(6);
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
||||||
self.data
|
self.data
|
||||||
.add_daemon_updatevm_tx(&req.node_pubkey, data_tx)
|
.add_daemon_updatevm_tx(&req.node_pubkey, data_tx)
|
||||||
.await;
|
.await;
|
||||||
|
let data = self.data.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(updatevmreq) = data_rx.recv().await {
|
while let Some(updatevmreq) = data_rx.recv().await {
|
||||||
debug!("Sending UpdateVMRequest to {}: {updatevmreq:?}", req.node_pubkey);
|
debug!("Sending UpdateVMRequest to {}: {updatevmreq:?}", req.node_pubkey);
|
||||||
if let Err(e) = grpc_tx.send(Ok(updatevmreq)).await {
|
if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await {
|
||||||
warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey);
|
warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey);
|
||||||
|
data.submit_update_vmconfirmation(UpdateVmResp {
|
||||||
|
error: "Daemon not connected.".to_string(),
|
||||||
|
uuid: updatevmreq.uuid,
|
||||||
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -191,15 +198,16 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<Streaming<UpdateVmResp>>,
|
req: Request<Streaming<UpdateVmResp>>,
|
||||||
) -> Result<Response<Empty>, Status> {
|
) -> Result<Response<Empty>, Status> {
|
||||||
debug!("Some node connected to stream UpdateVmResp");
|
debug!("Some node connected to stream NewVmConfirmation");
|
||||||
let mut confirmations = req.into_inner();
|
let mut confirmations = req.into_inner();
|
||||||
while let Some(confirmation) = confirmations.next().await {
|
while let Some(confirmation) = confirmations.next().await {
|
||||||
match confirmation {
|
match confirmation {
|
||||||
Ok(c) => {
|
Ok(c) => {
|
||||||
info!("Received update confirmation from daemon: {c:?}")
|
info!("Received confirmation from daemon: {c:?}");
|
||||||
|
self.data.submit_update_vmconfirmation(c).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("Daemon disconnected from Streaming<UpdateVmResp>: {e:?}")
|
log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -237,17 +245,21 @@ impl BrainCliService for BrainCliMock {
|
|||||||
req: Request<UpdateVmRequest>,
|
req: Request<UpdateVmRequest>,
|
||||||
) -> Result<Response<UpdateVmResp>, Status> {
|
) -> Result<Response<UpdateVmResp>, Status> {
|
||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
match self.data.update_vm(req.clone()).await {
|
info!("Update VM requested via CLI: {req:?}");
|
||||||
Ok(_) => Ok(Response::new(UpdateVmResp {
|
let node_pubkey = req.node_pubkey.clone();
|
||||||
uuid: req.uuid,
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
||||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
self.data.submit_updatevmrequest(req, oneshot_tx).await;
|
||||||
error: "".to_string(),
|
match oneshot_rx.await {
|
||||||
})),
|
Ok(response) => {
|
||||||
Err(e) => Ok(Response::new(UpdateVmResp {
|
info!("Sending Update VM confirmation to {node_pubkey}: {response:?}");
|
||||||
uuid: req.uuid,
|
Ok(Response::new(response))
|
||||||
timestamp: "".to_string(),
|
}
|
||||||
error: e,
|
Err(e) => {
|
||||||
})),
|
log::error!("Something weird happened. Reached error {e:?}");
|
||||||
|
Err(Status::unknown(
|
||||||
|
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user