From 215e8ffbdf5058bc7a73550a74b84c8fe882284e Mon Sep 17 00:00:00 2001 From: LI Qing Date: Tue, 6 Jul 2021 16:04:08 +0800 Subject: [PATCH] Add support for robust futex syscalls --- src/libos/src/lib.rs | 1 + src/libos/src/process/do_exit.rs | 3 + src/libos/src/process/do_robust_list.rs | 195 ++++++++++++++++++++++++ src/libos/src/process/mod.rs | 2 + src/libos/src/process/syscalls.rs | 24 +++ src/libos/src/process/thread/builder.rs | 14 +- src/libos/src/process/thread/mod.rs | 34 ++++- src/libos/src/syscall/mod.rs | 13 +- test/pthread/main.c | 78 ++++++++++ 9 files changed, 355 insertions(+), 9 deletions(-) create mode 100644 src/libos/src/process/do_robust_list.rs diff --git a/src/libos/src/lib.rs b/src/libos/src/lib.rs index 15a65d50..1b820b2a 100644 --- a/src/libos/src/lib.rs +++ b/src/libos/src/lib.rs @@ -20,6 +20,7 @@ #![feature(get_mut_unchecked)] // for std::hint::black_box #![feature(test)] +#![feature(atomic_from_mut)] #[macro_use] extern crate alloc; diff --git a/src/libos/src/process/do_exit.rs b/src/libos/src/process/do_exit.rs index 3d976cea..e99431e9 100644 --- a/src/libos/src/process/do_exit.rs +++ b/src/libos/src/process/do_exit.rs @@ -44,6 +44,9 @@ fn exit_thread(term_status: TermStatus) { futex_wake(ctid_ptr.as_ptr() as *const i32, 1); } + // Notify waiters that the owner of robust futex has died. + thread.wake_robust_list(); + // Keep the main thread's tid available as long as the process is not destroyed. // This is important as the user space may still attempt to access the main // thread's ThreadRef through the process's pid after the process has become diff --git a/src/libos/src/process/do_robust_list.rs b/src/libos/src/process/do_robust_list.rs new file mode 100644 index 00000000..babb087d --- /dev/null +++ b/src/libos/src/process/do_robust_list.rs @@ -0,0 +1,195 @@ +/// Robust futexes provide a mechanism that is used in addition to normal futex, +/// for kernel assist of cleanup of held locks on thread exit. +/// +/// Actual locking and unlocking is handled entirely by user level code with the +/// existing futex mechanism to wait or wakeup locks. +/// The kernels only essential involvement in robust futex is to remember where +/// the list head is, and to walk the list on thread exit, handling locks still +/// held by the departing thread. +/// Ref: https://www.kernel.org/doc/html/latest/locking/robust-futex-ABI.html +/// +use std::ptr::NonNull; +use std::sync::atomic::{AtomicU32, Ordering}; + +use crate::prelude::*; +use crate::util::mem_util::from_user::*; + +pub fn do_set_robust_list(list_head_ptr: *mut RobustListHead, len: usize) -> Result<()> { + debug!( + "set_robust_list: list_head_ptr: {:?}, len: {}", + list_head_ptr, len + ); + if std::mem::size_of::() != len { + return_errno!(EINVAL, "invalid size for RobustListHead"); + } + // We do not check if the pointer is a valid user space pointer, deferring + // it in waking the robust list. If the pointer is invalid, we just stop + // waking the robust list. + let robust_list = NonNull::new(list_head_ptr); + let current = current!(); + current.set_robust_list(robust_list); + Ok(()) +} + +pub fn do_get_robust_list(tid: pid_t) -> Result<*mut RobustListHead> { + debug!("get_robust_list: tid: {}", tid); + let thread = if tid == 0 { + current!() + } else { + super::table::get_thread(tid)? + }; + let robust_list_ptr = thread + .robust_list() + .map(|robust_list| robust_list.as_ptr()) + .unwrap_or(std::ptr::null_mut()); + Ok(robust_list_ptr) +} + +/// This struct is same as Linux's robust_list +#[repr(C)] +struct RobustList { + next: *const RobustList, +} + +/// This struct is same as Linux's robust_list_head +#[repr(C)] +pub struct RobustListHead { + /// Linked list of lock entries + /// + /// If it points to the head of the list, then it is the end of the list. + /// If it is an invalid user space pointer or a null pointer, stop iterating + /// the list. + list: RobustList, + /// Specifies the offset from the address of the lock entry to the address + /// of the futex. + futex_offset: isize, + /// Contains transient copy of the address of the lock entry, during list + /// insertion and removal. + list_op_pending: *const RobustList, +} + +impl RobustListHead { + /// Return an iterator for all futexes in the robust list. + /// + /// The futex refered to by `list_op_pending`, if any, will be returned as + /// the last item. + pub fn futexes<'a>(&'a self) -> FutexIter<'a> { + FutexIter::new(self) + } + + /// Return the pending futex address if exist + fn pending_futex_addr(&self) -> Option<*const i32> { + if self.list_op_pending.is_null() { + None + } else { + Some(unsafe { self.futex_addr(self.list_op_pending) }) + } + } + + /// Get the futex address + unsafe fn futex_addr(&self, entry_ptr: *const RobustList) -> *const i32 { + (entry_ptr as *const u8).offset(self.futex_offset) as *const i32 + } +} + +const ROBUST_LIST_LIMIT: isize = 2048; + +pub struct FutexIter<'a> { + robust_list: &'a RobustListHead, + entry_ptr: *const RobustList, + count: isize, +} + +impl<'a> FutexIter<'a> { + fn new(robust_list: &'a RobustListHead) -> Self { + Self { + robust_list, + entry_ptr: robust_list.list.next, + count: 0, + } + } + + // The `self.count` is normally a positive value used to iterate the list + // to avoid excessively long or circular list, we use a special value -1 + // to represent the end of the Iterator. + fn set_end(&mut self) { + self.count = -1; + } + + fn is_end(&self) -> bool { + self.count < 0 + } +} + +impl<'a> Iterator for FutexIter<'a> { + type Item = *const i32; + + /// Returns the futex address. + fn next(&mut self) -> Option<*const i32> { + if self.is_end() { + return None; + } + + // Iterate the linked list + while self.entry_ptr != &self.robust_list.list { + // Avoid excessively long or circular list + if self.count == ROBUST_LIST_LIMIT { + break; + } + // Invalid pointer, stop iterating the robust list + if check_ptr(self.entry_ptr).is_err() { + return None; + } + // A pending lock might already be on the list + let futex_addr = if self.entry_ptr != self.robust_list.list_op_pending { + Some(unsafe { self.robust_list.futex_addr(self.entry_ptr) }) + } else { + None + }; + self.entry_ptr = unsafe { (*self.entry_ptr).next }; + self.count += 1; + if futex_addr.is_some() { + return futex_addr; + } + } + + // End of iterating the linked list + // If the pending futex exists, return it as the last one + self.set_end(); + self.robust_list.pending_futex_addr() + } +} + +const FUTEX_WAITERS: u32 = 0x8000_0000; +const FUTEX_OWNER_DIED: u32 = 0x4000_0000; +const FUTEX_TID_MASK: u32 = 0x3FFF_FFFF; + +/// Wakeup one robust futex owned by the thread +pub fn wake_robust_futex(futex_addr: *const i32, tid: pid_t) -> Result<()> { + let futex_val = { + check_ptr(futex_addr)?; + unsafe { AtomicU32::from_mut(&mut *(futex_addr as *mut u32)) } + }; + let mut old_val = futex_val.load(Ordering::SeqCst); + loop { + // This futex may held by another thread, do nothing + if old_val & FUTEX_TID_MASK != tid { + break; + } + let new_val = (old_val & FUTEX_WAITERS) | FUTEX_OWNER_DIED; + if let Err(cur_val) = + futex_val.compare_exchange(old_val, new_val, Ordering::SeqCst, Ordering::SeqCst) + { + // The futex value has changed, let's retry with current value + old_val = cur_val; + continue; + } + // Wakeup one waiter + if futex_val.load(Ordering::SeqCst) & FUTEX_WAITERS != 0 { + debug!("wake robust futex addr: {:?}", futex_addr); + super::do_futex::futex_wake(futex_addr, 1)?; + } + break; + } + Ok(()) +} diff --git a/src/libos/src/process/mod.rs b/src/libos/src/process/mod.rs index 46b09278..5b24577c 100644 --- a/src/libos/src/process/mod.rs +++ b/src/libos/src/process/mod.rs @@ -22,6 +22,7 @@ use self::wait::{WaitQueue, Waiter}; pub use self::do_exit::handle_force_exit; pub use self::do_futex::{futex_wait, futex_wake}; +pub use self::do_robust_list::RobustListHead; pub use self::do_spawn::do_spawn_without_exec; pub use self::process::{Process, ProcessFilter, ProcessStatus, IDLE}; pub use self::spawn_attribute::posix_spawnattr_t; @@ -36,6 +37,7 @@ mod do_exec; mod do_exit; mod do_futex; mod do_getpid; +mod do_robust_list; mod do_set_tid_address; mod do_spawn; mod do_wait4; diff --git a/src/libos/src/process/syscalls.rs b/src/libos/src/process/syscalls.rs index 4d005dd6..13740f50 100644 --- a/src/libos/src/process/syscalls.rs +++ b/src/libos/src/process/syscalls.rs @@ -2,6 +2,7 @@ use super::do_arch_prctl::ArchPrctlCode; use super::do_clone::CloneFlags; use super::do_exec::do_exec; use super::do_futex::{FutexFlags, FutexOp, FutexTimeout}; +use super::do_robust_list::RobustListHead; use super::do_spawn::FileAction; use super::do_wait4::WaitOptions; use super::prctl::PrctlCmd; @@ -446,3 +447,26 @@ pub fn do_execve(path: *const i8, argv: *const *const i8, envp: *const *const i8 do_exec(&path, &argv, &envp, ¤t) } + +pub fn do_set_robust_list(list_head_ptr: *mut RobustListHead, len: usize) -> Result { + if !list_head_ptr.is_null() { + check_mut_ptr(list_head_ptr)?; + } + super::do_robust_list::do_set_robust_list(list_head_ptr, len)?; + Ok(0) +} + +pub fn do_get_robust_list( + tid: pid_t, + list_head_ptr_ptr: *mut *mut RobustListHead, + len_ptr: *mut usize, +) -> Result { + check_mut_ptr(list_head_ptr_ptr)?; + check_mut_ptr(len_ptr)?; + let list_head_ptr = super::do_robust_list::do_get_robust_list(tid)?; + unsafe { + list_head_ptr_ptr.write(list_head_ptr); + len_ptr.write(std::mem::size_of::()); + } + Ok(0) +} diff --git a/src/libos/src/process/thread/builder.rs b/src/libos/src/process/thread/builder.rs index fddacde5..aff98400 100644 --- a/src/libos/src/process/thread/builder.rs +++ b/src/libos/src/process/thread/builder.rs @@ -1,8 +1,9 @@ use std::ptr::NonNull; use super::{ - FileTableRef, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, SchedAgentRef, - SigQueues, SigSet, Task, Thread, ThreadId, ThreadInner, ThreadName, ThreadRef, + FileTableRef, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, + RobustListHead, SchedAgentRef, SigQueues, SigSet, Task, Thread, ThreadId, ThreadInner, + ThreadName, ThreadRef, }; use crate::events::HostEventFd; use crate::prelude::*; @@ -22,6 +23,7 @@ pub struct ThreadBuilder { rlimits: Option, sig_mask: Option, clear_ctid: Option>, + robust_list: Option>, name: Option, } @@ -38,6 +40,7 @@ impl ThreadBuilder { rlimits: None, sig_mask: None, clear_ctid: None, + robust_list: None, name: None, } } @@ -92,6 +95,11 @@ impl ThreadBuilder { self } + pub fn robust_list(mut self, robust_list_addr: NonNull) -> Self { + self.robust_list = Some(robust_list_addr); + self + } + pub fn name(mut self, name: ThreadName) -> Self { self.name = Some(name); self @@ -103,6 +111,7 @@ impl ThreadBuilder { .ok_or_else(|| errno!(EINVAL, "task is mandatory"))?; let tid = self.tid.unwrap_or_else(|| ThreadId::new()); let clear_ctid = RwLock::new(self.clear_ctid); + let robust_list = RwLock::new(self.robust_list); let inner = SgxMutex::new(ThreadInner::new()); let process = self .process @@ -130,6 +139,7 @@ impl ThreadBuilder { task, tid, clear_ctid, + robust_list, inner, process, vm, diff --git a/src/libos/src/process/thread/mod.rs b/src/libos/src/process/thread/mod.rs index ed41f337..26ca2133 100644 --- a/src/libos/src/process/thread/mod.rs +++ b/src/libos/src/process/thread/mod.rs @@ -4,7 +4,7 @@ use std::ptr::NonNull; use super::task::Task; use super::{ FileTableRef, ForcedExitStatus, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, - ResourceLimitsRef, SchedAgentRef, TermStatus, ThreadRef, + ResourceLimitsRef, RobustListHead, SchedAgentRef, TermStatus, ThreadRef, }; use crate::events::HostEventFd; use crate::fs::{EventCreationFlags, EventFile}; @@ -28,6 +28,7 @@ pub struct Thread { tid: ThreadId, // Mutable info clear_ctid: RwLock>>, + robust_list: RwLock>>, inner: SgxMutex, name: RwLock, // Process @@ -139,6 +140,37 @@ impl Thread { *self.clear_ctid.write().unwrap() = new_clear_ctid; } + pub fn robust_list(&self) -> Option> { + *self.robust_list.read().unwrap() + } + + pub fn set_robust_list(&self, new_robust_list: Option>) { + *self.robust_list.write().unwrap() = new_robust_list; + } + + /// Walks the robust futex list, marking futex dead and wake waiters. + /// It corresponds to Linux's exit_robust_list(), errors are silently ignored. + pub fn wake_robust_list(&self) { + let list_head_ptr = match self.robust_list() { + None => { + return; + } + Some(robust_list) => robust_list.as_ptr(), + }; + debug!("wake the rubust_list: {:?}", list_head_ptr); + let robust_list = { + // Invalid pointer, stop scanning the list further + if crate::util::mem_util::from_user::check_ptr(list_head_ptr).is_err() { + return; + } + unsafe { &*list_head_ptr } + }; + for futex_addr in robust_list.futexes() { + super::do_robust_list::wake_robust_futex(futex_addr, self.tid()); + } + self.set_robust_list(None); + } + pub fn name(&self) -> ThreadName { self.name.read().unwrap().clone() } diff --git a/src/libos/src/syscall/mod.rs b/src/libos/src/syscall/mod.rs index 05c420aa..7b7a7d85 100644 --- a/src/libos/src/syscall/mod.rs +++ b/src/libos/src/syscall/mod.rs @@ -41,10 +41,11 @@ use crate::net::{ do_shutdown, do_socket, do_socketpair, mmsghdr, msghdr, msghdr_mut, }; use crate::process::{ - do_arch_prctl, do_clone, do_execve, do_exit, do_exit_group, do_futex, do_getegid, do_geteuid, - do_getgid, do_getgroups, do_getpgid, do_getpid, do_getppid, do_gettid, do_getuid, do_prctl, - do_set_tid_address, do_spawn_for_glibc, do_spawn_for_musl, do_wait4, pid_t, posix_spawnattr_t, - FdOp, SpawnFileActions, ThreadStatus, + do_arch_prctl, do_clone, do_execve, do_exit, do_exit_group, do_futex, do_get_robust_list, + do_getegid, do_geteuid, do_getgid, do_getgroups, do_getpgid, do_getpid, do_getppid, do_gettid, + do_getuid, do_prctl, do_set_robust_list, do_set_tid_address, do_spawn_for_glibc, + do_spawn_for_musl, do_wait4, pid_t, posix_spawnattr_t, FdOp, RobustListHead, SpawnFileActions, + ThreadStatus, }; use crate::sched::{do_getcpu, do_sched_getaffinity, do_sched_setaffinity, do_sched_yield}; use crate::signal::{ @@ -359,8 +360,8 @@ macro_rules! process_syscall_table_with_callback { (Pselect6 = 270) => handle_unsupported(), (Ppoll = 271) => handle_unsupported(), (Unshare = 272) => handle_unsupported(), - (SetRobustList = 273) => handle_unsupported(), - (GetRobustList = 274) => handle_unsupported(), + (SetRobustList = 273) => do_set_robust_list(list_head_ptr: *mut RobustListHead, len: usize), + (GetRobustList = 274) => do_get_robust_list(tid: pid_t, list_head_ptr_ptr: *mut *mut RobustListHead, len_ptr: *mut usize), (Splice = 275) => handle_unsupported(), (Tee = 276) => handle_unsupported(), (SyncFileRange = 277) => handle_unsupported(), diff --git a/test/pthread/main.c b/test/pthread/main.c index 1b3937f3..8eb63617 100644 --- a/test/pthread/main.c +++ b/test/pthread/main.c @@ -84,6 +84,83 @@ static int test_mutex_with_concurrent_counter(void) { return 0; } +// ============================================================================ +// The test case of robust mutex +// ============================================================================ + +struct thread_robust_arg { + int ti; + volatile int *global_count; + pthread_mutex_t *mutex; +}; + +int ret_err = -1; + +static void *thread_worker(void *_arg) { + struct thread_robust_arg *arg = _arg; + int err = pthread_mutex_lock(arg->mutex); + if (err == EOWNERDEAD) { + // The mutex is locked by the thread here, but the state is marked as + // inconsistent, the thread should call 'pthread_mutex_consistent' to + // make the mutex consistent again. + if (pthread_mutex_consistent(arg->mutex) != 0) { + printf("ERROR: failed to recover the mutex\n"); + return &ret_err; + } + } else if (err != 0) { + printf("ERROR: failed to lock the mutex with error: %d\n", err); + return &ret_err; + } + // Mutex is locked + (*arg->global_count)++; + // Wait for other threads to acquire the lock + sleep(1); + // Exit without unlocking the mutex, this will makes the mutex in an + // inconsistent state. + return NULL; +} + +static int test_robust_mutex_with_concurrent_counter(void) { + volatile int global_count = 0; + pthread_t threads[NTHREADS]; + struct thread_robust_arg thread_args[NTHREADS]; + // Init robust mutex + pthread_mutex_t mutex; + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST); + pthread_mutex_init(&mutex, &attr); + // Start the threads + for (int ti = 0; ti < NTHREADS; ti++) { + struct thread_robust_arg *thread_arg = &thread_args[ti]; + thread_arg->ti = ti; + thread_arg->global_count = &global_count; + thread_arg->mutex = &mutex; + + if (pthread_create(&threads[ti], NULL, thread_worker, thread_arg) < 0) { + THROW_ERROR("pthread_create failed (ti = %d)", ti); + } + } + // Wait for the threads to finish + for (int ti = 0; ti < NTHREADS; ti++) { + int *ret_val; + if (pthread_join(threads[ti], (void **)&ret_val) < 0) { + THROW_ERROR("pthread_join failed (ti = %d)", ti); + } + if (ret_val && *ret_val != 0) { + THROW_ERROR("run thread failed (ti = %d) with return val: %d", ti, *ret_val); + } + } + // Check the result + if (global_count != NTHREADS) { + THROW_ERROR("incorrect global_count (actual = %d, expected = %d)", global_count, + NTHREADS); + } + + pthread_mutex_destroy(&mutex); + return 0; +} + // ============================================================================ // The test case of waiting condition variable // ============================================================================ @@ -189,6 +266,7 @@ static int test_mutex_timedlock() { static test_case_t test_cases[] = { TEST_CASE(test_mutex_with_concurrent_counter), + TEST_CASE(test_robust_mutex_with_concurrent_counter), TEST_CASE(test_mutex_with_cond_wait), TEST_CASE(test_mutex_timedlock), };