Add Waiter and WaitQueue

This commit is contained in:
Tate, Hongliang Tian 2019-01-04 16:24:28 +08:00
parent 372649f3d6
commit d960792ef3
5 changed files with 307 additions and 42 deletions

@ -0,0 +1,129 @@
use super::{*};
// TODO: make sure Processes are released eventually
#[derive(Clone, Copy, Debug)]
pub enum ChildProcessFilter {
WithAnyPID,
WithPID(pid_t),
WithPGID(pid_t),
}
unsafe impl Send for ChildProcessFilter {}
pub fn do_exit(exit_status: i32) {
let current_ref = get_current();
let mut current = current_ref.lock().unwrap();
// Update current
current.exit_status = exit_status;
current.status = Status::ZOMBIE;
// Update children
for child_ref in &current.children {
let mut child = child_ref.lock().unwrap();
child.parent = Some(IDLE_PROCESS.clone());
}
current.children.clear();
// Notify parent if necessary
let parent_ref = current.get_parent().clone();
let (mut parent, current) = {
// Always lock parent before its child
drop(current);
lock_two_in_order(&parent_ref, &current_ref)
};
// Wake up the parent if it is waiting on this child
if parent.waiting_children.is_none() { return; }
let mut wait_queue = parent.waiting_children.as_mut().unwrap();
wait_queue.del_and_wake_one_waiter(|waiter_data| -> Option<pid_t> {
match waiter_data {
ChildProcessFilter::WithAnyPID => {
},
ChildProcessFilter::WithPID(required_pid) => {
if current.get_pid() != *required_pid {
return None;
}
},
ChildProcessFilter::WithPGID(required_pgid) => {
if current.get_pgid() != *required_pgid {
return None;
}
},
}
Some(current.get_pid())
});
}
pub fn do_wait4(child_filter: &ChildProcessFilter, exit_status: &mut i32)
-> Result<pid_t, Error>
{
let waiter = {
let current_ref = get_current();
let mut current = current_ref.lock().unwrap();
let mut any_child_to_wait_for = false;
for child_ref in current.get_children() {
let child = child_ref.lock().unwrap();
let may_wait_for = match child_filter {
ChildProcessFilter::WithAnyPID => {
true
},
ChildProcessFilter::WithPID(required_pid) => {
child.get_pid() == *required_pid
},
ChildProcessFilter::WithPGID(required_pgid) => {
child.get_pgid() == *required_pgid
}
};
if !may_wait_for { continue; }
// Return immediately as a child that we wait for has alreay exited
if child.status == Status::ZOMBIE {
return Ok(child.pid);
}
any_child_to_wait_for = true;
}
if !any_child_to_wait_for { return errno!(ECHILD, "No such child"); }
let waiter = Waiter::new(child_filter);
let mut wait_queue = WaitQueue::new();
wait_queue.add_waiter(&waiter);
current.waiting_children = Some(wait_queue);
waiter
};
let child_pid = waiter.wait_on();
if child_pid == 0 { panic!("THIS SHOULD NEVER HAPPEN!"); }
{
let current_ref = get_current();
let mut current = current_ref.lock().unwrap();
current.waiting_children = None;
}
let child_ref = process_table::get(child_pid).unwrap();
let child = {
let child = child_ref.lock().unwrap();
if child.get_status() != Status::ZOMBIE {
panic!("THIS SHOULD NEVER HAPPEN!");
}
child
};
*exit_status = child.get_exit_status();
process_table::remove(child_pid);
Ok(child_pid)
}
fn lock_two_in_order<'a>(first_ref: &'a ProcessRef, second_ref: &'a ProcessRef) ->
(SgxMutexGuard<'a, Process>, SgxMutexGuard<'a, Process>)
{
(first_ref.lock().unwrap(), second_ref.lock().unwrap())
}

