[libos] Implement edge/level triggering waiter and poller

This commit is contained in:
ClawSeven 2024-04-29 18:07:02 +08:00 committed by volcano
parent bfc97f5ba2
commit 44eb5ca3fe
8 changed files with 517 additions and 221 deletions

@ -19,6 +19,7 @@ mod event;
mod host_event_fd; mod host_event_fd;
mod notifier; mod notifier;
mod observer; mod observer;
mod poller;
mod waiter; mod waiter;
mod waiter_queue; mod waiter_queue;
mod waiter_queue_observer; mod waiter_queue_observer;
@ -27,6 +28,7 @@ pub use self::event::{Event, EventFilter};
pub use self::host_event_fd::HostEventFd; pub use self::host_event_fd::HostEventFd;
pub use self::notifier::Notifier; pub use self::notifier::Notifier;
pub use self::observer::Observer; pub use self::observer::Observer;
pub use self::waiter::{Waiter, Waker}; pub use self::poller::{Pollee, Poller};
pub use self::waiter::{EdgeSync, LevelSync, Synchronizer, Waiter, Waker};
pub use self::waiter_queue::WaiterQueue; pub use self::waiter_queue::WaiterQueue;
pub use self::waiter_queue_observer::WaiterQueueObserver; pub use self::waiter_queue_observer::WaiterQueueObserver;

