diff --git a/src/exec/occlum_exec.proto b/src/exec/occlum_exec.proto index 79d15e67..f84e4175 100644 --- a/src/exec/occlum_exec.proto +++ b/src/exec/occlum_exec.proto @@ -8,22 +8,27 @@ service OcclumExec { rpc StatusCheck(HealthCheckRequest) returns (HealthCheckResponse) {} // Client asks the server to execute the command - rpc ExecCommand(ExecComm) returns (ExecCommResponse) {} + rpc ExecCommand(ExecCommRequest) returns (ExecCommResponse) {} // Client gets the return value rpc GetResult(GetResultRequest) returns (GetResultResponse) {} - // Client sends heart beats to server - rpc HeartBeat(stream HealthCheckRequest) returns (stream HealthCheckResponse) {} + // Client stops the server + rpc StopServer(StopRequest) returns (StopResponse) {} - //Client stops the server - rpc StopServer(StopRequest) returns (StopResponse){} + // Client send signal to server + rpc KillProcess(KillProcessRequest) returns (KillProcessResponse) {} } -message GetResultRequest { - uint32 process_id = 1; +message KillProcessRequest { + int32 process_id = 1; + int32 signal = 2; } +message KillProcessResponse {} + +message GetResultRequest { int32 process_id = 1; } + message GetResultResponse { enum ExecutionStatus { UNKNOWN = 0; @@ -34,7 +39,7 @@ message GetResultResponse { int32 result = 2; } -message ExecComm { +message ExecCommRequest { uint32 process_id = 1; string sockpath = 2; string command = 3; @@ -43,25 +48,24 @@ message ExecComm { } message ExecCommResponse { - uint32 process_id = 1; + enum ExecutionStatus { + RUNNING = 0; + LAUNCH_FAILED = 1; + } + ExecutionStatus status = 1; + int32 process_id = 2; } -message HealthCheckRequest { - uint32 process_id = 1; -} +message HealthCheckRequest {} message HealthCheckResponse { enum ServingStatus { - UNKNOWN = 0; - SERVING = 1; - NOT_SERVING = 2; + SERVING = 0; + NOT_SERVING = 1; } ServingStatus status = 1; } -message StopRequest { - uint32 time = 1; -} +message StopRequest { uint32 time = 1; } -message StopResponse { -} \ No newline at end of file +message StopResponse {} \ No newline at end of file diff --git a/src/exec/src/bin/occlum_exec_client.rs b/src/exec/src/bin/occlum_exec_client.rs index 19da3b5b..f402e832 100644 --- a/src/exec/src/bin/occlum_exec_client.rs +++ b/src/exec/src/bin/occlum_exec_client.rs @@ -10,12 +10,12 @@ extern crate log; use clap::{App, Arg}; use futures::executor; -use futures::stream::StreamExt; use grpc::prelude::*; use grpc::ClientConf; use occlum_exec::occlum_exec::{ - ExecComm, GetResultRequest, GetResultResponse_ExecutionStatus, HealthCheckRequest, - HealthCheckResponse_ServingStatus, StopRequest, + ExecCommRequest, ExecCommResponse_ExecutionStatus, GetResultRequest, + GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse_ServingStatus, + KillProcessRequest, StopRequest, }; use occlum_exec::occlum_exec_grpc::OcclumExecClient; use occlum_exec::{ @@ -23,6 +23,8 @@ use occlum_exec::{ }; use protobuf::RepeatedField; use sendfd::SendWithFd; +use signal_hook::iterator::Signals; +use signal_hook::{SIGINT, SIGKILL, SIGQUIT, SIGTERM, SIGUSR1}; use std::cmp; use std::env; use std::os::unix::net::UnixListener; @@ -32,9 +34,6 @@ use std::sync::{Arc, Mutex}; use std::{thread, time}; use tempdir::TempDir; -use signal_hook::iterator::Signals; -use signal_hook::SIGUSR1; - /// Execute the command on server /// /// # Examples @@ -50,7 +49,7 @@ fn exec_command( command: &str, parameters: &[&str], envs: &[&str], -) -> Result { +) -> Result { debug!("exec_command {:?} {:?} {:?}", command, parameters, envs); let mut parameter_list = RepeatedField::default(); @@ -89,7 +88,7 @@ fn exec_command( client .exec_command( grpc::RequestOptions::new(), - ExecComm { + ExecCommRequest { process_id: process::id(), command: command.to_string(), parameters: parameter_list, @@ -102,10 +101,15 @@ fn exec_command( ); // Drop response metadata match resp { - Ok(resp) => { - sendfd_thread.join().unwrap(); - Ok(resp.process_id) - } + Ok(resp) => match resp.status { + ExecCommResponse_ExecutionStatus::LAUNCH_FAILED => { + Err(String::from("failed to launch the process.")) + } + ExecCommResponse_ExecutionStatus::RUNNING => { + sendfd_thread.join().unwrap(); + Ok(resp.process_id) + } + }, Err(_) => Err(String::from("failed to send request.")), } } @@ -113,7 +117,6 @@ fn exec_command( /// Starts the server if the server is not running fn start_server(client: &OcclumExecClient, server_name: &str) -> Result { let mut server_launched = false; - let mut server_connection_retry_time = 0; loop { let resp = executor::block_on( @@ -121,23 +124,17 @@ fn start_server(client: &OcclumExecClient, server_name: &str) -> Result { - match resp.status { - HealthCheckResponse_ServingStatus::NOT_SERVING => { - return Err("no process".to_string()) - } - _ => {} - }; + if resp.status == HealthCheckResponse_ServingStatus::NOT_SERVING { + return Err("server is not running. It is not able to start.".to_string()); + } debug!("server is running."); return Ok(0); } @@ -157,11 +154,6 @@ fn start_server(client: &OcclumExecClient, server_name: &str) -> Result { - //the application stopped - break; - } - false => { - executor::block_on(req.wait()).unwrap(); - req.send_data(HealthCheckRequest { - process_id: process_id, - ..Default::default() - }) - .expect("send failed"); - } - }; - } - req.finish().expect("req finish failed"); - }); - - let mut responses = resp.drop_metadata(); - 'a: loop { - while let Some(message) = executor::block_on(responses.next()) { - let status = match message { - Ok(m) => m.status, - Err(_e) => { - //stop the client for any issue - //Todo: How to report the crash issue? - HealthCheckResponse_ServingStatus::NOT_SERVING - } - }; - - if status != HealthCheckResponse_ServingStatus::SERVING { - //the application has stopped - *process_stopped.lock().unwrap() = true; - break 'a; - } - - thread::sleep(time::Duration::from_millis(100)); - } - } -} - //Gets the application return value -fn get_return_value(client: &OcclumExecClient, process_id: &u32) -> Result { +fn get_return_value(client: &OcclumExecClient, process_id: &i32) -> Result { let resp = executor::block_on( client .get_result( @@ -267,6 +207,26 @@ fn get_return_value(client: &OcclumExecClient, process_id: &u32) -> Result Result<(), i32> { env_logger::init(); @@ -303,7 +263,10 @@ fn main() -> Result<(), i32> { .get_matches(); let args: Vec = env::args().collect(); - let env: Vec = env::vars().into_iter().map(|(key, val)| format!("{}={}", key, val)).collect(); + let env: Vec = env::vars() + .into_iter() + .map(|(key, val)| format!("{}={}", key, val)) + .collect(); let mut sock_file = String::from(args[0].as_str()); let sock_file = str::replace( @@ -325,12 +288,14 @@ fn main() -> Result<(), i32> { ); if let Err(s) = start_server(&client, &server_name) { - debug!("start_server failed {}", s); + println!("start_server failed {}", s); return Err(-1); } + println!("server is running."); } else if let Some(ref matches) = matches.subcommand_matches("stop") { let stop_time = matches.value_of("time").unwrap().parse::().unwrap(); stop_server(&client, stop_time); + println!("server stopped."); } else if let Some(ref matches) = matches.subcommand_matches("exec") { let cmd_args: Vec<&str> = match matches .values_of("args") @@ -344,24 +309,48 @@ fn main() -> Result<(), i32> { let (cmd, args) = cmd_args.split_first().unwrap(); let env: Vec<&str> = env.iter().map(|string| string.as_str()).collect(); - match exec_command(&client, cmd, args, &env) { - Ok(process_id) => { - let signals = Signals::new(&[SIGUSR1]).unwrap(); - let signal_thread = thread::spawn(move || { - for sig in signals.forever() { - debug!("Received signal {:?}", sig); + // Create the signal handler + let process_killed = Arc::new(Mutex::new(false)); + let process_killed_clone = Arc::clone(&process_killed); + let signals = Signals::new(&[SIGUSR1, SIGINT, SIGQUIT, SIGTERM]).unwrap(); + let signal_thread = thread::spawn(move || { + for signal in signals.forever() { + debug!("Received signal {:?}", signal); + match signal { + SIGUSR1 => { break; } - }); - - //Notifies the server, if client killed by KILL - start_heart_beat(&client, process_id.clone()); + SIGINT | SIGQUIT | SIGTERM => { + let mut process_killed = process_killed_clone.lock().unwrap(); + *process_killed = true; + break; + } + _ => unreachable!(), + } + } + }); + match exec_command(&client, cmd, args, &env) { + Ok(process_id) => { + // the signal thread exit if server finished execution or user kill the client signal_thread.join().unwrap(); - let result = get_return_value(&client, &process_id).unwrap(); - if result != 0 { - return Err(result); + // check the signal type: + // if client killed by user, send SIGTERM and SIGKILL to server + if *process_killed.lock().unwrap() { + // stop the process in server + kill_process(&client, &process_id, &SIGTERM); + kill_process(&client, &process_id, &SIGKILL); + return Err(-1); + } else { + if let Ok(result) = get_return_value(&client, &process_id) { + if result != 0 { + return Err(result); + } + } else { + debug!("get the return value failed"); + return Err(-1); + } } } Err(s) => { diff --git a/src/exec/src/bin/occlum_exec_server.rs b/src/exec/src/bin/occlum_exec_server.rs index 1a2a51d3..47dc7425 100644 --- a/src/exec/src/bin/occlum_exec_server.rs +++ b/src/exec/src/bin/occlum_exec_server.rs @@ -36,7 +36,6 @@ fn check_server_status(sock_file: &str) -> bool { .status_check( grpc::RequestOptions::new(), HealthCheckRequest { - process_id: 0, ..Default::default() }, ) diff --git a/src/exec/src/server.rs b/src/exec/src/server.rs index 03bccff9..3036105d 100644 --- a/src/exec/src/server.rs +++ b/src/exec/src/server.rs @@ -2,19 +2,15 @@ extern crate chrono; extern crate nix; extern crate timer; use crate::occlum_exec::{ - ExecComm, ExecCommResponse, GetResultRequest, GetResultResponse, - GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse, - HealthCheckResponse_ServingStatus, StopRequest, StopResponse, + ExecCommRequest, ExecCommResponse, ExecCommResponse_ExecutionStatus, GetResultRequest, + GetResultResponse, GetResultResponse_ExecutionStatus, HealthCheckRequest, HealthCheckResponse, + HealthCheckResponse_ServingStatus, KillProcessRequest, KillProcessResponse, StopRequest, + StopResponse, }; use crate::occlum_exec_grpc::OcclumExec; - -use futures::stream::StreamExt; -use grpc::Metadata; -use grpc::ServerHandlerContext; -use grpc::ServerRequest; -use grpc::ServerRequestSingle; -use grpc::ServerResponseSink; -use grpc::ServerResponseUnarySink; +use grpc::{ServerHandlerContext, ServerRequestSingle, ServerResponseUnarySink}; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; use sendfd::RecvWithFd; use std::cmp; use std::collections::HashMap; @@ -22,20 +18,15 @@ use std::ffi::CString; use std::os::unix::io::RawFd; use std::os::unix::net::UnixStream; use std::sync::{Arc, Condvar, Mutex}; -use std::task::Poll; use std::thread; use timer::{Guard, Timer}; -use nix::sys::signal::{self, Signal}; -use nix::unistd::Pid; - #[derive(Default)] pub struct OcclumExecImpl { //process_id, return value, execution status - commands: Arc, bool)>>>, + commands: Arc, bool)>>>, execution_lock: Arc<(Mutex, Condvar)>, stop_timer: Arc>>, - process_id: Arc>, } impl OcclumExecImpl { @@ -47,7 +38,6 @@ impl OcclumExecImpl { commands: Default::default(), execution_lock: lock, stop_timer: Arc::new(Mutex::new(Some(timer))), - process_id: Arc::new(Mutex::new(1)), } } } @@ -60,7 +50,7 @@ fn reset_stop_timer( //New a timer to stop the server let timer = timer::Timer::new(); let guard = timer.schedule_with_delay(chrono::Duration::seconds(time as i64), move || { - if rust_occlum_pal_kill(-1, SIGKILL).is_err(){ + if rust_occlum_pal_kill(-1, SIGKILL).is_err() { warn!("SIGKILL failed.") } let (execution_lock, cvar) = &*lock; @@ -79,6 +69,22 @@ fn clear_stop_timer(old_timer: &Arc>>) { } impl OcclumExec for OcclumExecImpl { + fn kill_process( + &self, + _o: ::grpc::ServerHandlerContext, + mut req: ::grpc::ServerRequestSingle, + resp: ::grpc::ServerResponseUnarySink, + ) -> ::grpc::Result<()> { + let req = req.take_message(); + if rust_occlum_pal_kill(req.process_id, req.signal).is_err() { + warn!("failed to send signal to process."); + } + + resp.finish(KillProcessResponse { + ..Default::default() + }) + } + fn get_result( &self, _o: ServerHandlerContext, @@ -123,7 +129,7 @@ impl OcclumExec for OcclumExecImpl { mut req: ServerRequestSingle, resp: ServerResponseUnarySink, ) -> grpc::Result<()> { - if rust_occlum_pal_kill(-1, SIGTERM).is_err(){ + if rust_occlum_pal_kill(-1, SIGTERM).is_err() { warn!("SIGTERM failed."); } let time = cmp::min(req.take_message().time, crate::DEFAULT_SERVER_TIMER); @@ -134,7 +140,7 @@ impl OcclumExec for OcclumExecImpl { fn status_check( &self, _o: ServerHandlerContext, - mut req: ServerRequestSingle, + _req: ServerRequestSingle, resp: ServerResponseUnarySink, ) -> grpc::Result<()> { //Reset the timer @@ -155,33 +161,16 @@ impl OcclumExec for OcclumExecImpl { break; } - //Get the process id from the request - let process_id = req.take_message().process_id; - - match process_id { - 0 => resp.finish(HealthCheckResponse::default()), - process_id => { - let commands = self.commands.clone(); - let mut commands = commands.lock().unwrap(); - - match commands.get_mut(&process_id) { - Some(_) => resp.finish(HealthCheckResponse { - status: HealthCheckResponse_ServingStatus::SERVING, - ..Default::default() - }), - _ => resp.finish(HealthCheckResponse { - status: HealthCheckResponse_ServingStatus::NOT_SERVING, - ..Default::default() - }), - } - } - } + resp.finish(HealthCheckResponse { + status: HealthCheckResponse_ServingStatus::SERVING, + ..Default::default() + }) } fn exec_command( &self, _o: ServerHandlerContext, - mut req: ServerRequestSingle, + mut req: ServerRequestSingle, resp: ServerResponseUnarySink, ) -> grpc::Result<()> { clear_stop_timer(&self.stop_timer.clone()); @@ -215,96 +204,53 @@ impl OcclumExec for OcclumExecImpl { } }; - let gpid = self.process_id.clone(); - let mut gpid = gpid.lock().unwrap(); - let process_id: u32 = *gpid; - *gpid += 1; - drop(gpid); - let _commands = self.commands.clone(); let _execution_lock = self.execution_lock.clone(); let _stop_timer = self.stop_timer.clone(); - let mut commands = _commands.lock().unwrap(); - commands.entry(process_id).or_insert((None, true)); - drop(commands); - let cmd = req.command.clone(); let args = req.parameters.into_vec().clone(); let envs = req.enviroments.into_vec().clone(); let client_process_id = req.process_id; - //Run the command in a thread - thread::spawn(move || { - let mut exit_status = Box::new(0); - rust_occlum_pal_exec(&cmd, &args, &envs, &stdio_fds, &mut exit_status) - .expect("failed to execute the command"); - - reset_stop_timer(_execution_lock, _stop_timer, crate::DEFAULT_SERVER_TIMER); + if let Ok(process_id) = rust_occlum_pal_create_process(&cmd, &args, &envs, &stdio_fds) { let mut commands = _commands.lock().unwrap(); - *commands.get_mut(&process_id).expect("get process") = (Some(*exit_status), false); + commands.entry(process_id).or_insert((None, true)); + drop(commands); - //Notifies the client to application stopped - debug!( - "process:{} finished, send signal to {}", - process_id, client_process_id - ); + //Run the command in a thread + thread::spawn(move || { + let mut exit_status = Box::new(0); - //TODO: fix me if the client has been killed - signal::kill(Pid::from_raw(client_process_id as i32), Signal::SIGUSR1).unwrap(); - }); + rust_occlum_pal_exec(process_id, &mut exit_status) + .expect("failed to execute the command"); - resp.finish(ExecCommResponse { - process_id: process_id, - ..Default::default() - }) - } + reset_stop_timer(_execution_lock, _stop_timer, crate::DEFAULT_SERVER_TIMER); + let mut commands = _commands.lock().unwrap(); + *commands.get_mut(&process_id).expect("get process") = (Some(*exit_status), false); - fn heart_beat( - &self, - o: ServerHandlerContext, - req: ServerRequest, - mut resp: ServerResponseSink, - ) -> grpc::Result<()> { - let mut req = req.into_stream(); - let commands = self.commands.clone(); + //Notifies the client to application stopped + debug!( + "process:{} finished, send signal to {}", + process_id, client_process_id + ); - o.spawn_poll_fn(move |cx| { - loop { - // Wait until resp is writable - if let Poll::Pending = resp.poll(cx)? { - return Poll::Pending; - } + //TODO: fix me if the client has been killed + signal::kill(Pid::from_raw(client_process_id as i32), Signal::SIGUSR1).unwrap(); + }); - match req.poll_next_unpin(cx)? { - Poll::Pending => { - return Poll::Pending; - } - Poll::Ready(Some(note)) => { - let process_id = note.process_id; - let commands = commands.lock().unwrap(); - let process_status = match &commands.get(&process_id) { - None => HealthCheckResponse_ServingStatus::UNKNOWN, - Some(&(exit_status, _)) => match exit_status { - None => HealthCheckResponse_ServingStatus::SERVING, - Some(_) => HealthCheckResponse_ServingStatus::NOT_SERVING, - }, - }; - - resp.send_data(HealthCheckResponse { - status: process_status, - ..Default::default() - }) - .unwrap(); - } - Poll::Ready(None) => { - resp.send_trailers(Metadata::new()).expect("send"); - return Poll::Ready(Ok(())); - } - } - } - }); - Ok(()) + resp.finish(ExecCommResponse { + status: ExecCommResponse_ExecutionStatus::RUNNING, + process_id: process_id, + ..Default::default() + }) + } else { + resp.finish(ExecCommResponse { + status: ExecCommResponse_ExecutionStatus::LAUNCH_FAILED, + process_id: 0, + ..Default::default() + }) + } } } @@ -371,7 +317,9 @@ extern "C" { fn occlum_pal_kill(pid: i32, sig: i32) -> i32; } -fn vec_strings_to_cchars(strings: &Vec) -> Result<(Vec<*const libc::c_char>,Vec), i32> { +fn vec_strings_to_cchars( + strings: &Vec, +) -> Result<(Vec<*const libc::c_char>, Vec), i32> { let mut strings_content = Vec::::new(); let mut cchar_strings = Vec::<*const libc::c_char>::new(); for string in strings { @@ -385,13 +333,12 @@ fn vec_strings_to_cchars(strings: &Vec) -> Result<(Vec<*const libc::c_ch } /// Executes the command inside Occlum enclave -fn rust_occlum_pal_exec( +fn rust_occlum_pal_create_process( cmd: &str, args: &Vec, envs: &Vec, stdio: &occlum_stdio_fds, - exit_status: &mut i32, -) -> Result<(), i32> { +) -> Result { let cmd_path = CString::new(cmd).expect("cmd_path: new failed"); let (cmd_args_array, _cmd_args) = vec_strings_to_cchars(args)?; let (cmd_envs_array, _cmd_envs) = vec_strings_to_cchars(envs)?; @@ -406,17 +353,20 @@ fn rust_occlum_pal_exec( pid: &mut libos_tid as *mut i32, }); - let ret = unsafe{occlum_pal_create_process(Box::into_raw(create_process_args))}; - if ret != 0 { - return Err(ret); + let ret = unsafe { occlum_pal_create_process(Box::into_raw(create_process_args)) }; + match ret { + 0 => Ok(libos_tid), + _ => Err(ret), } +} +fn rust_occlum_pal_exec(occlum_process_id: i32, exit_status: &mut i32) -> Result<(), i32> { let exec_args = Box::new(occlum_pal_exec_args { - pid: libos_tid, + pid: occlum_process_id, exit_value: exit_status as *mut i32, }); - let ret = unsafe {occlum_pal_exec(Box::into_raw(exec_args))}; + let ret = unsafe { occlum_pal_exec(Box::into_raw(exec_args)) }; match ret { 0 => Ok(()), @@ -424,16 +374,16 @@ fn rust_occlum_pal_exec( } } -/// Send a signal to one or multiple LibOS processes -// only support SIGKILL and SIGTERM +/// Send a signal to one or multiple LibOS processes +// only support SIGKILL and SIGTERM const SIGKILL: i32 = 9; const SIGTERM: i32 = 15; -fn rust_occlum_pal_kill(pid: i32, sig: i32) -> Result { +fn rust_occlum_pal_kill(pid: i32, sig: i32) -> Result<(), i32> { let ret = unsafe { occlum_pal_kill(pid, sig) }; if ret == 0 { - return Ok(0); + return Ok(()); } else { return Err(ret); }