Add the event subsystem

An event can be anything ranging from the exit of a process (interesting
to `wait4`) to the arrival of a blocked signal (interesting to
`sigwaitinfo`), from the completion of a file operation (interesting to
`epoll`) to the change of a file status (interesting to `inotify`).

To meet the event-related demands from various subsystems, this event
subsystem is designed to provide a set of general-purpose primitives:

* `Waiter`, `Waker`, and `WaiterQueue` are primitives to put threads
to sleep and later wake them up.
* `Event`, `Observer`, and `Notifier` are primitives to handle and
broadcast events.
* `WaiterQueueObserver` implements the common pattern of waking up
threads once some interesting events happen.
This commit is contained in:
Tate, Hongliang Tian 2020-09-21 11:19:29 +00:00 committed by Zongmin.Gu
parent 1f6fc3d27a
commit 9bb1baef4e
16 changed files with 601 additions and 5 deletions

@ -190,6 +190,15 @@ enclave {
unsigned int initval, unsigned int initval,
int flags int flags
) propagate_errno; ) propagate_errno;
int occlum_ocall_eventfd_poll(
int eventfd,
[in, out] struct timespec *timeout
) propagate_errno;
void occlum_ocall_eventfd_write_batch(
[in, count=num_fds] int* eventfds,
size_t num_fds,
uint64_t val
);
int occlum_ocall_poll( int occlum_ocall_poll(
[in, out, count=nfds] struct pollfd *fds, [in, out, count=nfds] struct pollfd *fds,

@ -0,0 +1,7 @@
/// A trait to represent any event.
pub trait Event: Copy + Clone + Send + Sync + 'static {}
/// A trait to filter events.
pub trait EventFilter<E: Event>: Send + Sync + 'static {
fn filter(&self, event: &E) -> bool;
}

@ -0,0 +1,111 @@
use std::time::Duration;
use crate::prelude::*;
use crate::time::{timespec_t, TIMERSLACK};
pub struct HostEventFd {
host_fd: FileDesc,
}
impl HostEventFd {
pub fn new() -> Result<Self> {
const EFD_NONBLOCK: i32 = 1 << 11;
let host_fd = try_libc!({
let mut ret: i32 = 0;
let status = occlum_ocall_eventfd(&mut ret, 0, EFD_NONBLOCK);
assert!(status == sgx_status_t::SGX_SUCCESS);
ret
}) as FileDesc;
Ok(Self { host_fd })
}
pub fn read_u64(&self) -> Result<u64> {
let mut val: u64 = 0;
let ret = try_libc!(libc::ocall::read(
self.host_fd as c_int,
&mut val as *mut _ as *mut c_void,
std::mem::size_of::<u64>(),
)) as usize;
debug_assert!(ret != std::mem::size_of::<u64>());
Ok(val)
}
pub fn write_u64(&self, val: u64) {
unsafe {
libc::ocall::write(
self.host_fd as c_int,
&val as *const _ as *const c_void,
std::mem::size_of::<u64>(),
);
}
}
pub fn poll(&self, timeout: Option<&Duration>) -> Result<()> {
let mut timeout = timeout.cloned();
self.poll_mut(timeout.as_mut())
}
pub fn poll_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
match timeout {
None => ocall_eventfd_poll(self.host_fd, std::ptr::null_mut()),
Some(timeout) => {
let mut remain_c = timespec_t::from(*timeout);
let ret = ocall_eventfd_poll(self.host_fd, &mut remain_c);
let remain = remain_c.as_duration();
assert!(remain <= *timeout + TIMERSLACK.to_duration());
*timeout = remain;
ret
}
}
}
/// Write to all host eventfds in one OCall.
///
/// Precondition. The caller must ensure that the host fds are valid.
pub unsafe fn write_u64_raw_and_batch(host_fds: &[FileDesc], val: u64) {
ocall_eventfd_write_batch(host_fds, val);
}
pub fn host_fd(&self) -> FileDesc {
self.host_fd
}
}
impl Drop for HostEventFd {
fn drop(&mut self) {
let ret = unsafe { libc::ocall::close(self.host_fd as c_int) };
debug_assert!(ret == 0);
}
}
fn ocall_eventfd_poll(host_fd: FileDesc, timeout: *mut timespec_t) -> Result<()> {
try_libc!({
let mut ret = 0;
let status = unsafe { occlum_ocall_eventfd_poll(&mut ret, host_fd, timeout) };
assert!(status == sgx_status_t::SGX_SUCCESS);
ret
});
Ok(())
}
fn ocall_eventfd_write_batch(host_fds: &[FileDesc], val: u64) {
let status =
unsafe { occlum_ocall_eventfd_write_batch(host_fds.as_ptr(), host_fds.len(), val) };
assert!(status == sgx_status_t::SGX_SUCCESS);
}
extern "C" {
fn occlum_ocall_eventfd(ret: *mut i32, init_val: u32, flags: i32) -> sgx_status_t;
fn occlum_ocall_eventfd_poll(
ret: *mut i32,
fd: FileDesc,
timeout: *mut timespec_t,
) -> sgx_status_t;
fn occlum_ocall_eventfd_write_batch(
fds: *const FileDesc,
num_fds: usize,
val: u64,
) -> sgx_status_t;
}

