Compare commits
1 Commits
ac436e3ae6
...
bced16437e
Author | SHA1 | Date | |
---|---|---|---|
bced16437e |
12
snp.proto
12
snp.proto
@ -52,6 +52,7 @@ message MeasurementIP {
|
|||||||
string gateway = 4;
|
string gateway = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This should also include a block hash or similar, for auth
|
||||||
message RegisterNodeReq {
|
message RegisterNodeReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string owner_pubkey = 2;
|
string owner_pubkey = 2;
|
||||||
@ -130,9 +131,16 @@ message BrainMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DaemonStreamAuth {
|
||||||
|
string timestamp = 1;
|
||||||
|
string pubkey = 2;
|
||||||
|
repeated string contracts = 3;
|
||||||
|
string signature = 4;
|
||||||
|
}
|
||||||
|
|
||||||
message DaemonMessage {
|
message DaemonMessage {
|
||||||
oneof Msg {
|
oneof Msg {
|
||||||
Pubkey pubkey = 1;
|
DaemonStreamAuth auth = 1;
|
||||||
NewVmResp new_vm_resp = 2;
|
NewVmResp new_vm_resp = 2;
|
||||||
UpdateVmResp update_vm_resp = 3;
|
UpdateVmResp update_vm_resp = 3;
|
||||||
NodeResources node_resources = 4;
|
NodeResources node_resources = 4;
|
||||||
@ -141,7 +149,7 @@ message DaemonMessage {
|
|||||||
|
|
||||||
service BrainDaemon {
|
service BrainDaemon {
|
||||||
rpc RegisterNode (RegisterNodeReq) returns (stream Contract);
|
rpc RegisterNode (RegisterNodeReq) returns (stream Contract);
|
||||||
rpc BrainMessages (Pubkey) returns (stream BrainMessage);
|
rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage);
|
||||||
rpc DaemonMessages (stream DaemonMessage) returns (Empty);
|
rpc DaemonMessages (stream DaemonMessage) returns (Empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
108
src/grpc.rs
108
src/grpc.rs
@ -42,7 +42,7 @@ impl BrainDaemon for BrainDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<RegisterNodeReq>,
|
req: Request<RegisterNodeReq>,
|
||||||
) -> Result<Response<Self::RegisterNodeStream>, Status> {
|
) -> Result<Response<Self::RegisterNodeStream>, Status> {
|
||||||
let req = req.into_inner();
|
let req = check_sig_from_req(req)?;
|
||||||
info!("Starting registration process for {:?}", req);
|
info!("Starting registration process for {:?}", req);
|
||||||
let node = crate::data::Node {
|
let node = crate::data::Node {
|
||||||
public_key: req.node_pubkey.clone(),
|
public_key: req.node_pubkey.clone(),
|
||||||
@ -73,12 +73,19 @@ impl BrainDaemon for BrainDaemonMock {
|
|||||||
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessage, Status>> + Send>>;
|
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessage, Status>> + Send>>;
|
||||||
async fn brain_messages(
|
async fn brain_messages(
|
||||||
&self,
|
&self,
|
||||||
req: Request<Pubkey>,
|
req: Request<DaemonStreamAuth>,
|
||||||
) -> Result<Response<Self::BrainMessagesStream>, Status> {
|
) -> Result<Response<Self::BrainMessagesStream>, Status> {
|
||||||
let req = req.into_inner();
|
let auth = req.into_inner();
|
||||||
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
let pubkey = auth.pubkey.clone();
|
||||||
|
check_sig_from_parts(
|
||||||
|
&pubkey,
|
||||||
|
&auth.timestamp,
|
||||||
|
&format!("{:?}", auth.contracts),
|
||||||
|
&auth.signature,
|
||||||
|
)?;
|
||||||
|
info!("Daemon {} connected to receive brain messages", pubkey);
|
||||||
let (tx, rx) = mpsc::channel(6);
|
let (tx, rx) = mpsc::channel(6);
|
||||||
self.data.add_daemon_tx(&req.pubkey, tx);
|
self.data.add_daemon_tx(&pubkey, tx);
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||||
Ok(Response::new(
|
Ok(Response::new(
|
||||||
Box::pin(output_stream) as Self::BrainMessagesStream
|
Box::pin(output_stream) as Self::BrainMessagesStream
|
||||||
@ -90,14 +97,27 @@ impl BrainDaemon for BrainDaemonMock {
|
|||||||
req: Request<Streaming<DaemonMessage>>,
|
req: Request<Streaming<DaemonMessage>>,
|
||||||
) -> Result<Response<Empty>, Status> {
|
) -> Result<Response<Empty>, Status> {
|
||||||
let mut req_stream = req.into_inner();
|
let mut req_stream = req.into_inner();
|
||||||
let mut pubkey = String::new();
|
let pubkey: String;
|
||||||
|
if let Some(Ok(msg)) = req_stream.next().await {
|
||||||
|
if let Some(daemon_message::Msg::Auth(auth)) = msg.msg {
|
||||||
|
pubkey = auth.pubkey.clone();
|
||||||
|
check_sig_from_parts(
|
||||||
|
&pubkey,
|
||||||
|
&auth.timestamp,
|
||||||
|
&format!("{:?}", auth.contracts),
|
||||||
|
&auth.signature,
|
||||||
|
)?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::unauthenticated("Could not authenticate the daemon"));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(Status::unauthenticated("Could not authenticate the daemon"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// info!("Received a message from daemon {pubkey}: {daemon_message:?}");
|
||||||
while let Some(daemon_message) = req_stream.next().await {
|
while let Some(daemon_message) = req_stream.next().await {
|
||||||
info!("Received a message from daemon {pubkey}: {daemon_message:?}");
|
|
||||||
match daemon_message {
|
match daemon_message {
|
||||||
Ok(msg) => match msg.msg {
|
Ok(msg) => match msg.msg {
|
||||||
Some(daemon_message::Msg::Pubkey(p)) => {
|
|
||||||
pubkey = p.pubkey;
|
|
||||||
}
|
|
||||||
Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => {
|
Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => {
|
||||||
self.data.submit_newvm_resp(new_vm_resp).await;
|
self.data.submit_newvm_resp(new_vm_resp).await;
|
||||||
}
|
}
|
||||||
@ -107,7 +127,7 @@ impl BrainDaemon for BrainDaemonMock {
|
|||||||
Some(daemon_message::Msg::NodeResources(node_resources)) => {
|
Some(daemon_message::Msg::NodeResources(node_resources)) => {
|
||||||
self.data.submit_node_resources(node_resources);
|
self.data.submit_node_resources(node_resources);
|
||||||
}
|
}
|
||||||
None => {}
|
_ => {}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("Daemon disconnected: {e:?}");
|
log::warn!("Daemon disconnected: {e:?}");
|
||||||
@ -122,18 +142,18 @@ impl BrainDaemon for BrainDaemonMock {
|
|||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl BrainCli for BrainCliMock {
|
impl BrainCli for BrainCliMock {
|
||||||
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
|
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
Ok(Response::new(self.data.get_balance(&req.pubkey).into()))
|
Ok(Response::new(self.data.get_balance(&req.pubkey).into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_airdrop(&self, req: Request<Pubkey>) -> Result<Response<Empty>, Status> {
|
async fn get_airdrop(&self, req: Request<Pubkey>) -> Result<Response<Empty>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
self.data.get_airdrop(&req.pubkey);
|
self.data.get_airdrop(&req.pubkey);
|
||||||
Ok(Response::new(Empty {}))
|
Ok(Response::new(Empty {}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("New VM requested via CLI: {req:?}");
|
info!("New VM requested via CLI: {req:?}");
|
||||||
let admin_pubkey = req.admin_pubkey.clone();
|
let admin_pubkey = req.admin_pubkey.clone();
|
||||||
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
||||||
@ -153,7 +173,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
|
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("Update VM requested via CLI: {req:?}");
|
info!("Update VM requested via CLI: {req:?}");
|
||||||
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
||||||
self.data.submit_updatevm_req(req, oneshot_tx).await;
|
self.data.submit_updatevm_req(req, oneshot_tx).await;
|
||||||
@ -169,7 +189,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
|
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
match self
|
match self
|
||||||
.data
|
.data
|
||||||
.extend_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano)
|
.extend_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano)
|
||||||
@ -184,7 +204,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<ListContractsReq>,
|
req: Request<ListContractsReq>,
|
||||||
) -> Result<Response<Self::ListContractsStream>, Status> {
|
) -> Result<Response<Self::ListContractsStream>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("CLI {} requested ListVMContractsStream", req.admin_pubkey);
|
info!("CLI {} requested ListVMContractsStream", req.admin_pubkey);
|
||||||
let contracts = match req.uuid.is_empty() {
|
let contracts = match req.uuid.is_empty() {
|
||||||
false => match self.data.find_contract_by_uuid(&req.uuid) {
|
false => match self.data.find_contract_by_uuid(&req.uuid) {
|
||||||
@ -210,7 +230,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<NodeFilters>,
|
req: Request<NodeFilters>,
|
||||||
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
|
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("Unknown CLI requested ListNodesStream: {req:?}");
|
info!("Unknown CLI requested ListNodesStream: {req:?}");
|
||||||
let nodes = self.data.find_nodes_by_filters(&req);
|
let nodes = self.data.find_nodes_by_filters(&req);
|
||||||
let (tx, rx) = mpsc::channel(6);
|
let (tx, rx) = mpsc::channel(6);
|
||||||
@ -229,7 +249,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<NodeFilters>,
|
req: Request<NodeFilters>,
|
||||||
) -> Result<Response<NodeListResp>, Status> {
|
) -> Result<Response<NodeListResp>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("Unknown CLI requested ListNodesStream: {req:?}");
|
info!("Unknown CLI requested ListNodesStream: {req:?}");
|
||||||
match self.data.get_one_node_by_filters(&req) {
|
match self.data.get_one_node_by_filters(&req) {
|
||||||
Some(node) => Ok(Response::new(node.into())),
|
Some(node) => Ok(Response::new(node.into())),
|
||||||
@ -240,7 +260,7 @@ impl BrainCli for BrainCliMock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
|
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
|
||||||
let req = check_signature(req)?;
|
let req = check_sig_from_req(req)?;
|
||||||
info!("Unknown CLI requested to delete vm {}", req.uuid);
|
info!("Unknown CLI requested to delete vm {}", req.uuid);
|
||||||
match self.data.delete_vm(req).await {
|
match self.data.delete_vm(req).await {
|
||||||
Ok(()) => Ok(Response::new(Empty {})),
|
Ok(()) => Ok(Response::new(Empty {})),
|
||||||
@ -295,7 +315,13 @@ impl PubkeyGetter for NodeFilters {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_signature<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
|
impl PubkeyGetter for RegisterNodeReq {
|
||||||
|
fn get_pubkey(&self) -> Option<String> {
|
||||||
|
Some(self.node_pubkey.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
|
||||||
let time = match req.metadata().get("timestamp") {
|
let time = match req.metadata().get("timestamp") {
|
||||||
Some(t) => t.clone(),
|
Some(t) => t.clone(),
|
||||||
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
|
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
|
||||||
@ -357,3 +383,43 @@ fn check_signature<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result
|
|||||||
}
|
}
|
||||||
Ok(req)
|
Ok(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
|
||||||
|
let now = chrono::Utc::now();
|
||||||
|
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
|
||||||
|
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
|
||||||
|
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
|
||||||
|
if seconds_elapsed > 1 || seconds_elapsed < -1 {
|
||||||
|
return Err(Status::unauthenticated(format!(
|
||||||
|
"Date is not within 1 sec of the time of the server: CLI {} vs Server {}",
|
||||||
|
parsed_time, now
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let signature = bs58::decode(sig)
|
||||||
|
.into_vec()
|
||||||
|
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
|
||||||
|
let signature = ed25519_dalek::Signature::from_bytes(
|
||||||
|
signature
|
||||||
|
.as_slice()
|
||||||
|
.try_into()
|
||||||
|
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
|
||||||
|
);
|
||||||
|
|
||||||
|
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
|
||||||
|
&bs58::decode(&pubkey)
|
||||||
|
.into_vec()
|
||||||
|
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
|
||||||
|
.try_into()
|
||||||
|
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
|
||||||
|
)
|
||||||
|
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
|
||||||
|
|
||||||
|
let msg = time.to_string() + msg;
|
||||||
|
use ed25519_dalek::Verifier;
|
||||||
|
pubkey
|
||||||
|
.verify(msg.as_bytes(), &signature)
|
||||||
|
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user