Fix Channel's notifier being inconsistent with poll/epoll

This commit is contained in:
Tate, Hongliang Tian 2020-11-11 16:44:06 +00:00 committed by Zongmin.Gu
parent 5b6d06b808
commit 10f3ffa9e6

@ -21,17 +21,24 @@ impl<I> Channel<I> {
let rb = RingBuffer::new(capacity); let rb = RingBuffer::new(capacity);
let (rb_producer, rb_consumer) = rb.split(); let (rb_producer, rb_consumer) = rb.split();
let producer = Producer::new(rb_producer, state.clone()); let mut producer = Producer::new(rb_producer, state.clone());
let consumer = Consumer::new(rb_consumer, state.clone()); let mut consumer = Consumer::new(rb_consumer, state.clone());
// Make event connection between the producer and consumer // The events on an endpoint is not triggered by itself, but its peer.
producer.notifier().register( // For example, a producer becomes writable (IoEvents::OUT) only if
Arc::downgrade(&consumer.observer) as Weak<dyn Observer<_>>, // its peer consumer gets read. So an endpoint needs to hold a
// reference to the notifier of its peer.
producer.peer_notifier = Arc::downgrade(&consumer.notifier);
consumer.peer_notifier = Arc::downgrade(&producer.notifier);
// An endpoint registers itself as an observer to its own notifier so
// that it can be waken up by its peer.
producer.notifier.register(
Arc::downgrade(&producer.observer) as Weak<dyn Observer<_>>,
None, None,
None, None,
); );
consumer.notifier().register( consumer.notifier.register(
Arc::downgrade(&producer.observer) as Weak<dyn Observer<_>>, Arc::downgrade(&consumer.observer) as Weak<dyn Observer<_>>,
None, None,
None, None,
); );
@ -88,7 +95,8 @@ pub struct EndPoint<T> {
inner: SgxMutex<T>, inner: SgxMutex<T>,
state: Arc<State>, state: Arc<State>,
observer: Arc<WaiterQueueObserver<IoEvents>>, observer: Arc<WaiterQueueObserver<IoEvents>>,
notifier: IoNotifier, notifier: Arc<IoNotifier>,
peer_notifier: Weak<IoNotifier>,
is_nonblocking: AtomicBool, is_nonblocking: AtomicBool,
} }
@ -96,13 +104,15 @@ impl<T> EndPoint<T> {
fn new(inner: T, state: Arc<State>) -> Self { fn new(inner: T, state: Arc<State>) -> Self {
let inner = SgxMutex::new(inner); let inner = SgxMutex::new(inner);
let observer = WaiterQueueObserver::new(); let observer = WaiterQueueObserver::new();
let notifier = IoNotifier::new(); let notifier = Arc::new(IoNotifier::new());
let peer_notifier = Default::default();
let is_nonblocking = AtomicBool::new(false); let is_nonblocking = AtomicBool::new(false);
Self { Self {
inner, inner,
state, state,
observer, observer,
notifier, notifier,
peer_notifier,
is_nonblocking, is_nonblocking,
} }
} }
@ -131,6 +141,12 @@ impl<T> EndPoint<T> {
self.observer.waiter_queue().dequeue_and_wake_all(); self.observer.waiter_queue().dequeue_and_wake_all();
} }
} }
fn trigger_peer_events(&self, events: &IoEvents) {
if let Some(peer_notifier) = self.peer_notifier.upgrade() {
peer_notifier.broadcast(events);
}
}
} }
/// The state of a channel shared by the two endpoints of a channel. /// The state of a channel shared by the two endpoints of a channel.
@ -205,7 +221,7 @@ impl<I> Producer<I> {
item = match rb_producer.push(item) { item = match rb_producer.push(item) {
Ok(()) => { Ok(()) => {
drop(rb_producer); drop(rb_producer);
self.notifier.broadcast(&IoEvents::IN); self.trigger_peer_events(&IoEvents::IN);
return Ok(()); return Ok(());
} }
Err(item) => item, Err(item) => item,
@ -247,8 +263,8 @@ impl<I> Producer<I> {
self.state.set_producer_shutdown(); self.state.set_producer_shutdown();
} }
// Notify all consumers and other observers // The shutdown of the producer triggers hangup events on the consumer
self.notifier.broadcast(&IoEvents::HUP); self.trigger_peer_events(&IoEvents::HUP);
// Wake all threads that are blocked on pushing to this producer // Wake all threads that are blocked on pushing to this producer
self.observer.waiter_queue().dequeue_and_wake_all(); self.observer.waiter_queue().dequeue_and_wake_all();
} }
@ -274,7 +290,7 @@ impl<I: Copy> Producer<I> {
let count = rb_producer.push_slice(items); let count = rb_producer.push_slice(items);
if count > 0 { if count > 0 {
drop(rb_producer); drop(rb_producer);
self.notifier.broadcast(&IoEvents::IN); self.trigger_peer_events(&IoEvents::IN);
return Ok(count); return Ok(count);
} }
@ -301,7 +317,7 @@ impl<I> Consumer<I> {
if let Some(item) = rb_consumer.pop() { if let Some(item) = rb_consumer.pop() {
drop(rb_consumer); drop(rb_consumer);
self.notifier.broadcast(&IoEvents::OUT); self.trigger_peer_events(&IoEvents::OUT);
return Ok(Some(item)); return Ok(Some(item));
} }
@ -344,8 +360,8 @@ impl<I> Consumer<I> {
self.state.set_consumer_shutdown(); self.state.set_consumer_shutdown();
} }
// Notify all producers and other observers // The consumer being shutdown triggers error on the producer
self.notifier.broadcast(&IoEvents::RDHUP); self.trigger_peer_events(&IoEvents::ERR);
// Wake all threads that are blocked on popping from this consumer // Wake all threads that are blocked on popping from this consumer
self.observer.waiter_queue().dequeue_and_wake_all(); self.observer.waiter_queue().dequeue_and_wake_all();
} }
@ -371,7 +387,7 @@ impl<I: Copy> Consumer<I> {
let count = rb_consumer.pop_slice(items); let count = rb_consumer.pop_slice(items);
if count > 0 { if count > 0 {
drop(rb_consumer); drop(rb_consumer);
self.notifier.broadcast(&IoEvents::OUT); self.trigger_peer_events(&IoEvents::OUT);
return Ok(count); return Ok(count);
}; };