From b2e626760b8c5665787a8272d656ccaa59dc6f90 Mon Sep 17 00:00:00 2001 From: "Tate, Hongliang Tian" Date: Tue, 2 Apr 2019 21:16:29 +0800 Subject: [PATCH] Add futex --- src/libos/src/process/futex.rs | 258 +++++++++++++++++++++++++++++ src/libos/src/process/mod.rs | 2 + src/libos/src/process/spawn/mod.rs | 4 +- src/libos/src/syscall/mod.rs | 33 +++- test/clone/main.c | 56 +++++-- 5 files changed, 338 insertions(+), 15 deletions(-) create mode 100644 src/libos/src/process/futex.rs diff --git a/src/libos/src/process/futex.rs b/src/libos/src/process/futex.rs new file mode 100644 index 00000000..d712e27d --- /dev/null +++ b/src/libos/src/process/futex.rs @@ -0,0 +1,258 @@ +use super::*; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// `FutexOp`, `FutexFlags`, and `futex_op_and_flags_from_u32` are helper types and +/// functions for handling the versatile commands and arguments of futex system +/// call in a memory-safe way. + +#[allow(non_camel_case_types)] +pub enum FutexOp { + FUTEX_WAIT = 0, + FUTEX_WAKE = 1, + FUTEX_FD = 2, + FUTEX_REQUEUE = 3, + FUTEX_CMP_REQUEUE = 4, + FUTEX_WAKE_OP = 5, + FUTEX_LOCK_PI = 6, + FUTEX_UNLOCK_PI = 7, + FUTEX_TRYLOCK_PI = 8, + FUTEX_WAIT_BITSET = 9, +} +const FUTEX_OP_MASK : u32 = 0x0000_000F; + +impl FutexOp { + pub fn from_u32(bits: u32) -> Result { + match bits { + 0 => Ok(FutexOp::FUTEX_WAIT), + 1 => Ok(FutexOp::FUTEX_WAKE), + 2 => Ok(FutexOp::FUTEX_FD), + 3 => Ok(FutexOp::FUTEX_REQUEUE), + 4 => Ok(FutexOp::FUTEX_CMP_REQUEUE), + 5 => Ok(FutexOp::FUTEX_WAKE_OP), + 6 => Ok(FutexOp::FUTEX_LOCK_PI), + 7 => Ok(FutexOp::FUTEX_UNLOCK_PI), + 8 => Ok(FutexOp::FUTEX_TRYLOCK_PI), + 9 => Ok(FutexOp::FUTEX_WAIT_BITSET), + _ => errno!(EINVAL, "Unknown futex op"), + } + } +} + +bitflags! { + pub struct FutexFlags : u32 { + const FUTEX_PRIVATE = 128; + const FUTEX_CLOCK_REALTIME = 256; + } +} +const FUTEX_FLAGS_MASK : u32 = 0xFFFF_FFF0; + +impl FutexFlags { + pub fn from_u32(bits: u32) -> Result { + FutexFlags::from_bits(bits).ok_or_else(|| + Error::new(Errno::EINVAL, "Unknown futex flags")) + } +} + +pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags), Error> { + let op = { + let op_bits = bits & FUTEX_OP_MASK; + FutexOp::from_u32(op_bits)? + }; + let flags = { + let flags_bits = bits & FUTEX_FLAGS_MASK; + FutexFlags::from_u32(flags_bits)? + }; + Ok((op, flags)) +} + + +/// Do futex wait +pub fn futex_wait(futex_addr: *const i32, futex_val: i32) -> Result<(), Error> { + let futex_key = FutexKey::new(futex_addr); + let futex_item = FUTEX_TABLE.lock().unwrap() + .get_or_new_item(futex_key); + + futex_item.wait(futex_val); + + FUTEX_TABLE.lock().unwrap().put_item(futex_item); + Ok(()) +} + +/// Do futex wake +pub fn futex_wake(futex_addr: *const i32, max_count: usize) -> Result { + let futex_key = FutexKey::new(futex_addr); + let futex_item = FUTEX_TABLE.lock().unwrap().get_item(futex_key)?; + let count = futex_item.wake(max_count); + FUTEX_TABLE.lock().unwrap().put_item(futex_item); + Ok(count) +} + + +lazy_static! { + static ref FUTEX_TABLE : SgxMutex = { + SgxMutex::new(FutexTable::new()) + }; +} + +#[derive(PartialEq, Eq, Hash, Copy, Clone)] +struct FutexKey(usize); + +impl FutexKey { + pub fn new(addr: *const i32) -> FutexKey { + FutexKey(addr as usize) + } + + pub fn load_val(&self) -> i32 { + unsafe { *(self.0 as *const i32) } + } +} + +struct FutexItem { + key: FutexKey, + queue: SgxMutex>, +} + +impl FutexItem { + pub fn new(key: FutexKey) -> FutexItem { + FutexItem { + key: key, + queue: SgxMutex::new(VecDeque::new()), + } + } + + pub fn wake(&self, max_count: usize) -> usize { + let mut queue = self.queue.lock().unwrap(); + let mut count = 0; + while count < max_count { + let waiter = { + let waiter_option = queue.pop_front(); + if waiter_option.is_none() { break; } + waiter_option.unwrap() + }; + waiter.wake(); + count += 1; + } + count + } + + pub fn wait(&self, futex_val: i32) -> () { + let mut queue = self.queue.lock().unwrap(); + if self.key.load_val() != futex_val { + return; + } + + let waiter = Arc::new(Waiter::new()); + queue.push_back(waiter.clone()); + drop(queue); + + // Must make sure that no locks are holded by this thread before sleep + waiter.wait(); + } +} + +type FutexItemRef = Arc; + +struct FutexTable { + table: HashMap, +} + +impl FutexTable { + pub fn new() -> FutexTable { + FutexTable { + table: HashMap::new(), + } + } + + pub fn get_or_new_item(&mut self, key: FutexKey) -> FutexItemRef { + let table = &mut self.table; + let item = table.entry(key).or_insert_with(|| { + Arc::new(FutexItem::new(key)) + }); + item.clone() + } + + pub fn get_item(&mut self, key: FutexKey) -> Result { + let table = &mut self.table; + table.get_mut(&key).map(|item| item.clone()) + .ok_or_else(|| Error::new(Errno::ENOENT, "futex key cannot be found")) + } + + pub fn put_item(&mut self, item: FutexItemRef) { + let table = &mut self.table; + // If there are only two references, one is the given argument, the + // other in the table, then it is time to release the futex item. + // This is because we are holding the lock of futex table and the + // reference count cannot be possibly increased by other threads. + if Arc::strong_count(&item) == 2 { + // Release the last but one reference + let key = item.key; + drop(item); + // Release the last reference + table.remove(&key); + } + } +} + + +#[derive(Debug)] +struct Waiter { + thread: *const c_void, + is_woken: AtomicBool, +} +type WaiterRef = Arc; + +impl Waiter { + pub fn new() -> Waiter { + Waiter { + thread: unsafe { sgx_thread_get_self() }, + is_woken: AtomicBool::new(false), + } + } + + pub fn wait(&self) { + while self.is_woken.load(Ordering::SeqCst) != true { + wait_event(self.thread); + } + } + + pub fn wake(&self) { + self.is_woken.store(true, Ordering::SeqCst); + set_event(self.thread); + } +} + +unsafe impl Send for Waiter {} +unsafe impl Sync for Waiter {} + +fn wait_event(thread: *const c_void) { + let mut ret: c_int = 0; + let mut sgx_ret: c_int = 0; + unsafe { + sgx_ret = sgx_thread_wait_untrusted_event_ocall(&mut ret as *mut c_int, thread); + } + if ret != 0 || sgx_ret != 0 { + panic!("ERROR: sgx_thread_wait_untrusted_event_ocall failed"); + } +} + +fn set_event(thread: *const c_void) { + let mut ret: c_int = 0; + let mut sgx_ret: c_int = 0; + unsafe { + sgx_ret = sgx_thread_set_untrusted_event_ocall(&mut ret as *mut c_int, thread); + } + if ret != 0 || sgx_ret != 0 { + panic!("ERROR: sgx_thread_set_untrusted_event_ocall failed"); + } +} + +extern "C" { + fn sgx_thread_get_self() -> *const c_void; + + /* Go outside and wait on my untrusted event */ + fn sgx_thread_wait_untrusted_event_ocall(ret: *mut c_int, self_thread: *const c_void) -> c_int; + + /* Wake a thread waiting on its untrusted event */ + fn sgx_thread_set_untrusted_event_ocall(ret: *mut c_int, waiter_thread: *const c_void) + -> c_int; +} diff --git a/src/libos/src/process/mod.rs b/src/libos/src/process/mod.rs index 099ffc27..b048d97a 100644 --- a/src/libos/src/process/mod.rs +++ b/src/libos/src/process/mod.rs @@ -7,6 +7,7 @@ pub use self::exit::{do_exit, do_wait4, ChildProcessFilter}; pub use self::spawn::{do_spawn, FileAction}; pub use self::wait::{WaitQueue, Waiter}; pub use self::thread::{do_clone, CloneFlags, ThreadGroup}; +pub use self::futex::{FutexOp, FutexFlags, futex_op_and_flags_from_u32, futex_wake, futex_wait}; #[allow(non_camel_case_types)] pub type pid_t = u32; @@ -63,6 +64,7 @@ mod spawn; mod task; mod wait; mod thread; +mod futex; use self::task::Task; use super::*; diff --git a/src/libos/src/process/spawn/mod.rs b/src/libos/src/process/spawn/mod.rs index b4c85ba0..0a6084af 100644 --- a/src/libos/src/process/spawn/mod.rs +++ b/src/libos/src/process/spawn/mod.rs @@ -52,7 +52,7 @@ pub fn do_spawn>( }; let (new_pid, new_process_ref) = { - let cwd = elf_path.as_ref().parent().unwrap().to_str().unwrap(); + let cwd = parent_ref.lock().unwrap().get_cwd().to_owned(); let vm = init_vm::do_init(&elf_file, &elf_buf[..])?; let task = { let program_entry = { @@ -70,7 +70,7 @@ pub fn do_spawn>( let files = init_files(parent_ref, file_actions)?; Arc::new(SgxMutex::new(files)) }; - Process::new(cwd, task, vm_ref, files_ref)? + Process::new(&cwd, task, vm_ref, files_ref)? }; parent_adopts_new_child(&parent_ref, &new_process_ref); process_table::put(new_pid, new_process_ref.clone()); diff --git a/src/libos/src/syscall/mod.rs b/src/libos/src/syscall/mod.rs index 78615466..e5625292 100644 --- a/src/libos/src/syscall/mod.rs +++ b/src/libos/src/syscall/mod.rs @@ -2,7 +2,7 @@ use {fs, process, std, vm}; use fs::{FileDesc, off_t}; use fs::File; use prelude::*; -use process::{ChildProcessFilter, FileAction, pid_t, CloneFlags}; +use process::{ChildProcessFilter, FileAction, pid_t, CloneFlags, FutexFlags, FutexOp}; use std::ffi::{CStr, CString}; use std::ptr; use time::timeval_t; @@ -87,6 +87,12 @@ pub extern "C" fn dispatch_syscall( arg4 as usize, ), SYS_WAIT4 => do_wait4(arg0 as i32, arg1 as *mut i32), + SYS_FUTEX => do_futex( + arg0 as *const i32, + arg1 as u32, + arg2 as i32, + // TODO: accept other optional arguments + ), SYS_GETPID => do_getpid(), SYS_GETPPID => do_getppid(), @@ -238,6 +244,31 @@ pub fn do_clone( Ok(child_pid as isize) } +pub fn do_futex( + futex_addr: *const i32, + futex_op: u32, + futex_val: i32, +) -> Result { + check_ptr(futex_addr)?; + let (futex_op, futex_flags) = process::futex_op_and_flags_from_u32(futex_op)?; + match futex_op { + FutexOp::FUTEX_WAIT => { + process::futex_wait(futex_addr, futex_val).map(|_| 0) + } + FutexOp::FUTEX_WAKE => { + let max_count = { + if futex_val < 0 { + return errno!(EINVAL, "the count must not be negative"); + } + futex_val as usize + }; + process::futex_wake(futex_addr, max_count) + .map(|count| count as isize) + }, + _ => errno!(ENOSYS, "the futex operation is not supported"), + } +} + fn do_open(path: *const i8, flags: u32, mode: u32) -> Result { let path = clone_cstring_safely(path)?.to_string_lossy().into_owned(); let fd = fs::do_open(&path, flags, mode)?; diff --git a/test/clone/main.c b/test/clone/main.c index 908241af..55452497 100644 --- a/test/clone/main.c +++ b/test/clone/main.c @@ -1,28 +1,57 @@ +#define _GNU_SOURCE #include #include -#define _GNU_SOURCE #include +#include +#include -#define NTHREADS 4 -#define STACK_SIZE (8 * 1024) +/* + * Helper functions + */ -// From file arch/x86_64/atomic_arch.h in musl libc. MIT License. -static inline void a_inc(volatile int *p) -{ - __asm__ __volatile__( - "lock ; incl %0" - : "=m"(*p) : "m"(*p) : "memory" ); +static inline int a_load(volatile int* x) { + return __atomic_load_n((int*)x, __ATOMIC_SEQ_CST); } +static inline int a_add_fetch(volatile int* x, int a) { + return __atomic_add_fetch((int*)x, a, __ATOMIC_SEQ_CST); +} + +/* + * Futex wrapper + */ + +#define FUTEX_NUM 202 + +#define FUTEX_WAIT 0 +#define FUTEX_WAKE 1 + +// Libc does not provide a wrapper for futex, so we do it our own +static int futex(volatile int *futex_addr, int futex_op, int val) { + return (int) syscall(FUTEX_NUM, futex_addr, futex_op, val); +} + + +/* + * Child threads + */ + +#define NTHREADS 4 +#define STACK_SIZE (8 * 1024) + volatile int num_exit_threads = 0; -int thread_func(void* arg) { +static int thread_func(void* arg) { int* tid = arg; //printf("tid = %d\n", *tid); - a_inc(&num_exit_threads); + // Wake up the main thread if all child threads exit + if (a_add_fetch(&num_exit_threads, 1) == NTHREADS) { + futex(&num_exit_threads, FUTEX_WAKE, 1); + } return 0; } + int main(int argc, const char* argv[]) { unsigned int clone_flags = CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD | CLONE_SYSVSEM | CLONE_DETACHED; @@ -47,7 +76,10 @@ int main(int argc, const char* argv[]) { printf("Waiting for %d threads to exit...", NTHREADS); // Wait for all threads to exit - while (num_exit_threads != NTHREADS); + int curr_num_exit_threads; + while ((curr_num_exit_threads = a_load(&num_exit_threads)) != NTHREADS) { + futex(&num_exit_threads, FUTEX_WAIT, curr_num_exit_threads); + } printf("done.\n"); return 0;