diff --git a/src/libos/src/fs/channel.rs b/src/libos/src/fs/channel.rs index 6574450f..2ab9b737 100644 --- a/src/libos/src/fs/channel.rs +++ b/src/libos/src/fs/channel.rs @@ -90,94 +90,71 @@ impl Channel { } } -/// An endpoint is either the producer or consumer of a channel. -pub struct EndPoint { - inner: SgxMutex, - state: Arc, - observer: Arc>, - notifier: Arc, - peer_notifier: Weak, - is_nonblocking: AtomicBool, -} - -impl EndPoint { - fn new(inner: T, state: Arc) -> Self { - let inner = SgxMutex::new(inner); - let observer = WaiterQueueObserver::new(); - let notifier = Arc::new(IoNotifier::new()); - let peer_notifier = Default::default(); - let is_nonblocking = AtomicBool::new(false); - Self { - inner, - state, - observer, - notifier, - peer_notifier, - is_nonblocking, +// A macro to implemennt the common part of the two end point types, Producer +// and Consumer. +macro_rules! impl_end_point_type { + ($(#[$attr:meta])* $vis:vis struct $end_point:ident<$i:ident> { + inner: $inner:ident<$_:ident>, + }) => ( + /// An endpoint is either the producer or consumer of a channel. + $(#[$attr])* $vis struct $end_point<$i> { + inner: SgxMutex<$inner<$i>>, + state: Arc, + observer: Arc>, + notifier: Arc, + peer_notifier: Weak, + is_nonblocking: AtomicBool, } - } - /// Returns the I/O notifier. - /// - /// An interesting observer can receive I/O events of the endpoint by - /// registering itself to this notifier. - pub fn notifier(&self) -> &IoNotifier { - &self.notifier - } + impl<$i> $end_point<$i> { + fn new(inner: $inner<$i>, state: Arc) -> Self { + let inner = SgxMutex::new(inner); + let observer = WaiterQueueObserver::new(); + let notifier = Arc::new(IoNotifier::new()); + let peer_notifier = Default::default(); + let is_nonblocking = AtomicBool::new(false); + Self { + inner, + state, + observer, + notifier, + peer_notifier, + is_nonblocking, + } + } - /// Returns whether the endpoint is non-blocking. - /// - /// By default, a channel is blocking. - pub fn is_nonblocking(&self) -> bool { - self.is_nonblocking.load(Ordering::Acquire) - } + /// Returns the I/O notifier. + /// + /// An interesting observer can receive I/O events of the endpoint by + /// registering itself to this notifier. + pub fn notifier(&self) -> &IoNotifier { + &self.notifier + } - /// Set whether the endpoint is non-blocking. - pub fn set_nonblocking(&self, nonblocking: bool) { - self.is_nonblocking.store(nonblocking, Ordering::Release); + /// Returns whether the endpoint is non-blocking. + /// + /// By default, a channel is blocking. + pub fn is_nonblocking(&self) -> bool { + self.is_nonblocking.load(Ordering::Acquire) + } - if nonblocking { - // Wake all threads that are blocked on pushing/popping this endpoint - self.observer.waiter_queue().dequeue_and_wake_all(); + /// Set whether the endpoint is non-blocking. + pub fn set_nonblocking(&self, nonblocking: bool) { + self.is_nonblocking.store(nonblocking, Ordering::Release); + + if nonblocking { + // Wake all threads that are blocked on pushing/popping this endpoint + self.observer.waiter_queue().dequeue_and_wake_all(); + } + } + + fn trigger_peer_events(&self, events: &IoEvents) { + if let Some(peer_notifier) = self.peer_notifier.upgrade() { + peer_notifier.broadcast(events); + } + } } - } - - fn trigger_peer_events(&self, events: &IoEvents) { - if let Some(peer_notifier) = self.peer_notifier.upgrade() { - peer_notifier.broadcast(events); - } - } -} - -/// The state of a channel shared by the two endpoints of a channel. -struct State { - is_producer_shutdown: AtomicBool, - is_consumer_shutdown: AtomicBool, -} - -impl State { - pub fn new() -> Self { - Self { - is_producer_shutdown: AtomicBool::new(false), - is_consumer_shutdown: AtomicBool::new(false), - } - } - - pub fn is_producer_shutdown(&self) -> bool { - self.is_producer_shutdown.load(Ordering::Acquire) - } - - pub fn is_consumer_shutdown(&self) -> bool { - self.is_consumer_shutdown.load(Ordering::Acquire) - } - - pub fn set_producer_shutdown(&self) { - self.is_producer_shutdown.store(true, Ordering::Release) - } - - pub fn set_consumer_shutdown(&self) { - self.is_consumer_shutdown.store(true, Ordering::Release) - } + ) } // Just like a normal loop, except that a waiter queue (as well as a waiter) @@ -206,8 +183,12 @@ macro_rules! waiter_loop { }; } -/// Producer is the writable endpoint of a channel. -pub type Producer = EndPoint>; +impl_end_point_type! { + /// Producer is the writable endpoint of a channel. + pub struct Producer { + inner: RbProducer, + } +} impl Producer { pub fn push(&self, mut item: I) -> Result<()> { @@ -260,6 +241,9 @@ impl Producer { { // It is important to hold this lock while updating the state let inner = self.inner.lock().unwrap(); + if self.state.is_producer_shutdown() { + return; + } self.state.set_producer_shutdown(); } @@ -307,8 +291,18 @@ impl Producer { } } -/// Consumer is the readable endpoint of a channel. -pub type Consumer = EndPoint>; +impl Drop for Producer { + fn drop(&mut self) { + self.shutdown(); + } +} + +impl_end_point_type! { + /// Consumer is the readable endpoint of a channel. + pub struct Consumer { + inner: RbConsumer, + } +} impl Consumer { pub fn pop(&self) -> Result> { @@ -361,6 +355,9 @@ impl Consumer { { // It is important to hold this lock while updating the state let inner = self.inner.lock().unwrap(); + if self.state.is_consumer_shutdown() { + return; + } self.state.set_consumer_shutdown(); } @@ -410,3 +407,40 @@ impl Consumer { ); } } + +impl Drop for Consumer { + fn drop(&mut self) { + self.shutdown(); + } +} + +/// The state of a channel shared by the two endpoints of a channel. +struct State { + is_producer_shutdown: AtomicBool, + is_consumer_shutdown: AtomicBool, +} + +impl State { + pub fn new() -> Self { + Self { + is_producer_shutdown: AtomicBool::new(false), + is_consumer_shutdown: AtomicBool::new(false), + } + } + + pub fn is_producer_shutdown(&self) -> bool { + self.is_producer_shutdown.load(Ordering::Acquire) + } + + pub fn is_consumer_shutdown(&self) -> bool { + self.is_consumer_shutdown.load(Ordering::Acquire) + } + + pub fn set_producer_shutdown(&self) { + self.is_producer_shutdown.store(true, Ordering::Release) + } + + pub fn set_consumer_shutdown(&self) { + self.is_consumer_shutdown.store(true, Ordering::Release) + } +}