From 44eb5ca3fe124def4ef9dc7668db1869c5cd184e Mon Sep 17 00:00:00 2001 From: ClawSeven Date: Mon, 29 Apr 2024 18:07:02 +0800 Subject: [PATCH] [libos] Implement edge/level triggering waiter and poller --- src/libos/src/events/mod.rs | 4 +- src/libos/src/events/poller.rs | 210 +++++++++++++++++++ src/libos/src/events/waiter.rs | 212 -------------------- src/libos/src/events/waiter/edge.rs | 85 ++++++++ src/libos/src/events/waiter/level.rs | 68 +++++++ src/libos/src/events/waiter/mod.rs | 113 +++++++++++ src/libos/src/events/waiter/synchronizer.rs | 31 +++ src/libos/src/events/waiter_queue.rs | 15 +- 8 files changed, 517 insertions(+), 221 deletions(-) create mode 100644 src/libos/src/events/poller.rs delete mode 100644 src/libos/src/events/waiter.rs create mode 100644 src/libos/src/events/waiter/edge.rs create mode 100644 src/libos/src/events/waiter/level.rs create mode 100644 src/libos/src/events/waiter/mod.rs create mode 100644 src/libos/src/events/waiter/synchronizer.rs diff --git a/src/libos/src/events/mod.rs b/src/libos/src/events/mod.rs index 099ca50c..9144d4df 100644 --- a/src/libos/src/events/mod.rs +++ b/src/libos/src/events/mod.rs @@ -19,6 +19,7 @@ mod event; mod host_event_fd; mod notifier; mod observer; +mod poller; mod waiter; mod waiter_queue; mod waiter_queue_observer; @@ -27,6 +28,7 @@ pub use self::event::{Event, EventFilter}; pub use self::host_event_fd::HostEventFd; pub use self::notifier::Notifier; pub use self::observer::Observer; -pub use self::waiter::{Waiter, Waker}; +pub use self::poller::{Pollee, Poller}; +pub use self::waiter::{EdgeSync, LevelSync, Synchronizer, Waiter, Waker}; pub use self::waiter_queue::WaiterQueue; pub use self::waiter_queue_observer::WaiterQueueObserver; diff --git a/src/libos/src/events/poller.rs b/src/libos/src/events/poller.rs new file mode 100644 index 00000000..8e3622ad --- /dev/null +++ b/src/libos/src/events/poller.rs @@ -0,0 +1,210 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Weak; +use std::time::Duration; + +use super::{EdgeSync, Notifier, Observer, Waiter}; +use crate::fs::{IoEvents, IoNotifier}; +use crate::prelude::*; + +/// A pollee maintains a set of active events, which can be polled with +/// pollers or be monitored with observers. +pub struct Pollee { + inner: Arc, +} + +struct PolleeInner { + // A table that maintains all interesting pollers + pollers: IoNotifier, + // For efficient manipulation, we use AtomicU32 instead of Atomic + events: AtomicU32, +} + +impl Pollee { + /// Creates a new instance of pollee. + pub fn new(init_events: IoEvents) -> Self { + let inner = PolleeInner { + pollers: Notifier::new(), + events: AtomicU32::new(init_events.bits()), + }; + Self { + inner: Arc::new(inner), + } + } + + pub fn notifier(&self) -> &IoNotifier { + &self.inner.pollers + } + + /// Returns the current events of the pollee given an event mask. + /// + /// If no interesting events are polled and a poller is provided, then + /// the poller will start monitoring the pollee and receive event + /// notification once the pollee gets any interesting events. + /// + /// This operation is _atomic_ in the sense that either some interesting + /// events are returned or the poller is registered (if a poller is provided). + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + let mask = mask | IoEvents::ALWAYS_POLL; + + // Fast path: return events immediately + if poller.is_none() { + let revents = self.events() & mask; + return revents; + } + + // Slow path: connect the pollee with the poller + self.connect_poller(mask, poller.unwrap()); + + // It is important to check events again to handle race conditions + self.events() & mask + } + + pub fn connect_poller(&self, mask: IoEvents, poller: &Poller) { + self.register_observer(poller.observer(), mask); + + let mut pollees = poller.inner.pollees.lock(); + pollees.push(Arc::downgrade(&self.inner).into()); + } + + /// Add some events to the pollee's state. + /// + /// This method wakes up all registered pollers that are interested in + /// the added events. + pub fn add_events(&self, events: IoEvents) { + self.inner.events.fetch_or(events.bits(), Ordering::Release); + self.inner.pollers.broadcast(&events); + } + + /// Remove some events from the pollee's state. + /// + /// This method will not wake up registered pollers even when + /// the pollee still has some interesting events to the pollers. + pub fn del_events(&self, events: IoEvents) { + self.inner + .events + .fetch_and(!events.bits(), Ordering::Release); + } + + /// Reset the pollee's state. + /// + /// Reset means removing all events on the pollee. + pub fn reset_events(&self) { + self.inner + .events + .fetch_and(!IoEvents::all().bits(), Ordering::Release); + } + + /// Register an event observer. + /// + /// A registered observer will get notified (through its `on_events` method) + /// every time new events specified by the `masks` argument happen on the + /// pollee (through the `add_events` method). + /// + /// If the given observer has already been registered, then its registered + /// event mask will be updated. + /// + /// Note that the observer will always get notified of the events in + /// `Events::ALWAYS_POLL` regardless of the value of `masks`. + /// + /// # Memory leakage + /// + /// Since an `Arc` for each observer is kept internally by a pollee, + /// it is important for the user to call the `unregister_observer` method + /// when the observer is no longer interested in the pollee. Otherwise, + /// the observer will not be dropped. + pub fn register_observer(&self, observer: Weak>, mask: IoEvents) { + let mask = mask | IoEvents::ALWAYS_POLL; + self.inner.pollers.register(observer, Some(mask), None) + } + + /// Unregister an event observer. + /// + /// If such an observer is found, then the registered observer will be + /// removed from the pollee and returned as the return value. Otherwise, + /// a `None` will be returned. + pub fn unregister_observer(&self, observer: &Weak>) { + self.inner.pollers.unregister(observer) + } + + fn events(&self) -> IoEvents { + let event_bits = self.inner.events.load(Ordering::Relaxed); + unsafe { IoEvents::from_bits_unchecked(event_bits) } + } +} + +impl std::fmt::Debug for Pollee { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Pollee") + .field("events", &self.events()) + .field("pollers", &"..") + .finish() + } +} + +/// A poller gets notified when its associated pollees have interesting events. +pub struct Poller { + inner: Arc, +} + +struct PollerInner { + // Use event counter to wait or wake up a poller + waiter: Waiter, + // All pollees that are interesting to this poller + pollees: Mutex>>, +} + +unsafe impl Send for PollerInner {} +unsafe impl Sync for PollerInner {} + +impl Poller { + /// Constructs a new `Poller`. + pub fn new() -> Self { + let inner = PollerInner { + waiter: Waiter::::new(), + pollees: Mutex::new(Vec::new()), + }; + Self { + inner: Arc::new(inner), + } + } + + /// Wait until there are any interesting events happen since last `wait`. + pub fn wait(&self) -> Result<()> { + self.inner.waiter.wait(None) + } + + /// Wait until there are any interesting events happen since last `wait`, or reach timeout. + pub fn wait_timeout(&self, timeout: Option<&mut Duration>) -> Result<()> { + self.inner.waiter.wait_mut(timeout) + } + + pub fn observer(&self) -> Weak> { + Arc::downgrade(&self.inner) as _ + } +} + +impl Observer for PollerInner { + fn on_event( + &self, + _event: &IoEvents, + _metadata: &Option>, + ) -> () { + self.waiter.waker().wake(); + } +} + +impl Drop for Poller { + fn drop(&mut self) { + let mut pollees = self.inner.pollees.lock(); + if pollees.len() == 0 { + return; + } + + let self_observer = self.observer(); + for weak_pollee in pollees.drain(..) { + if let Some(pollee) = weak_pollee.upgrade() { + pollee.pollers.unregister(&self_observer); + } + } + } +} diff --git a/src/libos/src/events/waiter.rs b/src/libos/src/events/waiter.rs deleted file mode 100644 index 8b9ff738..00000000 --- a/src/libos/src/events/waiter.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Weak; -use std::time::Duration; - -use super::host_event_fd::HostEventFd; -use crate::prelude::*; - -/// A waiter enables a thread to sleep. -pub struct Waiter { - inner: Arc, -} - -impl Waiter { - /// Create a waiter for the current thread. - /// - /// A `Waiter` is bound to the curent thread that creates it: it cannot be - /// sent to or used by any other threads as the type implements `!Send` and - /// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep. - pub fn new() -> Self { - Self { - inner: Arc::new(Inner::new()), - } - } - - /// Return whether a waiter has been waken up. - /// - /// Once a waiter is waken up, the `wait` or `wait_mut` method becomes - /// non-blocking. - pub fn is_woken(&self) -> bool { - self.inner.is_woken() - } - - /// Reset a waiter. - /// - /// After a `Waiter` being waken up, the `reset` method must be called so - /// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the - /// current thread again. - pub fn reset(&self) { - self.inner.reset(); - } - - /// Put the current thread to sleep until being waken up by a waker. - /// - /// The method has three possible return values: - /// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`; - /// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout. - /// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal. - /// - /// If the `timeout` argument is `None`, then the second case won't happen, - /// i.e., the method will block indefinitely. - pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { - self.inner.wait(timeout) - } - - /// Put the current thread to sleep until being waken up by a waker. - /// - /// This method is similar to the `wait` method except that the `timeout` - /// argument will be updated to reflect the remaining timeout. - pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { - self.inner.wait_mut(timeout) - } - - /// Create a waker that can wake up this waiter. - /// - /// `WaiterQueue` maintains a list of `Waker` internally to wake up the - /// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method - /// does not need to be called manually. - pub fn waker(&self) -> Waker { - Waker { - inner: Arc::downgrade(&self.inner), - } - } - - /// Expose the internal host eventfd. - /// - /// This host eventfd should be used by an external user carefully. - pub fn host_eventfd(&self) -> &HostEventFd { - self.inner.host_eventfd() - } -} - -impl !Send for Waiter {} -impl !Sync for Waiter {} - -/// A waker can wake up the thread that its waiter has put to sleep. -pub struct Waker { - inner: Weak, -} - -impl Waker { - /// Wake up the waiter that creates this waker. - pub fn wake(&self) { - if let Some(inner) = self.inner.upgrade() { - inner.wake() - } - } - - /// Wake up waiters in batch, more efficient than waking up one-by-one. - pub fn batch_wake<'a, I: Iterator>(iter: I) { - Inner::batch_wake(iter); - } -} - -/// Instruction rearrangement about control dependency -/// -/// Such as the following code: -/// fn function(flag: bool, a: i32, b: i32) { -/// if flag { // 1 -/// let i = a * b; // 2 -/// } -/// } -/// -/// Guidelines for compilation optimization:without changing the single-threaded semantics -/// of the program, the execution order of statements can be rearranged. There is a control -/// dependency between flag and i. When the instruction is reordered, step 2 will write the -/// result value to the hardware cache, and when judged to be true, the result value will be -/// written to the variable i. Therefore, controlling dependency does not prevent compiler -/// optimizations -/// -/// Note about memory ordering: -/// Here is_woken needs to be synchronized with host_eventfd. The read operation of -/// is_woken needs to see the change of the host_eventfd field. Just `Acquire` or -/// `Release` needs to be used to make all the change of the host_eventfd visible to us. -/// -/// The ordering in CAS operations can be `Relaxed`, `Acquire`, `AcqRel` or `SeqCst`, -/// The key is to consider the specific usage scenario. Here fail does not synchronize other -/// variables in the CAS operation, which can use `Relaxed`, and the host_enent needs -/// to be synchronized in success, so `Acquire` needs to be used so that we can see all the -/// changes in the host_eventfd after that. -/// -/// Although it is correct to use AcqRel, here I think it is okay to use Acquire, because -/// you don't need to synchronize host_event before is_woken, only later. -struct Inner { - is_woken: AtomicBool, - host_eventfd: Arc, -} - -impl Inner { - pub fn new() -> Self { - let is_woken = AtomicBool::new(false); - let host_eventfd = current!().host_eventfd().clone(); - Self { - is_woken, - host_eventfd, - } - } - - pub fn is_woken(&self) -> bool { - self.is_woken.load(Ordering::Acquire) - } - - pub fn reset(&self) { - self.is_woken.store(false, Ordering::Release); - } - - pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { - while !self.is_woken() { - self.host_eventfd.poll(timeout)?; - } - Ok(()) - } - - pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { - let mut remain = timeout.as_ref().map(|d| **d); - - // Need to change timeout from `Option<&mut Duration>` to `&mut Option` - // so that the Rust compiler is happy about using the variable in a loop. - let ret = self.do_wait_mut(&mut remain); - - if let Some(timeout) = timeout { - *timeout = remain.unwrap(); - } - ret - } - - fn do_wait_mut(&self, remain: &mut Option) -> Result<()> { - while !self.is_woken() { - self.host_eventfd.poll_mut(remain.as_mut())?; - } - Ok(()) - } - - pub fn wake(&self) { - if self - .is_woken - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - self.host_eventfd.write_u64(1); - } - } - - pub fn batch_wake<'a, I: Iterator>(iter: I) { - let host_eventfds = iter - .filter_map(|waker| waker.inner.upgrade()) - .filter(|inner| { - inner - .is_woken - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - }) - .map(|inner| inner.host_eventfd.host_fd()) - .collect::>(); - unsafe { - HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1); - } - } - - pub fn host_eventfd(&self) -> &HostEventFd { - &self.host_eventfd - } -} diff --git a/src/libos/src/events/waiter/edge.rs b/src/libos/src/events/waiter/edge.rs new file mode 100644 index 00000000..41e94bbd --- /dev/null +++ b/src/libos/src/events/waiter/edge.rs @@ -0,0 +1,85 @@ +use atomic::Ordering; +use std::{sync::atomic::AtomicU32, time::Duration}; + +use super::{HostEventFd, Synchronizer}; +use crate::prelude::*; + +const WAIT: u32 = u32::MAX; +const INIT: u32 = 0; +const NOTIFIED: u32 = 1; + +pub struct EdgeSync { + state: AtomicU32, + host_eventfd: Arc, +} + +impl Synchronizer for EdgeSync { + fn new() -> Self { + Self { + state: AtomicU32::new(INIT), + host_eventfd: current!().host_eventfd().clone(), + } + } + + fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + if self.state.fetch_sub(1, Ordering::Acquire) == NOTIFIED { + return Ok(()); + } + loop { + self.host_eventfd.poll(timeout)?; + if self + .state + .compare_exchange(NOTIFIED, INIT, Ordering::Acquire, Ordering::Acquire) + .is_ok() + { + return Ok(()); + } else { + // Spurious wake up. We loop to try again. + } + } + } + + fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + if self.state.fetch_sub(1, Ordering::Acquire) == NOTIFIED { + return Ok(()); + } + let mut remain = timeout.as_ref().map(|d| **d); + // Need to change timeout from `Option<&mut Duration>` to `&mut Option` + // so that the Rust compiler is happy about using the variable in a loop. + let ret = self.host_eventfd.poll_mut(remain.as_mut()); + // Wait for something to happen, assuming it's still set to PARKED. + // futex_wait(&self.state, PARKED, Some(timeout)); + // This is not just a store, because we need to establish a + // release-acquire ordering with unpark(). + if self.state.swap(INIT, Ordering::Acquire) == NOTIFIED { + // Woke up because of unpark(). + } else { + // Timeout or spurious wake up. + // We return either way, because we can't easily tell if it was the + // timeout or not. + } + if let Some(timeout) = timeout { + *timeout = remain.unwrap(); + } + ret + } + + fn reset(&self) { + // do nothing for edge trigger + () + } + + fn wake(&self) { + if self.wake_cond() { + self.host_eventfd.write_u64(1); + } + } + + fn host_eventfd(&self) -> &HostEventFd { + &self.host_eventfd + } + + fn wake_cond(&self) -> bool { + self.state.swap(NOTIFIED, Ordering::Release) == WAIT + } +} diff --git a/src/libos/src/events/waiter/level.rs b/src/libos/src/events/waiter/level.rs new file mode 100644 index 00000000..8600f05d --- /dev/null +++ b/src/libos/src/events/waiter/level.rs @@ -0,0 +1,68 @@ +use atomic::Ordering; +use std::{sync::atomic::AtomicBool, time::Duration}; + +use super::{HostEventFd, Synchronizer}; +use crate::prelude::*; + +pub struct LevelSync { + is_woken: AtomicBool, + host_eventfd: Arc, +} + +impl Synchronizer for LevelSync { + fn new() -> Self { + Self { + is_woken: AtomicBool::new(false), + host_eventfd: current!().host_eventfd().clone(), + } + } + + fn reset(&self) { + self.is_woken.store(false, Ordering::Release); + } + + fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + while !self.is_woken() { + self.host_eventfd.poll(timeout)?; + } + Ok(()) + } + + fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + let mut remain = timeout.as_ref().map(|d| **d); + // Need to change timeout from `Option<&mut Duration>` to `&mut Option` + // so that the Rust compiler is happy about using the variable in a loop. + + while !self.is_woken() { + self.host_eventfd.poll_mut(remain.as_mut())?; + } + + if let Some(timeout) = timeout { + *timeout = remain.unwrap(); + } + Ok(()) + } + + fn wake(&self) { + if self.wake_cond() { + self.host_eventfd.write_u64(1); + } + } + + fn wake_cond(&self) -> bool { + self.is_woken + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + fn host_eventfd(&self) -> &HostEventFd { + &self.host_eventfd + } +} + +impl LevelSync { + #[inline(always)] + fn is_woken(&self) -> bool { + self.is_woken.load(Ordering::Acquire) + } +} diff --git a/src/libos/src/events/waiter/mod.rs b/src/libos/src/events/waiter/mod.rs new file mode 100644 index 00000000..2ad083a2 --- /dev/null +++ b/src/libos/src/events/waiter/mod.rs @@ -0,0 +1,113 @@ +mod edge; +mod level; +mod synchronizer; + +pub use self::edge::EdgeSync; +pub use self::level::LevelSync; +pub use self::synchronizer::Synchronizer; + +use super::HostEventFd; +use crate::prelude::*; +use std::{sync::Weak, time::Duration}; + +/// A waiter enables a thread to sleep. +pub struct Waiter +where + Sync: Synchronizer, +{ + inner: Arc, +} + +impl Waiter { + /// Create a waiter for the current thread. + /// + /// A `Waiter` is bound to the curent thread that creates it: it cannot be + /// sent to or used by any other threads as the type implements `!Send` and + /// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep. + pub fn new() -> Self { + Self { + inner: Arc::new(Sync::new()), + } + } + + /// Reset a waiter. + /// + /// After a `Waiter` being waken up, the `reset` method must be called so + /// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the + /// current thread again. + pub fn reset(&self) { + self.inner.reset(); + } + + /// Put the current thread to sleep until being waken up by a waker. + /// + /// The method has three possible return values: + /// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`; + /// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout. + /// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal. + /// + /// If the `timeout` argument is `None`, then the second case won't happen, + /// i.e., the method will block indefinitely. + pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + self.inner.wait(timeout) + } + + /// Put the current thread to sleep until being waken up by a waker. + /// + /// This method is similar to the `wait` method except that the `timeout` + /// argument will be updated to reflect the remaining timeout. + pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + self.inner.wait_mut(timeout) + } + + /// Create a waker that can wake up this waiter. + /// + /// `WaiterQueue` maintains a list of `Waker` internally to wake up the + /// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method + /// does not need to be called manually. + pub fn waker(&self) -> Waker { + Waker { + inner: Arc::downgrade(&self.inner), + } + } + + /// Expose the internal host eventfd. + /// + /// This host eventfd should be used by an external user carefully. + pub fn host_eventfd(&self) -> &HostEventFd { + self.inner.host_eventfd() + } +} + +impl !Send for Waiter {} +impl !Sync for Waiter {} + +/// A waker can wake up the thread that its waiter has put to sleep. +pub struct Waker +where + S: Synchronizer, +{ + inner: Weak, +} + +impl Waker { + /// Wake up the waiter that creates this waker. + pub fn wake(&self) { + if let Some(inner) = self.inner.upgrade() { + inner.wake() + } + } + + /// Wake up waiters in batch, more efficient than waking up one-by-one. + pub fn batch_wake<'a, W: 'a + Synchronizer, I: Iterator>>(iter: I) { + let host_eventfds = iter + .filter_map(|waker| waker.inner.upgrade()) + .filter(|inner| inner.wake_cond()) + .map(|inner| inner.host_eventfd().host_fd()) + .collect::>(); + + unsafe { + HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1); + } + } +} diff --git a/src/libos/src/events/waiter/synchronizer.rs b/src/libos/src/events/waiter/synchronizer.rs new file mode 100644 index 00000000..3b74c9f5 --- /dev/null +++ b/src/libos/src/events/waiter/synchronizer.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use super::HostEventFd; +use crate::prelude::*; + +/// This trait abstracts over the synchronization mechanism to allow for different implementations that can +/// interact with the host's file descriptor based event notification mechanisms or other kinds of notification facilities. + +pub trait Synchronizer { + /// Creates and returns a new instance of a synchronization primitive. + fn new() -> Self; + + /// Resets the synchronization primitive state. + fn reset(&self); + + /// Waits for the synchronization event to occur until an optional `timeout` duration has elapsed. + fn wait(&self, timeout: Option<&Duration>) -> Result<()>; + + /// Similar to `wait` but allows a mutable `timeout` parameter that can be adjusted to reflect the remaining + /// time for the wait operation. + fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()>; + + /// Wakes one or more threads waiting on this synchronization primitive + fn wake(&self); + + /// Returns a reference to the `host_eventfd`, an object tied to a file descriptor used for event notifications. + fn host_eventfd(&self) -> &HostEventFd; + + /// Determines the condition under which a wake event should be triggered. + fn wake_cond(&self) -> bool; +} diff --git a/src/libos/src/events/waiter_queue.rs b/src/libos/src/events/waiter_queue.rs index 48adcfe3..8cb068db 100644 --- a/src/libos/src/events/waiter_queue.rs +++ b/src/libos/src/events/waiter_queue.rs @@ -1,8 +1,7 @@ use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; -use super::{Waiter, Waker}; +use super::{LevelSync, Synchronizer, Waiter, Waker}; use crate::prelude::*; /// A queue for waiters. @@ -26,12 +25,12 @@ use crate::prelude::*; /// In this code snippet, the count variable is synchronized with the wakers field. /// In this case, we only need to ensure that waker.lock() occurs before count. /// Although it is safer to use AcqRel,here using `Release` would be enough. -pub struct WaiterQueue { +pub struct WaiterQueue { count: AtomicUsize, - wakers: SgxMutex>, + wakers: SgxMutex>>, } -impl WaiterQueue { +impl WaiterQueue { /// Creates an empty queue for `Waiter`s. pub fn new() -> Self { Self { @@ -52,7 +51,7 @@ impl WaiterQueue { /// It is allowed to enqueue a waiter more than once before it is dequeued. /// But this is usually not a good idea. It is the callers' responsibility /// to use the API properly. - pub fn reset_and_enqueue(&self, waiter: &Waiter) { + pub fn reset_and_enqueue(&self, waiter: &Waiter) { waiter.reset(); let mut wakers = self.wakers.lock().unwrap(); @@ -81,13 +80,13 @@ impl WaiterQueue { let to_wake = { let mut wakers = self.wakers.lock().unwrap(); let max_count = max_count.min(wakers.len()); - let to_wake: Vec = wakers.drain(..max_count).collect(); + let to_wake: Vec> = wakers.drain(..max_count).collect(); self.count.fetch_sub(to_wake.len(), Ordering::Release); to_wake }; // Wake in batch - Waker::batch_wake(to_wake.iter()); + Waker::::batch_wake(to_wake.iter()); to_wake.len() } }