@ -0,0 +1,210 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Weak;
use std::time::Duration;
use super::{EdgeSync, Notifier, Observer, Waiter};
use crate::fs::{IoEvents, IoNotifier};
use crate::prelude::*;
/// A pollee maintains a set of active events, which can be polled with
/// pollers or be monitored with observers.
pub struct Pollee {
inner: Arc<PolleeInner>,
}
struct PolleeInner {
// A table that maintains all interesting pollers
pollers: IoNotifier,
// For efficient manipulation, we use AtomicU32 instead of Atomic<Events>
events: AtomicU32,
}
impl Pollee {
/// Creates a new instance of pollee.
pub fn new(init_events: IoEvents) -> Self {
let inner = PolleeInner {
pollers: Notifier::new(),
events: AtomicU32::new(init_events.bits()),
};
Self {
inner: Arc::new(inner),
}
}
pub fn notifier(&self) -> &IoNotifier {
&self.inner.pollers
}
/// Returns the current events of the pollee given an event mask.
///
/// If no interesting events are polled and a poller is provided, then
/// the poller will start monitoring the pollee and receive event
/// notification once the pollee gets any interesting events.
///
/// This operation is _atomic_ in the sense that either some interesting
/// events are returned or the poller is registered (if a poller is provided).
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
let mask = mask | IoEvents::ALWAYS_POLL;
// Fast path: return events immediately
if poller.is_none() {
let revents = self.events() & mask;
return revents;
}
// Slow path: connect the pollee with the poller
self.connect_poller(mask, poller.unwrap());
// It is important to check events again to handle race conditions
self.events() & mask
}
pub fn connect_poller(&self, mask: IoEvents, poller: &Poller) {
self.register_observer(poller.observer(), mask);
let mut pollees = poller.inner.pollees.lock();
pollees.push(Arc::downgrade(&self.inner).into());
}
/// Add some events to the pollee's state.
///
/// This method wakes up all registered pollers that are interested in
/// the added events.
pub fn add_events(&self, events: IoEvents) {
self.inner.events.fetch_or(events.bits(), Ordering::Release);
self.inner.pollers.broadcast(&events);
}
/// Remove some events from the pollee's state.
///
/// This method will not wake up registered pollers even when
/// the pollee still has some interesting events to the pollers.
pub fn del_events(&self, events: IoEvents) {
self.inner
.events
.fetch_and(!events.bits(), Ordering::Release);
}
/// Reset the pollee's state.
///
/// Reset means removing all events on the pollee.
pub fn reset_events(&self) {
self.inner
.events
.fetch_and(!IoEvents::all().bits(), Ordering::Release);
}
/// Register an event observer.
///
/// A registered observer will get notified (through its `on_events` method)
/// every time new events specified by the `masks` argument happen on the
/// pollee (through the `add_events` method).
///
/// If the given observer has already been registered, then its registered
/// event mask will be updated.
///
/// Note that the observer will always get notified of the events in
/// `Events::ALWAYS_POLL` regardless of the value of `masks`.
///
/// # Memory leakage
///
/// Since an `Arc` for each observer is kept internally by a pollee,
/// it is important for the user to call the `unregister_observer` method
/// when the observer is no longer interested in the pollee. Otherwise,
/// the observer will not be dropped.
pub fn register_observer(&self, observer: Weak<dyn Observer<IoEvents>>, mask: IoEvents) {
let mask = mask | IoEvents::ALWAYS_POLL;
self.inner.pollers.register(observer, Some(mask), None)
}
/// Unregister an event observer.
///
/// If such an observer is found, then the registered observer will be
/// removed from the pollee and returned as the return value. Otherwise,
/// a `None` will be returned.
pub fn unregister_observer(&self, observer: &Weak<dyn Observer<IoEvents>>) {
self.inner.pollers.unregister(observer)
}
fn events(&self) -> IoEvents {
let event_bits = self.inner.events.load(Ordering::Relaxed);
unsafe { IoEvents::from_bits_unchecked(event_bits) }
}
}
impl std::fmt::Debug for Pollee {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pollee")
.field("events", &self.events())
.field("pollers", &"..")
.finish()
}
}
/// A poller gets notified when its associated pollees have interesting events.
pub struct Poller {
inner: Arc<PollerInner>,
}
struct PollerInner {
// Use event counter to wait or wake up a poller
waiter: Waiter<EdgeSync>,
// All pollees that are interesting to this poller
pollees: Mutex<Vec<Weak<PolleeInner>>>,
}
unsafe impl Send for PollerInner {}
unsafe impl Sync for PollerInner {}
impl Poller {
/// Constructs a new `Poller`.
pub fn new() -> Self {
let inner = PollerInner {
waiter: Waiter::<EdgeSync>::new(),
pollees: Mutex::new(Vec::new()),
};
Self {
inner: Arc::new(inner),
}
}
/// Wait until there are any interesting events happen since last `wait`.
pub fn wait(&self) -> Result<()> {
self.inner.waiter.wait(None)
}
/// Wait until there are any interesting events happen since last `wait`, or reach timeout.
pub fn wait_timeout(&self, timeout: Option<&mut Duration>) -> Result<()> {
self.inner.waiter.wait_mut(timeout)
}
pub fn observer(&self) -> Weak<dyn Observer<IoEvents>> {
Arc::downgrade(&self.inner) as _
}
}
impl Observer<IoEvents> for PollerInner {
fn on_event(
&self,
_event: &IoEvents,
_metadata: &Option<Weak<dyn core::any::Any + Send + Sync>>,
) -> () {
self.waiter.waker().wake();
}
}
impl Drop for Poller {
fn drop(&mut self) {
let mut pollees = self.inner.pollees.lock();
if pollees.len() == 0 {
return;
}
let self_observer = self.observer();
for weak_pollee in pollees.drain(..) {
if let Some(pollee) = weak_pollee.upgrade() {
pollee.pollers.unregister(&self_observer);
}
}
}
}