@ -0,0 +1,32 @@
//! The event subsystem.
//!
//! An event can be anything ranging from the exit of a process (interesting
//! to `wait4`) to the arrival of a blocked signal (interesting to `sigwaitinfo`),
//! from the completion of a file operation (interesting to `epoll`) to the change
//! of a file status (interesting to `inotify`).
//!
//! To meet the event-related demands from various subsystems, this event
//! subsystem is designed to provide a set of general-purpose primitives:
//!
//! * `Waiter`, `Waker`, and `WaiterQueue` are primitives to put threads to sleep
//! and later wake them up.
//! * `Event`, `Observer`, and `Notifier` are primitives to handle and broadcast
//! events.
//! * `WaiterQueueObserver` implements the common pattern of waking up threads
//! * once some interesting events happen.
mod event;
mod host_event_fd;
mod notifier;
mod observer;
mod waiter;
mod waiter_queue;
mod waiter_queue_observer;
pub use self::event::{Event, EventFilter};
pub use self::host_event_fd::HostEventFd;
pub use self::notifier::Notifier;
pub use self::observer::Observer;
pub use self::waiter::{Waiter, Waker};
pub use self::waiter_queue::WaiterQueue;
pub use self::waiter_queue_observer::WaiterQueueObserver;

@ -0,0 +1,75 @@
use std::any::Any;
use std::marker::PhantomData;
use std::sync::Weak;
use super::{Event, EventFilter, Observer};
use crate::prelude::*;
/// An event notifier broadcasts interesting events to registered observers.
pub struct Notifier<E: Event, F: EventFilter<E> = DummyEventFilter<E>> {
subscribers: SgxMutex<VecDeque<Subscriber<E, F>>>,
}
struct Subscriber<E: Event, F: EventFilter<E>> {
observer: Weak<dyn Observer<E>>,
filter: Option<F>,
metadata: Option<Box<dyn Any + Send + Sync>>,
}
impl<E: Event, F: EventFilter<E>> Notifier<E, F> {
/// Create an event notifier.
pub fn new() -> Self {
let subscribers = SgxMutex::new(VecDeque::new());
Self { subscribers }
}
/// Register an observer with its interesting events and metadata.
pub fn register(
&self,
observer: Weak<dyn Observer<E>>,
filter: Option<F>,
metadata: Option<Box<dyn Any + Send + Sync>>,
) {
let mut subscribers = self.subscribers.lock().unwrap();
subscribers.push_back(Subscriber {
observer,
filter,
metadata,
});
}
/// Unregister an observer.
pub fn unregister(&self, observer: &Weak<dyn Observer<E>>) {
let mut subscribers = self.subscribers.lock().unwrap();
subscribers.retain(|subscriber| !Weak::ptr_eq(&subscriber.observer, observer));
}
/// Broadcast an event to all registered observers.
pub fn broadcast(&self, event: &E) {
let subscribers = self.subscribers.lock().unwrap();
for subscriber in subscribers.iter() {
if let Some(filter) = subscriber.filter.as_ref() {
if !filter.filter(event) {
continue;
}
}
let observer = match subscriber.observer.upgrade() {
// TODO: should remove subscribers whose observers have been freed
None => return,
Some(observer) => observer,
};
observer.on_event(event, &subscriber.metadata);
}
}
}
pub struct DummyEventFilter<E> {
phantom: PhantomData<E>,
}
impl<E: Event> EventFilter<E> for DummyEventFilter<E> {
fn filter(&self, event: &E) -> bool {
true
}
}

