Improve futex performance

1. Enlarge the size of the futex buckets;
2. Wake up the waiting threads in one ocall.
This commit is contained in:
He Sun 2020-07-23 14:44:35 +08:00
parent 2400cc4baa
commit 9b17ac1847
3 changed files with 71 additions and 22 deletions

@ -187,12 +187,11 @@ pub fn futex_requeue(
Ok(nwakes) Ok(nwakes)
} }
// Make sure futex bucket count is the power of 2
const BUCKET_COUNT: usize = 1 << 8;
const BUCKET_MASK: usize = BUCKET_COUNT - 1;
lazy_static! { lazy_static! {
static ref FUTEX_BUCKETS: FutexBucketVec = { FutexBucketVec::new(BUCKET_COUNT) }; // Use the same count as linux kernel to keep the same performance
static ref BUCKET_COUNT: usize = ((1 << 8) * (*crate::sched::NCORES)).next_power_of_two();
static ref BUCKET_MASK: usize = *BUCKET_COUNT - 1;
static ref FUTEX_BUCKETS: FutexBucketVec = { FutexBucketVec::new(*BUCKET_COUNT) };
} }
#[derive(PartialEq, Copy, Clone)] #[derive(PartialEq, Copy, Clone)]
@ -227,7 +226,7 @@ impl FutexItem {
} }
pub fn wake(&self) { pub fn wake(&self) {
self.waiter.wake() self.waiter().wake()
} }
pub fn wait(&self, timeout: &Option<timespec_t>) -> Result<()> { pub fn wait(&self, timeout: &Option<timespec_t>) -> Result<()> {
@ -239,6 +238,15 @@ impl FutexItem {
} }
Ok(()) Ok(())
} }
pub fn waiter(&self) -> &WaiterRef {
&self.waiter
}
pub fn batch_wake(items: &[FutexItem]) {
let waiters: Vec<&WaiterRef> = items.iter().map(|item| item.waiter()).collect();
Waiter::batch_wake(&waiters);
}
} }
struct FutexBucket { struct FutexBucket {
@ -266,19 +274,23 @@ impl FutexBucket {
self.queue.swap_remove_back(item_i.unwrap()) self.queue.swap_remove_back(item_i.unwrap())
} }
// TODO: consider using std::future to improve the readability
pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize { pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize {
let mut count = 0; let mut count = 0;
let mut idx = 0; let mut idx = 0;
let mut items_to_wake = Vec::new();
while count < max_count && idx < self.queue.len() { while count < max_count && idx < self.queue.len() {
if key == self.queue[idx].key { if key == self.queue[idx].key {
if let Some(item) = self.queue.swap_remove_back(idx) { if let Some(item) = self.queue.swap_remove_back(idx) {
item.wake(); items_to_wake.push(item);
count += 1; count += 1;
} }
} else { } else {
idx += 1; idx += 1;
} }
} }
FutexItem::batch_wake(&items_to_wake);
count count
} }
@ -335,7 +347,7 @@ impl FutexBucketVec {
} }
pub fn get_bucket(&self, key: FutexKey) -> (usize, FutexBucketRef) { pub fn get_bucket(&self, key: FutexKey) -> (usize, FutexBucketRef) {
let idx = BUCKET_MASK & { let idx = *BUCKET_MASK & {
// The addr is the multiples of 4, so we ignore the last 2 bits // The addr is the multiples of 4, so we ignore the last 2 bits
let addr = key.addr() >> 2; let addr = key.addr() >> 2;
let mut s = DefaultHasher::new(); let mut s = DefaultHasher::new();
@ -377,10 +389,35 @@ impl Waiter {
} }
pub fn wake(&self) { pub fn wake(&self) {
if self.is_woken.fetch_or(true, Ordering::SeqCst) == false { if self.is_woken().fetch_or(true, Ordering::SeqCst) == false {
set_event(self.thread); set_events(&[self.thread])
} }
} }
pub fn thread(&self) -> *const c_void {
self.thread
}
pub fn is_woken(&self) -> &AtomicBool {
&self.is_woken
}
pub fn batch_wake(waiters: &[&WaiterRef]) {
let threads: Vec<*const c_void> = waiters
.iter()
.filter_map(|waiter| {
// Only wake up items that are not woken.
// Set the item to be woken if it is not woken.
if waiter.is_woken().fetch_or(true, Ordering::SeqCst) == false {
Some(waiter.thread())
} else {
None
}
})
.collect();
set_events(&threads);
}
} }
impl PartialEq for Waiter { impl PartialEq for Waiter {
@ -425,15 +462,24 @@ fn wait_event_timeout(thread: *const c_void, timeout: &Option<timespec_t>) -> Re
Ok(()) Ok(())
} }
fn set_event(thread: *const c_void) { fn set_events(threads: &[*const c_void]) {
if threads.is_empty() {
return;
}
let mut ret: c_int = 0; let mut ret: c_int = 0;
let mut sgx_ret: c_int = 0; let sgx_ret = unsafe {
unsafe { sgx_thread_set_multiple_untrusted_events_ocall(
sgx_ret = sgx_thread_set_untrusted_event_ocall(&mut ret as *mut c_int, thread); &mut ret as *mut c_int,
} threads.as_ptr(),
if ret != 0 || sgx_ret != 0 { threads.len(),
panic!("ERROR: sgx_thread_set_untrusted_event_ocall failed"); )
} };
assert!(
ret == 0 && sgx_ret == 0,
"ERROR: sgx_thread_set_multiple_untrusted_events_ocall failed"
);
} }
extern "C" { extern "C" {
@ -446,7 +492,9 @@ extern "C" {
errno: *mut c_int, errno: *mut c_int,
) -> c_int; ) -> c_int;
/* Wake a thread waiting on its untrusted event */ fn sgx_thread_set_multiple_untrusted_events_ocall(
fn sgx_thread_set_untrusted_event_ocall(ret: *mut c_int, waiter_thread: *const c_void) ret: *mut c_int,
-> c_int; waiters: *const *const c_void,
total: usize,
) -> c_int;
} }

@ -117,7 +117,7 @@ impl Index<usize> for CpuSet {
lazy_static! { lazy_static! {
/// The number of all CPU cores on the platform /// The number of all CPU cores on the platform
static ref NCORES: usize = { pub static ref NCORES: usize = {
extern "C" { extern "C" {
fn occlum_ocall_ncores(ret: *mut i32) -> sgx_status_t; fn occlum_ocall_ncores(ret: *mut i32) -> sgx_status_t;
} }

@ -5,5 +5,6 @@ mod do_sched_yield;
mod sched_agent; mod sched_agent;
mod syscalls; mod syscalls;
pub use cpu_set::NCORES;
pub use sched_agent::SchedAgent; pub use sched_agent::SchedAgent;
pub use syscalls::*; pub use syscalls::*;