@ -1,212 +0,0 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Weak;
use std::time::Duration;
use super::host_event_fd::HostEventFd;
use crate::prelude::*;
/// A waiter enables a thread to sleep.
pub struct Waiter {
inner: Arc<Inner>,
}
impl Waiter {
/// Create a waiter for the current thread.
///
/// A `Waiter` is bound to the curent thread that creates it: it cannot be
/// sent to or used by any other threads as the type implements `!Send` and
/// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep.
pub fn new() -> Self {
Self {
inner: Arc::new(Inner::new()),
}
}
/// Return whether a waiter has been waken up.
///
/// Once a waiter is waken up, the `wait` or `wait_mut` method becomes
/// non-blocking.
pub fn is_woken(&self) -> bool {
self.inner.is_woken()
}
/// Reset a waiter.
///
/// After a `Waiter` being waken up, the `reset` method must be called so
/// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the
/// current thread again.
pub fn reset(&self) {
self.inner.reset();
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// The method has three possible return values:
/// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`;
/// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout.
/// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal.
///
/// If the `timeout` argument is `None`, then the second case won't happen,
/// i.e., the method will block indefinitely.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.wait(timeout)
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// This method is similar to the `wait` method except that the `timeout`
/// argument will be updated to reflect the remaining timeout.
pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
self.inner.wait_mut(timeout)
}
/// Create a waker that can wake up this waiter.
///
/// `WaiterQueue` maintains a list of `Waker` internally to wake up the
/// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method
/// does not need to be called manually.
pub fn waker(&self) -> Waker {
Waker {
inner: Arc::downgrade(&self.inner),
}
}
/// Expose the internal host eventfd.
///
/// This host eventfd should be used by an external user carefully.
pub fn host_eventfd(&self) -> &HostEventFd {
self.inner.host_eventfd()
}
}
impl !Send for Waiter {}
impl !Sync for Waiter {}
/// A waker can wake up the thread that its waiter has put to sleep.
pub struct Waker {
inner: Weak<Inner>,
}
impl Waker {
/// Wake up the waiter that creates this waker.
pub fn wake(&self) {
if let Some(inner) = self.inner.upgrade() {
inner.wake()
}
}
/// Wake up waiters in batch, more efficient than waking up one-by-one.
pub fn batch_wake<'a, I: Iterator<Item = &'a Waker>>(iter: I) {
Inner::batch_wake(iter);
}
}
/// Instruction rearrangement about control dependency
///
/// Such as the following code:
/// fn function(flag: bool, a: i32, b: i32) {
/// if flag { // 1
/// let i = a * b; // 2
/// }
/// }
///
/// Guidelines for compilation optimizationwithout changing the single-threaded semantics
/// of the program, the execution order of statements can be rearranged. There is a control
/// dependency between flag and i. When the instruction is reordered, step 2 will write the
/// result value to the hardware cache, and when judged to be true, the result value will be
/// written to the variable i. Therefore, controlling dependency does not prevent compiler
/// optimizations
///
/// Note about memory ordering:
/// Here is_woken needs to be synchronized with host_eventfd. The read operation of
/// is_woken needs to see the change of the host_eventfd field. Just `Acquire` or
/// `Release` needs to be used to make all the change of the host_eventfd visible to us.
///
/// The ordering in CAS operations can be `Relaxed`, `Acquire`, `AcqRel` or `SeqCst`,
/// The key is to consider the specific usage scenario. Here fail does not synchronize other
/// variables in the CAS operation, which can use `Relaxed`, and the host_enent needs
/// to be synchronized in success, so `Acquire` needs to be used so that we can see all the
/// changes in the host_eventfd after that.
///
/// Although it is correct to use AcqRel, here I think it is okay to use Acquire, because
/// you don't need to synchronize host_event before is_woken, only later.
struct Inner {
is_woken: AtomicBool,
host_eventfd: Arc<HostEventFd>,
}
impl Inner {
pub fn new() -> Self {
let is_woken = AtomicBool::new(false);
let host_eventfd = current!().host_eventfd().clone();
Self {
is_woken,
host_eventfd,
}
}
pub fn is_woken(&self) -> bool {
self.is_woken.load(Ordering::Acquire)
}
pub fn reset(&self) {
self.is_woken.store(false, Ordering::Release);
}
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
while !self.is_woken() {
self.host_eventfd.poll(timeout)?;
}
Ok(())
}
pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
let mut remain = timeout.as_ref().map(|d| **d);
// Need to change timeout from `Option<&mut Duration>` to `&mut Option<Duration>`
// so that the Rust compiler is happy about using the variable in a loop.
let ret = self.do_wait_mut(&mut remain);
if let Some(timeout) = timeout {
*timeout = remain.unwrap();
}
ret
}
fn do_wait_mut(&self, remain: &mut Option<Duration>) -> Result<()> {
while !self.is_woken() {
self.host_eventfd.poll_mut(remain.as_mut())?;
}
Ok(())
}
pub fn wake(&self) {
if self
.is_woken
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
self.host_eventfd.write_u64(1);
}
}
pub fn batch_wake<'a, I: Iterator<Item = &'a Waker>>(iter: I) {
let host_eventfds = iter
.filter_map(|waker| waker.inner.upgrade())
.filter(|inner| {
inner
.is_woken
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
})
.map(|inner| inner.host_eventfd.host_fd())
.collect::<Vec<FileDesc>>();
unsafe {
HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1);
}
}
pub fn host_eventfd(&self) -> &HostEventFd {
&self.host_eventfd
}
}