@ -0,0 +1,19 @@
use std::any::Any;
use super::Event;
use crate::prelude::*;
/// An obsever receives events from the notifiers to which it has registered.
pub trait Observer<E: Event>: Send + Sync {
/// The callback that will be executed when some interesting events are
/// delivered by a notifier to this observer.
///
/// Note that it is important to keep this method simple, short, and
/// non-blocking. This is because the caller of this function, which is most
/// likely to be `Notifier::broadcast`, may have acquired the locks of some
/// resources. In general, these locks may coincide with the ones required
/// by a specific implementation of `Observer::on_event`. Thus, to avoid
/// the odds of deadlocks, the `on_event` method should be written short
/// and sweet.
fn on_event(&self, event: &E, metadata: &Option<Box<dyn Any + Send + Sync>>) -> ();
}

@ -0,0 +1,172 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Weak;
use std::time::Duration;
use super::host_event_fd::HostEventFd;
use crate::prelude::*;
/// A waiter enables a thread to sleep.
pub struct Waiter {
inner: Arc<Inner>,
}
impl Waiter {
/// Create a waiter for the current thread.
///
/// A `Waiter` is bound to the curent thread that creates it: it cannot be
/// sent to or used by any other threads as the type implements `!Send` and
/// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep.
pub fn new() -> Self {
Self {
inner: Arc::new(Inner::new()),
}
}
/// Return whether a waiter has been waken up.
///
/// Once a waiter is waken up, the `wait` or `wait_mut` method becomes
/// non-blocking.
pub fn is_woken(&self) -> bool {
self.inner.is_woken()
}
/// Reset a waiter.
///
/// After a `Waiter` being waken up, the `reset` method must be called so
/// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the
/// current thread again.
pub fn reset(&self) {
self.inner.reset();
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// The method has three possible return values:
/// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`;
/// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout.
/// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal.
///
/// If the `timeout` argument is `None`, then the second case won't happen,
/// i.e., the method will block indefinitely.
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
self.inner.wait(timeout)
}
/// Put the current thread to sleep until being waken up by a waker.
///
/// This method is similar to the `wait` method except that the `timeout`
/// argument will be updated to reflect the remaining timeout.
pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
self.inner.wait_mut(timeout)
}
/// Create a waker that can wake up this waiter.
///
/// `WaiterQueue` maintains a list of `Waker` internally to wake up the
/// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method
/// does not need to be called manually.
pub fn waker(&self) -> Waker {
Waker {
inner: Arc::downgrade(&self.inner),
}
}
}
impl !Send for Waiter {}
impl !Sync for Waiter {}
/// A waker can wake up the thread that its waiter has put to sleep.
pub struct Waker {
inner: Weak<Inner>,
}
impl Waker {
/// Wake up the waiter that creates this waker.
pub fn wake(&self) {
if let Some(inner) = self.inner.upgrade() {
inner.wake()
}
}
/// Wake up waiters in batch, more efficient than waking up one-by-one.
pub fn batch_wake<'a, I: Iterator<Item = &'a Waker>>(iter: I) {
Inner::batch_wake(iter);
}
}
struct Inner {
is_woken: AtomicBool,
host_eventfd: Arc<HostEventFd>,
}
impl Inner {
pub fn new() -> Self {
let is_woken = AtomicBool::new(false);
let host_eventfd = current!().host_eventfd().clone();
Self {
is_woken,
host_eventfd,
}
}
pub fn is_woken(&self) -> bool {
self.is_woken.load(Ordering::SeqCst)
}
pub fn reset(&self) {
self.is_woken.store(false, Ordering::SeqCst);
}
pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> {
while !self.is_woken() {
self.host_eventfd.poll(timeout)?;
}
Ok(())
}
pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> {
let mut remain = timeout.as_ref().map(|d| **d);
// Need to change timeout from `Option<&mut Duration>` to `&mut Option<Duration>`
// so that the Rust compiler is happy about using the variable in a loop.
let ret = self.do_wait_mut(&mut remain);
if let Some(timeout) = timeout {
*timeout = remain.unwrap();
}
ret
}
fn do_wait_mut(&self, remain: &mut Option<Duration>) -> Result<()> {
while !self.is_woken() {
self.host_eventfd.poll_mut(remain.as_mut())?;
}
Ok(())
}
pub fn wake(&self) {
if self
.is_woken
.compare_and_swap(false, true, Ordering::SeqCst)
== false
{
self.host_eventfd.write_u64(1);
}
}
pub fn batch_wake<'a, I: Iterator<Item = &'a Waker>>(iter: I) {
let host_eventfds = iter
.filter_map(|waker| waker.inner.upgrade())
.filter(|inner| {
inner
.is_woken
.compare_and_swap(false, true, Ordering::SeqCst)
== false
})
.map(|inner| inner.host_eventfd.host_fd())
.collect::<Vec<FileDesc>>();
unsafe {
HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1);
}
}
}

