From 9b17ac1847322c26f2749ee5d1c4f4e901940a18 Mon Sep 17 00:00:00 2001 From: He Sun Date: Thu, 23 Jul 2020 14:44:35 +0800 Subject: [PATCH] Improve futex performance 1. Enlarge the size of the futex buckets; 2. Wake up the waiting threads in one ocall. --- src/libos/src/process/do_futex.rs | 90 +++++++++++++++++++++++-------- src/libos/src/sched/cpu_set.rs | 2 +- src/libos/src/sched/mod.rs | 1 + 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/src/libos/src/process/do_futex.rs b/src/libos/src/process/do_futex.rs index 93c19083..7fc6c475 100644 --- a/src/libos/src/process/do_futex.rs +++ b/src/libos/src/process/do_futex.rs @@ -187,12 +187,11 @@ pub fn futex_requeue( Ok(nwakes) } -// Make sure futex bucket count is the power of 2 -const BUCKET_COUNT: usize = 1 << 8; -const BUCKET_MASK: usize = BUCKET_COUNT - 1; - lazy_static! { - static ref FUTEX_BUCKETS: FutexBucketVec = { FutexBucketVec::new(BUCKET_COUNT) }; + // Use the same count as linux kernel to keep the same performance + static ref BUCKET_COUNT: usize = ((1 << 8) * (*crate::sched::NCORES)).next_power_of_two(); + static ref BUCKET_MASK: usize = *BUCKET_COUNT - 1; + static ref FUTEX_BUCKETS: FutexBucketVec = { FutexBucketVec::new(*BUCKET_COUNT) }; } #[derive(PartialEq, Copy, Clone)] @@ -227,7 +226,7 @@ impl FutexItem { } pub fn wake(&self) { - self.waiter.wake() + self.waiter().wake() } pub fn wait(&self, timeout: &Option) -> Result<()> { @@ -239,6 +238,15 @@ impl FutexItem { } Ok(()) } + + pub fn waiter(&self) -> &WaiterRef { + &self.waiter + } + + pub fn batch_wake(items: &[FutexItem]) { + let waiters: Vec<&WaiterRef> = items.iter().map(|item| item.waiter()).collect(); + Waiter::batch_wake(&waiters); + } } struct FutexBucket { @@ -266,19 +274,23 @@ impl FutexBucket { self.queue.swap_remove_back(item_i.unwrap()) } + // TODO: consider using std::future to improve the readability pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize { let mut count = 0; let mut idx = 0; + let mut items_to_wake = Vec::new(); while count < max_count && idx < self.queue.len() { if key == self.queue[idx].key { if let Some(item) = self.queue.swap_remove_back(idx) { - item.wake(); + items_to_wake.push(item); count += 1; } } else { idx += 1; } } + + FutexItem::batch_wake(&items_to_wake); count } @@ -335,7 +347,7 @@ impl FutexBucketVec { } pub fn get_bucket(&self, key: FutexKey) -> (usize, FutexBucketRef) { - let idx = BUCKET_MASK & { + let idx = *BUCKET_MASK & { // The addr is the multiples of 4, so we ignore the last 2 bits let addr = key.addr() >> 2; let mut s = DefaultHasher::new(); @@ -377,10 +389,35 @@ impl Waiter { } pub fn wake(&self) { - if self.is_woken.fetch_or(true, Ordering::SeqCst) == false { - set_event(self.thread); + if self.is_woken().fetch_or(true, Ordering::SeqCst) == false { + set_events(&[self.thread]) } } + + pub fn thread(&self) -> *const c_void { + self.thread + } + + pub fn is_woken(&self) -> &AtomicBool { + &self.is_woken + } + + pub fn batch_wake(waiters: &[&WaiterRef]) { + let threads: Vec<*const c_void> = waiters + .iter() + .filter_map(|waiter| { + // Only wake up items that are not woken. + // Set the item to be woken if it is not woken. + if waiter.is_woken().fetch_or(true, Ordering::SeqCst) == false { + Some(waiter.thread()) + } else { + None + } + }) + .collect(); + + set_events(&threads); + } } impl PartialEq for Waiter { @@ -425,15 +462,24 @@ fn wait_event_timeout(thread: *const c_void, timeout: &Option) -> Re Ok(()) } -fn set_event(thread: *const c_void) { +fn set_events(threads: &[*const c_void]) { + if threads.is_empty() { + return; + } + let mut ret: c_int = 0; - let mut sgx_ret: c_int = 0; - unsafe { - sgx_ret = sgx_thread_set_untrusted_event_ocall(&mut ret as *mut c_int, thread); - } - if ret != 0 || sgx_ret != 0 { - panic!("ERROR: sgx_thread_set_untrusted_event_ocall failed"); - } + let sgx_ret = unsafe { + sgx_thread_set_multiple_untrusted_events_ocall( + &mut ret as *mut c_int, + threads.as_ptr(), + threads.len(), + ) + }; + + assert!( + ret == 0 && sgx_ret == 0, + "ERROR: sgx_thread_set_multiple_untrusted_events_ocall failed" + ); } extern "C" { @@ -446,7 +492,9 @@ extern "C" { errno: *mut c_int, ) -> c_int; - /* Wake a thread waiting on its untrusted event */ - fn sgx_thread_set_untrusted_event_ocall(ret: *mut c_int, waiter_thread: *const c_void) - -> c_int; + fn sgx_thread_set_multiple_untrusted_events_ocall( + ret: *mut c_int, + waiters: *const *const c_void, + total: usize, + ) -> c_int; } diff --git a/src/libos/src/sched/cpu_set.rs b/src/libos/src/sched/cpu_set.rs index 9a9ffcc4..6314928a 100644 --- a/src/libos/src/sched/cpu_set.rs +++ b/src/libos/src/sched/cpu_set.rs @@ -117,7 +117,7 @@ impl Index for CpuSet { lazy_static! { /// The number of all CPU cores on the platform - static ref NCORES: usize = { + pub static ref NCORES: usize = { extern "C" { fn occlum_ocall_ncores(ret: *mut i32) -> sgx_status_t; } diff --git a/src/libos/src/sched/mod.rs b/src/libos/src/sched/mod.rs index 7abd2970..466a7cec 100644 --- a/src/libos/src/sched/mod.rs +++ b/src/libos/src/sched/mod.rs @@ -5,5 +5,6 @@ mod do_sched_yield; mod sched_agent; mod syscalls; +pub use cpu_set::NCORES; pub use sched_agent::SchedAgent; pub use syscalls::*;