Update occlum exec to support kill

This commit is contained in:
zongmin.gu 2020-05-31 18:51:19 +08:00 committed by zongmin.gzm
parent f4d9a7f4e1
commit 4e02db367e
4 changed files with 188 additions and 246 deletions

@ -8,22 +8,27 @@ service OcclumExec {
rpc StatusCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
// Client asks the server to execute the command
rpc ExecCommand(ExecComm) returns (ExecCommResponse) {}
rpc ExecCommand(ExecCommRequest) returns (ExecCommResponse) {}
// Client gets the return value
rpc GetResult(GetResultRequest) returns (GetResultResponse) {}
// Client sends heart beats to server
rpc HeartBeat(stream HealthCheckRequest) returns (stream HealthCheckResponse) {}
// Client stops the server
rpc StopServer(StopRequest) returns (StopResponse) {}
// Client send signal to server
rpc KillProcess(KillProcessRequest) returns (KillProcessResponse) {}
}
message GetResultRequest {
uint32 process_id = 1;
message KillProcessRequest {
int32 process_id = 1;
int32 signal = 2;
}
message KillProcessResponse {}
message GetResultRequest { int32 process_id = 1; }
message GetResultResponse {
enum ExecutionStatus {
UNKNOWN = 0;
@ -34,7 +39,7 @@ message GetResultResponse {
int32 result = 2;
}
message ExecComm {
message ExecCommRequest {
uint32 process_id = 1;
string sockpath = 2;
string command = 3;
@ -43,25 +48,24 @@ message ExecComm {
}
message ExecCommResponse {
uint32 process_id = 1;
enum ExecutionStatus {
RUNNING = 0;
LAUNCH_FAILED = 1;
}
ExecutionStatus status = 1;
int32 process_id = 2;
}
message HealthCheckRequest {
uint32 process_id = 1;
}
message HealthCheckRequest {}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVING = 0;
NOT_SERVING = 1;
}
ServingStatus status = 1;
}
message StopRequest {
uint32 time = 1;
}
message StopRequest { uint32 time = 1; }
message StopResponse {
}
message StopResponse {}

@ -10,12 +10,12 @@ extern crate log;
use clap::{App, Arg};
use futures::executor;
use futures::stream::StreamExt;
use grpc::prelude::*;
use grpc::ClientConf;
use occlum_exec::occlum_exec::{
ExecComm, GetResultRequest, GetResultResponse_ExecutionStatus, HealthCheckRequest,
HealthCheckResponse_ServingStatus, StopRequest,
ExecCommRequest, ExecCommResponse_ExecutionStatus, GetResultRequest,
GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse_ServingStatus,
KillProcessRequest, StopRequest,
};
use occlum_exec::occlum_exec_grpc::OcclumExecClient;
use occlum_exec::{
@ -23,6 +23,8 @@ use occlum_exec::{
};
use protobuf::RepeatedField;
use sendfd::SendWithFd;
use signal_hook::iterator::Signals;
use signal_hook::{SIGINT, SIGKILL, SIGQUIT, SIGTERM, SIGUSR1};
use std::cmp;
use std::env;
use std::os::unix::net::UnixListener;
@ -32,9 +34,6 @@ use std::sync::{Arc, Mutex};
use std::{thread, time};
use tempdir::TempDir;
use signal_hook::iterator::Signals;
use signal_hook::SIGUSR1;
/// Execute the command on server
///
/// # Examples
@ -50,7 +49,7 @@ fn exec_command(
command: &str,
parameters: &[&str],
envs: &[&str],
) -> Result<u32, String> {
) -> Result<i32, String> {
debug!("exec_command {:?} {:?} {:?}", command, parameters, envs);
let mut parameter_list = RepeatedField::default();
@ -89,7 +88,7 @@ fn exec_command(
client
.exec_command(
grpc::RequestOptions::new(),
ExecComm {
ExecCommRequest {
process_id: process::id(),
command: command.to_string(),
parameters: parameter_list,
@ -102,10 +101,15 @@ fn exec_command(
); // Drop response metadata
match resp {
Ok(resp) => {
Ok(resp) => match resp.status {
ExecCommResponse_ExecutionStatus::LAUNCH_FAILED => {
Err(String::from("failed to launch the process."))
}
ExecCommResponse_ExecutionStatus::RUNNING => {
sendfd_thread.join().unwrap();
Ok(resp.process_id)
}
},
Err(_) => Err(String::from("failed to send request.")),
}
}
@ -113,7 +117,6 @@ fn exec_command(
/// Starts the server if the server is not running
fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, String> {
let mut server_launched = false;
let mut server_connection_retry_time = 0;
loop {
let resp = executor::block_on(
@ -121,23 +124,17 @@ fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, Str
.status_check(
grpc::RequestOptions::new(),
HealthCheckRequest {
process_id: 0,
..Default::default()
},
)
.join_metadata_result(),
);
server_connection_retry_time += 1;
match resp {
Ok((_, resp, _)) => {
match resp.status {
HealthCheckResponse_ServingStatus::NOT_SERVING => {
return Err("no process".to_string())
if resp.status == HealthCheckResponse_ServingStatus::NOT_SERVING {
return Err("server is not running. It is not able to start.".to_string());
}
_ => {}
};
debug!("server is running.");
return Ok(0);
}
@ -157,11 +154,6 @@ fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, Str
}
};
} else {
if server_connection_retry_time < 100 {
//wait server 100 millis
thread::sleep(time::Duration::from_millis(100));
continue;
}
return Err("Failed to launch server".to_string());
}
}
@ -190,60 +182,8 @@ fn stop_server(client: &OcclumExecClient, time: u32) {
}
}
/// Sends heart beats to server. When server responses NOT_SERVING, the app exit with return value.
fn start_heart_beat(client: &OcclumExecClient, process_id: u32) {
let process_stopped = Arc::new(Mutex::new(false));
let c_process_stopped = process_stopped.clone();
let (mut req, resp) =
executor::block_on(client.heart_beat(grpc::RequestOptions::new())).unwrap();
thread::spawn(move || {
loop {
thread::sleep(time::Duration::from_millis(500));
match *c_process_stopped.lock().unwrap() {
true => {
//the application stopped
break;
}
false => {
executor::block_on(req.wait()).unwrap();
req.send_data(HealthCheckRequest {
process_id: process_id,
..Default::default()
})
.expect("send failed");
}
};
}
req.finish().expect("req finish failed");
});
let mut responses = resp.drop_metadata();
'a: loop {
while let Some(message) = executor::block_on(responses.next()) {
let status = match message {
Ok(m) => m.status,
Err(_e) => {
//stop the client for any issue
//Todo: How to report the crash issue?
HealthCheckResponse_ServingStatus::NOT_SERVING
}
};
if status != HealthCheckResponse_ServingStatus::SERVING {
//the application has stopped
*process_stopped.lock().unwrap() = true;
break 'a;
}
thread::sleep(time::Duration::from_millis(100));
}
}
}
//Gets the application return value
fn get_return_value(client: &OcclumExecClient, process_id: &u32) -> Result<i32, ()> {
fn get_return_value(client: &OcclumExecClient, process_id: &i32) -> Result<i32, ()> {
let resp = executor::block_on(
client
.get_result(
@ -267,6 +207,26 @@ fn get_return_value(client: &OcclumExecClient, process_id: &u32) -> Result<i32,
}
}
// Kill the process running in server
fn kill_process(client: &OcclumExecClient, process_id: &i32, signal: &i32) {
if executor::block_on(
client
.kill_process(
grpc::RequestOptions::new(),
KillProcessRequest {
process_id: *process_id,
signal: *signal,
..Default::default()
},
)
.join_metadata_result(),
)
.is_err()
{
debug!("send signal failed");
}
}
fn main() -> Result<(), i32> {
env_logger::init();
@ -303,7 +263,10 @@ fn main() -> Result<(), i32> {
.get_matches();
let args: Vec<String> = env::args().collect();
let env: Vec<String> = env::vars().into_iter().map(|(key, val)| format!("{}={}", key, val)).collect();
let env: Vec<String> = env::vars()
.into_iter()
.map(|(key, val)| format!("{}={}", key, val))
.collect();
let mut sock_file = String::from(args[0].as_str());
let sock_file = str::replace(
@ -325,12 +288,14 @@ fn main() -> Result<(), i32> {
);
if let Err(s) = start_server(&client, &server_name) {
debug!("start_server failed {}", s);
println!("start_server failed {}", s);
return Err(-1);
}
println!("server is running.");
} else if let Some(ref matches) = matches.subcommand_matches("stop") {
let stop_time = matches.value_of("time").unwrap().parse::<u32>().unwrap();
stop_server(&client, stop_time);
println!("server stopped.");
} else if let Some(ref matches) = matches.subcommand_matches("exec") {
let cmd_args: Vec<&str> = match matches
.values_of("args")
@ -344,25 +309,49 @@ fn main() -> Result<(), i32> {
let (cmd, args) = cmd_args.split_first().unwrap();
let env: Vec<&str> = env.iter().map(|string| string.as_str()).collect();
match exec_command(&client, cmd, args, &env) {
Ok(process_id) => {
let signals = Signals::new(&[SIGUSR1]).unwrap();
// Create the signal handler
let process_killed = Arc::new(Mutex::new(false));
let process_killed_clone = Arc::clone(&process_killed);
let signals = Signals::new(&[SIGUSR1, SIGINT, SIGQUIT, SIGTERM]).unwrap();
let signal_thread = thread::spawn(move || {
for sig in signals.forever() {
debug!("Received signal {:?}", sig);
for signal in signals.forever() {
debug!("Received signal {:?}", signal);
match signal {
SIGUSR1 => {
break;
}
SIGINT | SIGQUIT | SIGTERM => {
let mut process_killed = process_killed_clone.lock().unwrap();
*process_killed = true;
break;
}
_ => unreachable!(),
}
}
});
//Notifies the server, if client killed by KILL
start_heart_beat(&client, process_id.clone());
match exec_command(&client, cmd, args, &env) {
Ok(process_id) => {
// the signal thread exit if server finished execution or user kill the client
signal_thread.join().unwrap();
let result = get_return_value(&client, &process_id).unwrap();
// check the signal type:
// if client killed by user, send SIGTERM and SIGKILL to server
if *process_killed.lock().unwrap() {
// stop the process in server
kill_process(&client, &process_id, &SIGTERM);
kill_process(&client, &process_id, &SIGKILL);
return Err(-1);
} else {
if let Ok(result) = get_return_value(&client, &process_id) {
if result != 0 {
return Err(result);
}
} else {
debug!("get the return value failed");
return Err(-1);
}
}
}
Err(s) => {
debug!("execute command failed {}", s);

@ -36,7 +36,6 @@ fn check_server_status(sock_file: &str) -> bool {
.status_check(
grpc::RequestOptions::new(),
HealthCheckRequest {
process_id: 0,
..Default::default()
},
)

@ -2,19 +2,15 @@ extern crate chrono;
extern crate nix;
extern crate timer;
use crate::occlum_exec::{
ExecComm, ExecCommResponse, GetResultRequest, GetResultResponse,
GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse,
HealthCheckResponse_ServingStatus, StopRequest, StopResponse,
ExecCommRequest, ExecCommResponse, ExecCommResponse_ExecutionStatus, GetResultRequest,
GetResultResponse, GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse,
HealthCheckResponse_ServingStatus, KillProcessRequest, KillProcessResponse, StopRequest,
StopResponse,
};
use crate::occlum_exec_grpc::OcclumExec;
use futures::stream::StreamExt;
use grpc::Metadata;
use grpc::ServerHandlerContext;
use grpc::ServerRequest;
use grpc::ServerRequestSingle;
use grpc::ServerResponseSink;
use grpc::ServerResponseUnarySink;
use grpc::{ServerHandlerContext, ServerRequestSingle, ServerResponseUnarySink};
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
use sendfd::RecvWithFd;
use std::cmp;
use std::collections::HashMap;
@ -22,20 +18,15 @@ use std::ffi::CString;
use std::os::unix::io::RawFd;
use std::os::unix::net::UnixStream;
use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
use std::thread;
use timer::{Guard, Timer};
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
#[derive(Default)]
pub struct OcclumExecImpl {
//process_id, return value, execution status
commands: Arc<Mutex<HashMap<u32, (Option<i32>, bool)>>>,
commands: Arc<Mutex<HashMap<i32, (Option<i32>, bool)>>>,
execution_lock: Arc<(Mutex<bool>, Condvar)>,
stop_timer: Arc<Mutex<Option<(Timer, Guard)>>>,
process_id: Arc<Mutex<u32>>,
}
impl OcclumExecImpl {
@ -47,7 +38,6 @@ impl OcclumExecImpl {
commands: Default::default(),
execution_lock: lock,
stop_timer: Arc::new(Mutex::new(Some(timer))),
process_id: Arc::new(Mutex::new(1)),
}
}
}
@ -79,6 +69,22 @@ fn clear_stop_timer(old_timer: &Arc<Mutex<Option<(Timer, Guard)>>>) {
}
impl OcclumExec for OcclumExecImpl {
fn kill_process(
&self,
_o: ::grpc::ServerHandlerContext,
mut req: ::grpc::ServerRequestSingle<KillProcessRequest>,
resp: ::grpc::ServerResponseUnarySink<KillProcessResponse>,
) -> ::grpc::Result<()> {
let req = req.take_message();
if rust_occlum_pal_kill(req.process_id, req.signal).is_err() {
warn!("failed to send signal to process.");
}
resp.finish(KillProcessResponse {
..Default::default()
})
}
fn get_result(
&self,
_o: ServerHandlerContext,
@ -134,7 +140,7 @@ impl OcclumExec for OcclumExecImpl {
fn status_check(
&self,
_o: ServerHandlerContext,
mut req: ServerRequestSingle<HealthCheckRequest>,
_req: ServerRequestSingle<HealthCheckRequest>,
resp: ServerResponseUnarySink<HealthCheckResponse>,
) -> grpc::Result<()> {
//Reset the timer
@ -155,33 +161,16 @@ impl OcclumExec for OcclumExecImpl {
break;
}
//Get the process id from the request
let process_id = req.take_message().process_id;
match process_id {
0 => resp.finish(HealthCheckResponse::default()),
process_id => {
let commands = self.commands.clone();
let mut commands = commands.lock().unwrap();
match commands.get_mut(&process_id) {
Some(_) => resp.finish(HealthCheckResponse {
resp.finish(HealthCheckResponse {
status: HealthCheckResponse_ServingStatus::SERVING,
..Default::default()
}),
_ => resp.finish(HealthCheckResponse {
status: HealthCheckResponse_ServingStatus::NOT_SERVING,
..Default::default()
}),
}
}
}
})
}
fn exec_command(
&self,
_o: ServerHandlerContext,
mut req: ServerRequestSingle<ExecComm>,
mut req: ServerRequestSingle<ExecCommRequest>,
resp: ServerResponseUnarySink<ExecCommResponse>,
) -> grpc::Result<()> {
clear_stop_timer(&self.stop_timer.clone());
@ -215,29 +204,25 @@ impl OcclumExec for OcclumExecImpl {
}
};
let gpid = self.process_id.clone();
let mut gpid = gpid.lock().unwrap();
let process_id: u32 = *gpid;
*gpid += 1;
drop(gpid);
let _commands = self.commands.clone();
let _execution_lock = self.execution_lock.clone();
let _stop_timer = self.stop_timer.clone();
let mut commands = _commands.lock().unwrap();
commands.entry(process_id).or_insert((None, true));
drop(commands);
let cmd = req.command.clone();
let args = req.parameters.into_vec().clone();
let envs = req.enviroments.into_vec().clone();
let client_process_id = req.process_id;
if let Ok(process_id) = rust_occlum_pal_create_process(&cmd, &args, &envs, &stdio_fds) {
let mut commands = _commands.lock().unwrap();
commands.entry(process_id).or_insert((None, true));
drop(commands);
//Run the command in a thread
thread::spawn(move || {
let mut exit_status = Box::new(0);
rust_occlum_pal_exec(&cmd, &args, &envs, &stdio_fds, &mut exit_status)
rust_occlum_pal_exec(process_id, &mut exit_status)
.expect("failed to execute the command");
reset_stop_timer(_execution_lock, _stop_timer, crate::DEFAULT_SERVER_TIMER);
@ -255,56 +240,17 @@ impl OcclumExec for OcclumExecImpl {
});
resp.finish(ExecCommResponse {
status: ExecCommResponse_ExecutionStatus::RUNNING,
process_id: process_id,
..Default::default()
})
}
fn heart_beat(
&self,
o: ServerHandlerContext,
req: ServerRequest<HealthCheckRequest>,
mut resp: ServerResponseSink<HealthCheckResponse>,
) -> grpc::Result<()> {
let mut req = req.into_stream();
let commands = self.commands.clone();
o.spawn_poll_fn(move |cx| {
loop {
// Wait until resp is writable
if let Poll::Pending = resp.poll(cx)? {
return Poll::Pending;
}
match req.poll_next_unpin(cx)? {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Some(note)) => {
let process_id = note.process_id;
let commands = commands.lock().unwrap();
let process_status = match &commands.get(&process_id) {
None => HealthCheckResponse_ServingStatus::UNKNOWN,
Some(&(exit_status, _)) => match exit_status {
None => HealthCheckResponse_ServingStatus::SERVING,
Some(_) => HealthCheckResponse_ServingStatus::NOT_SERVING,
},
};
resp.send_data(HealthCheckResponse {
status: process_status,
} else {
resp.finish(ExecCommResponse {
status: ExecCommResponse_ExecutionStatus::LAUNCH_FAILED,
process_id: 0,
..Default::default()
})
.unwrap();
}
Poll::Ready(None) => {
resp.send_trailers(Metadata::new()).expect("send");
return Poll::Ready(Ok(()));
}
}
}
});
Ok(())
}
}
@ -371,7 +317,9 @@ extern "C" {
fn occlum_pal_kill(pid: i32, sig: i32) -> i32;
}
fn vec_strings_to_cchars(strings: &Vec<String>) -> Result<(Vec<*const libc::c_char>,Vec<CString>), i32> {
fn vec_strings_to_cchars(
strings: &Vec<String>,
) -> Result<(Vec<*const libc::c_char>, Vec<CString>), i32> {
let mut strings_content = Vec::<CString>::new();
let mut cchar_strings = Vec::<*const libc::c_char>::new();
for string in strings {
@ -385,13 +333,12 @@ fn vec_strings_to_cchars(strings: &Vec<String>) -> Result<(Vec<*const libc::c_ch
}
/// Executes the command inside Occlum enclave
fn rust_occlum_pal_exec(
fn rust_occlum_pal_create_process(
cmd: &str,
args: &Vec<String>,
envs: &Vec<String>,
stdio: &occlum_stdio_fds,
exit_status: &mut i32,
) -> Result<(), i32> {
) -> Result<i32, i32> {
let cmd_path = CString::new(cmd).expect("cmd_path: new failed");
let (cmd_args_array, _cmd_args) = vec_strings_to_cchars(args)?;
let (cmd_envs_array, _cmd_envs) = vec_strings_to_cchars(envs)?;
@ -407,12 +354,15 @@ fn rust_occlum_pal_exec(
});
let ret = unsafe { occlum_pal_create_process(Box::into_raw(create_process_args)) };
if ret != 0 {
return Err(ret);
match ret {
0 => Ok(libos_tid),
_ => Err(ret),
}
}
fn rust_occlum_pal_exec(occlum_process_id: i32, exit_status: &mut i32) -> Result<(), i32> {
let exec_args = Box::new(occlum_pal_exec_args {
pid: libos_tid,
pid: occlum_process_id,
exit_value: exit_status as *mut i32,
});
@ -429,11 +379,11 @@ fn rust_occlum_pal_exec(
const SIGKILL: i32 = 9;
const SIGTERM: i32 = 15;
fn rust_occlum_pal_kill(pid: i32, sig: i32) -> Result<i32, i32> {
fn rust_occlum_pal_kill(pid: i32, sig: i32) -> Result<(), i32> {
let ret = unsafe { occlum_pal_kill(pid, sig) };
if ret == 0 {
return Ok(0);
return Ok(());
} else {
return Err(ret);
}