@ -0,0 +1,76 @@
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use super::{Waiter, Waker};
use crate::prelude::*;
/// A queue for waiters.
///
/// By using this queue, we can wake up threads in their waiters' enqueue order.
///
/// While the queue is conceptually for `Waiter`s, it internally maintains a list
/// of `Waker`s.
pub struct WaiterQueue {
count: AtomicUsize,
wakers: SgxMutex<VecDeque<Waker>>,
}
impl WaiterQueue {
/// Creates an empty queue for `Waiter`s.
pub fn new() -> Self {
Self {
count: AtomicUsize::new(0),
wakers: SgxMutex::new(VecDeque::new()),
}
}
/// Returns whether the queue is empty.
pub fn is_empty(&self) -> bool {
self.count.load(Ordering::SeqCst) == 0
}
/// Reset a waiter and enqueue it.
///
/// It is allowed to enqueue a waiter more than once before it is dequeued.
/// But this is usually not a good idea. It is the callers' responsibility
/// to use the API properly.
pub fn reset_and_enqueue(&self, waiter: &Waiter) {
waiter.reset();
let mut wakers = self.wakers.lock().unwrap();
self.count.fetch_add(1, Ordering::SeqCst);
wakers.push_back(waiter.waker());
}
/// Dequeue a waiter and wake up its thread.
pub fn dequeue_and_wake_one(&self) -> usize {
self.dequeue_and_wake_nr(1)
}
/// Dequeue all waiters and wake up their threads.
pub fn dequeue_and_wake_all(&self) -> usize {
self.dequeue_and_wake_nr(usize::MAX)
}
/// Deuque a maximum numer of waiters and wake up their threads.
pub fn dequeue_and_wake_nr(&self, max_count: usize) -> usize {
// The quick path for a common case
if self.is_empty() {
return 0;
}
// Dequeue wakers
let to_wake = {
let mut wakers = self.wakers.lock().unwrap();
let max_count = max_count.min(wakers.len());
let to_wake: Vec<Waker> = wakers.drain(..max_count).collect();
self.count.fetch_sub(to_wake.len(), Ordering::SeqCst);
to_wake
};
// Wake in batch
Waker::batch_wake(to_wake.iter());
to_wake.len()
}
}

