app features: LP cron, kicks and reports

feat: add cron job for app nodes

- allow app contract admin to report app node
- allow operator kick app contract
- cron job that gives LP to app node operators
This commit is contained in:
Noor 2025-04-03 12:42:38 +05:30 committed by ghe0
parent 9be7abc9cf
commit 8658f47d71
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 163 additions and 31 deletions

@ -40,13 +40,16 @@ pub enum Error {
#[error("Could not find contract {0}")]
AppContractNotFound(String),
#[error("Could not find contract {0}")]
ContractNotFound(String),
}
#[derive(Clone, Default, Serialize, Deserialize, Debug)]
pub struct AccountData {
pub balance: u64,
pub tmp_locked: u64,
// holds reasons why VMs of this account got kicked
// holds reasons why Contracts of this account got kicked
pub kicked_for: Vec<String>,
pub last_kick: chrono::DateTime<Utc>,
// holds accounts that banned this account
@ -247,7 +250,7 @@ impl From<AppContract> for AppContractPB {
}
}
#[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)]
#[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize)]
pub struct AppNode {
pub node_pubkey: String,
pub operator_wallet: String,
@ -262,7 +265,8 @@ pub struct AppNode {
pub max_ports_per_app: u32,
// nanotokens per unit per minute
pub price: u64,
// 1st String is user wallet and 2nd String is report message
pub reports: HashMap<String, String>,
pub offline_minutes: u64,
}
@ -276,7 +280,7 @@ impl From<AppNode> for AppNodeListResp {
city: value.city,
ip: value.ip,
price: value.price,
reports: Vec::new(),
reports: value.reports.into_values().collect(),
}
}
}
@ -442,7 +446,8 @@ impl BrainData {
let node = match nodes
.iter()
.find(|n| n.public_key == c.node_pubkey)
.cloned() {
.cloned()
{
Some(n) => n,
None => return c.locked_nano > 0,
};
@ -505,7 +510,6 @@ impl BrainData {
nodes.push(node);
}
// todo: this should also support Apps
/// Receives: operator, contract uuid, reason of kick
pub async fn kick_contract(
&self,
@ -514,17 +518,48 @@ impl BrainData {
reason: &str,
) -> Result<u64, Error> {
log::debug!("Operator {operator} requested a kick of {uuid} for reason: {reason}");
let contract = self.find_contract_by_uuid(uuid)?;
let (admin_pubkey, node_pubkey, updated_at, price_per_mint, is_vm) =
match self.find_any_contract_by_uuid(uuid) {
Ok((Some(vm), _)) => {
let price_per_mint = vm.price_per_minute();
(
vm.admin_pubkey,
vm.node_pubkey,
vm.updated_at,
price_per_mint,
true,
)
}
Ok((_, Some(app))) => {
let price_per_mint = app.price_per_minute();
(
app.admin_pubkey,
app.node_pubkey,
app.updated_at,
price_per_mint,
false,
)
}
_ => {
log::error!("Could not find contract {uuid}");
return Err(Error::ContractNotFound(uuid.to_string()));
}
};
let mut operator_data = self
.operators
.get_mut(operator)
.ok_or(Error::AccessDenied)?;
if !operator_data.vm_nodes.contains(&contract.node_pubkey) {
if !(operator_data.vm_nodes.contains(&node_pubkey)
|| operator_data.app_nodes.contains(&node_pubkey))
{
return Err(Error::AccessDenied);
}
let mut minutes_to_refund = chrono::Utc::now()
.signed_duration_since(contract.updated_at)
.signed_duration_since(updated_at)
.num_minutes()
.abs() as u64;
// cap refund at 1 week
@ -532,10 +567,10 @@ impl BrainData {
minutes_to_refund = 10080;
}
let mut refund_amount = minutes_to_refund * contract.price_per_minute();
let mut refund_amount = minutes_to_refund * price_per_mint;
let mut admin_account = self
.accounts
.get_mut(&contract.admin_pubkey)
.get_mut(&admin_pubkey)
.ok_or(Error::ImpossibleError)?;
// check if he got kicked within the last day
@ -559,15 +594,22 @@ impl BrainData {
admin_account.kicked_for.push(reason.to_string());
operator_data.escrow -= refund_amount;
let admin_pubkey = contract.admin_pubkey.clone();
let admin_pubkey = admin_pubkey.clone();
drop(admin_account);
drop(contract);
if is_vm {
self.delete_vm(grpc::DeleteVmReq {
uuid: uuid.to_string(),
admin_pubkey,
})
.await?;
} else {
self.send_del_container_req(DelAppReq {
uuid: uuid.to_string(),
admin_pubkey,
})
.await?;
}
Ok(refund_amount)
}
@ -593,10 +635,15 @@ impl BrainData {
});
}
pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) {
let mut nodes = self.vm_nodes.write().unwrap();
if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) {
node.reports.insert(admin_pubkey, report);
pub fn report_any_node(&self, admin_pubkey: String, node: &str, report: String) {
let mut vm_nodes = self.vm_nodes.write().unwrap();
if let Some(vm_node) = vm_nodes.iter_mut().find(|n| n.public_key == node) {
vm_node.reports.insert(admin_pubkey.clone(), report.clone());
}
let mut app_nodes = self.app_nodes.write().unwrap();
if let Some(app_node) = app_nodes.iter_mut().find(|n| n.node_pubkey == node) {
app_node.reports.insert(admin_pubkey, report);
}
}
@ -1238,6 +1285,25 @@ impl BrainData {
}
}
impl BrainData {
pub fn find_any_contract_by_uuid(
&self,
uuid: &str,
) -> Result<(Option<VmContract>, Option<AppContract>), Error> {
let contracts = self.vm_contracts.read().unwrap();
if let Some(vm_contract) = contracts.iter().cloned().find(|c| c.uuid == uuid) {
return Ok((Some(vm_contract), None));
}
let app_contracts = self.app_contracts.read().unwrap();
if let Some(app_contract) = app_contracts.iter().cloned().find(|c| c.uuid == uuid) {
return Ok((None, Some(app_contract)));
}
Err(Error::ContractNotFound(uuid.to_string()))
}
}
impl BrainData {
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageApp>) {
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
@ -1380,13 +1446,72 @@ impl BrainData {
.collect()
}
/// This is written to run every minute
pub async fn app_nodes_cron(&self) {
log::debug!("Running app nodes cron...");
let mut nodes = self.app_nodes.write().unwrap();
let mut app_contracts = self.app_contracts.write().unwrap();
for node in nodes.iter_mut() {
if self.app_daemon_tx.contains_key(&node.node_pubkey) {
node.offline_minutes = 0;
continue;
}
let mut operator = match self
.operators
.iter_mut()
.find(|o| o.app_nodes.contains(&node.node_pubkey))
{
Some(op) => op,
None => continue,
};
node.offline_minutes += 1;
// compensate contract admin if the node is offline more then 5 minutes
if node.offline_minutes > 5 {
for c in app_contracts
.iter()
.filter(|c| c.node_pubkey == node.node_pubkey)
{
let compensation = c.price_per_minute() * 10;
if compensation < operator.escrow {
operator.escrow -= compensation;
self.add_nano_to_wallet(&c.admin_pubkey, compensation);
}
}
}
}
// delete nodes that are offline more than 3 hours, and clean contracts
nodes.retain(|n| {
if n.offline_minutes > 1600 {
app_contracts.retain_mut(|c| {
if c.node_pubkey == n.node_pubkey {
self.add_nano_to_wallet(&c.admin_pubkey, c.locked_nano);
}
c.node_pubkey != n.node_pubkey
});
for mut op in self.operators.iter_mut() {
op.vm_nodes.remove(&n.node_pubkey);
}
}
n.offline_minutes <= 180
});
}
pub async fn app_contracts_cron(&self) {
let mut deleted_app_contracts: Vec<(String, String)> = Vec::new();
log::debug!("Running app contracts cron...");
{
let mut app_contracts = self.app_contracts.write().unwrap();
let app_nodes = self.app_nodes.read().unwrap();
app_contracts.retain_mut(|c| {
let node = self.find_app_node_by_pubkey(&c.node_pubkey).unwrap();
let node = match app_nodes
.iter()
.find(|n| n.node_pubkey == c.node_pubkey)
.cloned()
{
Some(n) => n,
None => return c.locked_nano > 0,
};
if node.offline_minutes == 0 {
let operator_wallet = node.operator_wallet.clone();
let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64;

@ -99,17 +99,23 @@ impl BrainGeneralCli for BrainGeneraClilMock {
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
match self.data.find_contract_by_uuid(&req.contract) {
Ok(contract)
if contract.admin_pubkey == req.admin_pubkey
&& contract.node_pubkey == req.node_pubkey =>
match self.data.find_any_contract_by_uuid(&req.contract) {
Ok((Some(vm_contract), _))
if vm_contract.admin_pubkey == req.admin_pubkey
&& vm_contract.node_pubkey == req.node_pubkey =>
{
()
}
Ok((_, Some(app_contract)))
if app_contract.admin_pubkey == req.admin_pubkey
&& app_contract.node_pubkey == req.node_pubkey =>
{
()
}
_ => return Err(Status::unauthenticated("No contract found by this ID.")),
};
self.data
.report_node(req.admin_pubkey, &req.node_pubkey, req.reason);
.report_any_node(req.admin_pubkey, &req.node_pubkey, req.reason);
Ok(Response::new(Empty {}))
}

@ -29,6 +29,7 @@ async fn main() {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
data_clone.vm_nodes_cron().await;
data_clone.vm_contracts_cron().await;
data_clone.app_nodes_cron().await;
data_clone.app_contracts_cron().await;
if let Err(e) = data_clone.save_to_disk() {
log::error!("Could not save data to disk due to error: {e}")