Add futex

This commit is contained in:
Tate, Hongliang Tian 2019-04-02 21:16:29 +08:00 committed by Tate Tian
parent 4cf8777592
commit b2e626760b
5 changed files with 338 additions and 15 deletions

@ -0,0 +1,258 @@
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
/// `FutexOp`, `FutexFlags`, and `futex_op_and_flags_from_u32` are helper types and
/// functions for handling the versatile commands and arguments of futex system
/// call in a memory-safe way.
#[allow(non_camel_case_types)]
pub enum FutexOp {
FUTEX_WAIT = 0,
FUTEX_WAKE = 1,
FUTEX_FD = 2,
FUTEX_REQUEUE = 3,
FUTEX_CMP_REQUEUE = 4,
FUTEX_WAKE_OP = 5,
FUTEX_LOCK_PI = 6,
FUTEX_UNLOCK_PI = 7,
FUTEX_TRYLOCK_PI = 8,
FUTEX_WAIT_BITSET = 9,
}
const FUTEX_OP_MASK : u32 = 0x0000_000F;
impl FutexOp {
pub fn from_u32(bits: u32) -> Result<FutexOp, Error> {
match bits {
0 => Ok(FutexOp::FUTEX_WAIT),
1 => Ok(FutexOp::FUTEX_WAKE),
2 => Ok(FutexOp::FUTEX_FD),
3 => Ok(FutexOp::FUTEX_REQUEUE),
4 => Ok(FutexOp::FUTEX_CMP_REQUEUE),
5 => Ok(FutexOp::FUTEX_WAKE_OP),
6 => Ok(FutexOp::FUTEX_LOCK_PI),
7 => Ok(FutexOp::FUTEX_UNLOCK_PI),
8 => Ok(FutexOp::FUTEX_TRYLOCK_PI),
9 => Ok(FutexOp::FUTEX_WAIT_BITSET),
_ => errno!(EINVAL, "Unknown futex op"),
}
}
}
bitflags! {
pub struct FutexFlags : u32 {
const FUTEX_PRIVATE = 128;
const FUTEX_CLOCK_REALTIME = 256;
}
}
const FUTEX_FLAGS_MASK : u32 = 0xFFFF_FFF0;
impl FutexFlags {
pub fn from_u32(bits: u32) -> Result<FutexFlags, Error> {
FutexFlags::from_bits(bits).ok_or_else(||
Error::new(Errno::EINVAL, "Unknown futex flags"))
}
}
pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags), Error> {
let op = {
let op_bits = bits & FUTEX_OP_MASK;
FutexOp::from_u32(op_bits)?
};
let flags = {
let flags_bits = bits & FUTEX_FLAGS_MASK;
FutexFlags::from_u32(flags_bits)?
};
Ok((op, flags))
}
/// Do futex wait
pub fn futex_wait(futex_addr: *const i32, futex_val: i32) -> Result<(), Error> {
let futex_key = FutexKey::new(futex_addr);
let futex_item = FUTEX_TABLE.lock().unwrap()
.get_or_new_item(futex_key);
futex_item.wait(futex_val);
FUTEX_TABLE.lock().unwrap().put_item(futex_item);
Ok(())
}
/// Do futex wake
pub fn futex_wake(futex_addr: *const i32, max_count: usize) -> Result<usize, Error> {
let futex_key = FutexKey::new(futex_addr);
let futex_item = FUTEX_TABLE.lock().unwrap().get_item(futex_key)?;
let count = futex_item.wake(max_count);
FUTEX_TABLE.lock().unwrap().put_item(futex_item);
Ok(count)
}
lazy_static! {
static ref FUTEX_TABLE : SgxMutex<FutexTable> = {
SgxMutex::new(FutexTable::new())
};
}
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
struct FutexKey(usize);
impl FutexKey {
pub fn new(addr: *const i32) -> FutexKey {
FutexKey(addr as usize)
}
pub fn load_val(&self) -> i32 {
unsafe { *(self.0 as *const i32) }
}
}
struct FutexItem {
key: FutexKey,
queue: SgxMutex<VecDeque<WaiterRef>>,
}
impl FutexItem {
pub fn new(key: FutexKey) -> FutexItem {
FutexItem {
key: key,
queue: SgxMutex::new(VecDeque::new()),
}
}
pub fn wake(&self, max_count: usize) -> usize {
let mut queue = self.queue.lock().unwrap();
let mut count = 0;
while count < max_count {
let waiter = {
let waiter_option = queue.pop_front();
if waiter_option.is_none() { break; }
waiter_option.unwrap()
};
waiter.wake();
count += 1;
}
count
}
pub fn wait(&self, futex_val: i32) -> () {
let mut queue = self.queue.lock().unwrap();
if self.key.load_val() != futex_val {
return;
}
let waiter = Arc::new(Waiter::new());
queue.push_back(waiter.clone());
drop(queue);
// Must make sure that no locks are holded by this thread before sleep
waiter.wait();
}
}
type FutexItemRef = Arc<FutexItem>;
struct FutexTable {
table: HashMap<FutexKey, FutexItemRef>,
}
impl FutexTable {
pub fn new() -> FutexTable {
FutexTable {
table: HashMap::new(),
}
}
pub fn get_or_new_item(&mut self, key: FutexKey) -> FutexItemRef {
let table = &mut self.table;
let item = table.entry(key).or_insert_with(|| {
Arc::new(FutexItem::new(key))
});
item.clone()
}
pub fn get_item(&mut self, key: FutexKey) -> Result<FutexItemRef, Error> {
let table = &mut self.table;
table.get_mut(&key).map(|item| item.clone())
.ok_or_else(|| Error::new(Errno::ENOENT, "futex key cannot be found"))
}
pub fn put_item(&mut self, item: FutexItemRef) {
let table = &mut self.table;
// If there are only two references, one is the given argument, the
// other in the table, then it is time to release the futex item.
// This is because we are holding the lock of futex table and the
// reference count cannot be possibly increased by other threads.
if Arc::strong_count(&item) == 2 {
// Release the last but one reference
let key = item.key;
drop(item);
// Release the last reference
table.remove(&key);
}
}
}
#[derive(Debug)]
struct Waiter {
thread: *const c_void,
is_woken: AtomicBool,
}
type WaiterRef = Arc<Waiter>;
impl Waiter {
pub fn new() -> Waiter {
Waiter {
thread: unsafe { sgx_thread_get_self() },
is_woken: AtomicBool::new(false),
}
}
pub fn wait(&self) {
while self.is_woken.load(Ordering::SeqCst) != true {
wait_event(self.thread);
}
}
pub fn wake(&self) {
self.is_woken.store(true, Ordering::SeqCst);
set_event(self.thread);
}
}
unsafe impl Send for Waiter {}
unsafe impl Sync for Waiter {}
fn wait_event(thread: *const c_void) {
let mut ret: c_int = 0;
let mut sgx_ret: c_int = 0;
unsafe {
sgx_ret = sgx_thread_wait_untrusted_event_ocall(&mut ret as *mut c_int, thread);
}
if ret != 0 || sgx_ret != 0 {
panic!("ERROR: sgx_thread_wait_untrusted_event_ocall failed");
}
}
fn set_event(thread: *const c_void) {
let mut ret: c_int = 0;
let mut sgx_ret: c_int = 0;
unsafe {
sgx_ret = sgx_thread_set_untrusted_event_ocall(&mut ret as *mut c_int, thread);
}
if ret != 0 || sgx_ret != 0 {
panic!("ERROR: sgx_thread_set_untrusted_event_ocall failed");
}
}
extern "C" {
fn sgx_thread_get_self() -> *const c_void;
/* Go outside and wait on my untrusted event */
fn sgx_thread_wait_untrusted_event_ocall(ret: *mut c_int, self_thread: *const c_void) -> c_int;
/* Wake a thread waiting on its untrusted event */
fn sgx_thread_set_untrusted_event_ocall(ret: *mut c_int, waiter_thread: *const c_void)
-> c_int;
}