@ -0,0 +1,35 @@
use std::any::Any;
use std::marker::PhantomData;
use super::{Event, Observer, WaiterQueue};
use crate::prelude::*;
/// A Observer associated with a WaiterQueue.
///
/// Once the observer receives any interesting events, it will dequeue and
/// wake up all `Waiters` in the associated `WaiterQueue`.
pub struct WaiterQueueObserver<E: Event> {
waiter_queue: WaiterQueue,
phantom: PhantomData<E>,
}
impl<E: Event> WaiterQueueObserver<E> {
pub fn new() -> Arc<Self> {
let waiter_queue = WaiterQueue::new();
let phantom = PhantomData;
Arc::new(Self {
waiter_queue,
phantom,
})
}
pub fn waiter_queue(&self) -> &WaiterQueue {
&self.waiter_queue
}
}
impl<E: Event> Observer<E> for WaiterQueueObserver<E> {
fn on_event(&self, event: &E, _metadata: &Option<Box<dyn Any + Send + Sync>>) {
self.waiter_queue.dequeue_and_wake_all();
}
}

@ -62,6 +62,7 @@ mod error;
mod config; mod config;
mod entry; mod entry;
mod events;
mod exception; mod exception;
mod fs; mod fs;
mod interrupt; mod interrupt;

