Fix a bug of channels

This bugfix ensures that when an object of Producer/Consumer for
channels is dropped, its shutdown method is called automatically. This ensures
that the peer of a Producer/Consumer gets notified and won't wait indefinitely.
This commit is contained in:
Tate, Hongliang Tian 2020-12-01 14:59:34 +00:00 committed by Zongmin.Gu
parent ea64939cac
commit 9809d81c4e

@ -90,94 +90,71 @@ impl<I: Copy> Channel<I> {
} }
} }
/// An endpoint is either the producer or consumer of a channel. // A macro to implemennt the common part of the two end point types, Producer<I>
pub struct EndPoint<T> { // and Consumer<I>.
inner: SgxMutex<T>, macro_rules! impl_end_point_type {
state: Arc<State>, ($(#[$attr:meta])* $vis:vis struct $end_point:ident<$i:ident> {
observer: Arc<WaiterQueueObserver<IoEvents>>, inner: $inner:ident<$_:ident>,
notifier: Arc<IoNotifier>, }) => (
peer_notifier: Weak<IoNotifier>, /// An endpoint is either the producer or consumer of a channel.
is_nonblocking: AtomicBool, $(#[$attr])* $vis struct $end_point<$i> {
} inner: SgxMutex<$inner<$i>>,
state: Arc<State>,
impl<T> EndPoint<T> { observer: Arc<WaiterQueueObserver<IoEvents>>,
fn new(inner: T, state: Arc<State>) -> Self { notifier: Arc<IoNotifier>,
let inner = SgxMutex::new(inner); peer_notifier: Weak<IoNotifier>,
let observer = WaiterQueueObserver::new(); is_nonblocking: AtomicBool,
let notifier = Arc::new(IoNotifier::new());
let peer_notifier = Default::default();
let is_nonblocking = AtomicBool::new(false);
Self {
inner,
state,
observer,
notifier,
peer_notifier,
is_nonblocking,
} }
}
/// Returns the I/O notifier. impl<$i> $end_point<$i> {
/// fn new(inner: $inner<$i>, state: Arc<State>) -> Self {
/// An interesting observer can receive I/O events of the endpoint by let inner = SgxMutex::new(inner);
/// registering itself to this notifier. let observer = WaiterQueueObserver::new();
pub fn notifier(&self) -> &IoNotifier { let notifier = Arc::new(IoNotifier::new());
&self.notifier let peer_notifier = Default::default();
} let is_nonblocking = AtomicBool::new(false);
Self {
inner,
state,
observer,
notifier,
peer_notifier,
is_nonblocking,
}
}
/// Returns whether the endpoint is non-blocking. /// Returns the I/O notifier.
/// ///
/// By default, a channel is blocking. /// An interesting observer can receive I/O events of the endpoint by
pub fn is_nonblocking(&self) -> bool { /// registering itself to this notifier.
self.is_nonblocking.load(Ordering::Acquire) pub fn notifier(&self) -> &IoNotifier {
} &self.notifier
}
/// Set whether the endpoint is non-blocking. /// Returns whether the endpoint is non-blocking.
pub fn set_nonblocking(&self, nonblocking: bool) { ///
self.is_nonblocking.store(nonblocking, Ordering::Release); /// By default, a channel is blocking.
pub fn is_nonblocking(&self) -> bool {
self.is_nonblocking.load(Ordering::Acquire)
}
if nonblocking { /// Set whether the endpoint is non-blocking.
// Wake all threads that are blocked on pushing/popping this endpoint pub fn set_nonblocking(&self, nonblocking: bool) {
self.observer.waiter_queue().dequeue_and_wake_all(); self.is_nonblocking.store(nonblocking, Ordering::Release);
if nonblocking {
// Wake all threads that are blocked on pushing/popping this endpoint
self.observer.waiter_queue().dequeue_and_wake_all();
}
}
fn trigger_peer_events(&self, events: &IoEvents) {
if let Some(peer_notifier) = self.peer_notifier.upgrade() {
peer_notifier.broadcast(events);
}
}
} }
} )
fn trigger_peer_events(&self, events: &IoEvents) {
if let Some(peer_notifier) = self.peer_notifier.upgrade() {
peer_notifier.broadcast(events);
}
}
}
/// The state of a channel shared by the two endpoints of a channel.
struct State {
is_producer_shutdown: AtomicBool,
is_consumer_shutdown: AtomicBool,
}
impl State {
pub fn new() -> Self {
Self {
is_producer_shutdown: AtomicBool::new(false),
is_consumer_shutdown: AtomicBool::new(false),
}
}
pub fn is_producer_shutdown(&self) -> bool {
self.is_producer_shutdown.load(Ordering::Acquire)
}
pub fn is_consumer_shutdown(&self) -> bool {
self.is_consumer_shutdown.load(Ordering::Acquire)
}
pub fn set_producer_shutdown(&self) {
self.is_producer_shutdown.store(true, Ordering::Release)
}
pub fn set_consumer_shutdown(&self) {
self.is_consumer_shutdown.store(true, Ordering::Release)
}
} }
// Just like a normal loop, except that a waiter queue (as well as a waiter) // Just like a normal loop, except that a waiter queue (as well as a waiter)
@ -206,8 +183,12 @@ macro_rules! waiter_loop {
}; };
} }
/// Producer is the writable endpoint of a channel. impl_end_point_type! {
pub type Producer<I> = EndPoint<RbProducer<I>>; /// Producer is the writable endpoint of a channel.
pub struct Producer<I> {
inner: RbProducer<I>,
}
}
impl<I> Producer<I> { impl<I> Producer<I> {
pub fn push(&self, mut item: I) -> Result<()> { pub fn push(&self, mut item: I) -> Result<()> {
@ -260,6 +241,9 @@ impl<I> Producer<I> {
{ {
// It is important to hold this lock while updating the state // It is important to hold this lock while updating the state
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
if self.state.is_producer_shutdown() {
return;
}
self.state.set_producer_shutdown(); self.state.set_producer_shutdown();
} }
@ -307,8 +291,18 @@ impl<I: Copy> Producer<I> {
} }
} }
/// Consumer is the readable endpoint of a channel. impl<I> Drop for Producer<I> {
pub type Consumer<I> = EndPoint<RbConsumer<I>>; fn drop(&mut self) {
self.shutdown();
}
}
impl_end_point_type! {
/// Consumer is the readable endpoint of a channel.
pub struct Consumer<I> {
inner: RbConsumer<I>,
}
}
impl<I> Consumer<I> { impl<I> Consumer<I> {
pub fn pop(&self) -> Result<Option<I>> { pub fn pop(&self) -> Result<Option<I>> {
@ -361,6 +355,9 @@ impl<I> Consumer<I> {
{ {
// It is important to hold this lock while updating the state // It is important to hold this lock while updating the state
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
if self.state.is_consumer_shutdown() {
return;
}
self.state.set_consumer_shutdown(); self.state.set_consumer_shutdown();
} }
@ -410,3 +407,40 @@ impl<I: Copy> Consumer<I> {
); );
} }
} }
impl<I> Drop for Consumer<I> {
fn drop(&mut self) {
self.shutdown();
}
}
/// The state of a channel shared by the two endpoints of a channel.
struct State {
is_producer_shutdown: AtomicBool,
is_consumer_shutdown: AtomicBool,
}
impl State {
pub fn new() -> Self {
Self {
is_producer_shutdown: AtomicBool::new(false),
is_consumer_shutdown: AtomicBool::new(false),
}
}
pub fn is_producer_shutdown(&self) -> bool {
self.is_producer_shutdown.load(Ordering::Acquire)
}
pub fn is_consumer_shutdown(&self) -> bool {
self.is_consumer_shutdown.load(Ordering::Acquire)
}
pub fn set_producer_shutdown(&self) {
self.is_producer_shutdown.store(true, Ordering::Release)
}
pub fn set_consumer_shutdown(&self) {
self.is_consumer_shutdown.store(true, Ordering::Release)
}
}