@ -0,0 +1,85 @@
use atomic::Ordering;
use std::{sync::atomic::AtomicU32, time::Duration};
use super::{HostEventFd, Synchronizer};
use crate::prelude::*;
const WAIT: u32 = u32::MAX;
const INIT: u32 = 0;
const NOTIFIED: u32 = 1;
pub struct EdgeSync {
state: AtomicU32,
host_eventfd: Arc<HostEventFd>,
}
impl Synchronizer for EdgeSync {
fn new() -> Self {
Self {
state: AtomicU32::new(INIT),
host_eventfd: current!().host_eventfd().clone(),
}
}
fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
if self.state.fetch_sub(1, Ordering::Acquire) == NOTIFIED {
return Ok(());
}
loop {
self.host_eventfd.poll(timeout)?;
if self
.state
.compare_exchange(NOTIFIED, INIT, Ordering::Acquire, Ordering::Acquire)
.is_ok()
{
return Ok(());
} else {
// Spurious wake up. We loop to try again.
}
}
}
fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
if self.state.fetch_sub(1, Ordering::Acquire) == NOTIFIED {
return Ok(());
}
let mut remain = timeout.as_ref().map(|d| **d);
// Need to change timeout from `Option<&mut Duration>` to `&mut Option<Duration>`
// so that the Rust compiler is happy about using the variable in a loop.
let ret = self.host_eventfd.poll_mut(remain.as_mut());
// Wait for something to happen, assuming it's still set to PARKED.
// futex_wait(&self.state, PARKED, Some(timeout));
// This is not just a store, because we need to establish a
// release-acquire ordering with unpark().
if self.state.swap(INIT, Ordering::Acquire) == NOTIFIED {
// Woke up because of unpark().
} else {
// Timeout or spurious wake up.
// We return either way, because we can't easily tell if it was the
// timeout or not.
}
if let Some(timeout) = timeout {
*timeout = remain.unwrap();
}
ret
}
fn reset(&self) {
// do nothing for edge trigger
()
}
fn wake(&self) {
if self.wake_cond() {
self.host_eventfd.write_u64(1);
}
}
fn host_eventfd(&self) -> &HostEventFd {
&self.host_eventfd
}
fn wake_cond(&self) -> bool {
self.state.swap(NOTIFIED, Ordering::Release) == WAIT
}
}

