diff --git a/src/libos/src/fs/channel.rs b/src/libos/src/fs/channel.rs index 82181f5a..3b266c9d 100644 --- a/src/libos/src/fs/channel.rs +++ b/src/libos/src/fs/channel.rs @@ -21,17 +21,24 @@ impl Channel { let rb = RingBuffer::new(capacity); let (rb_producer, rb_consumer) = rb.split(); - let producer = Producer::new(rb_producer, state.clone()); - let consumer = Consumer::new(rb_consumer, state.clone()); + let mut producer = Producer::new(rb_producer, state.clone()); + let mut consumer = Consumer::new(rb_consumer, state.clone()); - // Make event connection between the producer and consumer - producer.notifier().register( - Arc::downgrade(&consumer.observer) as Weak>, + // The events on an endpoint is not triggered by itself, but its peer. + // For example, a producer becomes writable (IoEvents::OUT) only if + // its peer consumer gets read. So an endpoint needs to hold a + // reference to the notifier of its peer. + producer.peer_notifier = Arc::downgrade(&consumer.notifier); + consumer.peer_notifier = Arc::downgrade(&producer.notifier); + // An endpoint registers itself as an observer to its own notifier so + // that it can be waken up by its peer. + producer.notifier.register( + Arc::downgrade(&producer.observer) as Weak>, None, None, ); - consumer.notifier().register( - Arc::downgrade(&producer.observer) as Weak>, + consumer.notifier.register( + Arc::downgrade(&consumer.observer) as Weak>, None, None, ); @@ -88,7 +95,8 @@ pub struct EndPoint { inner: SgxMutex, state: Arc, observer: Arc>, - notifier: IoNotifier, + notifier: Arc, + peer_notifier: Weak, is_nonblocking: AtomicBool, } @@ -96,13 +104,15 @@ impl EndPoint { fn new(inner: T, state: Arc) -> Self { let inner = SgxMutex::new(inner); let observer = WaiterQueueObserver::new(); - let notifier = IoNotifier::new(); + let notifier = Arc::new(IoNotifier::new()); + let peer_notifier = Default::default(); let is_nonblocking = AtomicBool::new(false); Self { inner, state, observer, notifier, + peer_notifier, is_nonblocking, } } @@ -131,6 +141,12 @@ impl EndPoint { self.observer.waiter_queue().dequeue_and_wake_all(); } } + + fn trigger_peer_events(&self, events: &IoEvents) { + if let Some(peer_notifier) = self.peer_notifier.upgrade() { + peer_notifier.broadcast(events); + } + } } /// The state of a channel shared by the two endpoints of a channel. @@ -205,7 +221,7 @@ impl Producer { item = match rb_producer.push(item) { Ok(()) => { drop(rb_producer); - self.notifier.broadcast(&IoEvents::IN); + self.trigger_peer_events(&IoEvents::IN); return Ok(()); } Err(item) => item, @@ -247,8 +263,8 @@ impl Producer { self.state.set_producer_shutdown(); } - // Notify all consumers and other observers - self.notifier.broadcast(&IoEvents::HUP); + // The shutdown of the producer triggers hangup events on the consumer + self.trigger_peer_events(&IoEvents::HUP); // Wake all threads that are blocked on pushing to this producer self.observer.waiter_queue().dequeue_and_wake_all(); } @@ -274,7 +290,7 @@ impl Producer { let count = rb_producer.push_slice(items); if count > 0 { drop(rb_producer); - self.notifier.broadcast(&IoEvents::IN); + self.trigger_peer_events(&IoEvents::IN); return Ok(count); } @@ -301,7 +317,7 @@ impl Consumer { if let Some(item) = rb_consumer.pop() { drop(rb_consumer); - self.notifier.broadcast(&IoEvents::OUT); + self.trigger_peer_events(&IoEvents::OUT); return Ok(Some(item)); } @@ -344,8 +360,8 @@ impl Consumer { self.state.set_consumer_shutdown(); } - // Notify all producers and other observers - self.notifier.broadcast(&IoEvents::RDHUP); + // The consumer being shutdown triggers error on the producer + self.trigger_peer_events(&IoEvents::ERR); // Wake all threads that are blocked on popping from this consumer self.observer.waiter_queue().dequeue_and_wake_all(); } @@ -371,7 +387,7 @@ impl Consumer { let count = rb_consumer.pop_slice(items); if count > 0 { drop(rb_consumer); - self.notifier.broadcast(&IoEvents::OUT); + self.trigger_peer_events(&IoEvents::OUT); return Ok(count); };