minimum locking balance on deployment locking balance on app deployment refunding locked nano while error on daemon returning appropreate error on app deployment fixed some typos on logging new timeout constants for daemon respose minor change in schema and proto extensive tests on app deployments fixed some vm tests
307 lines
11 KiB
Rust
307 lines
11 KiB
Rust
use crate::constants::{ACCOUNT, APP_NODE};
|
|
use crate::db::app::ActiveApp;
|
|
use crate::db::prelude as db;
|
|
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
|
|
use detee_shared::app_proto::brain_app_cli_server::BrainAppCli;
|
|
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon;
|
|
use detee_shared::app_proto::*;
|
|
use detee_shared::common_proto::Empty;
|
|
use log::info;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use surrealdb::engine::remote::ws::Client;
|
|
use surrealdb::RecordId;
|
|
use surrealdb::Surreal;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
use tonic::{Request, Response, Status, Streaming};
|
|
|
|
pub struct AppDaemonServer {
|
|
pub db: Arc<Surreal<Client>>,
|
|
}
|
|
|
|
impl AppDaemonServer {
|
|
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
|
Self { db }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainAppDaemon for AppDaemonServer {
|
|
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
|
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
|
|
|
|
async fn register_app_node(
|
|
&self,
|
|
req: Request<RegisterAppNodeReq>,
|
|
) -> Result<Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("Starting app_node registration process for {:?}", req);
|
|
|
|
let app_node = db::AppNode {
|
|
id: RecordId::from((APP_NODE, req.node_pubkey.clone())),
|
|
operator: RecordId::from((ACCOUNT, req.operator_wallet)),
|
|
country: req.country,
|
|
region: req.region,
|
|
city: req.city,
|
|
ip: req.main_ip,
|
|
price: req.price,
|
|
|
|
avail_mem_mb: 0,
|
|
avail_vcpus: 0,
|
|
avail_storage_gbs: 0,
|
|
avail_ports: 0,
|
|
max_ports_per_app: 0,
|
|
offline_minutes: 0,
|
|
};
|
|
|
|
app_node.register(&self.db).await?;
|
|
info!("Sending existing contracts to {}", req.node_pubkey);
|
|
|
|
let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for contract in contracts {
|
|
let _ = tx.send(Ok(contract.into())).await;
|
|
}
|
|
});
|
|
|
|
let resp_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(resp_stream)))
|
|
}
|
|
|
|
async fn brain_messages(
|
|
&self,
|
|
req: Request<DaemonAuth>,
|
|
) -> Result<Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
|
|
let auth = req.into_inner();
|
|
let pubkey = auth.pubkey.clone();
|
|
check_sig_from_parts(
|
|
&pubkey,
|
|
&auth.timestamp,
|
|
&format!("{:?}", auth.contracts),
|
|
&auth.signature,
|
|
)?;
|
|
|
|
info!("App Daemon {} connected to receive brain messages", pubkey);
|
|
|
|
let (tx, rx) = mpsc::channel(6);
|
|
{
|
|
let db = self.db.clone();
|
|
let pubkey = pubkey.clone();
|
|
let tx = tx.clone();
|
|
tokio::spawn(async move {
|
|
let _ = db::live_appnode_msgs::<db::NewAppReq>(&db, &pubkey, tx).await;
|
|
});
|
|
}
|
|
{
|
|
let db = self.db.clone();
|
|
let pubkey = pubkey.clone();
|
|
let tx = tx.clone();
|
|
tokio::spawn(async move {
|
|
let _ = db::live_appnode_msgs::<db::DeletedApp>(&db, &pubkey, tx).await;
|
|
});
|
|
}
|
|
|
|
let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
|
|
Ok(Response::new(Box::pin(resp_stream)))
|
|
}
|
|
|
|
async fn daemon_messages(
|
|
&self,
|
|
req: Request<Streaming<DaemonMessageApp>>,
|
|
) -> Result<Response<Empty>, Status> {
|
|
let mut req_stream = req.into_inner();
|
|
let pubkey: String;
|
|
if let Some(Ok(msg)) = req_stream.next().await {
|
|
log::debug!("App daemon_messages received auth message: {msg:?}");
|
|
if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg {
|
|
pubkey = auth.pubkey.clone();
|
|
check_sig_from_parts(
|
|
&pubkey,
|
|
&auth.timestamp,
|
|
&format!("{:?}", &auth.contracts),
|
|
&auth.signature,
|
|
)?;
|
|
} else {
|
|
return Err(Status::unauthenticated(
|
|
"Could not authenticate the app daemon: could not extract auth signature",
|
|
));
|
|
}
|
|
} else {
|
|
return Err(Status::unauthenticated("Could not authenticate the app daemon"));
|
|
}
|
|
|
|
while let Some(daemon_message) = req_stream.next().await {
|
|
match daemon_message {
|
|
Ok(msg) => match msg.msg {
|
|
Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => {
|
|
if !new_app_resp.error.is_empty() {
|
|
db::NewAppReq::submit_error(
|
|
&self.db,
|
|
&new_app_resp.uuid,
|
|
new_app_resp.error,
|
|
)
|
|
.await?;
|
|
} else {
|
|
db::ActiveApp::activate(&self.db, new_app_resp).await?;
|
|
}
|
|
}
|
|
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
|
|
let node_resource: db::AppNodeResources = app_node_resources.into();
|
|
node_resource.merge(&self.db, &pubkey).await?;
|
|
}
|
|
_ => {}
|
|
},
|
|
|
|
Err(e) => {
|
|
log::warn!("App Daemon Disconnected: {e:?}")
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
}
|
|
|
|
pub struct AppCliServer {
|
|
pub db: Arc<Surreal<Client>>,
|
|
}
|
|
|
|
impl AppCliServer {
|
|
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
|
Self { db }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainAppCli for AppCliServer {
|
|
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
|
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
|
|
|
|
async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
// TODO: make it atleast 1 hour
|
|
if req.locked_nano < 100 {
|
|
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
|
|
return Err(Status::unknown("lock atleaset 100 nano lps"));
|
|
}
|
|
info!("new_app process starting for {:?}", req);
|
|
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
|
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
|
}
|
|
|
|
let db_req: db::NewAppReq = req.into();
|
|
let id = db_req.id.key().to_string();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
let db = self.db.clone();
|
|
tokio::spawn(async move {
|
|
let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await);
|
|
});
|
|
db_req.submit(&self.db).await?;
|
|
|
|
match rx.await {
|
|
Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)),
|
|
Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)),
|
|
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
|
|
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
|
)),
|
|
Ok(Err(e)) => {
|
|
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
Err(e) => {
|
|
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn delete_app(&self, req: Request<DelAppReq>) -> Result<Response<Empty>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("delete_app process starting for {:?}", req);
|
|
match ActiveApp::delete(&self.db, &req.uuid).await? {
|
|
true => Ok(Response::new(Empty {})),
|
|
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
|
|
}
|
|
}
|
|
|
|
async fn list_app_contracts(
|
|
&self,
|
|
req: Request<ListAppContractsReq>,
|
|
) -> Result<Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("list_app_contracts process starting for {:?}", req);
|
|
|
|
let mut app_contracts = Vec::new();
|
|
|
|
if !req.uuid.is_empty() {
|
|
if let Some(app_contract) =
|
|
db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await?
|
|
{
|
|
if app_contract.admin.key().to_string() == req.admin_pubkey
|
|
|| app_contract.app_node.operator.key().to_string() == req.admin_pubkey
|
|
{
|
|
app_contracts.push(app_contract);
|
|
}
|
|
}
|
|
} else if req.as_operator {
|
|
app_contracts.append(
|
|
&mut db::ActiveAppWithNode::list_by_operator(&self.db, &req.admin_pubkey).await?,
|
|
);
|
|
} else {
|
|
app_contracts.append(
|
|
&mut db::ActiveAppWithNode::list_by_admin(&self.db, &req.admin_pubkey).await?,
|
|
);
|
|
}
|
|
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for app_contract in app_contracts {
|
|
let _ = tx.send(Ok(app_contract.into())).await;
|
|
}
|
|
});
|
|
|
|
let resp_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(resp_stream)))
|
|
}
|
|
|
|
async fn list_app_nodes(
|
|
&self,
|
|
req: Request<AppNodeFilters>,
|
|
) -> Result<Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("list_app_nodes process starting for {:?}", req);
|
|
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for app_node in app_nodes {
|
|
let _ = tx.send(Ok(app_node.into())).await;
|
|
}
|
|
});
|
|
let resp_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(resp_stream)))
|
|
}
|
|
|
|
async fn get_one_app_node(
|
|
&self,
|
|
req: Request<AppNodeFilters>,
|
|
) -> Result<Response<AppNodeListResp>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("get_one_app_node process starting for {:?}", req);
|
|
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)
|
|
.await?
|
|
.first()
|
|
.ok_or(Status::not_found("No app node found"))?
|
|
.clone();
|
|
|
|
Ok(Response::new(app_node.into()))
|
|
}
|
|
}
|