@ -0,0 +1,68 @@
use atomic::Ordering;
use std::{sync::atomic::AtomicBool, time::Duration};
use super::{HostEventFd, Synchronizer};
use crate::prelude::*;
pub struct LevelSync {
is_woken: AtomicBool,
host_eventfd: Arc<HostEventFd>,
}
impl Synchronizer for LevelSync {
fn new() -> Self {
Self {
is_woken: AtomicBool::new(false),
host_eventfd: current!().host_eventfd().clone(),
}
}
fn reset(&self) {
self.is_woken.store(false, Ordering::Release);
}
fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
while !self.is_woken() {
self.host_eventfd.poll(timeout)?;
}
Ok(())
}
fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
let mut remain = timeout.as_ref().map(|d| **d);
// Need to change timeout from `Option<&mut Duration>` to `&mut Option<Duration>`
// so that the Rust compiler is happy about using the variable in a loop.
while !self.is_woken() {
self.host_eventfd.poll_mut(remain.as_mut())?;
}
if let Some(timeout) = timeout {
*timeout = remain.unwrap();
}
Ok(())
}
fn wake(&self) {
if self.wake_cond() {
self.host_eventfd.write_u64(1);
}
}
fn wake_cond(&self) -> bool {
self.is_woken
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
fn host_eventfd(&self) -> &HostEventFd {
&self.host_eventfd
}
}
impl LevelSync {
#[inline(always)]
fn is_woken(&self) -> bool {
self.is_woken.load(Ordering::Acquire)
}
}

@ -0,0 +1,113 @@
mod edge;
mod level;
mod synchronizer;
pub use self::edge::EdgeSync;
pub use self::level::LevelSync;
pub use self::synchronizer::Synchronizer;
use super::HostEventFd;
use crate::prelude::*;
use std::{sync::Weak, time::Duration};
/// A waiter enables a thread to sleep.
pub struct Waiter<Sync = LevelSync>
where
Sync: Synchronizer,
{
inner: Arc<Sync>,
}
impl<Sync: Synchronizer> Waiter<Sync> {
/// Create a waiter for the current thread.
///
/// A `Waiter` is bound to the curent thread that creates it: it cannot be
/// sent to or used by any other threads as the type implements `!Send` and
/// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep.
pub fn new() -> Self {
Self {
inner: Arc::new(Sync::new()),
}
}
/// Reset a waiter.
///
/// After a `Waiter` being waken up, the `reset` method must be called so
/// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the
/// current thread again.
pub fn reset(&self) {
self.inner.reset();
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// The method has three possible return values:
/// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`;
/// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout.
/// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal.
///
/// If the `timeout` argument is `None`, then the second case won't happen,
/// i.e., the method will block indefinitely.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.wait(timeout)
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// This method is similar to the `wait` method except that the `timeout`
/// argument will be updated to reflect the remaining timeout.
pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
self.inner.wait_mut(timeout)
}
/// Create a waker that can wake up this waiter.
///
/// `WaiterQueue` maintains a list of `Waker` internally to wake up the
/// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method
/// does not need to be called manually.
pub fn waker(&self) -> Waker<Sync> {
Waker {
inner: Arc::downgrade(&self.inner),
}
}
/// Expose the internal host eventfd.
///
/// This host eventfd should be used by an external user carefully.
pub fn host_eventfd(&self) -> &HostEventFd {
self.inner.host_eventfd()
}
}
impl<S: Synchronizer> !Send for Waiter<S> {}
impl<S: Synchronizer> !Sync for Waiter<S> {}
/// A waker can wake up the thread that its waiter has put to sleep.
pub struct Waker<S = LevelSync>
where
S: Synchronizer,
{
inner: Weak<S>,
}
impl<S: Synchronizer> Waker<S> {
/// Wake up the waiter that creates this waker.
pub fn wake(&self) {
if let Some(inner) = self.inner.upgrade() {
inner.wake()
}
}
/// Wake up waiters in batch, more efficient than waking up one-by-one.
pub fn batch_wake<'a, W: 'a + Synchronizer, I: Iterator<Item = &'a Waker<W>>>(iter: I) {
let host_eventfds = iter
.filter_map(|waker| waker.inner.upgrade())
.filter(|inner| inner.wake_cond())
.map(|inner| inner.host_eventfd().host_fd())
.collect::<Vec<FileDesc>>();
unsafe {
HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1);
}
}
}

