Refactor exec server status to handle init failure

This commit is contained in:
Hui, Chunyang 2022-06-22 10:46:03 +00:00 committed by volcano
parent 5d75584e32
commit 04e00ddbc5
3 changed files with 83 additions and 25 deletions

@ -116,6 +116,7 @@ fn exec_command(
/// Starts the server if the server is not running /// Starts the server if the server is not running
fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, String> { fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, String> {
let mut server_launched = false; let mut server_launched = false;
let mut child = None;
loop { loop {
let resp = executor::block_on( let resp = executor::block_on(
@ -149,15 +150,18 @@ fn start_server(client: &OcclumExecClient, server_name: &str) -> Result<u32, Str
Err(_r) => { Err(_r) => {
return Err("Failed to launch server".to_string()); return Err("Failed to launch server".to_string());
} }
Ok(_r) => { Ok(ret_child) => {
server_launched = true; server_launched = true;
child = Some(ret_child);
//wait server 10 millis //wait server 10 millis
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
continue; continue;
} }
}; };
} else { } else {
if let Some(mut child) = child {
let _ = child.wait();
}
return Err("Failed to launch server".to_string()); return Err("Failed to launch server".to_string());
} }
} }

@ -11,7 +11,7 @@ use grpc::prelude::*;
use grpc::ClientConf; use grpc::ClientConf;
use occlum_exec::occlum_exec::HealthCheckRequest; use occlum_exec::occlum_exec::HealthCheckRequest;
use occlum_exec::occlum_exec_grpc::{OcclumExecClient, OcclumExecServer}; use occlum_exec::occlum_exec_grpc::{OcclumExecClient, OcclumExecServer};
use occlum_exec::server::OcclumExecImpl; use occlum_exec::server::{OcclumExecImpl, ServerStatus};
use occlum_exec::DEFAULT_SOCK_FILE; use occlum_exec::DEFAULT_SOCK_FILE;
use std::env; use std::env;
use std::ffi::{CStr, OsString}; use std::ffi::{CStr, OsString};
@ -20,7 +20,7 @@ use std::path::Path;
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
//Checks the server status, if the server is running return true, else recover the socket file and return false. //Checks the server status, if the server is running return true, else recover the socket file and return false.
fn check_server_status(sock_file: &str) -> bool { fn is_server_running(sock_file: &str) -> bool {
if let Err(e) = std::fs::File::open(sock_file) { if let Err(e) = std::fs::File::open(sock_file) {
debug!("failed to open the sock_file {:?}", e); debug!("failed to open the sock_file {:?}", e);
@ -53,7 +53,7 @@ fn check_server_status(sock_file: &str) -> bool {
} }
} }
fn main() { fn main() -> Result<(), i32> {
let matches = App::new("Occlum_server") let matches = App::new("Occlum_server")
.version("0.1.0") .version("0.1.0")
.arg( .arg(
@ -71,15 +71,15 @@ fn main() {
assert!(env::set_current_dir(&instance_dir).is_ok()); assert!(env::set_current_dir(&instance_dir).is_ok());
//If the server already startted, then return //If the server already startted, then return
if check_server_status(DEFAULT_SOCK_FILE) { if is_server_running(DEFAULT_SOCK_FILE) {
println!("server stared"); println!("server stared");
return; return Ok(());
} }
let server_stopped = Arc::new((Mutex::new(true), Condvar::new())); let server_status = Arc::new((Mutex::new(ServerStatus::default()), Condvar::new()));
let service_def = OcclumExecServer::new_service_def( let service_def = OcclumExecServer::new_service_def(
OcclumExecImpl::new_and_save_execution_lock(server_stopped.clone()), OcclumExecImpl::new_and_save_execution_lock(server_status.clone()),
); );
let mut server_builder = grpc::ServerBuilder::new_plain(); let mut server_builder = grpc::ServerBuilder::new_plain();
server_builder.add_service(service_def); server_builder.add_service(service_def);
@ -87,23 +87,33 @@ fn main() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
debug!("{:?}", e); debug!("{:?}", e);
return; return Err(-1);
} }
}; };
if let Ok(server) = server_builder.build() { if let Ok(server) = server_builder.build() {
rust_occlum_pal_init().expect("Occlum image initialization failed"); if let Err(_) = rust_occlum_pal_init() {
let (status, _) = &*server_status;
status.lock().unwrap().set_error();
return Err(-1);
}
//server is running //server is running
println!("server stared on addr {}", server.local_addr()); println!("server stared on addr {}", server.local_addr());
let (lock, cvar) = &*server_stopped; let (lock, cvar) = &*server_status;
let mut server_stopped = lock.lock().unwrap(); let mut status = lock.lock().unwrap();
*server_stopped = false; // *server_stopped = false;
while !*server_stopped { status.set_running();
server_stopped = cvar.wait(server_stopped).unwrap(); while status.is_running() {
status = cvar.wait(status).unwrap();
} }
rust_occlum_pal_destroy().expect("Destory occlum image failed"); rust_occlum_pal_destroy()?;
println!("server stopped"); println!("server stopped");
} else {
println!("server build failed");
return Err(-1);
} }
Ok(())
} }
extern "C" { extern "C" {

@ -22,16 +22,56 @@ use std::sync::{Arc, Condvar, Mutex};
use std::thread; use std::thread;
use timer::{Guard, Timer}; use timer::{Guard, Timer};
pub enum ServerStatus {
Stopped,
Running,
Error,
}
impl Default for ServerStatus {
fn default() -> Self {
Self::Stopped
}
}
impl ServerStatus {
pub fn set_error(&mut self) {
*self = Self::Error
}
pub fn set_running(&mut self) {
*self = Self::Running
}
fn set_stopped(&mut self) {
*self = Self::Stopped
}
pub fn is_running(&self) -> bool {
matches!(self, Self::Running)
}
fn is_error(&self) -> bool {
matches!(self, Self::Error)
}
fn is_stopped(&self) -> bool {
matches!(self, Self::Stopped)
}
}
#[derive(Default)] #[derive(Default)]
pub struct OcclumExecImpl { pub struct OcclumExecImpl {
//process_id, return value, execution status //process_id, return value, execution status
commands: Arc<Mutex<HashMap<i32, (Option<i32>, bool)>>>, commands: Arc<Mutex<HashMap<i32, (Option<i32>, bool)>>>,
execution_lock: Arc<(Mutex<bool>, Condvar)>, execution_lock: Arc<(Mutex<ServerStatus>, Condvar)>,
stop_timer: Arc<Mutex<Option<(Timer, Guard)>>>, stop_timer: Arc<Mutex<Option<(Timer, Guard)>>>,
} }
impl OcclumExecImpl { impl OcclumExecImpl {
pub fn new_and_save_execution_lock(lock: Arc<(Mutex<bool>, Condvar)>) -> OcclumExecImpl { pub fn new_and_save_execution_lock(
lock: Arc<(Mutex<ServerStatus>, Condvar)>,
) -> OcclumExecImpl {
OcclumExecImpl { OcclumExecImpl {
commands: Default::default(), commands: Default::default(),
execution_lock: lock, execution_lock: lock,
@ -111,8 +151,7 @@ impl OcclumExec for OcclumExecImpl {
warn!("SIGKILL failed.") warn!("SIGKILL failed.")
} }
let (execution_lock, cvar) = &*lock; let (execution_lock, cvar) = &*lock;
let mut server_stopped = execution_lock.lock().unwrap(); execution_lock.lock().unwrap().set_stopped();
*server_stopped = true;
cvar.notify_one(); cvar.notify_one();
}); });
@ -132,13 +171,18 @@ impl OcclumExec for OcclumExecImpl {
*self.stop_timer.lock().unwrap() = None; *self.stop_timer.lock().unwrap() = None;
//Waits for the Occlum loaded //Waits for the Occlum loaded
let (lock, _) = &*self.execution_lock.clone(); let (status, _) = &*self.execution_lock.clone();
loop { loop {
let server_stopped = lock.lock().unwrap(); let server_status = status.lock().unwrap();
if *server_stopped { if server_status.is_stopped() {
drop(server_stopped); drop(server_status);
continue; continue;
} }
if server_status.is_error() {
return Err(grpc::Error::Other("server error"));
}
break; break;
} }