@ -4,7 +4,8 @@ pub mod table {
pub use super::process_table::{get}; pub use super::process_table::{get};
} }
pub use self::spawn::{do_spawn}; pub use self::spawn::{do_spawn};
pub use self::exit::{do_exit, do_wait4, ChildProcessFilter};
pub use self::wait::{Waiter, WaitQueue};
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub type pid_t = u32; pub type pid_t = u32;
@ -14,11 +15,13 @@ pub struct Process {
task: Task, task: Task,
status: Status, status: Status,
pid: pid_t, pid: pid_t,
pgid: pid_t,
tgid: pid_t, tgid: pid_t,
exit_status: i32, exit_status: i32,
exec_path: String, exec_path: String,
parent: Option<ProcessRef>, parent: Option<ProcessRef>,
children: Vec<ProcessRef>, children: Vec<ProcessRef>,
waiting_children: Option<WaitQueue<ChildProcessFilter, pid_t>>,
vm: ProcessVM, vm: ProcessVM,
file_table: FileTable, file_table: FileTable,
} }
@ -32,52 +35,28 @@ pub fn do_getpid() -> pid_t {
current.get_pid() current.get_pid()
} }
pub fn do_getppid() -> pid_t { pub fn do_getgpid() -> pid_t {
let current_ref = get_current(); let current_ref = get_current();
let current = current_ref.lock().unwrap(); let current = current_ref.lock().unwrap();
let parent_ref = current.get_parent(); current.get_pgid()
}
pub fn do_getppid() -> pid_t {
let parent_ref = {
let current_ref = get_current();
let current = current_ref.lock().unwrap();
current.get_parent().clone()
};
let parent = parent_ref.lock().unwrap(); let parent = parent_ref.lock().unwrap();
parent.get_pid() parent.get_pid()
} }
pub fn do_exit(exit_status: i32) {
let current_ref = get_current();
let mut current = current_ref.lock().unwrap();
current.exit_status = exit_status;
current.status = Status::ZOMBIE;
for child_ref in &current.children {
let mut child = child_ref.lock().unwrap();
child.parent = Some(IDLE_PROCESS.clone());
}
current.children.clear();
}
pub fn do_wait4(child_pid: u32) -> Result<i32, Error> {
let child_process = process_table::get(child_pid)
.ok_or_else(|| (Errno::ECHILD, "Cannot find child process with the given PID"))?;
let mut exit_status = 0;
loop {
let guard = child_process.lock().unwrap();
if guard.get_status() == Status::ZOMBIE {
exit_status = guard.get_exit_status();
break;
}
drop(guard);
}
let child_pid = child_process.lock().unwrap().get_pid();
process_table::remove(child_pid);
Ok(exit_status)
}
mod task; mod task;
mod process; mod process;
mod process_table; mod process_table;
mod spawn; mod spawn;
mod wait;
mod exit;
use prelude::*; use prelude::*;
use vm::{ProcessVM, VMRangeTrait}; use vm::{ProcessVM, VMRangeTrait};