@ -0,0 +1,31 @@
use std::time::Duration;
use super::HostEventFd;
use crate::prelude::*;
/// This trait abstracts over the synchronization mechanism to allow for different implementations that can
/// interact with the host's file descriptor based event notification mechanisms or other kinds of notification facilities.
pub trait Synchronizer {
/// Creates and returns a new instance of a synchronization primitive.
fn new() -> Self;
/// Resets the synchronization primitive state.
fn reset(&self);
/// Waits for the synchronization event to occur until an optional `timeout` duration has elapsed.
fn wait(&self, timeout: Option<&Duration>) -> Result<()>;
/// Similar to `wait` but allows a mutable `timeout` parameter that can be adjusted to reflect the remaining
/// time for the wait operation.
fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()>;
/// Wakes one or more threads waiting on this synchronization primitive
fn wake(&self);
/// Returns a reference to the `host_eventfd`, an object tied to a file descriptor used for event notifications.
fn host_eventfd(&self) -> &HostEventFd;
/// Determines the condition under which a wake event should be triggered.
fn wake_cond(&self) -> bool;
}

@ -1,8 +1,7 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use super::{Waiter, Waker}; use super::{LevelSync, Synchronizer, Waiter, Waker};
use crate::prelude::*; use crate::prelude::*;
/// A queue for waiters. /// A queue for waiters.
@ -26,12 +25,12 @@ use crate::prelude::*;
/// In this code snippet, the count variable is synchronized with the wakers field. /// In this code snippet, the count variable is synchronized with the wakers field.
/// In this case, we only need to ensure that waker.lock() occurs before count. /// In this case, we only need to ensure that waker.lock() occurs before count.
/// Although it is safer to use AcqRelhere using `Release` would be enough. /// Although it is safer to use AcqRelhere using `Release` would be enough.
pub struct WaiterQueue { pub struct WaiterQueue<Sync: Synchronizer = LevelSync> {
count: AtomicUsize, count: AtomicUsize,
wakers: SgxMutex<VecDeque<Waker>>, wakers: SgxMutex<VecDeque<Waker<Sync>>>,
} }
impl WaiterQueue { impl<Sync: Synchronizer> WaiterQueue<Sync> {
/// Creates an empty queue for `Waiter`s. /// Creates an empty queue for `Waiter`s.
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -52,7 +51,7 @@ impl WaiterQueue {
/// It is allowed to enqueue a waiter more than once before it is dequeued. /// It is allowed to enqueue a waiter more than once before it is dequeued.
/// But this is usually not a good idea. It is the callers' responsibility /// But this is usually not a good idea. It is the callers' responsibility
/// to use the API properly. /// to use the API properly.
pub fn reset_and_enqueue(&self, waiter: &Waiter) { pub fn reset_and_enqueue(&self, waiter: &Waiter<Sync>) {
waiter.reset(); waiter.reset();
let mut wakers = self.wakers.lock().unwrap(); let mut wakers = self.wakers.lock().unwrap();
@ -81,13 +80,13 @@ impl WaiterQueue {
let to_wake = { let to_wake = {
let mut wakers = self.wakers.lock().unwrap(); let mut wakers = self.wakers.lock().unwrap();
let max_count = max_count.min(wakers.len()); let max_count = max_count.min(wakers.len());
let to_wake: Vec<Waker> = wakers.drain(..max_count).collect(); let to_wake: Vec<Waker<Sync>> = wakers.drain(..max_count).collect();
self.count.fetch_sub(to_wake.len(), Ordering::Release); self.count.fetch_sub(to_wake.len(), Ordering::Release);
to_wake to_wake
}; };
// Wake in batch // Wake in batch
Waker::batch_wake(to_wake.iter()); Waker::<Sync>::batch_wake(to_wake.iter());
to_wake.len() to_wake.len()
} }
} }