diff --git a/src/Enclave.edl b/src/Enclave.edl index 1fba2881..7a9d8a99 100644 --- a/src/Enclave.edl +++ b/src/Enclave.edl @@ -190,6 +190,8 @@ enclave { unsigned int initval, int flags ) propagate_errno; + // TODO: the usage of this OCall should be replaced with + // occlum_ocall_poll_with_eventfd, which is a more general form. int occlum_ocall_eventfd_poll( int eventfd, [in, out] struct timespec *timeout @@ -200,12 +202,20 @@ enclave { uint64_t val ); + // TODO: the usage of this OCall should be replaced with + // occlum_ocall_poll_with_eventfd, which is a more general form. int occlum_ocall_poll( [in, out, count=nfds] struct pollfd *fds, nfds_t nfds, [in, out] struct timeval *timeout, int efd - )propagate_errno; + ) propagate_errno; + int occlum_ocall_poll_with_eventfd( + [in, out, count=nfds] struct pollfd *fds, + nfds_t nfds, + [in, out] struct timespec *timeout, + int eventfd_idx + ) propagate_errno; void occlum_ocall_print_log(uint32_t level, [in, string] const char* msg); void occlum_ocall_flush_log(void); diff --git a/src/libos/src/events/host_event_fd.rs b/src/libos/src/events/host_event_fd.rs index 8e44ee17..50b95ba6 100644 --- a/src/libos/src/events/host_event_fd.rs +++ b/src/libos/src/events/host_event_fd.rs @@ -49,6 +49,11 @@ impl HostEventFd { match timeout { None => ocall_eventfd_poll(self.host_fd, std::ptr::null_mut()), Some(timeout) => { + const ZERO: Duration = Duration::from_secs(0); + if *timeout == ZERO { + return_errno!(ETIMEDOUT, "should return immediately"); + } + let mut remain_c = timespec_t::from(*timeout); let ret = ocall_eventfd_poll(self.host_fd, &mut remain_c); @@ -56,6 +61,12 @@ impl HostEventFd { assert!(remain <= *timeout + TIMERSLACK.to_duration()); *timeout = remain; + // Poll syscall does not treat timeout as error. So we need + // to distinguish the case by ourselves. + if *timeout == ZERO { + return_errno!(ETIMEDOUT, "time is up"); + } + ret } } diff --git a/src/libos/src/events/notifier.rs b/src/libos/src/events/notifier.rs index 9e51249d..0d4923af 100644 --- a/src/libos/src/events/notifier.rs +++ b/src/libos/src/events/notifier.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::fmt; use std::marker::PhantomData; use std::sync::Weak; @@ -13,7 +14,7 @@ pub struct Notifier = DummyEventFilter> { struct Subscriber> { observer: Weak>, filter: Option, - metadata: Option>, + metadata: Option>, } impl> Notifier { @@ -28,7 +29,7 @@ impl> Notifier { &self, observer: Weak>, filter: Option, - metadata: Option>, + metadata: Option>, ) { let mut subscribers = self.subscribers.lock().unwrap(); subscribers.push_back(Subscriber { @@ -64,6 +65,12 @@ impl> Notifier { } } +impl> fmt::Debug for Notifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Notifier {{ .. }}") + } +} + pub struct DummyEventFilter { phantom: PhantomData, } diff --git a/src/libos/src/events/observer.rs b/src/libos/src/events/observer.rs index 8a219835..0abe12b8 100644 --- a/src/libos/src/events/observer.rs +++ b/src/libos/src/events/observer.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::sync::Weak; use super::Event; use crate::prelude::*; @@ -15,5 +16,5 @@ pub trait Observer: Send + Sync { /// by a specific implementation of `Observer::on_event`. Thus, to avoid /// the odds of deadlocks, the `on_event` method should be written short /// and sweet. - fn on_event(&self, event: &E, metadata: &Option>) -> (); + fn on_event(&self, event: &E, metadata: &Option>) -> (); } diff --git a/src/libos/src/events/waiter.rs b/src/libos/src/events/waiter.rs index a7eceaf3..cad8daeb 100644 --- a/src/libos/src/events/waiter.rs +++ b/src/libos/src/events/waiter.rs @@ -70,6 +70,13 @@ impl Waiter { inner: Arc::downgrade(&self.inner), } } + + /// Expose the internal host eventfd. + /// + /// This host eventfd should be used by an external user carefully. + pub fn host_eventfd(&self) -> &HostEventFd { + self.inner.host_eventfd() + } } impl !Send for Waiter {} @@ -169,4 +176,8 @@ impl Inner { HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1); } } + + pub fn host_eventfd(&self) -> &HostEventFd { + &self.host_eventfd + } } diff --git a/src/libos/src/events/waiter_queue_observer.rs b/src/libos/src/events/waiter_queue_observer.rs index fe7ed930..4104e348 100644 --- a/src/libos/src/events/waiter_queue_observer.rs +++ b/src/libos/src/events/waiter_queue_observer.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::marker::PhantomData; +use std::sync::Weak; use super::{Event, Observer, WaiterQueue}; use crate::prelude::*; @@ -29,7 +30,7 @@ impl WaiterQueueObserver { } impl Observer for WaiterQueueObserver { - fn on_event(&self, event: &E, _metadata: &Option>) { + fn on_event(&self, event: &E, _metadata: &Option>) { self.waiter_queue.dequeue_and_wake_all(); } } diff --git a/src/libos/src/fs/event_file.rs b/src/libos/src/fs/event_file.rs index cdf74cea..d7170cf9 100644 --- a/src/libos/src/fs/event_file.rs +++ b/src/libos/src/fs/event_file.rs @@ -1,25 +1,36 @@ use super::*; +use atomic::{Atomic, Ordering}; + /// Native Linux eventfd // TODO: move the implementaion of eventfd into libos to defend against Iago attacks from OCalls #[derive(Debug)] pub struct EventFile { - host_fd: c_int, + host_fd: HostFd, + host_events: Atomic, + notifier: IoNotifier, } impl EventFile { pub fn new(init_val: u32, flags: EventCreationFlags) -> Result { - let host_fd = try_libc!({ + let raw_host_fd = try_libc!({ let mut ret: i32 = 0; let status = occlum_ocall_eventfd(&mut ret, init_val, flags.bits()); assert!(status == sgx_status_t::SGX_SUCCESS); ret - }); - Ok(Self { host_fd }) + }) as FileDesc; + let host_fd = HostFd::new(raw_host_fd); + let host_events = Atomic::new(IoEvents::empty()); + let notifier = IoNotifier::new(); + Ok(Self { + host_fd, + host_events, + notifier, + }) } pub fn get_host_fd(&self) -> c_int { - self.host_fd + self.host_fd.to_raw() as c_int } } @@ -40,7 +51,7 @@ extern "C" { impl Drop for EventFile { fn drop(&mut self) { - let ret = unsafe { libc::ocall::close(self.host_fd) }; + let ret = unsafe { libc::ocall::close(self.host_fd.to_raw() as i32) }; assert!(ret == 0); } } @@ -50,7 +61,7 @@ impl File for EventFile { let (buf_ptr, buf_len) = buf.as_mut().as_mut_ptr_and_len(); let ret = try_libc!(libc::ocall::read( - self.host_fd, + self.host_fd.to_raw() as i32, buf_ptr as *mut c_void, buf_len )) as usize; @@ -61,7 +72,7 @@ impl File for EventFile { fn write(&self, buf: &[u8]) -> Result { let (buf_ptr, buf_len) = buf.as_ptr_and_len(); let ret = try_libc!(libc::ocall::write( - self.host_fd, + self.host_fd.to_raw() as i32, buf_ptr as *const c_void, buf_len )) as usize; @@ -93,6 +104,26 @@ impl File for EventFile { Ok(()) } + fn poll_new(&self) -> IoEvents { + self.host_events.load(Ordering::Acquire) + } + + fn notifier(&self) -> Option<&IoNotifier> { + Some(&self.notifier) + } + + fn host_fd(&self) -> Option<&HostFd> { + Some(&self.host_fd) + } + + fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) { + self.host_events.store(*events, Ordering::Release); + + if trigger_notifier { + self.notifier.broadcast(events); + } + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/libos/src/fs/events.rs b/src/libos/src/fs/events.rs index 726a973e..55ce6e3a 100644 --- a/src/libos/src/fs/events.rs +++ b/src/libos/src/fs/events.rs @@ -1,8 +1,8 @@ -use crate::events::{Event, EventFilter, Notifier}; +use crate::events::{Event, EventFilter, Notifier, Observer}; use crate::prelude::*; bitflags! { - pub struct IoEvents: u16 { + pub struct IoEvents: u32 { const IN = 0x001; // = POLLIN const OUT = 0x004; // = POLLOUT const PRI = 0x002; // = POLLPRI diff --git a/src/libos/src/fs/file.rs b/src/libos/src/fs/file.rs index d1218f3d..30d39bd9 100644 --- a/src/libos/src/fs/file.rs +++ b/src/libos/src/fs/file.rs @@ -91,18 +91,46 @@ pub trait File: Debug + Sync + Send + Any { return_op_unsupported_error!("set_advisory_lock") } + // TODO: remove this function after all users of this code are removed fn poll(&self) -> Result<(crate::net::PollEventFlags)> { return_op_unsupported_error!("poll") } + // TODO: remove this function after all users of this code are removed fn enqueue_event(&self, _: crate::net::IoEvent) -> Result<()> { return_op_unsupported_error!("enqueue_event"); } + // TODO: remove this function after all users of this code are removed fn dequeue_event(&self) -> Result<()> { return_op_unsupported_error!("dequeue_event"); } + // TODO: rename poll_new to poll + fn poll_new(&self) -> IoEvents { + IoEvents::empty() + } + + /// Returns a notifier that broadcast events on this file. + /// + /// Not every file has an associated event notifier. + fn notifier(&self) -> Option<&IoNotifier> { + None + } + + /// Return the host fd, if the file is backed by an underlying host file. + fn host_fd(&self) -> Option<&HostFd> { + return None; + } + + /// Receive events from the host. + /// + /// After calling this method, the `poll` method of the `File` trait will + /// return the latest events on the `HostFile`. + /// + /// This method has no effect if the `host_fd` method returns `None`. + fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {} + fn as_any(&self) -> &dyn Any; } diff --git a/src/libos/src/fs/host_fd.rs b/src/libos/src/fs/host_fd.rs new file mode 100644 index 00000000..78db73ea --- /dev/null +++ b/src/libos/src/fs/host_fd.rs @@ -0,0 +1,65 @@ +use std::collections::HashSet; +use std::hash::{Hash, Hasher}; + +use super::*; + +/// A unique fd from the host OS. +/// +/// The uniqueness property is important both +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct HostFd(FileDesc); + +impl HostFd { + pub fn new(host_fd: FileDesc) -> Self { + HOST_FD_REGISTRY.lock().unwrap().register(host_fd).unwrap(); + Self(host_fd) + } + + pub fn to_raw(&self) -> FileDesc { + self.0 + } +} + +impl Drop for HostFd { + fn drop(&mut self) { + HOST_FD_REGISTRY + .lock() + .unwrap() + .unregister(self.to_raw()) + .unwrap(); + } +} + +lazy_static! { + static ref HOST_FD_REGISTRY: SgxMutex = + { SgxMutex::new(HostFdRegistry::new()) }; +} + +/// A registery for host fds to ensure that they are unique. +struct HostFdRegistry { + set: HashSet, +} + +impl HostFdRegistry { + pub fn new() -> Self { + Self { + set: HashSet::new(), + } + } + + pub fn register(&mut self, host_fd: FileDesc) -> Result<()> { + let new_val = self.set.insert(host_fd); + if !new_val { + return_errno!(EEXIST, "host fd has been registered"); + } + Ok(()) + } + + pub fn unregister(&mut self, host_fd: FileDesc) -> Result<()> { + let existing = self.set.remove(&host_fd); + if !existing { + return_errno!(ENOENT, "host fd has NOT been registered"); + } + Ok(()) + } +} diff --git a/src/libos/src/fs/mod.rs b/src/libos/src/fs/mod.rs index 1a80cf4f..deb6d519 100644 --- a/src/libos/src/fs/mod.rs +++ b/src/libos/src/fs/mod.rs @@ -20,6 +20,7 @@ pub use self::file_ops::{ }; pub use self::file_table::{FileDesc, FileTable}; pub use self::fs_view::FsView; +pub use self::host_fd::HostFd; pub use self::inode_file::{AsINodeFile, INodeExt, INodeFile}; pub use self::pipe::PipeType; pub use self::rootfs::ROOT_INODE; @@ -35,6 +36,7 @@ mod file_ops; mod file_table; mod fs_ops; mod fs_view; +mod host_fd; mod hostfs; mod inode_file; mod pipe; diff --git a/src/libos/src/fs/pipe.rs b/src/libos/src/fs/pipe.rs index d4eea820..769b81dc 100644 --- a/src/libos/src/fs/pipe.rs +++ b/src/libos/src/fs/pipe.rs @@ -96,6 +96,14 @@ impl File for PipeReader { Ok(events) } + fn poll_new(&self) -> IoEvents { + self.consumer.poll() + } + + fn notifier(&self) -> Option<&IoNotifier> { + Some(self.consumer.notifier()) + } + fn as_any(&self) -> &dyn Any { self } @@ -171,6 +179,14 @@ impl File for PipeWriter { Ok(events) } + fn poll_new(&self) -> IoEvents { + self.producer.poll() + } + + fn notifier(&self) -> Option<&IoNotifier> { + Some(self.producer.notifier()) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/libos/src/lib.rs b/src/libos/src/lib.rs index 40f9e137..d6fa9813 100644 --- a/src/libos/src/lib.rs +++ b/src/libos/src/lib.rs @@ -16,6 +16,7 @@ #![feature(option_expect_none)] // for UntrustedSliceAlloc in slice_alloc #![feature(slice_ptr_get)] +#![feature(maybe_uninit_extra)] #[macro_use] extern crate alloc; diff --git a/src/libos/src/net/io_multiplexing/epoll.rs b/src/libos/src/net/io_multiplexing/epoll.rs deleted file mode 100644 index d39f3ddb..00000000 --- a/src/libos/src/net/io_multiplexing/epoll.rs +++ /dev/null @@ -1,184 +0,0 @@ -use super::*; - -#[derive(Debug, Copy, Clone)] -pub enum EpollCtlCmd { - /// Add a file decriptor to the interface - Add = 1, - /// Remove a file decriptor from the interface - Del = 2, - /// Change file decriptor epoll_event structre - Mod = 3, -} - -impl TryFrom for EpollCtlCmd { - type Error = error::Error; - - fn try_from(op_num: i32) -> Result { - match op_num { - 1 => Ok(EpollCtlCmd::Add), - 2 => Ok(EpollCtlCmd::Del), - 3 => Ok(EpollCtlCmd::Mod), - _ => return_errno!(EINVAL, "invalid operation number"), - } - } -} - -bitflags! { - #[derive(Default)] - pub struct EpollEventFlags: u32 { - // The available events are got from linux source. - // This struct contains more flags than linux man page described. - const EPOLLIN = 0x0001; - const EPOLLPRI = 0x0002; - const EPOLLOUT = 0x0004; - const EPOLLERR = 0x0008; - const EPOLLHUP = 0x0010; - const EPOLLNVAL = 0x0020; - const EPOLLRDNORM = 0x0040; - const EPOLLRDBAND = 0x0080; - const EPOLLWRNORM = 0x0100; - const EPOLLWRBAND = 0x0200; - const EPOLLMSG = 0x0400; - const EPOLLRDHUP = 0x2000; - const EPOLLEXCLUSIVE = (1 << 28); - const EPOLLWAKEUP = (1 << 29); - const EPOLLONESHOT = (1 << 30); - const EPOLLET = (1 << 31); - } -} - -//TODO: Add more mitigations to protect from iago attacks -#[derive(Copy, Clone, Debug, Default)] -pub struct EpollEvent { - /// Epoll Events - events: EpollEventFlags, - /// Libos-agnostic user data variable - data: uint64_t, -} - -impl EpollEvent { - pub fn new(events: EpollEventFlags, data: uint64_t) -> Self { - Self { events, data } - } - - pub fn from_raw(epoll_event: &libc::epoll_event) -> Result { - Ok(Self::new( - EpollEventFlags::from_bits(epoll_event.events) - .ok_or_else(|| errno!(EINVAL, "invalid flags"))?, - epoll_event.u64, - )) - } - - pub fn to_raw(&self) -> libc::epoll_event { - libc::epoll_event { - events: self.events.bits(), - u64: self.data, - } - } -} - -#[derive(Debug)] -pub struct EpollFile { - host_fd: c_int, -} - -impl EpollFile { - /// Creates a new Linux epoll file descriptor - pub fn new(flags: CreationFlags) -> Result { - debug!("create epollfile: flags: {:?}", flags); - let host_fd = try_libc!(libc::ocall::epoll_create1(flags.bits() as i32)); - Ok(Self { host_fd }) - } - - pub fn control(&self, op: EpollCtlCmd, fd: FileDesc, event: Option<&EpollEvent>) -> Result<()> { - let host_fd = { - let fd_ref = current!().file(fd)?; - if let Ok(socket) = fd_ref.as_host_socket() { - socket.host_fd() - } else if let Ok(eventfd) = fd_ref.as_event() { - eventfd.get_host_fd() - } else if let Ok(epoll_file) = fd_ref.as_epfile() { - let target_host_fd = epoll_file.get_host_fd(); - if self.host_fd == target_host_fd { - return_errno!(EINVAL, "epfd should not be same as the target fd"); - } - target_host_fd - } else { - return_errno!(EPERM, "unsupported file type"); - } - }; - - // Notes on deadlock. - // - // All locks on fd (if any) will be released at this point. This means - // we don't have to worry about the potential deadlock caused by - // locking two files (say, fd and epfd) in an inconsistent order. - - //TODO: Shoud be const. - // Cast const to mut to be compatiable with the ocall from rust sdk. - let mut epevent = event.map(|e| e.to_raw()); - let raw_epevent_ptr: *mut libc::epoll_event = match epevent { - Some(ref mut e) => e as *mut _, - _ => std::ptr::null_mut(), - }; - - try_libc!(libc::ocall::epoll_ctl( - self.host_fd, - op as i32, - host_fd, - raw_epevent_ptr, - )); - Ok(()) - } - - /// Waits for an I/O event on the epoll file. - /// - /// Returns the number of file descriptors ready for the requested I/O. - pub fn wait(&self, events: &mut [EpollEvent], timeout: c_int) -> Result { - let mut raw_events: Vec = - vec![libc::epoll_event { events: 0, u64: 0 }; events.len()]; - let ret = try_libc!(libc::ocall::epoll_wait( - self.host_fd, - raw_events.as_mut_ptr(), - raw_events.len() as c_int, - timeout, - )) as usize; - - assert!(ret <= events.len()); - for i in 0..ret { - events[i] = EpollEvent::from_raw(&raw_events[i])?; - } - - Ok(ret) - } - - fn get_host_fd(&self) -> c_int { - self.host_fd - } -} - -impl Drop for EpollFile { - fn drop(&mut self) { - unsafe { - libc::ocall::close(self.host_fd); - } - } -} - -impl File for EpollFile { - fn as_any(&self) -> &dyn Any { - self - } -} - -pub trait AsEpollFile { - fn as_epfile(&self) -> Result<&EpollFile>; -} - -impl AsEpollFile for FileRef { - fn as_epfile(&self) -> Result<&EpollFile> { - self.as_any() - .downcast_ref::() - .ok_or_else(|| errno!(EBADF, "not an epoll file")) - } -} diff --git a/src/libos/src/net/io_multiplexing/epoll/epoll_file.rs b/src/libos/src/net/io_multiplexing/epoll/epoll_file.rs new file mode 100644 index 00000000..5deb0437 --- /dev/null +++ b/src/libos/src/net/io_multiplexing/epoll/epoll_file.rs @@ -0,0 +1,483 @@ +use std::any::Any; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::mem::{self, MaybeUninit}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Weak; +use std::time::Duration; + +use super::epoll_waiter::EpollWaiter; +use super::host_file_epoller::HostFileEpoller; +use super::{EpollCtl, EpollEvent, EpollFlags}; +use crate::events::{Observer, Waiter, WaiterQueue}; +use crate::fs::{File, HostFd, IoEvents, IoNotifier}; +use crate::prelude::*; + +// TODO: Prevent two epoll files from monitoring each other, which may cause +// deadlock in the current implementation. + +/// A file that provides epoll API. +/// +/// Conceptually, we maintain two lists: one consists of all interesting files, +/// which can be managed by the epoll ctl commands; the other are for ready files, +/// which are files that have some events. A epoll wait only needs to iterate the +/// ready list and poll each file to see if the file is ready for the interesting +/// I/O. +/// +/// To maintain the ready list, we need to monitor interesting events that happen +/// on the files. To do so, the `EpollFile` registers itself as an `Observer` to +/// the `IoNotifier`s of the monotored files. Thus, we can add a file to the ready +/// list when an event happens on the file. +/// +/// LibOS files are easy to monitor. LibOS files are implemented by us. We know +/// exactly when an event happens and thus can broadcast it using `IoNotifier`. +/// +/// Unlike LibOS files, host files are implemented by the host OS. We have no way +/// to let the host OS _push_ events to us. Luckily, we can do the reverse: _poll_ +/// host files to check events. And there is a good timing for it; that is, at +/// every epoll wait call. We have made a helper called `HostFileEpoller`, which can +/// poll events on a set of host files and trigger their associated `Notifier`s to +/// broadcast their events, e.g., to `EpollFile`. +/// +/// This way, both LibOS files and host files can notify the `EpollFile` about +/// their events. +pub struct EpollFile { + // All interesting entries. + interest: SgxMutex>>, + // Entries that are probably ready (having events happened). + ready: SgxMutex>>, + // All threads that are waiting on this epoll file. + waiters: WaiterQueue, + // A notifier to broadcast events on this epoll file. + notifier: IoNotifier, + // A helper to poll the events on the interesting host files. + host_file_epoller: HostFileEpoller, + // Any EpollFile is wrapped with Arc when created. + weak_self: Weak, +} + +impl EpollFile { + pub fn new() -> Arc { + let interest = Default::default(); + let ready = Default::default(); + let waiters = WaiterQueue::new(); + let notifier = IoNotifier::new(); + let host_file_epoller = HostFileEpoller::new(); + let weak_self = Default::default(); + + Self { + interest, + ready, + waiters, + notifier, + host_file_epoller, + weak_self, + } + .wrap_self() + } + + fn wrap_self(self) -> Arc { + let mut strong_self = Arc::new(self); + let weak_self = Arc::downgrade(&strong_self); + + unsafe { + let ptr_self = Arc::into_raw(strong_self) as *mut Self; + (*ptr_self).weak_self = weak_self; + strong_self = Arc::from_raw(ptr_self); + } + + strong_self + } + + pub fn control(&self, cmd: &EpollCtl) -> Result<()> { + debug!("epoll control: cmd = {:?}", cmd); + + match cmd { + EpollCtl::Add(fd, event, flags) => { + self.add_interest(*fd, *event, *flags)?; + } + EpollCtl::Del(fd) => { + self.del_interest(*fd)?; + } + EpollCtl::Mod(fd, event, flags) => { + self.mod_interest(*fd, *event, *flags)?; + } + } + Ok(()) + } + + pub fn wait( + &self, + revents: &mut [MaybeUninit], + timeout: Option<&Duration>, + ) -> Result { + debug!("epoll wait: timeout = {:?}", timeout); + + let mut timeout = timeout.cloned(); + let max_count = revents.len(); + let mut reinsert = VecDeque::with_capacity(max_count); + let waiter = EpollWaiter::new(&self.host_file_epoller); + + loop { + // Poll the latest states of the interested host files + self.host_file_epoller.poll_events(max_count); + + // Prepare for the waiter.wait_mut() at the end of the loop + self.waiters.reset_and_enqueue(waiter.as_ref()); + + // Pop from the ready list to find as many results as possible + let mut count = 0; + while count < max_count { + // Pop some entries from the ready list + let mut ready_entries = self.pop_ready(max_count - count); + if ready_entries.len() == 0 { + break; + } + + // Note that while iterating the ready entries, we do not hold the lock + // of the ready list. This reduces the chances of lock contention. + for ep_entry in ready_entries.into_iter() { + if ep_entry.is_deleted.load(Ordering::Acquire) { + continue; + } + + // Poll the file that corresponds to the entry + let mut inner = ep_entry.inner.lock().unwrap(); + let mask = inner.event.mask(); + let file = &ep_entry.file; + let events = file.poll_new() & mask; + if events.is_empty() { + continue; + } + + // We find a ready file! + let mut revent = inner.event; + revent.mask = events; + revents[count].write(revent); + count += 1; + + // Behave differently according the epoll flags + + if inner.flags.contains(EpollFlags::ONE_SHOT) { + inner.event.mask = IoEvents::empty(); + } + + if !inner + .flags + .intersects(EpollFlags::EDGE_TRIGGER | EpollFlags::ONE_SHOT) + { + drop(inner); + reinsert.push_back(ep_entry); + } + } + } + + // If any results, we can return + if count > 0 { + // Push the entries that are still ready after polling back to the ready list + if reinsert.len() > 0 { + self.push_ready_iter(reinsert.into_iter()); + } + + return Ok(count); + } + + // Wait for a while to try again later. + let ret = waiter.wait_mut(timeout.as_mut()); + if let Err(e) = ret { + if e.errno() == ETIMEDOUT { + return Ok(0); + } else { + return Err(e); + } + } + // This means we have been waken up successfully. Let's try again. + } + } + + fn add_interest(&self, fd: FileDesc, mut event: EpollEvent, flags: EpollFlags) -> Result<()> { + let file = current!().file(fd)?; + + let arc_self = self.weak_self.upgrade().unwrap(); + if Arc::ptr_eq(&(arc_self as Arc), &file) { + return_errno!(EINVAL, "a epoll file cannot epoll itself"); + } + + self.check_flags(&flags); + self.prepare_event(&mut event); + + let ep_entry = Arc::new(EpollEntry::new(fd, file, event, flags)); + + // A critical section protected by the lock of self.interest + { + let notifier = ep_entry + .file + .notifier() + .ok_or_else(|| errno!(EINVAL, "a file must has an associated notifier"))?; + + let mut interest_entries = self.interest.lock().unwrap(); + if interest_entries.get(&fd).is_some() { + return_errno!(EEXIST, "fd is already registered"); + } + interest_entries.insert(fd, ep_entry.clone()); + + // Start observing events on the target file. + let weak_observer = self.weak_self.clone() as Weak>; + let weak_ep_entry = Arc::downgrade(&ep_entry); + notifier.register(weak_observer, Some(IoEvents::all()), Some(weak_ep_entry)); + + // Handle host file + if ep_entry.file.host_fd().is_some() { + self.host_file_epoller + .add_file(ep_entry.file.clone(), event, flags); + } + } + + self.push_ready(ep_entry); + + Ok(()) + } + + fn del_interest(&self, fd: FileDesc) -> Result<()> { + // A critical section protected by the lock of self.interest + { + let mut interest_entries = self.interest.lock().unwrap(); + let ep_entry = interest_entries + .remove(&fd) + .ok_or_else(|| errno!(ENOENT, "fd is not added"))?; + ep_entry.is_deleted.store(true, Ordering::Release); + + let notifier = ep_entry.file.notifier().unwrap(); + let weak_observer = self.weak_self.clone() as Weak>; + notifier.unregister(&weak_observer); + + if ep_entry.file.host_fd().is_some() { + self.host_file_epoller.del_file(&ep_entry.file); + } + } + Ok(()) + } + + fn mod_interest(&self, fd: FileDesc, mut event: EpollEvent, flags: EpollFlags) -> Result<()> { + self.check_flags(&flags); + self.prepare_event(&mut event); + + // A critical section protected by the lock of self.interest + let ep_entry = { + let mut interest_entries = self.interest.lock().unwrap(); + let ep_entry = interest_entries + .get(&fd) + .ok_or_else(|| errno!(ENOENT, "fd is not added"))? + .clone(); + + let new_ep_inner = EpollEntryInner { event, flags }; + let mut old_ep_inner = ep_entry.inner.lock().unwrap(); + if *old_ep_inner == new_ep_inner { + return Ok(()); + } + *old_ep_inner = new_ep_inner; + drop(old_ep_inner); + + if ep_entry.file.host_fd().is_some() { + self.host_file_epoller + .mod_file(&ep_entry.file, event, flags); + } + + ep_entry + }; + + self.push_ready(ep_entry); + + Ok(()) + } + + fn push_ready(&self, ep_entry: Arc) { + // Fast path to avoid locking + if ep_entry.is_ready.load(Ordering::Relaxed) { + // Concurrency note: + // What if right after returning a true value of `is_ready`, then the `EpollEntry` is + // popped from the ready list? Does it mean than we miss an interesting event? + // + // The answer is NO. If the `is_ready` field of an `EpollEntry` turns from `true` to + // `false`, then the `EpollEntry` must be popped out of the ready list and its + // corresponding file must be polled in the `wait` method. This means that we have + // taken into account any interesting events happened on the file so far. + return; + } + + self.push_ready_iter(std::iter::once(ep_entry)); + } + + fn push_ready_iter>>(&self, ep_entries: I) { + let mut has_pushed_any = false; + + // A critical section protected by self.ready.lock() + { + let mut ready_entries = self.ready.lock().unwrap(); + for ep_entry in ep_entries { + if ep_entry.is_ready.load(Ordering::Relaxed) { + continue; + } + + ep_entry.is_ready.store(true, Ordering::Relaxed); + ready_entries.push_back(ep_entry); + + has_pushed_any = true; + } + } + + if has_pushed_any { + self.mark_ready(); + } + } + + fn pop_ready(&self, max_count: usize) -> VecDeque> { + // A critical section protected by self.ready.lock() + { + let mut ready_entries = self.ready.lock().unwrap(); + let max_count = max_count.min(ready_entries.len()); + ready_entries + .drain(..max_count) + .map(|ep_entry| { + ep_entry.is_ready.store(false, Ordering::Relaxed); + ep_entry + }) + .collect::>>() + } + } + + fn mark_ready(&self) { + self.notifier.broadcast(&IoEvents::IN); + self.waiters.dequeue_and_wake_all(); + } + + fn check_flags(&self, flags: &EpollFlags) { + if flags.intersects(EpollFlags::EXCLUSIVE | EpollFlags::WAKE_UP) { + warn!("{:?} contains unsupported flags", flags); + } + } + + fn prepare_event(&self, event: &mut EpollEvent) { + // Add two events that are reported by default + event.mask |= (IoEvents::ERR | IoEvents::HUP); + } +} + +impl File for EpollFile { + fn poll_new(&self) -> IoEvents { + if !self.host_file_epoller.poll().is_empty() { + return IoEvents::IN; + } + + let ready_entries = self.ready.lock().unwrap(); + if !ready_entries.is_empty() { + return IoEvents::IN; + } + + IoEvents::empty() + } + + fn notifier(&self) -> Option<&IoNotifier> { + Some(&self.notifier) + } + + fn host_fd(&self) -> Option<&HostFd> { + Some(self.host_file_epoller.host_fd()) + } + + fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) { + self.host_file_epoller.recv_host_events(events); + + if trigger_notifier { + self.notifier.broadcast(events); + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl Drop for EpollFile { + fn drop(&mut self) { + // Do not try to `self.weak_self.upgrade()`! The Arc object must have been + // dropped at this point. + let self_observer = self.weak_self.clone() as Weak>; + + // Unregister ourself from all interesting files' notifiers + let mut interest_entries = self.interest.lock().unwrap(); + interest_entries.drain().for_each(|(_, ep_entry)| { + if let Some(notifier) = ep_entry.file.notifier() { + notifier.unregister(&self_observer); + } + }); + } +} + +impl Observer for EpollFile { + fn on_event(&self, _events: &IoEvents, metadata: &Option>) { + let ep_entry_opt = metadata + .as_ref() + .and_then(|weak_any| weak_any.upgrade()) + .and_then(|strong_any| strong_any.downcast().ok()); + let ep_entry: Arc = match ep_entry_opt { + None => return, + Some(ep_entry) => ep_entry, + }; + + self.push_ready(ep_entry); + } +} + +impl fmt::Debug for EpollFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EpollFile") + .field("interest", &self.interest.lock().unwrap()) + .field("ready", &self.ready.lock().unwrap()) + .finish() + } +} + +pub trait AsEpollFile { + fn as_epoll_file(&self) -> Result<&EpollFile>; +} + +impl AsEpollFile for FileRef { + fn as_epoll_file(&self) -> Result<&EpollFile> { + self.as_any() + .downcast_ref::() + .ok_or_else(|| errno!(EBADF, "not an epoll file")) + } +} + +#[derive(Debug)] +struct EpollEntry { + fd: FileDesc, + file: FileRef, + inner: SgxMutex, + // Whether the entry is in the ready list + is_ready: AtomicBool, + // Whether the entry has been deleted from the interest list + is_deleted: AtomicBool, +} + +impl EpollEntry { + pub fn new(fd: FileDesc, file: FileRef, event: EpollEvent, flags: EpollFlags) -> Self { + let is_ready = Default::default(); + let is_deleted = Default::default(); + let inner = SgxMutex::new(EpollEntryInner { event, flags }); + Self { + fd, + file, + inner, + is_ready, + is_deleted, + } + } +} + +#[derive(Debug, PartialEq)] +struct EpollEntryInner { + event: EpollEvent, + flags: EpollFlags, +} diff --git a/src/libos/src/net/io_multiplexing/epoll/epoll_waiter.rs b/src/libos/src/net/io_multiplexing/epoll/epoll_waiter.rs new file mode 100644 index 00000000..3aa7b737 --- /dev/null +++ b/src/libos/src/net/io_multiplexing/epoll/epoll_waiter.rs @@ -0,0 +1,98 @@ +use std::ptr; +use std::time::Duration; + +use super::host_file_epoller::HostFileEpoller; +use crate::events::Waiter; +use crate::prelude::*; +use crate::time::{timespec_t, TIMERSLACK}; + +/// A waiter that is suitable for epoll. +pub struct EpollWaiter { + waiter: Waiter, + host_epoll_fd: FileDesc, +} + +impl EpollWaiter { + pub fn new(host_file_epoller: &HostFileEpoller) -> Self { + Self { + waiter: Waiter::new(), + host_epoll_fd: host_file_epoller.host_fd().to_raw(), + } + } + + /// Wait until the waiter is waken or the host epoll file has any + /// events or the method call is timeout or interrupted. + pub fn wait_mut(&self, mut timeout: Option<&mut Duration>) -> Result<()> { + const ZERO: Duration = Duration::from_secs(0); + if let Some(timeout) = timeout.as_ref() { + if **timeout == ZERO { + return_errno!(ETIMEDOUT, "should return immediately"); + } + } + + let host_eventfd = libc::pollfd { + fd: self.waiter.host_eventfd().host_fd() as i32, + events: libc::POLLIN, + revents: 0, + }; + let host_epf = libc::pollfd { + fd: self.host_epoll_fd as i32, + events: libc::POLLIN, + revents: 0, + }; + let mut pollfds = [host_eventfd, host_epf]; + let host_eventfd_idx = 0; + + let num_events = try_libc!({ + let mut remain_c = timeout.as_ref().map(|timeout| timespec_t::from(**timeout)); + let remain_c_ptr = remain_c.as_mut().map_or(ptr::null_mut(), |mut_ref| mut_ref); + + let mut ret = 0; + let status = unsafe { + occlum_ocall_poll_with_eventfd( + &mut ret, + (&mut pollfds[..]).as_mut_ptr(), + pollfds.len() as u32, + remain_c_ptr, + host_eventfd_idx, + ) + }; + assert!(status == sgx_status_t::SGX_SUCCESS); + + if let Some(timeout) = timeout.as_mut() { + let remain = remain_c.unwrap().as_duration(); + assert!(remain <= **timeout + TIMERSLACK.to_duration()); + **timeout = remain; + } + + ret + }); + + // Poll syscall does not treat timeout as error. So we need + // to distinguish the case by ourselves. + if let Some(timeout) = timeout.as_mut() { + if num_events == 0 { + **timeout = ZERO; + return_errno!(ETIMEDOUT, "no results and the time is up"); + } + } + + Ok(()) + } +} + +impl AsRef for EpollWaiter { + fn as_ref(&self) -> &Waiter { + &self.waiter + } +} + +extern "C" { + fn occlum_ocall_poll_with_eventfd( + ret: *mut i32, + fds: *mut libc::pollfd, + nfds: u32, + timeout: *mut timespec_t, + eventfd_idx: i32, + ) -> sgx_status_t; +} diff --git a/src/libos/src/net/io_multiplexing/epoll/host_file_epoller.rs b/src/libos/src/net/io_multiplexing/epoll/host_file_epoller.rs new file mode 100644 index 00000000..ce65d0fc --- /dev/null +++ b/src/libos/src/net/io_multiplexing/epoll/host_file_epoller.rs @@ -0,0 +1,200 @@ +use std::mem::MaybeUninit; +use std::ptr; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +use super::{EpollCtl, EpollEvent, EpollFlags}; +use crate::fs::{HostFd, IoEvents}; +use crate::prelude::*; + +/// An epoll-based helper type to poll the states of a set of host files. +#[derive(Debug)] +pub struct HostFileEpoller { + /// A map from host fd to HostFile, which maintains the set of the interesting + /// host files. + host_files: SgxMutex>, + /// The number of the interesting host files. + count: AtomicUsize, + /// The host fd of the underlying host epoll file. + host_epoll_fd: HostFd, + /// Whether the host epoll file has any events. + is_ready: AtomicBool, +} + +// TODO: the `add/mod/del_file` operation can be postponed until a `poll_files` operation, +// thus reducing the number of OCalls. + +impl HostFileEpoller { + pub fn new() -> Self { + let host_files = Default::default(); + let count = Default::default(); + let host_epoll_fd = { + let raw_host_fd = (|| -> Result { + let raw_host_fd = try_libc!(libc::ocall::epoll_create1(0)) as u32; + Ok(raw_host_fd) + })() + .expect("epoll_create should never fail"); + + HostFd::new(raw_host_fd) + }; + let is_ready = Default::default(); + Self { + host_files, + count, + host_epoll_fd, + is_ready, + } + } + + pub fn add_file(&self, host_file: FileRef, event: EpollEvent, flags: EpollFlags) -> Result<()> { + let mut host_files = self.host_files.lock().unwrap(); + let host_fd = host_file.host_fd().unwrap().to_raw(); + let already_added = host_files.insert(host_fd, host_file.clone()).is_some(); + if already_added { + // TODO: handle the case where one host file is somehow to be added more than once. + warn!( + "Cannot handle the case of adding the same host file twice in a robust way. + This can happen if the same `HostFile` is accessible via two different LibOS fds." + ); + return Ok(()); + } + + self.count.fetch_add(1, Ordering::Relaxed); + self.do_epoll_ctl(libc::EPOLL_CTL_ADD, &host_file, Some((event, flags))) + + // Concurrency note: + // The lock on self.host_files must be hold while invoking + // do_epoll_ctl to prevent race conditions that cause the OCall to fail. + // This same argument applies to mod_file and del_file methods. + } + + pub fn mod_file( + &self, + host_file: &FileRef, + event: EpollEvent, + flags: EpollFlags, + ) -> Result<()> { + let host_files = self.host_files.lock().unwrap(); + let host_fd = host_file.host_fd().unwrap().to_raw(); + let not_added = !host_files.contains_key(&host_fd); + if not_added { + return_errno!(ENOENT, "the host file must be added before modifying"); + } + + self.do_epoll_ctl(libc::EPOLL_CTL_MOD, &host_file, Some((event, flags))) + } + + pub fn del_file(&self, host_file: &FileRef) -> Result<()> { + let mut host_files = self.host_files.lock().unwrap(); + let host_fd = host_file.host_fd().unwrap().to_raw(); + let not_added = !host_files.remove(&host_fd).is_some(); + if not_added { + return_errno!(ENOENT, "the host file must be added before deleting"); + } + + self.count.fetch_sub(1, Ordering::Relaxed); + self.do_epoll_ctl(libc::EPOLL_CTL_DEL, &host_file, None) + } + + fn do_epoll_ctl( + &self, + raw_cmd: i32, + host_file: &FileRef, + event_and_flags: Option<(EpollEvent, EpollFlags)>, + ) -> Result<()> { + let host_epoll_fd = self.host_epoll_fd.to_raw(); + let host_fd = host_file.host_fd().unwrap().to_raw(); + + let c_event = event_and_flags.map(|(event, flags)| { + let mut c_event = event.to_c(); + c_event.events |= flags.bits() as u32; + c_event.u64 = host_fd as u64; + c_event + }); + + try_libc!(libc::ocall::epoll_ctl( + host_epoll_fd as i32, + raw_cmd, + host_file.host_fd().unwrap().to_raw() as i32, + c_event.as_ref().map_or(ptr::null(), |c_event| c_event) as *mut _, + )); + Ok(()) + } + + pub fn poll_events(&self, max_count: usize) -> usize { + // Quick check to avoid unnecessary OCall + if self.count.load(Ordering::Relaxed) == 0 { + return 0; + } + + // Do OCall to poll the host files monitored by the host epoll file + let mut raw_events = vec![MaybeUninit::::uninit(); max_count]; + let timeout = 0; + let ocall_res = || -> Result { + let count = try_libc!(libc::ocall::epoll_wait( + self.host_epoll_fd.to_raw() as i32, + raw_events.as_mut_ptr() as *mut _, + raw_events.len() as c_int, + timeout, + )) as usize; + assert!(count <= max_count); + Ok(count) + }(); + + let mut count = match ocall_res { + Ok(count) => count, + Err(e) => { + warn!("Unexpected error from ocall::epoll_wait(): {:?}", e); + 0 + } + }; + if count == 0 { + return 0; + } + + // Use the polled events from the host to update the states of the + // corresponding host files + let mut host_files = self.host_files.lock().unwrap(); + for raw_event in &raw_events[..count] { + let raw_event = unsafe { raw_event.assume_init() }; + let io_events = IoEvents::from_bits_truncate(raw_event.events as u32); + let host_fd = raw_event.u64 as u32; + + let host_file = match host_files.get(&host_fd) { + None => { + count -= 1; + // The corresponding host file may be deleted + continue; + } + Some(host_file) => host_file, + }; + + host_file.recv_host_events(&io_events, true); + } + count + } + + pub fn host_fd(&self) -> &HostFd { + &self.host_epoll_fd + } + + pub fn poll(&self) -> IoEvents { + if self.is_ready.load(Ordering::Acquire) { + IoEvents::IN + } else { + IoEvents::empty() + } + } + + pub fn recv_host_events(&self, events: &IoEvents) { + let is_ready = events.contains(IoEvents::IN); + self.is_ready.store(is_ready, Ordering::Release); + } +} + +impl Drop for HostFileEpoller { + fn drop(&mut self) { + unsafe { + libc::ocall::close(self.host_epoll_fd.to_raw() as i32); + } + } +} diff --git a/src/libos/src/net/io_multiplexing/epoll/mod.rs b/src/libos/src/net/io_multiplexing/epoll/mod.rs new file mode 100644 index 00000000..f1f8a6d8 --- /dev/null +++ b/src/libos/src/net/io_multiplexing/epoll/mod.rs @@ -0,0 +1,69 @@ +use crate::fs::IoEvents; +use crate::prelude::*; + +mod epoll_file; +mod epoll_waiter; +mod host_file_epoller; + +pub use self::epoll_file::{AsEpollFile, EpollFile}; + +/// An epoll control command. +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum EpollCtl { + Add(FileDesc, EpollEvent, EpollFlags), + Del(FileDesc), + Mod(FileDesc, EpollEvent, EpollFlags), +} + +/// An epoll control flags. +bitflags! { + pub struct EpollFlags: u32 { + const EXCLUSIVE = (1 << 28); + const WAKE_UP = (1 << 29); + const ONE_SHOT = (1 << 30); + const EDGE_TRIGGER = (1 << 31); + } +} + +impl EpollFlags { + pub fn from_c(c_event: &libc::epoll_event) -> Self { + EpollFlags::from_bits_truncate(c_event.events) + } +} + +/// An epoll event. +/// +/// This could be used as either an input of epoll ctl or an output of epoll wait. +// Note: the memory layout is compatible with that of C's struct epoll_event. +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct EpollEvent { + mask: IoEvents, + user_data: u64, +} + +impl EpollEvent { + pub fn new(mask: IoEvents, user_data: u64) -> Self { + Self { mask, user_data } + } + + pub fn mask(&self) -> IoEvents { + self.mask + } + + pub fn user_data(&self) -> u64 { + self.user_data + } + + pub fn from_c(c_event: &libc::epoll_event) -> Self { + let mask = IoEvents::from_bits_truncate(c_event.events as u32); + let user_data = c_event.u64; + Self { mask, user_data } + } + + pub fn to_c(&self) -> libc::epoll_event { + libc::epoll_event { + events: self.mask.bits() as u32, + u64: self.user_data, + } + } +} diff --git a/src/libos/src/net/io_multiplexing/mod.rs b/src/libos/src/net/io_multiplexing/mod.rs index 3cc87ea0..2a705466 100644 --- a/src/libos/src/net/io_multiplexing/mod.rs +++ b/src/libos/src/net/io_multiplexing/mod.rs @@ -1,18 +1,19 @@ use super::*; mod epoll; +// TODO: the following three modules will soon be removed mod io_event; mod poll; mod select; -pub use self::epoll::{AsEpollFile, EpollCtlCmd, EpollEvent, EpollEventFlags, EpollFile}; +pub use self::epoll::{AsEpollFile, EpollCtl, EpollEvent, EpollFile, EpollFlags}; pub use self::io_event::{ clear_notifier_status, notify_thread, wait_for_notification, IoEvent, THREAD_NOTIFIERS, }; pub use self::poll::{do_poll, PollEvent, PollEventFlags}; pub use self::select::{select, FdSetExt}; -use fs::{AsDevRandom, AsEvent, CreationFlags, File, FileDesc, FileRef, PipeType}; +use fs::{AsDevRandom, AsEvent, CreationFlags, File, FileDesc, FileRef, HostFd, PipeType}; use std::any::Any; use std::convert::TryFrom; use std::fmt; diff --git a/src/libos/src/net/io_multiplexing/poll.rs b/src/libos/src/net/io_multiplexing/poll.rs index 8ed8a54a..61777bd4 100644 --- a/src/libos/src/net/io_multiplexing/poll.rs +++ b/src/libos/src/net/io_multiplexing/poll.rs @@ -113,7 +113,7 @@ pub fn do_poll(pollfds: &mut [PollEvent], timeout: *mut timeval_t) -> Result Result { - let host_fd = try_libc!(libc::ocall::socket( - domain as i32, - socket_type as i32 | file_flags.bits(), - protocol - )); - Ok(Self { host_fd }) - } - - pub fn host_fd(&self) -> c_int { - self.host_fd - } - - pub fn bind(&self, addr: &SockAddr) -> Result<()> { - let (addr_ptr, addr_len) = addr.as_ptr_and_len(); - - let ret = try_libc!(libc::ocall::bind( - self.host_fd(), - addr_ptr as *const libc::sockaddr, - addr_len as u32 - )); - Ok(()) - } - - pub fn listen(&self, backlog: i32) -> Result<()> { - let ret = try_libc!(libc::ocall::listen(self.host_fd(), backlog)); - Ok(()) - } - - pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option)> { - let mut sockaddr = SockAddr::default(); - let mut addr_len = sockaddr.len(); - - let ret = try_libc!(libc::ocall::accept4( - self.host_fd(), - sockaddr.as_mut_ptr() as *mut _, - &mut addr_len as *mut _ as *mut _, - flags.bits() - )); - - let addr_option = if addr_len != 0 { - sockaddr.set_len(addr_len)?; - Some(sockaddr) - } else { - None - }; - Ok((Self { host_fd: ret }, addr_option)) - } - - pub fn connect(&self, addr: &Option) -> Result<()> { - debug!("host_fd: {} addr {:?}", self.host_fd(), addr); - - let (addr_ptr, addr_len) = if let Some(sock_addr) = addr { - sock_addr.as_ptr_and_len() - } else { - (std::ptr::null(), 0) - }; - - let ret = try_libc!(libc::ocall::connect( - self.host_fd(), - addr_ptr, - addr_len as u32 - )); - Ok(()) - } - - pub fn sendto( - &self, - buf: &[u8], - flags: SendFlags, - addr_option: &Option, - ) -> Result { - let bufs = vec![buf]; - let name_option = addr_option.as_ref().map(|addr| addr.as_slice()); - self.do_sendmsg(&bufs, flags, name_option, None) - } - - pub fn recvfrom(&self, buf: &mut [u8], flags: RecvFlags) -> Result<(usize, Option)> { - let mut sockaddr = SockAddr::default(); - let mut bufs = vec![buf]; - let (bytes_recv, addr_len, _, _) = - self.do_recvmsg(&mut bufs, flags, Some(sockaddr.as_mut_slice()), None)?; - - let addr_option = if addr_len != 0 { - sockaddr.set_len(addr_len)?; - Some(sockaddr) - } else { - None - }; - Ok((bytes_recv, addr_option)) - } -} - -impl Drop for HostSocket { - fn drop(&mut self) { - let ret = unsafe { libc::ocall::close(self.host_fd) }; - assert!(ret == 0); - } -} - -pub trait HostSocketType { - fn as_host_socket(&self) -> Result<&HostSocket>; -} - -impl HostSocketType for FileRef { - fn as_host_socket(&self) -> Result<&HostSocket> { - self.as_any() - .downcast_ref::() - .ok_or_else(|| errno!(EBADF, "not a host socket")) - } -} diff --git a/src/libos/src/net/socket/host_socket/ioctl_impl.rs b/src/libos/src/net/socket/host_socket/ioctl_impl.rs index 93577c5a..b551353d 100644 --- a/src/libos/src/net/socket/host_socket/ioctl_impl.rs +++ b/src/libos/src/net/socket/host_socket/ioctl_impl.rs @@ -13,7 +13,7 @@ impl HostSocket { let mut retval: i32 = 0; let status = occlum_ocall_ioctl( &mut retval as *mut i32, - self.host_fd(), + self.raw_host_fd() as i32, cmd_num, cmd_arg_ptr, cmd.arg_len(), @@ -36,7 +36,7 @@ impl HostSocket { let mut retval: i32 = 0; let status = occlum_ocall_ioctl_repack( &mut retval as *mut i32, - self.host_fd(), + self.raw_host_fd() as i32, BuiltinIoctlNum::SIOCGIFCONF as i32, arg_ref.ifc_buf, arg_ref.ifc_len, diff --git a/src/libos/src/net/socket/host_socket/mod.rs b/src/libos/src/net/socket/host_socket/mod.rs index 8734b35e..972cf566 100644 --- a/src/libos/src/net/socket/host_socket/mod.rs +++ b/src/libos/src/net/socket/host_socket/mod.rs @@ -1,9 +1,154 @@ -use super::*; +use std::any::Any; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::mem; + +use atomic::Atomic; + +use super::*; +use crate::fs::{ + occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, HostFd, IoEvents, IoNotifier, + IoctlCmd, StatusFlags, +}; -mod host_socket; mod ioctl_impl; mod recv; mod send; mod socket_file; -pub use self::host_socket::{HostSocket, HostSocketType}; +/// Native linux socket +#[derive(Debug)] +pub struct HostSocket { + host_fd: HostFd, + host_events: Atomic, + notifier: IoNotifier, +} + +impl HostSocket { + pub fn new( + domain: AddressFamily, + socket_type: SocketType, + file_flags: FileFlags, + protocol: i32, + ) -> Result { + let raw_host_fd = try_libc!(libc::ocall::socket( + domain as i32, + socket_type as i32 | file_flags.bits(), + protocol + )) as FileDesc; + let host_fd = HostFd::new(raw_host_fd); + Ok(HostSocket::from_host_fd(host_fd)) + } + + fn from_host_fd(host_fd: HostFd) -> HostSocket { + let host_events = Atomic::new(IoEvents::empty()); + let notifier = IoNotifier::new(); + Self { + host_fd, + host_events, + notifier, + } + } + + pub fn bind(&self, addr: &SockAddr) -> Result<()> { + let (addr_ptr, addr_len) = addr.as_ptr_and_len(); + + let ret = try_libc!(libc::ocall::bind( + self.raw_host_fd() as i32, + addr_ptr as *const libc::sockaddr, + addr_len as u32 + )); + Ok(()) + } + + pub fn listen(&self, backlog: i32) -> Result<()> { + let ret = try_libc!(libc::ocall::listen(self.raw_host_fd() as i32, backlog)); + Ok(()) + } + + pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option)> { + let mut sockaddr = SockAddr::default(); + let mut addr_len = sockaddr.len(); + + let raw_host_fd = try_libc!(libc::ocall::accept4( + self.raw_host_fd() as i32, + sockaddr.as_mut_ptr() as *mut _, + &mut addr_len as *mut _ as *mut _, + flags.bits() + )) as FileDesc; + let host_fd = HostFd::new(raw_host_fd); + + let addr_option = if addr_len != 0 { + sockaddr.set_len(addr_len)?; + Some(sockaddr) + } else { + None + }; + Ok((HostSocket::from_host_fd(host_fd), addr_option)) + } + + pub fn connect(&self, addr: &Option) -> Result<()> { + debug!("connect: host_fd: {}, addr {:?}", self.raw_host_fd(), addr); + + let (addr_ptr, addr_len) = if let Some(sock_addr) = addr { + sock_addr.as_ptr_and_len() + } else { + (std::ptr::null(), 0) + }; + + let ret = try_libc!(libc::ocall::connect( + self.raw_host_fd() as i32, + addr_ptr, + addr_len as u32 + )); + Ok(()) + } + + pub fn sendto( + &self, + buf: &[u8], + flags: SendFlags, + addr_option: &Option, + ) -> Result { + let bufs = vec![buf]; + let name_option = addr_option.as_ref().map(|addr| addr.as_slice()); + self.do_sendmsg(&bufs, flags, name_option, None) + } + + pub fn recvfrom(&self, buf: &mut [u8], flags: RecvFlags) -> Result<(usize, Option)> { + let mut sockaddr = SockAddr::default(); + let mut bufs = vec![buf]; + let (bytes_recv, addr_len, _, _) = + self.do_recvmsg(&mut bufs, flags, Some(sockaddr.as_mut_slice()), None)?; + + let addr_option = if addr_len != 0 { + sockaddr.set_len(addr_len)?; + Some(sockaddr) + } else { + None + }; + Ok((bytes_recv, addr_option)) + } + + pub fn raw_host_fd(&self) -> FileDesc { + self.host_fd.to_raw() + } +} + +impl Drop for HostSocket { + fn drop(&mut self) { + let ret = unsafe { libc::ocall::close(self.host_fd.to_raw() as i32) }; + assert!(ret == 0); + } +} + +pub trait HostSocketType { + fn as_host_socket(&self) -> Result<&HostSocket>; +} + +impl HostSocketType for FileRef { + fn as_host_socket(&self) -> Result<&HostSocket> { + self.as_any() + .downcast_ref::() + .ok_or_else(|| errno!(EBADF, "not a host socket")) + } +} diff --git a/src/libos/src/net/socket/host_socket/recv.rs b/src/libos/src/net/socket/host_socket/recv.rs index e7465ff2..e9ef036a 100644 --- a/src/libos/src/net/socket/host_socket/recv.rs +++ b/src/libos/src/net/socket/host_socket/recv.rs @@ -64,7 +64,7 @@ impl HostSocket { ) -> Result<(usize, usize, usize, MsgHdrFlags)> { // Prepare the arguments for OCall // Host socket fd - let host_fd = self.host_fd(); + let host_fd = self.raw_host_fd() as i32; // Name let (msg_name, msg_namelen) = name.as_mut_ptr_and_len(); let msg_name = msg_name as *mut c_void; diff --git a/src/libos/src/net/socket/host_socket/send.rs b/src/libos/src/net/socket/host_socket/send.rs index 68570d40..37ca607d 100644 --- a/src/libos/src/net/socket/host_socket/send.rs +++ b/src/libos/src/net/socket/host_socket/send.rs @@ -46,7 +46,7 @@ impl HostSocket { // Prepare the arguments for OCall let mut retval: isize = 0; // Host socket fd - let host_fd = self.host_fd(); + let host_fd = self.raw_host_fd() as i32; // Name let (msg_name, msg_namelen) = name.as_ptr_and_len(); let msg_name = msg_name as *const c_void; diff --git a/src/libos/src/net/socket/host_socket/socket_file.rs b/src/libos/src/net/socket/host_socket/socket_file.rs index 2955907c..67abc8f7 100644 --- a/src/libos/src/net/socket/host_socket/socket_file.rs +++ b/src/libos/src/net/socket/host_socket/socket_file.rs @@ -1,11 +1,14 @@ -use super::*; - -use crate::fs::{ - occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, IoctlCmd, StatusFlags, -}; use std::any::Any; use std::io::{Read, Seek, SeekFrom, Write}; +use atomic::{Atomic, Ordering}; + +use super::*; +use crate::fs::{ + occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, HostFd, IoEvents, IoctlCmd, + StatusFlags, +}; + //TODO: refactor write syscall to allow zero length with non-zero buffer impl File for HostSocket { fn read(&self, buf: &mut [u8]) -> Result { @@ -52,7 +55,10 @@ impl File for HostSocket { } fn get_status_flags(&self) -> Result { - let ret = try_libc!(libc::ocall::fcntl_arg0(self.host_fd(), libc::F_GETFL)); + let ret = try_libc!(libc::ocall::fcntl_arg0( + self.raw_host_fd() as i32, + libc::F_GETFL + )); Ok(StatusFlags::from_bits_truncate(ret as u32)) } @@ -64,13 +70,33 @@ impl File for HostSocket { | StatusFlags::O_NONBLOCK; let raw_status_flags = (new_status_flags & valid_flags_mask).bits(); try_libc!(libc::ocall::fcntl_arg1( - self.host_fd(), + self.raw_host_fd() as i32, libc::F_SETFL, raw_status_flags as c_int )); Ok(()) } + fn poll_new(&self) -> IoEvents { + self.host_events.load(Ordering::Acquire) + } + + fn host_fd(&self) -> Option<&HostFd> { + Some(&self.host_fd) + } + + fn notifier(&self) -> Option<&IoNotifier> { + Some(&self.notifier) + } + + fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) { + self.host_events.store(*events, Ordering::Release); + + if trigger_notifier { + self.notifier.broadcast(events); + } + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/libos/src/net/syscalls.rs b/src/libos/src/net/syscalls.rs index 8049716f..0eccd5f4 100644 --- a/src/libos/src/net/syscalls.rs +++ b/src/libos/src/net/syscalls.rs @@ -1,6 +1,9 @@ use super::*; -use super::io_multiplexing::{AsEpollFile, EpollCtlCmd, EpollEventFlags, EpollFile, FdSetExt}; +use std::mem::MaybeUninit; +use std::time::Duration; + +use super::io_multiplexing::{AsEpollFile, EpollCtl, EpollFile, EpollFlags, FdSetExt}; use fs::{CreationFlags, File, FileDesc, FileRef}; use misc::resource_t; use process::Process; @@ -163,7 +166,7 @@ pub fn do_shutdown(fd: c_int, how: c_int) -> Result { debug!("shutdown: fd: {}, how: {}", fd, how); let file_ref = current!().file(fd as FileDesc)?; if let Ok(socket) = file_ref.as_host_socket() { - let ret = try_libc!(libc::ocall::shutdown(socket.host_fd(), how)); + let ret = try_libc!(libc::ocall::shutdown(socket.raw_host_fd() as i32, how)); Ok(ret as isize) } else { // TODO: support unix socket @@ -185,7 +188,7 @@ pub fn do_setsockopt( let file_ref = current!().file(fd as FileDesc)?; if let Ok(socket) = file_ref.as_host_socket() { let ret = try_libc!(libc::ocall::setsockopt( - socket.host_fd(), + socket.raw_host_fd() as i32, level, optname, optval, @@ -215,7 +218,7 @@ pub fn do_getsockopt( let socket = file_ref.as_host_socket()?; let ret = try_libc!(libc::ocall::getsockopt( - socket.host_fd(), + socket.raw_host_fd() as i32, level, optname, optval, @@ -235,7 +238,11 @@ pub fn do_getpeername( ); let file_ref = current!().file(fd as FileDesc)?; if let Ok(socket) = file_ref.as_host_socket() { - let ret = try_libc!(libc::ocall::getpeername(socket.host_fd(), addr, addr_len)); + let ret = try_libc!(libc::ocall::getpeername( + socket.raw_host_fd() as i32, + addr, + addr_len + )); Ok(ret as isize) } else if let Ok(unix_socket) = file_ref.as_unix_socket() { warn!("getpeername for unix socket is unimplemented"); @@ -259,7 +266,11 @@ pub fn do_getsockname( ); let file_ref = current!().file(fd as FileDesc)?; if let Ok(socket) = file_ref.as_host_socket() { - let ret = try_libc!(libc::ocall::getsockname(socket.host_fd(), addr, addr_len)); + let ret = try_libc!(libc::ocall::getsockname( + socket.raw_host_fd() as i32, + addr, + addr_len + )); Ok(ret as isize) } else if let Ok(unix_socket) = file_ref.as_unix_socket() { warn!("getsockname for unix socket is unimplemented"); @@ -611,40 +622,53 @@ pub fn do_epoll_create(size: c_int) -> Result { } pub fn do_epoll_create1(raw_flags: c_int) -> Result { + debug!("epoll_create: raw_flags: {:?}", raw_flags); + // Only O_CLOEXEC is valid let flags = CreationFlags::from_bits(raw_flags as u32) .ok_or_else(|| errno!(EINVAL, "invalid flags"))? & CreationFlags::O_CLOEXEC; - let epoll_file = io_multiplexing::EpollFile::new(flags)?; - let file_ref: Arc = Arc::new(epoll_file); + let epoll_file: Arc = EpollFile::new(); let close_on_spawn = flags.contains(CreationFlags::O_CLOEXEC); - let fd = current!().add_file(file_ref, close_on_spawn); - - Ok(fd as isize) + let epfd = current!().add_file(epoll_file, close_on_spawn); + Ok(epfd as isize) } pub fn do_epoll_ctl( epfd: c_int, op: c_int, fd: c_int, - event: *const libc::epoll_event, + event_ptr: *const libc::epoll_event, ) -> Result { debug!("epoll_ctl: epfd: {}, op: {:?}, fd: {}", epfd, op, fd); - let inner_event = if !event.is_null() { - from_user::check_ptr(event)?; - Some(EpollEvent::from_raw(unsafe { &*event })?) - } else { - None + + let get_c_event = |event_ptr| -> Result<&libc::epoll_event> { + from_user::check_ptr(event_ptr)?; + Ok(unsafe { &*event_ptr }) + }; + + let fd = fd as FileDesc; + let ctl_cmd = match op { + libc::EPOLL_CTL_ADD => { + let c_event = get_c_event(event_ptr)?; + let event = EpollEvent::from_c(c_event); + let flags = EpollFlags::from_c(c_event); + EpollCtl::Add(fd, event, flags) + } + libc::EPOLL_CTL_DEL => EpollCtl::Del(fd), + libc::EPOLL_CTL_MOD => { + let c_event = get_c_event(event_ptr)?; + let event = EpollEvent::from_c(c_event); + let flags = EpollFlags::from_c(c_event); + EpollCtl::Mod(fd, event, flags) + } + _ => return_errno!(EINVAL, "invalid op"), }; let epfile_ref = current!().file(epfd as FileDesc)?; - let epoll_file = epfile_ref.as_epfile()?; + let epoll_file = epfile_ref.as_epoll_file()?; - epoll_file.control( - EpollCtlCmd::try_from(op)?, - fd as FileDesc, - inner_event.as_ref(), - )?; + epoll_file.control(&ctl_cmd)?; Ok(0) } @@ -652,8 +676,13 @@ pub fn do_epoll_wait( epfd: c_int, events: *mut libc::epoll_event, max_events: c_int, - timeout: c_int, + timeout_ms: c_int, ) -> Result { + debug!( + "epoll_wait: epfd: {}, max_events: {:?}, timeout_ms: {}", + epfd, max_events, timeout_ms + ); + let max_events = { if max_events <= 0 { return_errno!(EINVAL, "maxevents <= 0"); @@ -666,23 +695,26 @@ pub fn do_epoll_wait( }; // A new vector to store EpollEvent, which may degrade the performance due to extra copy. - let mut inner_events: Vec = - vec![EpollEvent::new(EpollEventFlags::empty(), 0); max_events]; + let mut inner_events: Vec> = vec![MaybeUninit::uninit(); max_events]; debug!( "epoll_wait: epfd: {}, len: {:?}, timeout: {}", epfd, raw_events.len(), - timeout + timeout_ms, ); let epfile_ref = current!().file(epfd as FileDesc)?; - let epoll_file = epfile_ref.as_epfile()?; - - let count = epoll_file.wait(&mut inner_events, timeout)?; + let epoll_file = epfile_ref.as_epoll_file()?; + let timeout = if timeout_ms >= 0 { + Some(Duration::from_millis(timeout_ms as u64)) + } else { + None + }; + let count = epoll_file.wait(&mut inner_events, timeout.as_ref())?; for i in 0..count { - raw_events[i] = inner_events[i].to_raw(); + raw_events[i] = unsafe { inner_events[i].assume_init() }.to_c(); } Ok(count as isize) @@ -698,7 +730,7 @@ pub fn do_epoll_pwait( if !sigmask.is_null() { warn!("epoll_pwait cannot handle signal mask, yet"); } else { - info!("epoll_wait"); + debug!("epoll_wait"); } do_epoll_wait(epfd, events, maxevents, timeout) } diff --git a/src/pal/src/ocalls/event.c b/src/pal/src/ocalls/event.c index aeae205c..3b42861b 100644 --- a/src/pal/src/ocalls/event.c +++ b/src/pal/src/ocalls/event.c @@ -41,3 +41,31 @@ void occlum_ocall_eventfd_write_batch( write(eventfds[fd_i], &val, sizeof(val)); } } + +int occlum_ocall_poll_with_eventfd( + struct pollfd *pollfds, + nfds_t nfds, + struct timespec *timeout, + int eventfd_idx +) { + if (eventfd_idx >= 0) { + pollfds[eventfd_idx].events |= POLLIN; + } + + // We use the ppoll syscall directly instead of the libc wrapper. This + // is because the syscall version updates the timeout argument to indicate + // how much time was left (which what we want), while the libc wrapper + // keeps the timeout argument unchanged. + int ret = RAW_PPOLL(pollfds, nfds, timeout); + if (ret < 0) { + return -1; + } + + if (eventfd_idx >= 0 && (pollfds[eventfd_idx].revents & POLLIN) != 0) { + int eventfd = pollfds[eventfd_idx].fd; + char buf[8]; + read(eventfd, buf, 8); + } + + return ret; +} diff --git a/test/pipe/main.c b/test/pipe/main.c index 17e69451..d1ccd252 100644 --- a/test/pipe/main.c +++ b/test/pipe/main.c @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -98,6 +100,51 @@ int test_select_timeout() { return 0; } +int test_epoll_timeout() { + int pipe_fds[2]; + if (pipe(pipe_fds) < 0) { + THROW_ERROR("failed to create a pipe"); + } + int pipe_read_fd = pipe_fds[0]; + int pipe_write_fd = pipe_fds[1]; + + int ep_fd = epoll_create1(0); + if (ep_fd < 0) { + THROW_ERROR("failed to create an epoll"); + } + + int ret; + struct epoll_event event; + + event.events = EPOLLIN; // we want the write end to be readable + event.data.u32 = pipe_write_fd; + ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_write_fd, &event); + if (ret < 0) { + THROW_ERROR("failed to do epoll ctl"); + } + + event.events = EPOLLOUT; // we want the read end to be writable + event.data.u32 = pipe_read_fd; + ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_read_fd, &event); + if (ret < 0) { + THROW_ERROR("failed to do epoll ctl"); + } + + // We are waiting for the write end to be readable or the read end to be + // writable, which can never happen. So the epoll_wait must end with + // timeout. + errno = 0; + struct epoll_event events[2]; + ret = epoll_wait(ep_fd, events, ARRAY_SIZE(events), 10 /* ms */); + if (ret != 0 || errno != 0) { + THROW_ERROR("failed to do epoll ctl"); + } + + free_pipe(pipe_fds); + close(ep_fd); + return 0; +} + int test_poll_timeout() { // Start the timer struct timeval tv_start, tv_end; @@ -167,6 +214,48 @@ int test_poll_no_timeout() { return 0; } +int test_epoll_no_timeout() { + int pipe_fds[2]; + if (pipe(pipe_fds) < 0) { + THROW_ERROR("failed to create a pipe"); + } + int pipe_read_fd = pipe_fds[0]; + int pipe_write_fd = pipe_fds[1]; + + int ep_fd = epoll_create1(0); + if (ep_fd < 0) { + THROW_ERROR("failed to create an epoll"); + } + + int ret; + struct epoll_event event; + + event.events = EPOLLOUT; // writable + event.data.u32 = pipe_write_fd; + ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_write_fd, &event); + if (ret < 0) { + THROW_ERROR("failed to do epoll ctl"); + } + + event.events = EPOLLIN; // readable + event.data.u32 = pipe_read_fd; + ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_read_fd, &event); + if (ret < 0) { + THROW_ERROR("failed to do epoll ctl"); + } + + struct epoll_event events[2]; + ret = epoll_wait(ep_fd, events, ARRAY_SIZE(events), -1); + // pipe_write_fd is ready, while pipe_read_fd is not + if (ret != 1) { + THROW_ERROR("failed to do epoll ctl"); + } + + free_pipe(pipe_fds); + close(ep_fd); + return 0; +} + int test_select_read_write() { int pipe_fds[2]; if (pipe(pipe_fds) < 0) { @@ -229,8 +318,10 @@ static test_case_t test_cases[] = { TEST_CASE(test_create_with_flags), //TEST_CASE(test_select_timeout), //TEST_CASE(test_poll_timeout), + TEST_CASE(test_epoll_timeout), //TEST_CASE(test_select_no_timeout), //TEST_CASE(test_poll_no_timeout), + TEST_CASE(test_epoll_no_timeout), //TEST_CASE(test_select_read_write), };