created update functionality??

This commit is contained in:
Ramil_Algayev 2024-12-24 03:21:33 +04:00
parent c7cc1565e2
commit 179923f408
2 changed files with 44 additions and 9 deletions

@ -42,13 +42,14 @@ message NewVMRequest {
message UpdateVMRequest {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
string node_pubkey = 2;
uint32 disk_size_gb = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
string kernel_url = 6;
string kernel_sha = 7;
string dtrfs_url = 8;
string dtrfs_sha = 9;
}
message UpdateVMResp {

@ -1,10 +1,10 @@
#![allow(dead_code)]
use crate::grpc::brain::{self as grpc, UpdateVmRequest, UpdateVmResp};
use crate::grpc::brain as grpc;
use dashmap::DashMap;
use log::{debug, info, warn};
use std::sync::RwLock;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::{self, Sender as OneshotSender};
use tokio::sync::oneshot::Sender as OneshotSender;
#[derive(Eq, Hash, PartialEq, Clone)]
pub struct Node {
@ -96,6 +96,7 @@ pub struct BrainData {
nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
tmp_updatevmrequests: DashMap<String, (grpc::UpdateVmRequest, OneshotSender<grpc::UpdateVmResp>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>,
@ -114,6 +115,7 @@ impl BrainData {
nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()),
tmp_vmrequests: DashMap::new(),
tmp_updatevmrequests: DashMap::new(),
daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(),
daemon_updatevm_tx: DashMap::new(),
@ -239,6 +241,38 @@ impl BrainData {
}
}
pub async fn submit_updatevmrequest(
&self,
req: grpc::UpdateVmRequest,
tx: OneshotSender<grpc::UpdateVmResp>,
) {
let uuid = req.uuid.clone();
info!("Inserting update vm request in memory: {:?}", req);
self.tmp_updatevmrequests.insert(uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) {
if server_tx.send(req.clone()).await.is_err() {
warn!("Daemon {} RX dropped before sending update VM. Cleaning memory...", &req.node_pubkey);
if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) {
let _ = oneshot_tx.1.send(grpc::UpdateVmResp {
uuid,
timestamp: "".to_string(),
error: "Daemon is offline.".to_string(),
});
}
}
} else {
warn!("No daemon TX found for {}", req.node_pubkey);
if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) {
let _ = oneshot_tx.1.send(grpc::UpdateVmResp {
uuid,
timestamp: "".to_string(),
error: "Daemon is offline.".to_string(),
});
}
}
}
pub fn insert_contract(&self, contract: Contract) {
let mut contracts = self.contracts.write().unwrap();
contracts.push(contract);