diff --git a/src/Enclave.edl b/src/Enclave.edl index 4b4c7a7e..1fba2881 100644 --- a/src/Enclave.edl +++ b/src/Enclave.edl @@ -190,6 +190,15 @@ enclave { unsigned int initval, int flags ) propagate_errno; + int occlum_ocall_eventfd_poll( + int eventfd, + [in, out] struct timespec *timeout + ) propagate_errno; + void occlum_ocall_eventfd_write_batch( + [in, count=num_fds] int* eventfds, + size_t num_fds, + uint64_t val + ); int occlum_ocall_poll( [in, out, count=nfds] struct pollfd *fds, diff --git a/src/libos/src/events/event.rs b/src/libos/src/events/event.rs new file mode 100644 index 00000000..f012e020 --- /dev/null +++ b/src/libos/src/events/event.rs @@ -0,0 +1,7 @@ +/// A trait to represent any event. +pub trait Event: Copy + Clone + Send + Sync + 'static {} + +/// A trait to filter events. +pub trait EventFilter: Send + Sync + 'static { + fn filter(&self, event: &E) -> bool; +} diff --git a/src/libos/src/events/host_event_fd.rs b/src/libos/src/events/host_event_fd.rs new file mode 100644 index 00000000..aff80a19 --- /dev/null +++ b/src/libos/src/events/host_event_fd.rs @@ -0,0 +1,111 @@ +use std::time::Duration; + +use crate::prelude::*; +use crate::time::{timespec_t, TIMERSLACK}; + +pub struct HostEventFd { + host_fd: FileDesc, +} + +impl HostEventFd { + pub fn new() -> Result { + const EFD_NONBLOCK: i32 = 1 << 11; + let host_fd = try_libc!({ + let mut ret: i32 = 0; + let status = occlum_ocall_eventfd(&mut ret, 0, EFD_NONBLOCK); + assert!(status == sgx_status_t::SGX_SUCCESS); + ret + }) as FileDesc; + Ok(Self { host_fd }) + } + + pub fn read_u64(&self) -> Result { + let mut val: u64 = 0; + let ret = try_libc!(libc::ocall::read( + self.host_fd as c_int, + &mut val as *mut _ as *mut c_void, + std::mem::size_of::(), + )) as usize; + debug_assert!(ret != std::mem::size_of::()); + Ok(val) + } + + pub fn write_u64(&self, val: u64) { + unsafe { + libc::ocall::write( + self.host_fd as c_int, + &val as *const _ as *const c_void, + std::mem::size_of::(), + ); + } + } + + pub fn poll(&self, timeout: Option<&Duration>) -> Result<()> { + let mut timeout = timeout.cloned(); + self.poll_mut(timeout.as_mut()) + } + + pub fn poll_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + match timeout { + None => ocall_eventfd_poll(self.host_fd, std::ptr::null_mut()), + Some(timeout) => { + let mut remain_c = timespec_t::from(*timeout); + let ret = ocall_eventfd_poll(self.host_fd, &mut remain_c); + + let remain = remain_c.as_duration(); + assert!(remain <= *timeout + TIMERSLACK.to_duration()); + *timeout = remain; + + ret + } + } + } + + /// Write to all host eventfds in one OCall. + /// + /// Precondition. The caller must ensure that the host fds are valid. + pub unsafe fn write_u64_raw_and_batch(host_fds: &[FileDesc], val: u64) { + ocall_eventfd_write_batch(host_fds, val); + } + + pub fn host_fd(&self) -> FileDesc { + self.host_fd + } +} + +impl Drop for HostEventFd { + fn drop(&mut self) { + let ret = unsafe { libc::ocall::close(self.host_fd as c_int) }; + debug_assert!(ret == 0); + } +} + +fn ocall_eventfd_poll(host_fd: FileDesc, timeout: *mut timespec_t) -> Result<()> { + try_libc!({ + let mut ret = 0; + let status = unsafe { occlum_ocall_eventfd_poll(&mut ret, host_fd, timeout) }; + assert!(status == sgx_status_t::SGX_SUCCESS); + ret + }); + Ok(()) +} + +fn ocall_eventfd_write_batch(host_fds: &[FileDesc], val: u64) { + let status = + unsafe { occlum_ocall_eventfd_write_batch(host_fds.as_ptr(), host_fds.len(), val) }; + assert!(status == sgx_status_t::SGX_SUCCESS); +} + +extern "C" { + fn occlum_ocall_eventfd(ret: *mut i32, init_val: u32, flags: i32) -> sgx_status_t; + fn occlum_ocall_eventfd_poll( + ret: *mut i32, + fd: FileDesc, + timeout: *mut timespec_t, + ) -> sgx_status_t; + fn occlum_ocall_eventfd_write_batch( + fds: *const FileDesc, + num_fds: usize, + val: u64, + ) -> sgx_status_t; +} diff --git a/src/libos/src/events/mod.rs b/src/libos/src/events/mod.rs new file mode 100644 index 00000000..099ca50c --- /dev/null +++ b/src/libos/src/events/mod.rs @@ -0,0 +1,32 @@ +//! The event subsystem. +//! +//! An event can be anything ranging from the exit of a process (interesting +//! to `wait4`) to the arrival of a blocked signal (interesting to `sigwaitinfo`), +//! from the completion of a file operation (interesting to `epoll`) to the change +//! of a file status (interesting to `inotify`). +//! +//! To meet the event-related demands from various subsystems, this event +//! subsystem is designed to provide a set of general-purpose primitives: +//! +//! * `Waiter`, `Waker`, and `WaiterQueue` are primitives to put threads to sleep +//! and later wake them up. +//! * `Event`, `Observer`, and `Notifier` are primitives to handle and broadcast +//! events. +//! * `WaiterQueueObserver` implements the common pattern of waking up threads +//! * once some interesting events happen. + +mod event; +mod host_event_fd; +mod notifier; +mod observer; +mod waiter; +mod waiter_queue; +mod waiter_queue_observer; + +pub use self::event::{Event, EventFilter}; +pub use self::host_event_fd::HostEventFd; +pub use self::notifier::Notifier; +pub use self::observer::Observer; +pub use self::waiter::{Waiter, Waker}; +pub use self::waiter_queue::WaiterQueue; +pub use self::waiter_queue_observer::WaiterQueueObserver; diff --git a/src/libos/src/events/notifier.rs b/src/libos/src/events/notifier.rs new file mode 100644 index 00000000..9e51249d --- /dev/null +++ b/src/libos/src/events/notifier.rs @@ -0,0 +1,75 @@ +use std::any::Any; +use std::marker::PhantomData; +use std::sync::Weak; + +use super::{Event, EventFilter, Observer}; +use crate::prelude::*; + +/// An event notifier broadcasts interesting events to registered observers. +pub struct Notifier = DummyEventFilter> { + subscribers: SgxMutex>>, +} + +struct Subscriber> { + observer: Weak>, + filter: Option, + metadata: Option>, +} + +impl> Notifier { + /// Create an event notifier. + pub fn new() -> Self { + let subscribers = SgxMutex::new(VecDeque::new()); + Self { subscribers } + } + + /// Register an observer with its interesting events and metadata. + pub fn register( + &self, + observer: Weak>, + filter: Option, + metadata: Option>, + ) { + let mut subscribers = self.subscribers.lock().unwrap(); + subscribers.push_back(Subscriber { + observer, + filter, + metadata, + }); + } + + /// Unregister an observer. + pub fn unregister(&self, observer: &Weak>) { + let mut subscribers = self.subscribers.lock().unwrap(); + subscribers.retain(|subscriber| !Weak::ptr_eq(&subscriber.observer, observer)); + } + + /// Broadcast an event to all registered observers. + pub fn broadcast(&self, event: &E) { + let subscribers = self.subscribers.lock().unwrap(); + for subscriber in subscribers.iter() { + if let Some(filter) = subscriber.filter.as_ref() { + if !filter.filter(event) { + continue; + } + } + let observer = match subscriber.observer.upgrade() { + // TODO: should remove subscribers whose observers have been freed + None => return, + Some(observer) => observer, + }; + + observer.on_event(event, &subscriber.metadata); + } + } +} + +pub struct DummyEventFilter { + phantom: PhantomData, +} + +impl EventFilter for DummyEventFilter { + fn filter(&self, event: &E) -> bool { + true + } +} diff --git a/src/libos/src/events/observer.rs b/src/libos/src/events/observer.rs new file mode 100644 index 00000000..8a219835 --- /dev/null +++ b/src/libos/src/events/observer.rs @@ -0,0 +1,19 @@ +use std::any::Any; + +use super::Event; +use crate::prelude::*; + +/// An obsever receives events from the notifiers to which it has registered. +pub trait Observer: Send + Sync { + /// The callback that will be executed when some interesting events are + /// delivered by a notifier to this observer. + /// + /// Note that it is important to keep this method simple, short, and + /// non-blocking. This is because the caller of this function, which is most + /// likely to be `Notifier::broadcast`, may have acquired the locks of some + /// resources. In general, these locks may coincide with the ones required + /// by a specific implementation of `Observer::on_event`. Thus, to avoid + /// the odds of deadlocks, the `on_event` method should be written short + /// and sweet. + fn on_event(&self, event: &E, metadata: &Option>) -> (); +} diff --git a/src/libos/src/events/waiter.rs b/src/libos/src/events/waiter.rs new file mode 100644 index 00000000..a7eceaf3 --- /dev/null +++ b/src/libos/src/events/waiter.rs @@ -0,0 +1,172 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Weak; +use std::time::Duration; + +use super::host_event_fd::HostEventFd; +use crate::prelude::*; + +/// A waiter enables a thread to sleep. +pub struct Waiter { + inner: Arc, +} + +impl Waiter { + /// Create a waiter for the current thread. + /// + /// A `Waiter` is bound to the curent thread that creates it: it cannot be + /// sent to or used by any other threads as the type implements `!Send` and + /// `!Sync` traits. Thus, a `Waiter` can only put the current thread to sleep. + pub fn new() -> Self { + Self { + inner: Arc::new(Inner::new()), + } + } + + /// Return whether a waiter has been waken up. + /// + /// Once a waiter is waken up, the `wait` or `wait_mut` method becomes + /// non-blocking. + pub fn is_woken(&self) -> bool { + self.inner.is_woken() + } + + /// Reset a waiter. + /// + /// After a `Waiter` being waken up, the `reset` method must be called so + /// that the `Waiter` can use the `wait` or `wait_mut` methods to sleep the + /// current thread again. + pub fn reset(&self) { + self.inner.reset(); + } + + /// Put the current thread to sleep until being waken up by a waker. + /// + /// The method has three possible return values: + /// 1. `Ok(())`: The `Waiter` has been waken up by one of its `Waker`; + /// 2. `Err(e) if e.errno() == Errno::ETIMEDOUT`: Timeout. + /// 3. `Err(e) if e.errno() == Errno::EINTR`: Interrupted by a signal. + /// + /// If the `timeout` argument is `None`, then the second case won't happen, + /// i.e., the method will block indefinitely. + pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + self.inner.wait(timeout) + } + + /// Put the current thread to sleep until being waken up by a waker. + /// + /// This method is similar to the `wait` method except that the `timeout` + /// argument will be updated to reflect the remaining timeout. + pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + self.inner.wait_mut(timeout) + } + + /// Create a waker that can wake up this waiter. + /// + /// `WaiterQueue` maintains a list of `Waker` internally to wake up the + /// enqueued `Waiter`s. So, for users that uses `WaiterQueue`, this method + /// does not need to be called manually. + pub fn waker(&self) -> Waker { + Waker { + inner: Arc::downgrade(&self.inner), + } + } +} + +impl !Send for Waiter {} +impl !Sync for Waiter {} + +/// A waker can wake up the thread that its waiter has put to sleep. +pub struct Waker { + inner: Weak, +} + +impl Waker { + /// Wake up the waiter that creates this waker. + pub fn wake(&self) { + if let Some(inner) = self.inner.upgrade() { + inner.wake() + } + } + + /// Wake up waiters in batch, more efficient than waking up one-by-one. + pub fn batch_wake<'a, I: Iterator>(iter: I) { + Inner::batch_wake(iter); + } +} + +struct Inner { + is_woken: AtomicBool, + host_eventfd: Arc, +} + +impl Inner { + pub fn new() -> Self { + let is_woken = AtomicBool::new(false); + let host_eventfd = current!().host_eventfd().clone(); + Self { + is_woken, + host_eventfd, + } + } + + pub fn is_woken(&self) -> bool { + self.is_woken.load(Ordering::SeqCst) + } + + pub fn reset(&self) { + self.is_woken.store(false, Ordering::SeqCst); + } + + pub fn wait(&self, timeout: Option<&Duration>) -> Result<()> { + while !self.is_woken() { + self.host_eventfd.poll(timeout)?; + } + Ok(()) + } + + pub fn wait_mut(&self, timeout: Option<&mut Duration>) -> Result<()> { + let mut remain = timeout.as_ref().map(|d| **d); + + // Need to change timeout from `Option<&mut Duration>` to `&mut Option` + // so that the Rust compiler is happy about using the variable in a loop. + let ret = self.do_wait_mut(&mut remain); + + if let Some(timeout) = timeout { + *timeout = remain.unwrap(); + } + ret + } + + fn do_wait_mut(&self, remain: &mut Option) -> Result<()> { + while !self.is_woken() { + self.host_eventfd.poll_mut(remain.as_mut())?; + } + Ok(()) + } + + pub fn wake(&self) { + if self + .is_woken + .compare_and_swap(false, true, Ordering::SeqCst) + == false + { + self.host_eventfd.write_u64(1); + } + } + + pub fn batch_wake<'a, I: Iterator>(iter: I) { + let host_eventfds = iter + .filter_map(|waker| waker.inner.upgrade()) + .filter(|inner| { + inner + .is_woken + .compare_and_swap(false, true, Ordering::SeqCst) + == false + }) + .map(|inner| inner.host_eventfd.host_fd()) + .collect::>(); + unsafe { + HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1); + } + } +} diff --git a/src/libos/src/events/waiter_queue.rs b/src/libos/src/events/waiter_queue.rs new file mode 100644 index 00000000..7ec178e8 --- /dev/null +++ b/src/libos/src/events/waiter_queue.rs @@ -0,0 +1,76 @@ +use std::collections::VecDeque; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + +use super::{Waiter, Waker}; +use crate::prelude::*; + +/// A queue for waiters. +/// +/// By using this queue, we can wake up threads in their waiters' enqueue order. +/// +/// While the queue is conceptually for `Waiter`s, it internally maintains a list +/// of `Waker`s. +pub struct WaiterQueue { + count: AtomicUsize, + wakers: SgxMutex>, +} + +impl WaiterQueue { + /// Creates an empty queue for `Waiter`s. + pub fn new() -> Self { + Self { + count: AtomicUsize::new(0), + wakers: SgxMutex::new(VecDeque::new()), + } + } + + /// Returns whether the queue is empty. + pub fn is_empty(&self) -> bool { + self.count.load(Ordering::SeqCst) == 0 + } + + /// Reset a waiter and enqueue it. + /// + /// It is allowed to enqueue a waiter more than once before it is dequeued. + /// But this is usually not a good idea. It is the callers' responsibility + /// to use the API properly. + pub fn reset_and_enqueue(&self, waiter: &Waiter) { + waiter.reset(); + + let mut wakers = self.wakers.lock().unwrap(); + self.count.fetch_add(1, Ordering::SeqCst); + wakers.push_back(waiter.waker()); + } + + /// Dequeue a waiter and wake up its thread. + pub fn dequeue_and_wake_one(&self) -> usize { + self.dequeue_and_wake_nr(1) + } + + /// Dequeue all waiters and wake up their threads. + pub fn dequeue_and_wake_all(&self) -> usize { + self.dequeue_and_wake_nr(usize::MAX) + } + + /// Deuque a maximum numer of waiters and wake up their threads. + pub fn dequeue_and_wake_nr(&self, max_count: usize) -> usize { + // The quick path for a common case + if self.is_empty() { + return 0; + } + + // Dequeue wakers + let to_wake = { + let mut wakers = self.wakers.lock().unwrap(); + let max_count = max_count.min(wakers.len()); + let to_wake: Vec = wakers.drain(..max_count).collect(); + self.count.fetch_sub(to_wake.len(), Ordering::SeqCst); + to_wake + }; + + // Wake in batch + Waker::batch_wake(to_wake.iter()); + to_wake.len() + } +} diff --git a/src/libos/src/events/waiter_queue_observer.rs b/src/libos/src/events/waiter_queue_observer.rs new file mode 100644 index 00000000..fe7ed930 --- /dev/null +++ b/src/libos/src/events/waiter_queue_observer.rs @@ -0,0 +1,35 @@ +use std::any::Any; +use std::marker::PhantomData; + +use super::{Event, Observer, WaiterQueue}; +use crate::prelude::*; + +/// A Observer associated with a WaiterQueue. +/// +/// Once the observer receives any interesting events, it will dequeue and +/// wake up all `Waiters` in the associated `WaiterQueue`. +pub struct WaiterQueueObserver { + waiter_queue: WaiterQueue, + phantom: PhantomData, +} + +impl WaiterQueueObserver { + pub fn new() -> Arc { + let waiter_queue = WaiterQueue::new(); + let phantom = PhantomData; + Arc::new(Self { + waiter_queue, + phantom, + }) + } + + pub fn waiter_queue(&self) -> &WaiterQueue { + &self.waiter_queue + } +} + +impl Observer for WaiterQueueObserver { + fn on_event(&self, event: &E, _metadata: &Option>) { + self.waiter_queue.dequeue_and_wake_all(); + } +} diff --git a/src/libos/src/lib.rs b/src/libos/src/lib.rs index 8e98315c..40f9e137 100644 --- a/src/libos/src/lib.rs +++ b/src/libos/src/lib.rs @@ -62,6 +62,7 @@ mod error; mod config; mod entry; +mod events; mod exception; mod fs; mod interrupt; diff --git a/src/libos/src/process/thread/builder.rs b/src/libos/src/process/thread/builder.rs index 90544687..ee66feb8 100644 --- a/src/libos/src/process/thread/builder.rs +++ b/src/libos/src/process/thread/builder.rs @@ -4,6 +4,7 @@ use super::{ FileTableRef, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, SchedAgentRef, SigQueues, SigSet, Task, Thread, ThreadId, ThreadInner, ThreadName, ThreadRef, }; +use crate::events::HostEventFd; use crate::prelude::*; use crate::time::ThreadProfiler; @@ -116,6 +117,7 @@ impl ThreadBuilder { } else { SgxMutex::new(None) }; + let host_eventfd = Arc::new(HostEventFd::new()?); let new_thread = Arc::new(Thread { task, @@ -134,6 +136,7 @@ impl ThreadBuilder { sig_tmp_mask, sig_stack, profiler, + host_eventfd, }); let mut inner = new_thread.process().inner(); diff --git a/src/libos/src/process/thread/mod.rs b/src/libos/src/process/thread/mod.rs index f51a2ebf..ed41f337 100644 --- a/src/libos/src/process/thread/mod.rs +++ b/src/libos/src/process/thread/mod.rs @@ -6,6 +6,7 @@ use super::{ FileTableRef, ForcedExitStatus, FsViewRef, ProcessRef, ProcessVM, ProcessVMRef, ResourceLimitsRef, SchedAgentRef, TermStatus, ThreadRef, }; +use crate::events::HostEventFd; use crate::fs::{EventCreationFlags, EventFile}; use crate::net::THREAD_NOTIFIERS; use crate::prelude::*; @@ -44,6 +45,8 @@ pub struct Thread { sig_stack: SgxMutex>, // System call timing profiler: SgxMutex>, + // Misc + host_eventfd: Arc, } #[derive(Debug, PartialEq, Clone, Copy)] @@ -144,6 +147,10 @@ impl Thread { *self.name.write().unwrap() = new_name; } + pub fn host_eventfd(&self) -> &Arc { + &self.host_eventfd + } + pub(super) fn start(&self, host_tid: pid_t) { self.sched().lock().unwrap().attach(host_tid); self.inner().start(); diff --git a/src/libos/src/time/mod.rs b/src/libos/src/time/mod.rs index 454c42c6..631ddaeb 100644 --- a/src/libos/src/time/mod.rs +++ b/src/libos/src/time/mod.rs @@ -13,6 +13,7 @@ pub mod timer_slack; pub mod up_time; pub use profiler::ThreadProfiler; +pub use timer_slack::TIMERSLACK; #[allow(non_camel_case_types)] pub type time_t = i64; @@ -73,6 +74,15 @@ pub struct timespec_t { nsec: i64, } +impl From for timespec_t { + fn from(duration: Duration) -> timespec_t { + let sec = duration.as_secs() as time_t; + let nsec = duration.subsec_nanos() as i64; + debug_assert!(sec >= 0); // nsec >= 0 always holds + timespec_t { sec, nsec } + } +} + impl timespec_t { pub fn from_raw_ptr(ptr: *const timespec_t) -> Result { let ts = unsafe { *ptr }; diff --git a/src/pal/src/ocalls/event.c b/src/pal/src/ocalls/event.c new file mode 100644 index 00000000..edeb725c --- /dev/null +++ b/src/pal/src/ocalls/event.c @@ -0,0 +1,43 @@ +#define _GNU_SOURCE +#include "ocalls.h" +#include +#include +#include +#include +#include + +int occlum_ocall_eventfd(unsigned int initval, int flags) { + return eventfd(initval, flags); +} + +int occlum_ocall_eventfd_poll(int eventfd, struct timespec *timeout) { + int ret; + + struct pollfd pollfds[1]; + pollfds[0].fd = eventfd; + pollfds[0].events = POLLIN; + pollfds[0].revents = 0; + + // We use the ppoll syscall directly instead of the libc wrapper. This + // is because the syscall version updates the timeout argument to indicate + // how much time was left (which what we want), while the libc wrapper + // keeps the timeout argument unchanged. + ret = raw_ppoll(pollfds, 1, timeout); + if (ret < 0) { + return -1; + } + + char buf[8]; + read(eventfd, buf, 8); + return 0; +} + +void occlum_ocall_eventfd_write_batch( + int *eventfds, + size_t num_fds, + uint64_t val +) { + for (int fd_i = 0; fd_i < num_fds; fd_i++) { + write(eventfds[fd_i], &val, sizeof(val)); + } +} diff --git a/src/pal/src/ocalls/fs.c b/src/pal/src/ocalls/fs.c index ce6613d8..7b7ccf6d 100644 --- a/src/pal/src/ocalls/fs.c +++ b/src/pal/src/ocalls/fs.c @@ -2,17 +2,12 @@ #include #include #include -#include #include void occlum_ocall_sync(void) { sync(); } -int occlum_ocall_eventfd(unsigned int initval, int flags) { - return eventfd(initval, flags); -} - int occlum_ocall_ioctl_repack(int fd, int request, char *buf, int len, int *recv_len) { int ret = 0; diff --git a/src/pal/src/pal_syscall.h b/src/pal/src/pal_syscall.h index 13aa145b..5fe586e8 100644 --- a/src/pal/src/pal_syscall.h +++ b/src/pal/src/pal_syscall.h @@ -12,5 +12,6 @@ #define tgkill(tgid, tid, signum) ((int)syscall(__NR_tgkill, (tgid), (tid), (signum))); #define futex_wait(addr, val, timeout) ((int)syscall(__NR_futex, (addr), FUTEX_WAIT, (val), (timeout))) #define futex_wake(addr) ((int)syscall(__NR_futex, (addr), FUTEX_WAKE, 1)) +#define raw_ppoll(fds, nfds, timeout) ((int)syscall(__NR_ppoll, (fds), (nfds), (timeout), NULL, 0)) #endif /* __PAL_SYSCALL_H__ */