Add timeout support for futex wait

This commit is contained in:
LI Qing 2020-03-14 22:14:26 +08:00 committed by tate.thl
parent 96876b2935
commit eff91daac9
4 changed files with 140 additions and 10 deletions

@ -3,6 +3,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::intrinsics::atomic_load;
use std::sync::atomic::{AtomicBool, Ordering};
use time::timespec_t;
/// `FutexOp`, `FutexFlags`, and `futex_op_and_flags_from_u32` are helper types and
/// functions for handling the versatile commands and arguments of futex system
@ -68,7 +69,15 @@ pub fn futex_op_and_flags_from_u32(bits: u32) -> Result<(FutexOp, FutexFlags)> {
}
/// Do futex wait
pub fn futex_wait(futex_addr: *const i32, futex_val: i32) -> Result<()> {
pub fn futex_wait(
futex_addr: *const i32,
futex_val: i32,
timeout: &Option<timespec_t>,
) -> Result<()> {
info!(
"futex_wait addr: {:#x}, val: {}, timeout: {:?}",
futex_addr as usize, futex_val, timeout
);
// Get and lock the futex bucket
let futex_key = FutexKey::new(futex_addr);
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
@ -76,7 +85,7 @@ pub fn futex_wait(futex_addr: *const i32, futex_val: i32) -> Result<()> {
// Check the futex value
if futex_key.load_val() != futex_val {
return_errno!(EAGAIN, "futex value does not match")
return_errno!(EAGAIN, "futex value does not match");
}
// Why we first lock the bucket then check the futex value?
//
@ -115,7 +124,7 @@ pub fn futex_wait(futex_addr: *const i32, futex_val: i32) -> Result<()> {
// Must make sure that no locks are holded by this thread before wait
drop(futex_bucket);
futex_item.wait()
futex_item.wait_timeout(timeout)
}
/// Do futex wake
@ -202,7 +211,7 @@ impl FutexKey {
}
}
#[derive(Clone)]
#[derive(Clone, PartialEq)]
struct FutexItem {
key: FutexKey,
waiter: WaiterRef,
@ -220,8 +229,19 @@ impl FutexItem {
self.waiter.wake()
}
pub fn wait(&self) -> Result<()> {
self.waiter.wait()
pub fn wait_timeout(&self, timeout: &Option<timespec_t>) -> Result<()> {
match timeout {
None => self.waiter.wait(),
Some(ts) => {
if let Err(e) = self.waiter.wait_timeout(&ts) {
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(self.key);
let mut futex_bucket = futex_bucket_ref.lock().unwrap();
futex_bucket.dequeue_item(self);
return_errno!(e.errno(), "futex wait with timeout error");
}
Ok(())
}
}
}
}
@ -242,6 +262,14 @@ impl FutexBucket {
self.queue.push_back(item);
}
pub fn dequeue_item(&mut self, futex_item: &FutexItem) -> Option<FutexItem> {
let item_i = self.queue.iter().position(|item| *item == *futex_item);
if item_i.is_none() {
return None;
}
self.queue.swap_remove_back(item_i.unwrap())
}
pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize {
let mut count = 0;
let mut idx = 0;
@ -349,6 +377,22 @@ impl Waiter {
Ok(())
}
pub fn wait_timeout(&self, timeout: &timespec_t) -> Result<()> {
let current = unsafe { sgx_thread_get_self() };
if current != self.thread {
return Ok(());
}
while self.is_woken.load(Ordering::SeqCst) == false {
if let Err(e) = wait_event_timeout(self.thread, timeout) {
self.is_woken.store(true, Ordering::SeqCst);
// Do sanity check here, only possible errnos here are ETIMEDOUT, EAGAIN and EINTR
debug_assert!(e.errno() == ETIMEDOUT || e.errno() == EAGAIN || e.errno() == EINTR);
return_errno!(e.errno(), "wait_timeout error");
}
}
Ok(())
}
pub fn wake(&self) {
if self.is_woken.fetch_or(true, Ordering::SeqCst) == false {
set_event(self.thread);
@ -356,6 +400,12 @@ impl Waiter {
}
}
impl PartialEq for Waiter {
fn eq(&self, other: &Self) -> bool {
self.thread == other.thread
}
}
unsafe impl Send for Waiter {}
unsafe impl Sync for Waiter {}
@ -370,6 +420,31 @@ fn wait_event(thread: *const c_void) {
}
}
fn wait_event_timeout(thread: *const c_void, timeout: &timespec_t) -> Result<()> {
let mut ret: c_int = 0;
let mut sgx_ret: c_int = 0;
let mut errno: c_int = 0;
unsafe {
sgx_ret = sgx_thread_wait_untrusted_event_timeout_ocall(
&mut ret as *mut c_int,
thread,
timeout.sec(),
timeout.nsec(),
&mut errno as *mut c_int,
);
}
if ret != 0 || sgx_ret != 0 {
panic!("ERROR: sgx_thread_wait_untrusted_event_timeout_ocall failed");
}
if errno != 0 {
return_errno!(
Errno::from(errno as u32),
"sgx_thread_wait_untrusted_event_timeout_ocall error"
);
}
Ok(())
}
fn set_event(thread: *const c_void) {
let mut ret: c_int = 0;
let mut sgx_ret: c_int = 0;
@ -387,6 +462,14 @@ extern "C" {
/* Go outside and wait on my untrusted event */
fn sgx_thread_wait_untrusted_event_ocall(ret: *mut c_int, self_thread: *const c_void) -> c_int;
fn sgx_thread_wait_untrusted_event_timeout_ocall(
ret: *mut c_int,
self_thread: *const c_void,
sec: c_long,
nsec: c_long,
errno: *mut c_int,
) -> c_int;
/* Wake a thread waiting on its untrusted event */
fn sgx_thread_set_untrusted_event_ocall(ret: *mut c_int, waiter_thread: *const c_void)
-> c_int;

@ -183,7 +183,7 @@ pub extern "C" fn dispatch_syscall(
arg0 as *const i32,
arg1 as u32,
arg2 as i32,
arg3 as i32,
arg3 as u64,
arg4 as *const i32,
// Todo: accept other optional arguments
),
@ -460,7 +460,7 @@ pub fn do_futex(
futex_addr: *const i32,
futex_op: u32,
futex_val: i32,
timeout: i32,
timeout: u64,
futex_new_addr: *const i32,
) -> Result<isize> {
check_ptr(futex_addr)?;
@ -474,7 +474,22 @@ pub fn do_futex(
};
match futex_op {
FutexOp::FUTEX_WAIT => process::futex_wait(futex_addr, futex_val).map(|_| 0),
FutexOp::FUTEX_WAIT => {
let timeout = {
let timeout = timeout as *const timespec_t;
if timeout.is_null() {
None
} else {
let ts = timespec_t::from_raw_ptr(timeout)?;
ts.validate()?;
if futex_flags.contains(FutexFlags::FUTEX_CLOCK_REALTIME) {
warn!("CLOCK_REALTIME is not supported yet, use monotonic clock");
}
Some(ts)
}
};
process::futex_wait(futex_addr, futex_val, &timeout).map(|_| 0)
}
FutexOp::FUTEX_WAKE => {
let max_count = get_futex_val(futex_val)?;
process::futex_wake(futex_addr, max_count).map(|count| count as isize)
@ -482,7 +497,7 @@ pub fn do_futex(
FutexOp::FUTEX_REQUEUE => {
check_ptr(futex_new_addr)?;
let max_nwakes = get_futex_val(futex_val)?;
let max_nrequeues = get_futex_val(timeout)?;
let max_nrequeues = get_futex_val(timeout as i32)?;
process::futex_requeue(futex_addr, max_nwakes, max_nrequeues, futex_new_addr)
.map(|nwakes| nwakes as isize)
}

@ -75,6 +75,14 @@ impl timespec_t {
}
}
pub fn sec(&self) -> time_t {
self.sec
}
pub fn nsec(&self) -> i64 {
self.nsec
}
pub fn as_duration(&self) -> Duration {
Duration::new(self.sec as u64, self.nsec as u32)
}

@ -1,6 +1,7 @@
#include <sys/types.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include "test.h"
// ============================================================================
@ -162,6 +163,28 @@ static int test_mutex_with_cond_wait(void) {
return 0;
}
// ============================================================================
// The test case of timed lock
// ============================================================================
static int test_mutex_timedlock() {
int err;
struct timespec ts;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
/*
* This will cause a deadlock, a timeout error will return
*/
err = pthread_mutex_timedlock(&lock, &ts);
if (err != ETIMEDOUT) {
THROW_ERROR("mutex timed lock failed");
}
return 0;
}
// ============================================================================
// Test suite main
// ============================================================================
@ -169,6 +192,7 @@ static int test_mutex_with_cond_wait(void) {
static test_case_t test_cases[] = {
TEST_CASE(test_mutex_with_concurrent_counter),
TEST_CASE(test_mutex_with_cond_wait),
TEST_CASE(test_mutex_timedlock),
};
int main() {