@ -7,6 +7,7 @@ pub use self::exit::{do_exit, do_wait4, ChildProcessFilter};
pub use self::spawn::{do_spawn, FileAction}; pub use self::spawn::{do_spawn, FileAction};
pub use self::wait::{WaitQueue, Waiter}; pub use self::wait::{WaitQueue, Waiter};
pub use self::thread::{do_clone, CloneFlags, ThreadGroup}; pub use self::thread::{do_clone, CloneFlags, ThreadGroup};
pub use self::futex::{FutexOp, FutexFlags, futex_op_and_flags_from_u32, futex_wake, futex_wait};
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub type pid_t = u32; pub type pid_t = u32;
@ -63,6 +64,7 @@ mod spawn;
mod task; mod task;
mod wait; mod wait;
mod thread; mod thread;
mod futex;
use self::task::Task; use self::task::Task;
use super::*; use super::*;

@ -52,7 +52,7 @@ pub fn do_spawn<P: AsRef<Path>>(
}; };
let (new_pid, new_process_ref) = { let (new_pid, new_process_ref) = {
let cwd = elf_path.as_ref().parent().unwrap().to_str().unwrap(); let cwd = parent_ref.lock().unwrap().get_cwd().to_owned();
let vm = init_vm::do_init(&elf_file, &elf_buf[..])?; let vm = init_vm::do_init(&elf_file, &elf_buf[..])?;
let task = { let task = {
let program_entry = { let program_entry = {
@ -70,7 +70,7 @@ pub fn do_spawn<P: AsRef<Path>>(
let files = init_files(parent_ref, file_actions)?; let files = init_files(parent_ref, file_actions)?;
Arc::new(SgxMutex::new(files)) Arc::new(SgxMutex::new(files))
}; };
Process::new(cwd, task, vm_ref, files_ref)? Process::new(&cwd, task, vm_ref, files_ref)?
}; };
parent_adopts_new_child(&parent_ref, &new_process_ref); parent_adopts_new_child(&parent_ref, &new_process_ref);
process_table::put(new_pid, new_process_ref.clone()); process_table::put(new_pid, new_process_ref.clone());

@ -2,7 +2,7 @@ use {fs, process, std, vm};
use fs::{FileDesc, off_t}; use fs::{FileDesc, off_t};
use fs::File; use fs::File;
use prelude::*; use prelude::*;
use process::{ChildProcessFilter, FileAction, pid_t, CloneFlags}; use process::{ChildProcessFilter, FileAction, pid_t, CloneFlags, FutexFlags, FutexOp};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::ptr; use std::ptr;
use time::timeval_t; use time::timeval_t;
@ -87,6 +87,12 @@ pub extern "C" fn dispatch_syscall(
arg4 as usize, arg4 as usize,
), ),
SYS_WAIT4 => do_wait4(arg0 as i32, arg1 as *mut i32), SYS_WAIT4 => do_wait4(arg0 as i32, arg1 as *mut i32),
SYS_FUTEX => do_futex(
arg0 as *const i32,
arg1 as u32,
arg2 as i32,
// TODO: accept other optional arguments
),
SYS_GETPID => do_getpid(), SYS_GETPID => do_getpid(),
SYS_GETPPID => do_getppid(), SYS_GETPPID => do_getppid(),
@ -238,6 +244,31 @@ pub fn do_clone(
Ok(child_pid as isize) Ok(child_pid as isize)
} }
pub fn do_futex(
futex_addr: *const i32,
futex_op: u32,
futex_val: i32,
) -> Result<isize, Error> {
check_ptr(futex_addr)?;
let (futex_op, futex_flags) = process::futex_op_and_flags_from_u32(futex_op)?;
match futex_op {
FutexOp::FUTEX_WAIT => {
process::futex_wait(futex_addr, futex_val).map(|_| 0)
}
FutexOp::FUTEX_WAKE => {
let max_count = {
if futex_val < 0 {
return errno!(EINVAL, "the count must not be negative");
}
futex_val as usize
};
process::futex_wake(futex_addr, max_count)
.map(|count| count as isize)
},
_ => errno!(ENOSYS, "the futex operation is not supported"),
}
}
fn do_open(path: *const i8, flags: u32, mode: u32) -> Result<isize, Error> { fn do_open(path: *const i8, flags: u32, mode: u32) -> Result<isize, Error> {
let path = clone_cstring_safely(path)?.to_string_lossy().into_owned(); let path = clone_cstring_safely(path)?.to_string_lossy().into_owned();
let fd = fs::do_open(&path, flags, mode)?; let fd = fs::do_open(&path, flags, mode)?;

@ -1,28 +1,57 @@
#define _GNU_SOURCE
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#define _GNU_SOURCE
#include <sched.h> #include <sched.h>
#include <unistd.h>
#include <sys/syscall.h>
/*
* Helper functions
*/
static inline int a_load(volatile int* x) {
return __atomic_load_n((int*)x, __ATOMIC_SEQ_CST);
}
static inline int a_add_fetch(volatile int* x, int a) {
return __atomic_add_fetch((int*)x, a, __ATOMIC_SEQ_CST);
}
/*
* Futex wrapper
*/
#define FUTEX_NUM 202
#define FUTEX_WAIT 0
#define FUTEX_WAKE 1
// Libc does not provide a wrapper for futex, so we do it our own
static int futex(volatile int *futex_addr, int futex_op, int val) {
return (int) syscall(FUTEX_NUM, futex_addr, futex_op, val);
}
/*
* Child threads
*/
#define NTHREADS 4 #define NTHREADS 4
#define STACK_SIZE (8 * 1024) #define STACK_SIZE (8 * 1024)
// From file arch/x86_64/atomic_arch.h in musl libc. MIT License.
static inline void a_inc(volatile int *p)
{
__asm__ __volatile__(
"lock ; incl %0"
: "=m"(*p) : "m"(*p) : "memory" );
}
volatile int num_exit_threads = 0; volatile int num_exit_threads = 0;
int thread_func(void* arg) { static int thread_func(void* arg) {
int* tid = arg; int* tid = arg;
//printf("tid = %d\n", *tid); //printf("tid = %d\n", *tid);
a_inc(&num_exit_threads); // Wake up the main thread if all child threads exit
if (a_add_fetch(&num_exit_threads, 1) == NTHREADS) {
futex(&num_exit_threads, FUTEX_WAKE, 1);
}
return 0; return 0;
} }
int main(int argc, const char* argv[]) { int main(int argc, const char* argv[]) {
unsigned int clone_flags = CLONE_VM | CLONE_FS | CLONE_FILES | unsigned int clone_flags = CLONE_VM | CLONE_FS | CLONE_FILES |
CLONE_SIGHAND | CLONE_THREAD | CLONE_SYSVSEM | CLONE_DETACHED; CLONE_SIGHAND | CLONE_THREAD | CLONE_SYSVSEM | CLONE_DETACHED;
@ -47,7 +76,10 @@ int main(int argc, const char* argv[]) {
printf("Waiting for %d threads to exit...", NTHREADS); printf("Waiting for %d threads to exit...", NTHREADS);
// Wait for all threads to exit // Wait for all threads to exit
while (num_exit_threads != NTHREADS); int curr_num_exit_threads;
while ((curr_num_exit_threads = a_load(&num_exit_threads)) != NTHREADS) {
futex(&num_exit_threads, FUTEX_WAIT, curr_num_exit_threads);
}
printf("done.\n"); printf("done.\n");
return 0; return 0;