@ -4,6 +4,7 @@ use super::{
FileTableRef, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, SchedAgentRef, FileTableRef, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, SchedAgentRef,
SigQueues, SigSet, Task, Thread, ThreadId, ThreadInner, ThreadName, ThreadRef, SigQueues, SigSet, Task, Thread, ThreadId, ThreadInner, ThreadName, ThreadRef,
}; };
use crate::events::HostEventFd;
use crate::prelude::*; use crate::prelude::*;
use crate::time::ThreadProfiler; use crate::time::ThreadProfiler;
@ -116,6 +117,7 @@ impl ThreadBuilder {
} else { } else {
SgxMutex::new(None) SgxMutex::new(None)
}; };
let host_eventfd = Arc::new(HostEventFd::new()?);
let new_thread = Arc::new(Thread { let new_thread = Arc::new(Thread {
task, task,
@ -134,6 +136,7 @@ impl ThreadBuilder {
sig_tmp_mask, sig_tmp_mask,
sig_stack, sig_stack,
profiler, profiler,
host_eventfd,
}); });
let mut inner = new_thread.process().inner(); let mut inner = new_thread.process().inner();

@ -6,6 +6,7 @@ use super::{
FileTableRef, ForcedExitStatus, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, FileTableRef, ForcedExitStatus, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef,
ResourceLimitsRef, SchedAgentRef, TermStatus, ThreadRef, ResourceLimitsRef, SchedAgentRef, TermStatus, ThreadRef,
}; };
use crate::events::HostEventFd;
use crate::fs::{EventCreationFlags, EventFile}; use crate::fs::{EventCreationFlags, EventFile};
use crate::net::THREAD_NOTIFIERS; use crate::net::THREAD_NOTIFIERS;
use crate::prelude::*; use crate::prelude::*;
@ -44,6 +45,8 @@ pub struct Thread {
sig_stack: SgxMutex<Option<SigStack>>, sig_stack: SgxMutex<Option<SigStack>>,
// System call timing // System call timing
profiler: SgxMutex<Option<ThreadProfiler>>, profiler: SgxMutex<Option<ThreadProfiler>>,
// Misc
host_eventfd: Arc<HostEventFd>,
} }
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
@ -144,6 +147,10 @@ impl Thread {
*self.name.write().unwrap() = new_name; *self.name.write().unwrap() = new_name;
} }
pub fn host_eventfd(&self) -> &Arc<HostEventFd> {
&self.host_eventfd
}
pub(super) fn start(&self, host_tid: pid_t) { pub(super) fn start(&self, host_tid: pid_t) {
self.sched().lock().unwrap().attach(host_tid); self.sched().lock().unwrap().attach(host_tid);
self.inner().start(); self.inner().start();

@ -13,6 +13,7 @@ pub mod timer_slack;
pub mod up_time; pub mod up_time;
pub use profiler::ThreadProfiler; pub use profiler::ThreadProfiler;
pub use timer_slack::TIMERSLACK;
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub type time_t = i64; pub type time_t = i64;
@ -73,6 +74,15 @@ pub struct timespec_t {
nsec: i64, nsec: i64,
} }
impl From<Duration> for timespec_t {
fn from(duration: Duration) -> timespec_t {
let sec = duration.as_secs() as time_t;
let nsec = duration.subsec_nanos() as i64;
debug_assert!(sec >= 0); // nsec >= 0 always holds
timespec_t { sec, nsec }
}
}
impl timespec_t { impl timespec_t {
pub fn from_raw_ptr(ptr: *const timespec_t) -> Result<timespec_t> { pub fn from_raw_ptr(ptr: *const timespec_t) -> Result<timespec_t> {
let ts = unsafe { *ptr }; let ts = unsafe { *ptr };

@ -0,0 +1,43 @@
#define _GNU_SOURCE
#include "ocalls.h"
#include <errno.h>
#include <signal.h>
#include <poll.h>
#include <unistd.h>
#include <sys/eventfd.h>
int occlum_ocall_eventfd(unsigned int initval, int flags) {
return eventfd(initval, flags);
}
int occlum_ocall_eventfd_poll(int eventfd, struct timespec *timeout) {
int ret;
struct pollfd pollfds[1];
pollfds[0].fd = eventfd;
pollfds[0].events = POLLIN;
pollfds[0].revents = 0;
// We use the ppoll syscall directly instead of the libc wrapper. This
// is because the syscall version updates the timeout argument to indicate
// how much time was left (which what we want), while the libc wrapper
// keeps the timeout argument unchanged.
ret = raw_ppoll(pollfds, 1, timeout);
if (ret < 0) {
return -1;
}
char buf[8];
read(eventfd, buf, 8);
return 0;
}
void occlum_ocall_eventfd_write_batch(
int *eventfds,
size_t num_fds,
uint64_t val
) {
for (int fd_i = 0; fd_i < num_fds; fd_i++) {
write(eventfds[fd_i], &val, sizeof(val));
}
}

@ -2,17 +2,12 @@
#include <errno.h> #include <errno.h>
#include <net/if.h> #include <net/if.h>
#include <unistd.h> #include <unistd.h>
#include <sys/eventfd.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
void occlum_ocall_sync(void) { void occlum_ocall_sync(void) {
sync(); sync();
} }
int occlum_ocall_eventfd(unsigned int initval, int flags) {
return eventfd(initval, flags);
}
int occlum_ocall_ioctl_repack(int fd, int request, char *buf, int len, int *recv_len) { int occlum_ocall_ioctl_repack(int fd, int request, char *buf, int len, int *recv_len) {
int ret = 0; int ret = 0;

@ -12,5 +12,6 @@
#define tgkill(tgid, tid, signum) ((int)syscall(__NR_tgkill, (tgid), (tid), (signum))); #define tgkill(tgid, tid, signum) ((int)syscall(__NR_tgkill, (tgid), (tid), (signum)));
#define futex_wait(addr, val, timeout) ((int)syscall(__NR_futex, (addr), FUTEX_WAIT, (val), (timeout))) #define futex_wait(addr, val, timeout) ((int)syscall(__NR_futex, (addr), FUTEX_WAIT, (val), (timeout)))
#define futex_wake(addr) ((int)syscall(__NR_futex, (addr), FUTEX_WAKE, 1)) #define futex_wake(addr) ((int)syscall(__NR_futex, (addr), FUTEX_WAKE, 1))
#define raw_ppoll(fds, nfds, timeout) ((int)syscall(__NR_ppoll, (fds), (nfds), (timeout), NULL, 0))
#endif /* __PAL_SYSCALL_H__ */ #endif /* __PAL_SYSCALL_H__ */