@ -10,11 +10,13 @@ lazy_static! {
task: Default::default(), task: Default::default(),
status: Default::default(), status: Default::default(),
pid: 0, pid: 0,
pgid: 0,
tgid: 0, tgid: 0,
exit_status: 0, exit_status: 0,
exec_path: "".to_owned(), exec_path: "".to_owned(),
parent: None, parent: None,
children: Vec::new(), children: Vec::new(),
waiting_children: Default::default(),
vm: Default::default(), vm: Default::default(),
file_table: Default::default(), file_table: Default::default(),
})) }))
@ -30,11 +32,13 @@ impl Process {
task: task, task: task,
status: Default::default(), status: Default::default(),
pid: new_pid, pid: new_pid,
pgid: new_pid,
tgid: new_pid, tgid: new_pid,
exec_path: exec_path.to_owned(), exec_path: exec_path.to_owned(),
exit_status: 0, exit_status: 0,
parent: None, parent: None,
children: Vec::new(), children: Vec::new(),
waiting_children: None,
vm: vm, vm: vm,
file_table: file_table, file_table: file_table,
})); }));
@ -44,6 +48,7 @@ impl Process {
pub fn get_task(&self) -> &Task { &self.task } pub fn get_task(&self) -> &Task { &self.task }
pub fn get_task_mut(&mut self) -> &mut Task { &mut self.task } pub fn get_task_mut(&mut self) -> &mut Task { &mut self.task }
pub fn get_pid(&self) -> pid_t { self.pid } pub fn get_pid(&self) -> pid_t { self.pid }
pub fn get_pgid(&self) -> pid_t { self.pgid }
pub fn get_tgid(&self) -> pid_t { self.tgid } pub fn get_tgid(&self) -> pid_t { self.tgid }
pub fn get_status(&self) -> Status { self.status } pub fn get_status(&self) -> Status { self.status }
pub fn get_exit_status(&self) -> i32 { self.exit_status } pub fn get_exit_status(&self) -> i32 { self.exit_status }

@ -0,0 +1,119 @@
use super::{*};
#[derive(Debug)]
pub struct Waiter<D, R>
where D: Sized + Copy, R: Sized + Copy
{
inner: Arc<SgxMutex<WaiterInner<D, R>>>,
thread: *const c_void,
}
unsafe impl<D, R> Send for Waiter<D, R> where D: Sized + Copy, R: Sized + Copy {}
#[derive(Debug)]
struct WaiterInner<D, R>
where D: Sized + Copy, R: Sized + Copy
{
has_waken: bool,
data: D,
result: Option<R>,
}
impl<D, R> Waiter<D, R>
where D: Sized + Copy, R: Sized + Copy
{
pub fn new(data: &D) -> Waiter<D, R> {
Waiter {
thread: unsafe { sgx_thread_get_self() },
inner: Arc::new(SgxMutex::new(WaiterInner {
has_waken: false,
data: *data,
result: None,
})),
}
}
pub fn get_data(&self) -> D {
self.inner.lock().unwrap().data
}
pub fn wait_on(&self) -> R {
if !self.inner.lock().unwrap().has_waken {
unsafe {
sgx_thread_wait_untrusted_event_ocall(self.thread);
}
}
self.inner.lock().unwrap().result.unwrap()
}
}
#[derive(Debug)]
pub struct WaitQueue<D, R>
where D: Sized + Copy, R: Sized + Copy
{
waiters: Vec<Waiter<D, R>>,
}
impl<D, R> WaitQueue<D, R>
where D: Sized + Copy, R: Sized + Copy
{
pub fn new() -> WaitQueue<D, R>
{
WaitQueue {
waiters: Vec::new(),
}
}
pub fn add_waiter(&mut self, waiter: &Waiter<D, R>) -> () {
self.waiters.push(Waiter {
thread: waiter.thread,
inner: waiter.inner.clone(),
});
}
pub fn del_and_wake_one_waiter<F>(&mut self, cond: F) -> usize
where F: Fn(&D) -> Option<R>
{
let mut waiters = &mut self.waiters;
let del_waiter_i = {
let waiter_i = waiters.iter().position(|waiter| {
let mut waiter_inner = waiter.inner.lock().unwrap();
if let Some(waiter_result) = cond(&waiter_inner.data) {
waiter_inner.has_waken = true;
waiter_inner.result = Some(waiter_result);
true
}
else {
false
}
});
if waiter_i.is_none() { return 0; }
waiter_i.unwrap()
};
let del_waiter = waiters.swap_remove(del_waiter_i);
unsafe {
sgx_thread_set_untrusted_event_ocall(del_waiter.thread);
}
1
}
}
extern {
fn sgx_thread_get_self() -> *const c_void;
/* Go outside and wait on my untrusted event */
fn sgx_thread_wait_untrusted_event_ocall(self_thread: *const c_void) -> c_int;
/* Wake a thread waiting on its untrusted event */
fn sgx_thread_set_untrusted_event_ocall(waiter_thread: *const c_void) -> c_int;
/* Wake a thread waiting on its untrusted event, and wait on my untrusted event */
fn sgx_thread_setwait_untrusted_events_ocall(
waiter_thread: *const c_void, self_thread: *const c_void) -> c_int;
/* Wake multiple threads waiting on their untrusted events */
fn sgx_thread_set_multiple_untrusted_events_ocall(
waiter_threads: *const *const c_void, total: size_t ) -> c_int;
}

@ -4,6 +4,7 @@ use {std, fs, process, vm};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use fs::{off_t, FileDesc}; use fs::{off_t, FileDesc};
use vm::{VMAreaFlags, VMResizeOptions}; use vm::{VMAreaFlags, VMResizeOptions};
use process::{pid_t, ChildProcessFilter};
// Use the internal syscall wrappers from sgx_tstd // Use the internal syscall wrappers from sgx_tstd
//use std::libc_fs as fs; //use std::libc_fs as fs;
//use std::libc_io as io; //use std::libc_io as io;
@ -376,14 +377,46 @@ pub extern "C" fn occlum_spawn(
} }
} }
fn do_wait4(pid: c_int, _exit_status: *mut c_int) -> Result<pid_t, Error> {
check_mut_ptr_from_user(_exit_status)?;
let child_process_filter = match pid {
pid if pid < -1 => {
process::ChildProcessFilter::WithPGID((-pid) as pid_t)
},
-1 => {
process::ChildProcessFilter::WithAnyPID
},
0 => {
let gpid = process::do_getgpid();
process::ChildProcessFilter::WithPGID(gpid)
},
pid if pid > 0 => {
process::ChildProcessFilter::WithPID(pid as pid_t)
},
_ => {
panic!("THIS SHOULD NEVER HAPPEN!");
}
};
let mut exit_status = 0;
match process::do_wait4(&child_process_filter, &mut exit_status) {
Ok(pid) => {
unsafe { *_exit_status = exit_status; }
Ok(pid)
}
Err(e) => {
Err(e)
}
}
}
#[no_mangle] #[no_mangle]
pub extern "C" fn occlum_wait4(child_pid: c_int, _exit_status: *mut c_int, pub extern "C" fn occlum_wait4(child_pid: c_int, exit_status: *mut c_int,
options: c_int/*, rusage: *mut Rusage*/) -> c_int options: c_int/*, rusage: *mut Rusage*/) -> c_int
{ {
match process::do_wait4(child_pid as u32) { match do_wait4(child_pid, exit_status) {
Ok(exit_status) => unsafe { Ok(pid) => {
*_exit_status = exit_status; pid as c_int
0
} }
Err(e) => { Err(e) => {
e.errno.as_retval() e.errno.as_retval()