Add the new epoll implementation
Before this commit, the epoll implementation works by simply delegating to the host OS through OCall. One major problem with this implementation is that it can only handle files that are backed by a file of the host OS (e.g., sockets), but not those are are mainly implemented by the LibOS (e.g., pipes). Therefore, a new epoll implementation that can handle all kinds of files is needed. This commit completely rewrites the epoll implementation by leveraging the new event subsystem. Now the new epoll can handle all file types: 1. Host files, e.g., sockets, eventfd; 2. LibOS files, e.g., pipes; 3. Hybrid files, e.g., epoll files. For a new file type to support epoll, it only neends to implement no more than four methods of the File trait: * poll (required for all file types); * notifier (required for all file files); * host_fd (only required for host files); * recv_host_events (only required for host files).
This commit is contained in:
parent
2ff4b1c776
commit
6fdfa57a14
@ -190,6 +190,8 @@ enclave {
|
||||
unsigned int initval,
|
||||
int flags
|
||||
) propagate_errno;
|
||||
// TODO: the usage of this OCall should be replaced with
|
||||
// occlum_ocall_poll_with_eventfd, which is a more general form.
|
||||
int occlum_ocall_eventfd_poll(
|
||||
int eventfd,
|
||||
[in, out] struct timespec *timeout
|
||||
@ -200,12 +202,20 @@ enclave {
|
||||
uint64_t val
|
||||
);
|
||||
|
||||
// TODO: the usage of this OCall should be replaced with
|
||||
// occlum_ocall_poll_with_eventfd, which is a more general form.
|
||||
int occlum_ocall_poll(
|
||||
[in, out, count=nfds] struct pollfd *fds,
|
||||
nfds_t nfds,
|
||||
[in, out] struct timeval *timeout,
|
||||
int efd
|
||||
)propagate_errno;
|
||||
) propagate_errno;
|
||||
int occlum_ocall_poll_with_eventfd(
|
||||
[in, out, count=nfds] struct pollfd *fds,
|
||||
nfds_t nfds,
|
||||
[in, out] struct timespec *timeout,
|
||||
int eventfd_idx
|
||||
) propagate_errno;
|
||||
|
||||
void occlum_ocall_print_log(uint32_t level, [in, string] const char* msg);
|
||||
void occlum_ocall_flush_log(void);
|
||||
|
@ -49,6 +49,11 @@ impl HostEventFd {
|
||||
match timeout {
|
||||
None => ocall_eventfd_poll(self.host_fd, std::ptr::null_mut()),
|
||||
Some(timeout) => {
|
||||
const ZERO: Duration = Duration::from_secs(0);
|
||||
if *timeout == ZERO {
|
||||
return_errno!(ETIMEDOUT, "should return immediately");
|
||||
}
|
||||
|
||||
let mut remain_c = timespec_t::from(*timeout);
|
||||
let ret = ocall_eventfd_poll(self.host_fd, &mut remain_c);
|
||||
|
||||
@ -56,6 +61,12 @@ impl HostEventFd {
|
||||
assert!(remain <= *timeout + TIMERSLACK.to_duration());
|
||||
*timeout = remain;
|
||||
|
||||
// Poll syscall does not treat timeout as error. So we need
|
||||
// to distinguish the case by ourselves.
|
||||
if *timeout == ZERO {
|
||||
return_errno!(ETIMEDOUT, "time is up");
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::any::Any;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Weak;
|
||||
|
||||
@ -13,7 +14,7 @@ pub struct Notifier<E: Event, F: EventFilter<E> = DummyEventFilter<E>> {
|
||||
struct Subscriber<E: Event, F: EventFilter<E>> {
|
||||
observer: Weak<dyn Observer<E>>,
|
||||
filter: Option<F>,
|
||||
metadata: Option<Box<dyn Any + Send + Sync>>,
|
||||
metadata: Option<Weak<dyn Any + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl<E: Event, F: EventFilter<E>> Notifier<E, F> {
|
||||
@ -28,7 +29,7 @@ impl<E: Event, F: EventFilter<E>> Notifier<E, F> {
|
||||
&self,
|
||||
observer: Weak<dyn Observer<E>>,
|
||||
filter: Option<F>,
|
||||
metadata: Option<Box<dyn Any + Send + Sync>>,
|
||||
metadata: Option<Weak<dyn Any + Send + Sync>>,
|
||||
) {
|
||||
let mut subscribers = self.subscribers.lock().unwrap();
|
||||
subscribers.push_back(Subscriber {
|
||||
@ -64,6 +65,12 @@ impl<E: Event, F: EventFilter<E>> Notifier<E, F> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Event, F: EventFilter<E>> fmt::Debug for Notifier<E, F> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Notifier {{ .. }}")
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DummyEventFilter<E> {
|
||||
phantom: PhantomData<E>,
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Weak;
|
||||
|
||||
use super::Event;
|
||||
use crate::prelude::*;
|
||||
@ -15,5 +16,5 @@ pub trait Observer<E: Event>: Send + Sync {
|
||||
/// by a specific implementation of `Observer::on_event`. Thus, to avoid
|
||||
/// the odds of deadlocks, the `on_event` method should be written short
|
||||
/// and sweet.
|
||||
fn on_event(&self, event: &E, metadata: &Option<Box<dyn Any + Send + Sync>>) -> ();
|
||||
fn on_event(&self, event: &E, metadata: &Option<Weak<dyn Any + Send + Sync>>) -> ();
|
||||
}
|
||||
|
@ -70,6 +70,13 @@ impl Waiter {
|
||||
inner: Arc::downgrade(&self.inner),
|
||||
}
|
||||
}
|
||||
|
||||
/// Expose the internal host eventfd.
|
||||
///
|
||||
/// This host eventfd should be used by an external user carefully.
|
||||
pub fn host_eventfd(&self) -> &HostEventFd {
|
||||
self.inner.host_eventfd()
|
||||
}
|
||||
}
|
||||
|
||||
impl !Send for Waiter {}
|
||||
@ -169,4 +176,8 @@ impl Inner {
|
||||
HostEventFd::write_u64_raw_and_batch(&host_eventfds, 1);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn host_eventfd(&self) -> &HostEventFd {
|
||||
&self.host_eventfd
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::any::Any;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Weak;
|
||||
|
||||
use super::{Event, Observer, WaiterQueue};
|
||||
use crate::prelude::*;
|
||||
@ -29,7 +30,7 @@ impl<E: Event> WaiterQueueObserver<E> {
|
||||
}
|
||||
|
||||
impl<E: Event> Observer<E> for WaiterQueueObserver<E> {
|
||||
fn on_event(&self, event: &E, _metadata: &Option<Box<dyn Any + Send + Sync>>) {
|
||||
fn on_event(&self, event: &E, _metadata: &Option<Weak<dyn Any + Send + Sync>>) {
|
||||
self.waiter_queue.dequeue_and_wake_all();
|
||||
}
|
||||
}
|
||||
|
@ -1,25 +1,36 @@
|
||||
use super::*;
|
||||
|
||||
use atomic::{Atomic, Ordering};
|
||||
|
||||
/// Native Linux eventfd
|
||||
// TODO: move the implementaion of eventfd into libos to defend against Iago attacks from OCalls
|
||||
#[derive(Debug)]
|
||||
pub struct EventFile {
|
||||
host_fd: c_int,
|
||||
host_fd: HostFd,
|
||||
host_events: Atomic<IoEvents>,
|
||||
notifier: IoNotifier,
|
||||
}
|
||||
|
||||
impl EventFile {
|
||||
pub fn new(init_val: u32, flags: EventCreationFlags) -> Result<Self> {
|
||||
let host_fd = try_libc!({
|
||||
let raw_host_fd = try_libc!({
|
||||
let mut ret: i32 = 0;
|
||||
let status = occlum_ocall_eventfd(&mut ret, init_val, flags.bits());
|
||||
assert!(status == sgx_status_t::SGX_SUCCESS);
|
||||
ret
|
||||
});
|
||||
Ok(Self { host_fd })
|
||||
}) as FileDesc;
|
||||
let host_fd = HostFd::new(raw_host_fd);
|
||||
let host_events = Atomic::new(IoEvents::empty());
|
||||
let notifier = IoNotifier::new();
|
||||
Ok(Self {
|
||||
host_fd,
|
||||
host_events,
|
||||
notifier,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_host_fd(&self) -> c_int {
|
||||
self.host_fd
|
||||
self.host_fd.to_raw() as c_int
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +51,7 @@ extern "C" {
|
||||
|
||||
impl Drop for EventFile {
|
||||
fn drop(&mut self) {
|
||||
let ret = unsafe { libc::ocall::close(self.host_fd) };
|
||||
let ret = unsafe { libc::ocall::close(self.host_fd.to_raw() as i32) };
|
||||
assert!(ret == 0);
|
||||
}
|
||||
}
|
||||
@ -50,7 +61,7 @@ impl File for EventFile {
|
||||
let (buf_ptr, buf_len) = buf.as_mut().as_mut_ptr_and_len();
|
||||
|
||||
let ret = try_libc!(libc::ocall::read(
|
||||
self.host_fd,
|
||||
self.host_fd.to_raw() as i32,
|
||||
buf_ptr as *mut c_void,
|
||||
buf_len
|
||||
)) as usize;
|
||||
@ -61,7 +72,7 @@ impl File for EventFile {
|
||||
fn write(&self, buf: &[u8]) -> Result<usize> {
|
||||
let (buf_ptr, buf_len) = buf.as_ptr_and_len();
|
||||
let ret = try_libc!(libc::ocall::write(
|
||||
self.host_fd,
|
||||
self.host_fd.to_raw() as i32,
|
||||
buf_ptr as *const c_void,
|
||||
buf_len
|
||||
)) as usize;
|
||||
@ -93,6 +104,26 @@ impl File for EventFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
self.host_events.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
Some(&self.notifier)
|
||||
}
|
||||
|
||||
fn host_fd(&self) -> Option<&HostFd> {
|
||||
Some(&self.host_fd)
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.store(*events, Ordering::Release);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
use crate::events::{Event, EventFilter, Notifier};
|
||||
use crate::events::{Event, EventFilter, Notifier, Observer};
|
||||
use crate::prelude::*;
|
||||
|
||||
bitflags! {
|
||||
pub struct IoEvents: u16 {
|
||||
pub struct IoEvents: u32 {
|
||||
const IN = 0x001; // = POLLIN
|
||||
const OUT = 0x004; // = POLLOUT
|
||||
const PRI = 0x002; // = POLLPRI
|
||||
|
@ -91,18 +91,46 @@ pub trait File: Debug + Sync + Send + Any {
|
||||
return_op_unsupported_error!("set_advisory_lock")
|
||||
}
|
||||
|
||||
// TODO: remove this function after all users of this code are removed
|
||||
fn poll(&self) -> Result<(crate::net::PollEventFlags)> {
|
||||
return_op_unsupported_error!("poll")
|
||||
}
|
||||
|
||||
// TODO: remove this function after all users of this code are removed
|
||||
fn enqueue_event(&self, _: crate::net::IoEvent) -> Result<()> {
|
||||
return_op_unsupported_error!("enqueue_event");
|
||||
}
|
||||
|
||||
// TODO: remove this function after all users of this code are removed
|
||||
fn dequeue_event(&self) -> Result<()> {
|
||||
return_op_unsupported_error!("dequeue_event");
|
||||
}
|
||||
|
||||
// TODO: rename poll_new to poll
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
IoEvents::empty()
|
||||
}
|
||||
|
||||
/// Returns a notifier that broadcast events on this file.
|
||||
///
|
||||
/// Not every file has an associated event notifier.
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Return the host fd, if the file is backed by an underlying host file.
|
||||
fn host_fd(&self) -> Option<&HostFd> {
|
||||
return None;
|
||||
}
|
||||
|
||||
/// Receive events from the host.
|
||||
///
|
||||
/// After calling this method, the `poll` method of the `File` trait will
|
||||
/// return the latest events on the `HostFile`.
|
||||
///
|
||||
/// This method has no effect if the `host_fd` method returns `None`.
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {}
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
|
65
src/libos/src/fs/host_fd.rs
Normal file
65
src/libos/src/fs/host_fd.rs
Normal file
@ -0,0 +1,65 @@
|
||||
use std::collections::HashSet;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use super::*;
|
||||
|
||||
/// A unique fd from the host OS.
|
||||
///
|
||||
/// The uniqueness property is important both
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct HostFd(FileDesc);
|
||||
|
||||
impl HostFd {
|
||||
pub fn new(host_fd: FileDesc) -> Self {
|
||||
HOST_FD_REGISTRY.lock().unwrap().register(host_fd).unwrap();
|
||||
Self(host_fd)
|
||||
}
|
||||
|
||||
pub fn to_raw(&self) -> FileDesc {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HostFd {
|
||||
fn drop(&mut self) {
|
||||
HOST_FD_REGISTRY
|
||||
.lock()
|
||||
.unwrap()
|
||||
.unregister(self.to_raw())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref HOST_FD_REGISTRY: SgxMutex<HostFdRegistry> =
|
||||
{ SgxMutex::new(HostFdRegistry::new()) };
|
||||
}
|
||||
|
||||
/// A registery for host fds to ensure that they are unique.
|
||||
struct HostFdRegistry {
|
||||
set: HashSet<FileDesc>,
|
||||
}
|
||||
|
||||
impl HostFdRegistry {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
set: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register(&mut self, host_fd: FileDesc) -> Result<()> {
|
||||
let new_val = self.set.insert(host_fd);
|
||||
if !new_val {
|
||||
return_errno!(EEXIST, "host fd has been registered");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn unregister(&mut self, host_fd: FileDesc) -> Result<()> {
|
||||
let existing = self.set.remove(&host_fd);
|
||||
if !existing {
|
||||
return_errno!(ENOENT, "host fd has NOT been registered");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@ pub use self::file_ops::{
|
||||
};
|
||||
pub use self::file_table::{FileDesc, FileTable};
|
||||
pub use self::fs_view::FsView;
|
||||
pub use self::host_fd::HostFd;
|
||||
pub use self::inode_file::{AsINodeFile, INodeExt, INodeFile};
|
||||
pub use self::pipe::PipeType;
|
||||
pub use self::rootfs::ROOT_INODE;
|
||||
@ -35,6 +36,7 @@ mod file_ops;
|
||||
mod file_table;
|
||||
mod fs_ops;
|
||||
mod fs_view;
|
||||
mod host_fd;
|
||||
mod hostfs;
|
||||
mod inode_file;
|
||||
mod pipe;
|
||||
|
@ -96,6 +96,14 @@ impl File for PipeReader {
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
self.consumer.poll()
|
||||
}
|
||||
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
Some(self.consumer.notifier())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
@ -171,6 +179,14 @@ impl File for PipeWriter {
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
self.producer.poll()
|
||||
}
|
||||
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
Some(self.producer.notifier())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#![feature(option_expect_none)]
|
||||
// for UntrustedSliceAlloc in slice_alloc
|
||||
#![feature(slice_ptr_get)]
|
||||
#![feature(maybe_uninit_extra)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate alloc;
|
||||
|
@ -1,184 +0,0 @@
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EpollCtlCmd {
|
||||
/// Add a file decriptor to the interface
|
||||
Add = 1,
|
||||
/// Remove a file decriptor from the interface
|
||||
Del = 2,
|
||||
/// Change file decriptor epoll_event structre
|
||||
Mod = 3,
|
||||
}
|
||||
|
||||
impl TryFrom<i32> for EpollCtlCmd {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(op_num: i32) -> Result<Self> {
|
||||
match op_num {
|
||||
1 => Ok(EpollCtlCmd::Add),
|
||||
2 => Ok(EpollCtlCmd::Del),
|
||||
3 => Ok(EpollCtlCmd::Mod),
|
||||
_ => return_errno!(EINVAL, "invalid operation number"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
#[derive(Default)]
|
||||
pub struct EpollEventFlags: u32 {
|
||||
// The available events are got from linux source.
|
||||
// This struct contains more flags than linux man page described.
|
||||
const EPOLLIN = 0x0001;
|
||||
const EPOLLPRI = 0x0002;
|
||||
const EPOLLOUT = 0x0004;
|
||||
const EPOLLERR = 0x0008;
|
||||
const EPOLLHUP = 0x0010;
|
||||
const EPOLLNVAL = 0x0020;
|
||||
const EPOLLRDNORM = 0x0040;
|
||||
const EPOLLRDBAND = 0x0080;
|
||||
const EPOLLWRNORM = 0x0100;
|
||||
const EPOLLWRBAND = 0x0200;
|
||||
const EPOLLMSG = 0x0400;
|
||||
const EPOLLRDHUP = 0x2000;
|
||||
const EPOLLEXCLUSIVE = (1 << 28);
|
||||
const EPOLLWAKEUP = (1 << 29);
|
||||
const EPOLLONESHOT = (1 << 30);
|
||||
const EPOLLET = (1 << 31);
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: Add more mitigations to protect from iago attacks
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
pub struct EpollEvent {
|
||||
/// Epoll Events
|
||||
events: EpollEventFlags,
|
||||
/// Libos-agnostic user data variable
|
||||
data: uint64_t,
|
||||
}
|
||||
|
||||
impl EpollEvent {
|
||||
pub fn new(events: EpollEventFlags, data: uint64_t) -> Self {
|
||||
Self { events, data }
|
||||
}
|
||||
|
||||
pub fn from_raw(epoll_event: &libc::epoll_event) -> Result<Self> {
|
||||
Ok(Self::new(
|
||||
EpollEventFlags::from_bits(epoll_event.events)
|
||||
.ok_or_else(|| errno!(EINVAL, "invalid flags"))?,
|
||||
epoll_event.u64,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn to_raw(&self) -> libc::epoll_event {
|
||||
libc::epoll_event {
|
||||
events: self.events.bits(),
|
||||
u64: self.data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EpollFile {
|
||||
host_fd: c_int,
|
||||
}
|
||||
|
||||
impl EpollFile {
|
||||
/// Creates a new Linux epoll file descriptor
|
||||
pub fn new(flags: CreationFlags) -> Result<Self> {
|
||||
debug!("create epollfile: flags: {:?}", flags);
|
||||
let host_fd = try_libc!(libc::ocall::epoll_create1(flags.bits() as i32));
|
||||
Ok(Self { host_fd })
|
||||
}
|
||||
|
||||
pub fn control(&self, op: EpollCtlCmd, fd: FileDesc, event: Option<&EpollEvent>) -> Result<()> {
|
||||
let host_fd = {
|
||||
let fd_ref = current!().file(fd)?;
|
||||
if let Ok(socket) = fd_ref.as_host_socket() {
|
||||
socket.host_fd()
|
||||
} else if let Ok(eventfd) = fd_ref.as_event() {
|
||||
eventfd.get_host_fd()
|
||||
} else if let Ok(epoll_file) = fd_ref.as_epfile() {
|
||||
let target_host_fd = epoll_file.get_host_fd();
|
||||
if self.host_fd == target_host_fd {
|
||||
return_errno!(EINVAL, "epfd should not be same as the target fd");
|
||||
}
|
||||
target_host_fd
|
||||
} else {
|
||||
return_errno!(EPERM, "unsupported file type");
|
||||
}
|
||||
};
|
||||
|
||||
// Notes on deadlock.
|
||||
//
|
||||
// All locks on fd (if any) will be released at this point. This means
|
||||
// we don't have to worry about the potential deadlock caused by
|
||||
// locking two files (say, fd and epfd) in an inconsistent order.
|
||||
|
||||
//TODO: Shoud be const.
|
||||
// Cast const to mut to be compatiable with the ocall from rust sdk.
|
||||
let mut epevent = event.map(|e| e.to_raw());
|
||||
let raw_epevent_ptr: *mut libc::epoll_event = match epevent {
|
||||
Some(ref mut e) => e as *mut _,
|
||||
_ => std::ptr::null_mut(),
|
||||
};
|
||||
|
||||
try_libc!(libc::ocall::epoll_ctl(
|
||||
self.host_fd,
|
||||
op as i32,
|
||||
host_fd,
|
||||
raw_epevent_ptr,
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Waits for an I/O event on the epoll file.
|
||||
///
|
||||
/// Returns the number of file descriptors ready for the requested I/O.
|
||||
pub fn wait(&self, events: &mut [EpollEvent], timeout: c_int) -> Result<usize> {
|
||||
let mut raw_events: Vec<libc::epoll_event> =
|
||||
vec![libc::epoll_event { events: 0, u64: 0 }; events.len()];
|
||||
let ret = try_libc!(libc::ocall::epoll_wait(
|
||||
self.host_fd,
|
||||
raw_events.as_mut_ptr(),
|
||||
raw_events.len() as c_int,
|
||||
timeout,
|
||||
)) as usize;
|
||||
|
||||
assert!(ret <= events.len());
|
||||
for i in 0..ret {
|
||||
events[i] = EpollEvent::from_raw(&raw_events[i])?;
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn get_host_fd(&self) -> c_int {
|
||||
self.host_fd
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EpollFile {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
libc::ocall::close(self.host_fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl File for EpollFile {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub trait AsEpollFile {
|
||||
fn as_epfile(&self) -> Result<&EpollFile>;
|
||||
}
|
||||
|
||||
impl AsEpollFile for FileRef {
|
||||
fn as_epfile(&self) -> Result<&EpollFile> {
|
||||
self.as_any()
|
||||
.downcast_ref::<EpollFile>()
|
||||
.ok_or_else(|| errno!(EBADF, "not an epoll file"))
|
||||
}
|
||||
}
|
483
src/libos/src/net/io_multiplexing/epoll/epoll_file.rs
Normal file
483
src/libos/src/net/io_multiplexing/epoll/epoll_file.rs
Normal file
@ -0,0 +1,483 @@
|
||||
use std::any::Any;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt;
|
||||
use std::mem::{self, MaybeUninit};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::epoll_waiter::EpollWaiter;
|
||||
use super::host_file_epoller::HostFileEpoller;
|
||||
use super::{EpollCtl, EpollEvent, EpollFlags};
|
||||
use crate::events::{Observer, Waiter, WaiterQueue};
|
||||
use crate::fs::{File, HostFd, IoEvents, IoNotifier};
|
||||
use crate::prelude::*;
|
||||
|
||||
// TODO: Prevent two epoll files from monitoring each other, which may cause
|
||||
// deadlock in the current implementation.
|
||||
|
||||
/// A file that provides epoll API.
|
||||
///
|
||||
/// Conceptually, we maintain two lists: one consists of all interesting files,
|
||||
/// which can be managed by the epoll ctl commands; the other are for ready files,
|
||||
/// which are files that have some events. A epoll wait only needs to iterate the
|
||||
/// ready list and poll each file to see if the file is ready for the interesting
|
||||
/// I/O.
|
||||
///
|
||||
/// To maintain the ready list, we need to monitor interesting events that happen
|
||||
/// on the files. To do so, the `EpollFile` registers itself as an `Observer` to
|
||||
/// the `IoNotifier`s of the monotored files. Thus, we can add a file to the ready
|
||||
/// list when an event happens on the file.
|
||||
///
|
||||
/// LibOS files are easy to monitor. LibOS files are implemented by us. We know
|
||||
/// exactly when an event happens and thus can broadcast it using `IoNotifier`.
|
||||
///
|
||||
/// Unlike LibOS files, host files are implemented by the host OS. We have no way
|
||||
/// to let the host OS _push_ events to us. Luckily, we can do the reverse: _poll_
|
||||
/// host files to check events. And there is a good timing for it; that is, at
|
||||
/// every epoll wait call. We have made a helper called `HostFileEpoller`, which can
|
||||
/// poll events on a set of host files and trigger their associated `Notifier`s to
|
||||
/// broadcast their events, e.g., to `EpollFile`.
|
||||
///
|
||||
/// This way, both LibOS files and host files can notify the `EpollFile` about
|
||||
/// their events.
|
||||
pub struct EpollFile {
|
||||
// All interesting entries.
|
||||
interest: SgxMutex<HashMap<FileDesc, Arc<EpollEntry>>>,
|
||||
// Entries that are probably ready (having events happened).
|
||||
ready: SgxMutex<VecDeque<Arc<EpollEntry>>>,
|
||||
// All threads that are waiting on this epoll file.
|
||||
waiters: WaiterQueue,
|
||||
// A notifier to broadcast events on this epoll file.
|
||||
notifier: IoNotifier,
|
||||
// A helper to poll the events on the interesting host files.
|
||||
host_file_epoller: HostFileEpoller,
|
||||
// Any EpollFile is wrapped with Arc when created.
|
||||
weak_self: Weak<Self>,
|
||||
}
|
||||
|
||||
impl EpollFile {
|
||||
pub fn new() -> Arc<Self> {
|
||||
let interest = Default::default();
|
||||
let ready = Default::default();
|
||||
let waiters = WaiterQueue::new();
|
||||
let notifier = IoNotifier::new();
|
||||
let host_file_epoller = HostFileEpoller::new();
|
||||
let weak_self = Default::default();
|
||||
|
||||
Self {
|
||||
interest,
|
||||
ready,
|
||||
waiters,
|
||||
notifier,
|
||||
host_file_epoller,
|
||||
weak_self,
|
||||
}
|
||||
.wrap_self()
|
||||
}
|
||||
|
||||
fn wrap_self(self) -> Arc<Self> {
|
||||
let mut strong_self = Arc::new(self);
|
||||
let weak_self = Arc::downgrade(&strong_self);
|
||||
|
||||
unsafe {
|
||||
let ptr_self = Arc::into_raw(strong_self) as *mut Self;
|
||||
(*ptr_self).weak_self = weak_self;
|
||||
strong_self = Arc::from_raw(ptr_self);
|
||||
}
|
||||
|
||||
strong_self
|
||||
}
|
||||
|
||||
pub fn control(&self, cmd: &EpollCtl) -> Result<()> {
|
||||
debug!("epoll control: cmd = {:?}", cmd);
|
||||
|
||||
match cmd {
|
||||
EpollCtl::Add(fd, event, flags) => {
|
||||
self.add_interest(*fd, *event, *flags)?;
|
||||
}
|
||||
EpollCtl::Del(fd) => {
|
||||
self.del_interest(*fd)?;
|
||||
}
|
||||
EpollCtl::Mod(fd, event, flags) => {
|
||||
self.mod_interest(*fd, *event, *flags)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn wait(
|
||||
&self,
|
||||
revents: &mut [MaybeUninit<EpollEvent>],
|
||||
timeout: Option<&Duration>,
|
||||
) -> Result<usize> {
|
||||
debug!("epoll wait: timeout = {:?}", timeout);
|
||||
|
||||
let mut timeout = timeout.cloned();
|
||||
let max_count = revents.len();
|
||||
let mut reinsert = VecDeque::with_capacity(max_count);
|
||||
let waiter = EpollWaiter::new(&self.host_file_epoller);
|
||||
|
||||
loop {
|
||||
// Poll the latest states of the interested host files
|
||||
self.host_file_epoller.poll_events(max_count);
|
||||
|
||||
// Prepare for the waiter.wait_mut() at the end of the loop
|
||||
self.waiters.reset_and_enqueue(waiter.as_ref());
|
||||
|
||||
// Pop from the ready list to find as many results as possible
|
||||
let mut count = 0;
|
||||
while count < max_count {
|
||||
// Pop some entries from the ready list
|
||||
let mut ready_entries = self.pop_ready(max_count - count);
|
||||
if ready_entries.len() == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
// Note that while iterating the ready entries, we do not hold the lock
|
||||
// of the ready list. This reduces the chances of lock contention.
|
||||
for ep_entry in ready_entries.into_iter() {
|
||||
if ep_entry.is_deleted.load(Ordering::Acquire) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Poll the file that corresponds to the entry
|
||||
let mut inner = ep_entry.inner.lock().unwrap();
|
||||
let mask = inner.event.mask();
|
||||
let file = &ep_entry.file;
|
||||
let events = file.poll_new() & mask;
|
||||
if events.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We find a ready file!
|
||||
let mut revent = inner.event;
|
||||
revent.mask = events;
|
||||
revents[count].write(revent);
|
||||
count += 1;
|
||||
|
||||
// Behave differently according the epoll flags
|
||||
|
||||
if inner.flags.contains(EpollFlags::ONE_SHOT) {
|
||||
inner.event.mask = IoEvents::empty();
|
||||
}
|
||||
|
||||
if !inner
|
||||
.flags
|
||||
.intersects(EpollFlags::EDGE_TRIGGER | EpollFlags::ONE_SHOT)
|
||||
{
|
||||
drop(inner);
|
||||
reinsert.push_back(ep_entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If any results, we can return
|
||||
if count > 0 {
|
||||
// Push the entries that are still ready after polling back to the ready list
|
||||
if reinsert.len() > 0 {
|
||||
self.push_ready_iter(reinsert.into_iter());
|
||||
}
|
||||
|
||||
return Ok(count);
|
||||
}
|
||||
|
||||
// Wait for a while to try again later.
|
||||
let ret = waiter.wait_mut(timeout.as_mut());
|
||||
if let Err(e) = ret {
|
||||
if e.errno() == ETIMEDOUT {
|
||||
return Ok(0);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
// This means we have been waken up successfully. Let's try again.
|
||||
}
|
||||
}
|
||||
|
||||
fn add_interest(&self, fd: FileDesc, mut event: EpollEvent, flags: EpollFlags) -> Result<()> {
|
||||
let file = current!().file(fd)?;
|
||||
|
||||
let arc_self = self.weak_self.upgrade().unwrap();
|
||||
if Arc::ptr_eq(&(arc_self as Arc<dyn File>), &file) {
|
||||
return_errno!(EINVAL, "a epoll file cannot epoll itself");
|
||||
}
|
||||
|
||||
self.check_flags(&flags);
|
||||
self.prepare_event(&mut event);
|
||||
|
||||
let ep_entry = Arc::new(EpollEntry::new(fd, file, event, flags));
|
||||
|
||||
// A critical section protected by the lock of self.interest
|
||||
{
|
||||
let notifier = ep_entry
|
||||
.file
|
||||
.notifier()
|
||||
.ok_or_else(|| errno!(EINVAL, "a file must has an associated notifier"))?;
|
||||
|
||||
let mut interest_entries = self.interest.lock().unwrap();
|
||||
if interest_entries.get(&fd).is_some() {
|
||||
return_errno!(EEXIST, "fd is already registered");
|
||||
}
|
||||
interest_entries.insert(fd, ep_entry.clone());
|
||||
|
||||
// Start observing events on the target file.
|
||||
let weak_observer = self.weak_self.clone() as Weak<dyn Observer<_>>;
|
||||
let weak_ep_entry = Arc::downgrade(&ep_entry);
|
||||
notifier.register(weak_observer, Some(IoEvents::all()), Some(weak_ep_entry));
|
||||
|
||||
// Handle host file
|
||||
if ep_entry.file.host_fd().is_some() {
|
||||
self.host_file_epoller
|
||||
.add_file(ep_entry.file.clone(), event, flags);
|
||||
}
|
||||
}
|
||||
|
||||
self.push_ready(ep_entry);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn del_interest(&self, fd: FileDesc) -> Result<()> {
|
||||
// A critical section protected by the lock of self.interest
|
||||
{
|
||||
let mut interest_entries = self.interest.lock().unwrap();
|
||||
let ep_entry = interest_entries
|
||||
.remove(&fd)
|
||||
.ok_or_else(|| errno!(ENOENT, "fd is not added"))?;
|
||||
ep_entry.is_deleted.store(true, Ordering::Release);
|
||||
|
||||
let notifier = ep_entry.file.notifier().unwrap();
|
||||
let weak_observer = self.weak_self.clone() as Weak<dyn Observer<_>>;
|
||||
notifier.unregister(&weak_observer);
|
||||
|
||||
if ep_entry.file.host_fd().is_some() {
|
||||
self.host_file_epoller.del_file(&ep_entry.file);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mod_interest(&self, fd: FileDesc, mut event: EpollEvent, flags: EpollFlags) -> Result<()> {
|
||||
self.check_flags(&flags);
|
||||
self.prepare_event(&mut event);
|
||||
|
||||
// A critical section protected by the lock of self.interest
|
||||
let ep_entry = {
|
||||
let mut interest_entries = self.interest.lock().unwrap();
|
||||
let ep_entry = interest_entries
|
||||
.get(&fd)
|
||||
.ok_or_else(|| errno!(ENOENT, "fd is not added"))?
|
||||
.clone();
|
||||
|
||||
let new_ep_inner = EpollEntryInner { event, flags };
|
||||
let mut old_ep_inner = ep_entry.inner.lock().unwrap();
|
||||
if *old_ep_inner == new_ep_inner {
|
||||
return Ok(());
|
||||
}
|
||||
*old_ep_inner = new_ep_inner;
|
||||
drop(old_ep_inner);
|
||||
|
||||
if ep_entry.file.host_fd().is_some() {
|
||||
self.host_file_epoller
|
||||
.mod_file(&ep_entry.file, event, flags);
|
||||
}
|
||||
|
||||
ep_entry
|
||||
};
|
||||
|
||||
self.push_ready(ep_entry);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_ready(&self, ep_entry: Arc<EpollEntry>) {
|
||||
// Fast path to avoid locking
|
||||
if ep_entry.is_ready.load(Ordering::Relaxed) {
|
||||
// Concurrency note:
|
||||
// What if right after returning a true value of `is_ready`, then the `EpollEntry` is
|
||||
// popped from the ready list? Does it mean than we miss an interesting event?
|
||||
//
|
||||
// The answer is NO. If the `is_ready` field of an `EpollEntry` turns from `true` to
|
||||
// `false`, then the `EpollEntry` must be popped out of the ready list and its
|
||||
// corresponding file must be polled in the `wait` method. This means that we have
|
||||
// taken into account any interesting events happened on the file so far.
|
||||
return;
|
||||
}
|
||||
|
||||
self.push_ready_iter(std::iter::once(ep_entry));
|
||||
}
|
||||
|
||||
fn push_ready_iter<I: Iterator<Item = Arc<EpollEntry>>>(&self, ep_entries: I) {
|
||||
let mut has_pushed_any = false;
|
||||
|
||||
// A critical section protected by self.ready.lock()
|
||||
{
|
||||
let mut ready_entries = self.ready.lock().unwrap();
|
||||
for ep_entry in ep_entries {
|
||||
if ep_entry.is_ready.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ep_entry.is_ready.store(true, Ordering::Relaxed);
|
||||
ready_entries.push_back(ep_entry);
|
||||
|
||||
has_pushed_any = true;
|
||||
}
|
||||
}
|
||||
|
||||
if has_pushed_any {
|
||||
self.mark_ready();
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_ready(&self, max_count: usize) -> VecDeque<Arc<EpollEntry>> {
|
||||
// A critical section protected by self.ready.lock()
|
||||
{
|
||||
let mut ready_entries = self.ready.lock().unwrap();
|
||||
let max_count = max_count.min(ready_entries.len());
|
||||
ready_entries
|
||||
.drain(..max_count)
|
||||
.map(|ep_entry| {
|
||||
ep_entry.is_ready.store(false, Ordering::Relaxed);
|
||||
ep_entry
|
||||
})
|
||||
.collect::<VecDeque<Arc<EpollEntry>>>()
|
||||
}
|
||||
}
|
||||
|
||||
fn mark_ready(&self) {
|
||||
self.notifier.broadcast(&IoEvents::IN);
|
||||
self.waiters.dequeue_and_wake_all();
|
||||
}
|
||||
|
||||
fn check_flags(&self, flags: &EpollFlags) {
|
||||
if flags.intersects(EpollFlags::EXCLUSIVE | EpollFlags::WAKE_UP) {
|
||||
warn!("{:?} contains unsupported flags", flags);
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_event(&self, event: &mut EpollEvent) {
|
||||
// Add two events that are reported by default
|
||||
event.mask |= (IoEvents::ERR | IoEvents::HUP);
|
||||
}
|
||||
}
|
||||
|
||||
impl File for EpollFile {
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
if !self.host_file_epoller.poll().is_empty() {
|
||||
return IoEvents::IN;
|
||||
}
|
||||
|
||||
let ready_entries = self.ready.lock().unwrap();
|
||||
if !ready_entries.is_empty() {
|
||||
return IoEvents::IN;
|
||||
}
|
||||
|
||||
IoEvents::empty()
|
||||
}
|
||||
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
Some(&self.notifier)
|
||||
}
|
||||
|
||||
fn host_fd(&self) -> Option<&HostFd> {
|
||||
Some(self.host_file_epoller.host_fd())
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_file_epoller.recv_host_events(events);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EpollFile {
|
||||
fn drop(&mut self) {
|
||||
// Do not try to `self.weak_self.upgrade()`! The Arc object must have been
|
||||
// dropped at this point.
|
||||
let self_observer = self.weak_self.clone() as Weak<dyn Observer<IoEvents>>;
|
||||
|
||||
// Unregister ourself from all interesting files' notifiers
|
||||
let mut interest_entries = self.interest.lock().unwrap();
|
||||
interest_entries.drain().for_each(|(_, ep_entry)| {
|
||||
if let Some(notifier) = ep_entry.file.notifier() {
|
||||
notifier.unregister(&self_observer);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Observer<IoEvents> for EpollFile {
|
||||
fn on_event(&self, _events: &IoEvents, metadata: &Option<Weak<dyn Any + Send + Sync>>) {
|
||||
let ep_entry_opt = metadata
|
||||
.as_ref()
|
||||
.and_then(|weak_any| weak_any.upgrade())
|
||||
.and_then(|strong_any| strong_any.downcast().ok());
|
||||
let ep_entry: Arc<EpollEntry> = match ep_entry_opt {
|
||||
None => return,
|
||||
Some(ep_entry) => ep_entry,
|
||||
};
|
||||
|
||||
self.push_ready(ep_entry);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for EpollFile {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("EpollFile")
|
||||
.field("interest", &self.interest.lock().unwrap())
|
||||
.field("ready", &self.ready.lock().unwrap())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait AsEpollFile {
|
||||
fn as_epoll_file(&self) -> Result<&EpollFile>;
|
||||
}
|
||||
|
||||
impl AsEpollFile for FileRef {
|
||||
fn as_epoll_file(&self) -> Result<&EpollFile> {
|
||||
self.as_any()
|
||||
.downcast_ref::<EpollFile>()
|
||||
.ok_or_else(|| errno!(EBADF, "not an epoll file"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct EpollEntry {
|
||||
fd: FileDesc,
|
||||
file: FileRef,
|
||||
inner: SgxMutex<EpollEntryInner>,
|
||||
// Whether the entry is in the ready list
|
||||
is_ready: AtomicBool,
|
||||
// Whether the entry has been deleted from the interest list
|
||||
is_deleted: AtomicBool,
|
||||
}
|
||||
|
||||
impl EpollEntry {
|
||||
pub fn new(fd: FileDesc, file: FileRef, event: EpollEvent, flags: EpollFlags) -> Self {
|
||||
let is_ready = Default::default();
|
||||
let is_deleted = Default::default();
|
||||
let inner = SgxMutex::new(EpollEntryInner { event, flags });
|
||||
Self {
|
||||
fd,
|
||||
file,
|
||||
inner,
|
||||
is_ready,
|
||||
is_deleted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct EpollEntryInner {
|
||||
event: EpollEvent,
|
||||
flags: EpollFlags,
|
||||
}
|
98
src/libos/src/net/io_multiplexing/epoll/epoll_waiter.rs
Normal file
98
src/libos/src/net/io_multiplexing/epoll/epoll_waiter.rs
Normal file
@ -0,0 +1,98 @@
|
||||
use std::ptr;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::host_file_epoller::HostFileEpoller;
|
||||
use crate::events::Waiter;
|
||||
use crate::prelude::*;
|
||||
use crate::time::{timespec_t, TIMERSLACK};
|
||||
|
||||
/// A waiter that is suitable for epoll.
|
||||
pub struct EpollWaiter {
|
||||
waiter: Waiter,
|
||||
host_epoll_fd: FileDesc,
|
||||
}
|
||||
|
||||
impl EpollWaiter {
|
||||
pub fn new(host_file_epoller: &HostFileEpoller) -> Self {
|
||||
Self {
|
||||
waiter: Waiter::new(),
|
||||
host_epoll_fd: host_file_epoller.host_fd().to_raw(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait until the waiter is waken or the host epoll file has any
|
||||
/// events or the method call is timeout or interrupted.
|
||||
pub fn wait_mut(&self, mut timeout: Option<&mut Duration>) -> Result<()> {
|
||||
const ZERO: Duration = Duration::from_secs(0);
|
||||
if let Some(timeout) = timeout.as_ref() {
|
||||
if **timeout == ZERO {
|
||||
return_errno!(ETIMEDOUT, "should return immediately");
|
||||
}
|
||||
}
|
||||
|
||||
let host_eventfd = libc::pollfd {
|
||||
fd: self.waiter.host_eventfd().host_fd() as i32,
|
||||
events: libc::POLLIN,
|
||||
revents: 0,
|
||||
};
|
||||
let host_epf = libc::pollfd {
|
||||
fd: self.host_epoll_fd as i32,
|
||||
events: libc::POLLIN,
|
||||
revents: 0,
|
||||
};
|
||||
let mut pollfds = [host_eventfd, host_epf];
|
||||
let host_eventfd_idx = 0;
|
||||
|
||||
let num_events = try_libc!({
|
||||
let mut remain_c = timeout.as_ref().map(|timeout| timespec_t::from(**timeout));
|
||||
let remain_c_ptr = remain_c.as_mut().map_or(ptr::null_mut(), |mut_ref| mut_ref);
|
||||
|
||||
let mut ret = 0;
|
||||
let status = unsafe {
|
||||
occlum_ocall_poll_with_eventfd(
|
||||
&mut ret,
|
||||
(&mut pollfds[..]).as_mut_ptr(),
|
||||
pollfds.len() as u32,
|
||||
remain_c_ptr,
|
||||
host_eventfd_idx,
|
||||
)
|
||||
};
|
||||
assert!(status == sgx_status_t::SGX_SUCCESS);
|
||||
|
||||
if let Some(timeout) = timeout.as_mut() {
|
||||
let remain = remain_c.unwrap().as_duration();
|
||||
assert!(remain <= **timeout + TIMERSLACK.to_duration());
|
||||
**timeout = remain;
|
||||
}
|
||||
|
||||
ret
|
||||
});
|
||||
|
||||
// Poll syscall does not treat timeout as error. So we need
|
||||
// to distinguish the case by ourselves.
|
||||
if let Some(timeout) = timeout.as_mut() {
|
||||
if num_events == 0 {
|
||||
**timeout = ZERO;
|
||||
return_errno!(ETIMEDOUT, "no results and the time is up");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Waiter> for EpollWaiter {
|
||||
fn as_ref(&self) -> &Waiter {
|
||||
&self.waiter
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
fn occlum_ocall_poll_with_eventfd(
|
||||
ret: *mut i32,
|
||||
fds: *mut libc::pollfd,
|
||||
nfds: u32,
|
||||
timeout: *mut timespec_t,
|
||||
eventfd_idx: i32,
|
||||
) -> sgx_status_t;
|
||||
}
|
200
src/libos/src/net/io_multiplexing/epoll/host_file_epoller.rs
Normal file
200
src/libos/src/net/io_multiplexing/epoll/host_file_epoller.rs
Normal file
@ -0,0 +1,200 @@
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
|
||||
use super::{EpollCtl, EpollEvent, EpollFlags};
|
||||
use crate::fs::{HostFd, IoEvents};
|
||||
use crate::prelude::*;
|
||||
|
||||
/// An epoll-based helper type to poll the states of a set of host files.
|
||||
#[derive(Debug)]
|
||||
pub struct HostFileEpoller {
|
||||
/// A map from host fd to HostFile, which maintains the set of the interesting
|
||||
/// host files.
|
||||
host_files: SgxMutex<HashMap<FileDesc, FileRef>>,
|
||||
/// The number of the interesting host files.
|
||||
count: AtomicUsize,
|
||||
/// The host fd of the underlying host epoll file.
|
||||
host_epoll_fd: HostFd,
|
||||
/// Whether the host epoll file has any events.
|
||||
is_ready: AtomicBool,
|
||||
}
|
||||
|
||||
// TODO: the `add/mod/del_file` operation can be postponed until a `poll_files` operation,
|
||||
// thus reducing the number of OCalls.
|
||||
|
||||
impl HostFileEpoller {
|
||||
pub fn new() -> Self {
|
||||
let host_files = Default::default();
|
||||
let count = Default::default();
|
||||
let host_epoll_fd = {
|
||||
let raw_host_fd = (|| -> Result<u32> {
|
||||
let raw_host_fd = try_libc!(libc::ocall::epoll_create1(0)) as u32;
|
||||
Ok(raw_host_fd)
|
||||
})()
|
||||
.expect("epoll_create should never fail");
|
||||
|
||||
HostFd::new(raw_host_fd)
|
||||
};
|
||||
let is_ready = Default::default();
|
||||
Self {
|
||||
host_files,
|
||||
count,
|
||||
host_epoll_fd,
|
||||
is_ready,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_file(&self, host_file: FileRef, event: EpollEvent, flags: EpollFlags) -> Result<()> {
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let already_added = host_files.insert(host_fd, host_file.clone()).is_some();
|
||||
if already_added {
|
||||
// TODO: handle the case where one host file is somehow to be added more than once.
|
||||
warn!(
|
||||
"Cannot handle the case of adding the same host file twice in a robust way.
|
||||
This can happen if the same `HostFile` is accessible via two different LibOS fds."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.count.fetch_add(1, Ordering::Relaxed);
|
||||
self.do_epoll_ctl(libc::EPOLL_CTL_ADD, &host_file, Some((event, flags)))
|
||||
|
||||
// Concurrency note:
|
||||
// The lock on self.host_files must be hold while invoking
|
||||
// do_epoll_ctl to prevent race conditions that cause the OCall to fail.
|
||||
// This same argument applies to mod_file and del_file methods.
|
||||
}
|
||||
|
||||
pub fn mod_file(
|
||||
&self,
|
||||
host_file: &FileRef,
|
||||
event: EpollEvent,
|
||||
flags: EpollFlags,
|
||||
) -> Result<()> {
|
||||
let host_files = self.host_files.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let not_added = !host_files.contains_key(&host_fd);
|
||||
if not_added {
|
||||
return_errno!(ENOENT, "the host file must be added before modifying");
|
||||
}
|
||||
|
||||
self.do_epoll_ctl(libc::EPOLL_CTL_MOD, &host_file, Some((event, flags)))
|
||||
}
|
||||
|
||||
pub fn del_file(&self, host_file: &FileRef) -> Result<()> {
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let not_added = !host_files.remove(&host_fd).is_some();
|
||||
if not_added {
|
||||
return_errno!(ENOENT, "the host file must be added before deleting");
|
||||
}
|
||||
|
||||
self.count.fetch_sub(1, Ordering::Relaxed);
|
||||
self.do_epoll_ctl(libc::EPOLL_CTL_DEL, &host_file, None)
|
||||
}
|
||||
|
||||
fn do_epoll_ctl(
|
||||
&self,
|
||||
raw_cmd: i32,
|
||||
host_file: &FileRef,
|
||||
event_and_flags: Option<(EpollEvent, EpollFlags)>,
|
||||
) -> Result<()> {
|
||||
let host_epoll_fd = self.host_epoll_fd.to_raw();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
|
||||
let c_event = event_and_flags.map(|(event, flags)| {
|
||||
let mut c_event = event.to_c();
|
||||
c_event.events |= flags.bits() as u32;
|
||||
c_event.u64 = host_fd as u64;
|
||||
c_event
|
||||
});
|
||||
|
||||
try_libc!(libc::ocall::epoll_ctl(
|
||||
host_epoll_fd as i32,
|
||||
raw_cmd,
|
||||
host_file.host_fd().unwrap().to_raw() as i32,
|
||||
c_event.as_ref().map_or(ptr::null(), |c_event| c_event) as *mut _,
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn poll_events(&self, max_count: usize) -> usize {
|
||||
// Quick check to avoid unnecessary OCall
|
||||
if self.count.load(Ordering::Relaxed) == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Do OCall to poll the host files monitored by the host epoll file
|
||||
let mut raw_events = vec![MaybeUninit::<libc::epoll_event>::uninit(); max_count];
|
||||
let timeout = 0;
|
||||
let ocall_res = || -> Result<usize> {
|
||||
let count = try_libc!(libc::ocall::epoll_wait(
|
||||
self.host_epoll_fd.to_raw() as i32,
|
||||
raw_events.as_mut_ptr() as *mut _,
|
||||
raw_events.len() as c_int,
|
||||
timeout,
|
||||
)) as usize;
|
||||
assert!(count <= max_count);
|
||||
Ok(count)
|
||||
}();
|
||||
|
||||
let mut count = match ocall_res {
|
||||
Ok(count) => count,
|
||||
Err(e) => {
|
||||
warn!("Unexpected error from ocall::epoll_wait(): {:?}", e);
|
||||
0
|
||||
}
|
||||
};
|
||||
if count == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Use the polled events from the host to update the states of the
|
||||
// corresponding host files
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
for raw_event in &raw_events[..count] {
|
||||
let raw_event = unsafe { raw_event.assume_init() };
|
||||
let io_events = IoEvents::from_bits_truncate(raw_event.events as u32);
|
||||
let host_fd = raw_event.u64 as u32;
|
||||
|
||||
let host_file = match host_files.get(&host_fd) {
|
||||
None => {
|
||||
count -= 1;
|
||||
// The corresponding host file may be deleted
|
||||
continue;
|
||||
}
|
||||
Some(host_file) => host_file,
|
||||
};
|
||||
|
||||
host_file.recv_host_events(&io_events, true);
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
pub fn host_fd(&self) -> &HostFd {
|
||||
&self.host_epoll_fd
|
||||
}
|
||||
|
||||
pub fn poll(&self) -> IoEvents {
|
||||
if self.is_ready.load(Ordering::Acquire) {
|
||||
IoEvents::IN
|
||||
} else {
|
||||
IoEvents::empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv_host_events(&self, events: &IoEvents) {
|
||||
let is_ready = events.contains(IoEvents::IN);
|
||||
self.is_ready.store(is_ready, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HostFileEpoller {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
libc::ocall::close(self.host_epoll_fd.to_raw() as i32);
|
||||
}
|
||||
}
|
||||
}
|
69
src/libos/src/net/io_multiplexing/epoll/mod.rs
Normal file
69
src/libos/src/net/io_multiplexing/epoll/mod.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use crate::fs::IoEvents;
|
||||
use crate::prelude::*;
|
||||
|
||||
mod epoll_file;
|
||||
mod epoll_waiter;
|
||||
mod host_file_epoller;
|
||||
|
||||
pub use self::epoll_file::{AsEpollFile, EpollFile};
|
||||
|
||||
/// An epoll control command.
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub enum EpollCtl {
|
||||
Add(FileDesc, EpollEvent, EpollFlags),
|
||||
Del(FileDesc),
|
||||
Mod(FileDesc, EpollEvent, EpollFlags),
|
||||
}
|
||||
|
||||
/// An epoll control flags.
|
||||
bitflags! {
|
||||
pub struct EpollFlags: u32 {
|
||||
const EXCLUSIVE = (1 << 28);
|
||||
const WAKE_UP = (1 << 29);
|
||||
const ONE_SHOT = (1 << 30);
|
||||
const EDGE_TRIGGER = (1 << 31);
|
||||
}
|
||||
}
|
||||
|
||||
impl EpollFlags {
|
||||
pub fn from_c(c_event: &libc::epoll_event) -> Self {
|
||||
EpollFlags::from_bits_truncate(c_event.events)
|
||||
}
|
||||
}
|
||||
|
||||
/// An epoll event.
|
||||
///
|
||||
/// This could be used as either an input of epoll ctl or an output of epoll wait.
|
||||
// Note: the memory layout is compatible with that of C's struct epoll_event.
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub struct EpollEvent {
|
||||
mask: IoEvents,
|
||||
user_data: u64,
|
||||
}
|
||||
|
||||
impl EpollEvent {
|
||||
pub fn new(mask: IoEvents, user_data: u64) -> Self {
|
||||
Self { mask, user_data }
|
||||
}
|
||||
|
||||
pub fn mask(&self) -> IoEvents {
|
||||
self.mask
|
||||
}
|
||||
|
||||
pub fn user_data(&self) -> u64 {
|
||||
self.user_data
|
||||
}
|
||||
|
||||
pub fn from_c(c_event: &libc::epoll_event) -> Self {
|
||||
let mask = IoEvents::from_bits_truncate(c_event.events as u32);
|
||||
let user_data = c_event.u64;
|
||||
Self { mask, user_data }
|
||||
}
|
||||
|
||||
pub fn to_c(&self) -> libc::epoll_event {
|
||||
libc::epoll_event {
|
||||
events: self.mask.bits() as u32,
|
||||
u64: self.user_data,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,18 +1,19 @@
|
||||
use super::*;
|
||||
|
||||
mod epoll;
|
||||
// TODO: the following three modules will soon be removed
|
||||
mod io_event;
|
||||
mod poll;
|
||||
mod select;
|
||||
|
||||
pub use self::epoll::{AsEpollFile, EpollCtlCmd, EpollEvent, EpollEventFlags, EpollFile};
|
||||
pub use self::epoll::{AsEpollFile, EpollCtl, EpollEvent, EpollFile, EpollFlags};
|
||||
pub use self::io_event::{
|
||||
clear_notifier_status, notify_thread, wait_for_notification, IoEvent, THREAD_NOTIFIERS,
|
||||
};
|
||||
pub use self::poll::{do_poll, PollEvent, PollEventFlags};
|
||||
pub use self::select::{select, FdSetExt};
|
||||
|
||||
use fs::{AsDevRandom, AsEvent, CreationFlags, File, FileDesc, FileRef, PipeType};
|
||||
use fs::{AsDevRandom, AsEvent, CreationFlags, File, FileDesc, FileRef, HostFd, PipeType};
|
||||
use std::any::Any;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
|
@ -113,7 +113,7 @@ pub fn do_poll(pollfds: &mut [PollEvent], timeout: *mut timeval_t) -> Result<usi
|
||||
}
|
||||
|
||||
if let Ok(socket) = file_ref.as_host_socket() {
|
||||
let fd = socket.host_fd() as FileDesc;
|
||||
let fd = socket.host_fd().unwrap().to_raw();
|
||||
index_host_pollfds.push(i);
|
||||
host_pollfds.push(PollEvent::new(fd, pollfd.events()));
|
||||
} else if let Ok(eventfd) = file_ref.as_event() {
|
||||
|
@ -1,132 +0,0 @@
|
||||
use super::*;
|
||||
|
||||
use crate::fs::{
|
||||
occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, IoctlCmd, StatusFlags,
|
||||
};
|
||||
use std::any::Any;
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::mem;
|
||||
|
||||
/// Native linux socket
|
||||
#[derive(Debug)]
|
||||
pub struct HostSocket {
|
||||
host_fd: c_int,
|
||||
}
|
||||
|
||||
impl HostSocket {
|
||||
pub fn new(
|
||||
domain: AddressFamily,
|
||||
socket_type: SocketType,
|
||||
file_flags: FileFlags,
|
||||
protocol: i32,
|
||||
) -> Result<Self> {
|
||||
let host_fd = try_libc!(libc::ocall::socket(
|
||||
domain as i32,
|
||||
socket_type as i32 | file_flags.bits(),
|
||||
protocol
|
||||
));
|
||||
Ok(Self { host_fd })
|
||||
}
|
||||
|
||||
pub fn host_fd(&self) -> c_int {
|
||||
self.host_fd
|
||||
}
|
||||
|
||||
pub fn bind(&self, addr: &SockAddr) -> Result<()> {
|
||||
let (addr_ptr, addr_len) = addr.as_ptr_and_len();
|
||||
|
||||
let ret = try_libc!(libc::ocall::bind(
|
||||
self.host_fd(),
|
||||
addr_ptr as *const libc::sockaddr,
|
||||
addr_len as u32
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn listen(&self, backlog: i32) -> Result<()> {
|
||||
let ret = try_libc!(libc::ocall::listen(self.host_fd(), backlog));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option<SockAddr>)> {
|
||||
let mut sockaddr = SockAddr::default();
|
||||
let mut addr_len = sockaddr.len();
|
||||
|
||||
let ret = try_libc!(libc::ocall::accept4(
|
||||
self.host_fd(),
|
||||
sockaddr.as_mut_ptr() as *mut _,
|
||||
&mut addr_len as *mut _ as *mut _,
|
||||
flags.bits()
|
||||
));
|
||||
|
||||
let addr_option = if addr_len != 0 {
|
||||
sockaddr.set_len(addr_len)?;
|
||||
Some(sockaddr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok((Self { host_fd: ret }, addr_option))
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: &Option<SockAddr>) -> Result<()> {
|
||||
debug!("host_fd: {} addr {:?}", self.host_fd(), addr);
|
||||
|
||||
let (addr_ptr, addr_len) = if let Some(sock_addr) = addr {
|
||||
sock_addr.as_ptr_and_len()
|
||||
} else {
|
||||
(std::ptr::null(), 0)
|
||||
};
|
||||
|
||||
let ret = try_libc!(libc::ocall::connect(
|
||||
self.host_fd(),
|
||||
addr_ptr,
|
||||
addr_len as u32
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sendto(
|
||||
&self,
|
||||
buf: &[u8],
|
||||
flags: SendFlags,
|
||||
addr_option: &Option<SockAddr>,
|
||||
) -> Result<usize> {
|
||||
let bufs = vec![buf];
|
||||
let name_option = addr_option.as_ref().map(|addr| addr.as_slice());
|
||||
self.do_sendmsg(&bufs, flags, name_option, None)
|
||||
}
|
||||
|
||||
pub fn recvfrom(&self, buf: &mut [u8], flags: RecvFlags) -> Result<(usize, Option<SockAddr>)> {
|
||||
let mut sockaddr = SockAddr::default();
|
||||
let mut bufs = vec![buf];
|
||||
let (bytes_recv, addr_len, _, _) =
|
||||
self.do_recvmsg(&mut bufs, flags, Some(sockaddr.as_mut_slice()), None)?;
|
||||
|
||||
let addr_option = if addr_len != 0 {
|
||||
sockaddr.set_len(addr_len)?;
|
||||
Some(sockaddr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok((bytes_recv, addr_option))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HostSocket {
|
||||
fn drop(&mut self) {
|
||||
let ret = unsafe { libc::ocall::close(self.host_fd) };
|
||||
assert!(ret == 0);
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HostSocketType {
|
||||
fn as_host_socket(&self) -> Result<&HostSocket>;
|
||||
}
|
||||
|
||||
impl HostSocketType for FileRef {
|
||||
fn as_host_socket(&self) -> Result<&HostSocket> {
|
||||
self.as_any()
|
||||
.downcast_ref::<HostSocket>()
|
||||
.ok_or_else(|| errno!(EBADF, "not a host socket"))
|
||||
}
|
||||
}
|
@ -13,7 +13,7 @@ impl HostSocket {
|
||||
let mut retval: i32 = 0;
|
||||
let status = occlum_ocall_ioctl(
|
||||
&mut retval as *mut i32,
|
||||
self.host_fd(),
|
||||
self.raw_host_fd() as i32,
|
||||
cmd_num,
|
||||
cmd_arg_ptr,
|
||||
cmd.arg_len(),
|
||||
@ -36,7 +36,7 @@ impl HostSocket {
|
||||
let mut retval: i32 = 0;
|
||||
let status = occlum_ocall_ioctl_repack(
|
||||
&mut retval as *mut i32,
|
||||
self.host_fd(),
|
||||
self.raw_host_fd() as i32,
|
||||
BuiltinIoctlNum::SIOCGIFCONF as i32,
|
||||
arg_ref.ifc_buf,
|
||||
arg_ref.ifc_len,
|
||||
|
@ -1,9 +1,154 @@
|
||||
use super::*;
|
||||
use std::any::Any;
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::mem;
|
||||
|
||||
use atomic::Atomic;
|
||||
|
||||
use super::*;
|
||||
use crate::fs::{
|
||||
occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, HostFd, IoEvents, IoNotifier,
|
||||
IoctlCmd, StatusFlags,
|
||||
};
|
||||
|
||||
mod host_socket;
|
||||
mod ioctl_impl;
|
||||
mod recv;
|
||||
mod send;
|
||||
mod socket_file;
|
||||
|
||||
pub use self::host_socket::{HostSocket, HostSocketType};
|
||||
/// Native linux socket
|
||||
#[derive(Debug)]
|
||||
pub struct HostSocket {
|
||||
host_fd: HostFd,
|
||||
host_events: Atomic<IoEvents>,
|
||||
notifier: IoNotifier,
|
||||
}
|
||||
|
||||
impl HostSocket {
|
||||
pub fn new(
|
||||
domain: AddressFamily,
|
||||
socket_type: SocketType,
|
||||
file_flags: FileFlags,
|
||||
protocol: i32,
|
||||
) -> Result<Self> {
|
||||
let raw_host_fd = try_libc!(libc::ocall::socket(
|
||||
domain as i32,
|
||||
socket_type as i32 | file_flags.bits(),
|
||||
protocol
|
||||
)) as FileDesc;
|
||||
let host_fd = HostFd::new(raw_host_fd);
|
||||
Ok(HostSocket::from_host_fd(host_fd))
|
||||
}
|
||||
|
||||
fn from_host_fd(host_fd: HostFd) -> HostSocket {
|
||||
let host_events = Atomic::new(IoEvents::empty());
|
||||
let notifier = IoNotifier::new();
|
||||
Self {
|
||||
host_fd,
|
||||
host_events,
|
||||
notifier,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bind(&self, addr: &SockAddr) -> Result<()> {
|
||||
let (addr_ptr, addr_len) = addr.as_ptr_and_len();
|
||||
|
||||
let ret = try_libc!(libc::ocall::bind(
|
||||
self.raw_host_fd() as i32,
|
||||
addr_ptr as *const libc::sockaddr,
|
||||
addr_len as u32
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn listen(&self, backlog: i32) -> Result<()> {
|
||||
let ret = try_libc!(libc::ocall::listen(self.raw_host_fd() as i32, backlog));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option<SockAddr>)> {
|
||||
let mut sockaddr = SockAddr::default();
|
||||
let mut addr_len = sockaddr.len();
|
||||
|
||||
let raw_host_fd = try_libc!(libc::ocall::accept4(
|
||||
self.raw_host_fd() as i32,
|
||||
sockaddr.as_mut_ptr() as *mut _,
|
||||
&mut addr_len as *mut _ as *mut _,
|
||||
flags.bits()
|
||||
)) as FileDesc;
|
||||
let host_fd = HostFd::new(raw_host_fd);
|
||||
|
||||
let addr_option = if addr_len != 0 {
|
||||
sockaddr.set_len(addr_len)?;
|
||||
Some(sockaddr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok((HostSocket::from_host_fd(host_fd), addr_option))
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: &Option<SockAddr>) -> Result<()> {
|
||||
debug!("connect: host_fd: {}, addr {:?}", self.raw_host_fd(), addr);
|
||||
|
||||
let (addr_ptr, addr_len) = if let Some(sock_addr) = addr {
|
||||
sock_addr.as_ptr_and_len()
|
||||
} else {
|
||||
(std::ptr::null(), 0)
|
||||
};
|
||||
|
||||
let ret = try_libc!(libc::ocall::connect(
|
||||
self.raw_host_fd() as i32,
|
||||
addr_ptr,
|
||||
addr_len as u32
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sendto(
|
||||
&self,
|
||||
buf: &[u8],
|
||||
flags: SendFlags,
|
||||
addr_option: &Option<SockAddr>,
|
||||
) -> Result<usize> {
|
||||
let bufs = vec![buf];
|
||||
let name_option = addr_option.as_ref().map(|addr| addr.as_slice());
|
||||
self.do_sendmsg(&bufs, flags, name_option, None)
|
||||
}
|
||||
|
||||
pub fn recvfrom(&self, buf: &mut [u8], flags: RecvFlags) -> Result<(usize, Option<SockAddr>)> {
|
||||
let mut sockaddr = SockAddr::default();
|
||||
let mut bufs = vec![buf];
|
||||
let (bytes_recv, addr_len, _, _) =
|
||||
self.do_recvmsg(&mut bufs, flags, Some(sockaddr.as_mut_slice()), None)?;
|
||||
|
||||
let addr_option = if addr_len != 0 {
|
||||
sockaddr.set_len(addr_len)?;
|
||||
Some(sockaddr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok((bytes_recv, addr_option))
|
||||
}
|
||||
|
||||
pub fn raw_host_fd(&self) -> FileDesc {
|
||||
self.host_fd.to_raw()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HostSocket {
|
||||
fn drop(&mut self) {
|
||||
let ret = unsafe { libc::ocall::close(self.host_fd.to_raw() as i32) };
|
||||
assert!(ret == 0);
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HostSocketType {
|
||||
fn as_host_socket(&self) -> Result<&HostSocket>;
|
||||
}
|
||||
|
||||
impl HostSocketType for FileRef {
|
||||
fn as_host_socket(&self) -> Result<&HostSocket> {
|
||||
self.as_any()
|
||||
.downcast_ref::<HostSocket>()
|
||||
.ok_or_else(|| errno!(EBADF, "not a host socket"))
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ impl HostSocket {
|
||||
) -> Result<(usize, usize, usize, MsgHdrFlags)> {
|
||||
// Prepare the arguments for OCall
|
||||
// Host socket fd
|
||||
let host_fd = self.host_fd();
|
||||
let host_fd = self.raw_host_fd() as i32;
|
||||
// Name
|
||||
let (msg_name, msg_namelen) = name.as_mut_ptr_and_len();
|
||||
let msg_name = msg_name as *mut c_void;
|
||||
|
@ -46,7 +46,7 @@ impl HostSocket {
|
||||
// Prepare the arguments for OCall
|
||||
let mut retval: isize = 0;
|
||||
// Host socket fd
|
||||
let host_fd = self.host_fd();
|
||||
let host_fd = self.raw_host_fd() as i32;
|
||||
// Name
|
||||
let (msg_name, msg_namelen) = name.as_ptr_and_len();
|
||||
let msg_name = msg_name as *const c_void;
|
||||
|
@ -1,11 +1,14 @@
|
||||
use super::*;
|
||||
|
||||
use crate::fs::{
|
||||
occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, IoctlCmd, StatusFlags,
|
||||
};
|
||||
use std::any::Any;
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
|
||||
use atomic::{Atomic, Ordering};
|
||||
|
||||
use super::*;
|
||||
use crate::fs::{
|
||||
occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, HostFd, IoEvents, IoctlCmd,
|
||||
StatusFlags,
|
||||
};
|
||||
|
||||
//TODO: refactor write syscall to allow zero length with non-zero buffer
|
||||
impl File for HostSocket {
|
||||
fn read(&self, buf: &mut [u8]) -> Result<usize> {
|
||||
@ -52,7 +55,10 @@ impl File for HostSocket {
|
||||
}
|
||||
|
||||
fn get_status_flags(&self) -> Result<StatusFlags> {
|
||||
let ret = try_libc!(libc::ocall::fcntl_arg0(self.host_fd(), libc::F_GETFL));
|
||||
let ret = try_libc!(libc::ocall::fcntl_arg0(
|
||||
self.raw_host_fd() as i32,
|
||||
libc::F_GETFL
|
||||
));
|
||||
Ok(StatusFlags::from_bits_truncate(ret as u32))
|
||||
}
|
||||
|
||||
@ -64,13 +70,33 @@ impl File for HostSocket {
|
||||
| StatusFlags::O_NONBLOCK;
|
||||
let raw_status_flags = (new_status_flags & valid_flags_mask).bits();
|
||||
try_libc!(libc::ocall::fcntl_arg1(
|
||||
self.host_fd(),
|
||||
self.raw_host_fd() as i32,
|
||||
libc::F_SETFL,
|
||||
raw_status_flags as c_int
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
self.host_events.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
fn host_fd(&self) -> Option<&HostFd> {
|
||||
Some(&self.host_fd)
|
||||
}
|
||||
|
||||
fn notifier(&self) -> Option<&IoNotifier> {
|
||||
Some(&self.notifier)
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.store(*events, Ordering::Release);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
use super::*;
|
||||
|
||||
use super::io_multiplexing::{AsEpollFile, EpollCtlCmd, EpollEventFlags, EpollFile, FdSetExt};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::io_multiplexing::{AsEpollFile, EpollCtl, EpollFile, EpollFlags, FdSetExt};
|
||||
use fs::{CreationFlags, File, FileDesc, FileRef};
|
||||
use misc::resource_t;
|
||||
use process::Process;
|
||||
@ -163,7 +166,7 @@ pub fn do_shutdown(fd: c_int, how: c_int) -> Result<isize> {
|
||||
debug!("shutdown: fd: {}, how: {}", fd, how);
|
||||
let file_ref = current!().file(fd as FileDesc)?;
|
||||
if let Ok(socket) = file_ref.as_host_socket() {
|
||||
let ret = try_libc!(libc::ocall::shutdown(socket.host_fd(), how));
|
||||
let ret = try_libc!(libc::ocall::shutdown(socket.raw_host_fd() as i32, how));
|
||||
Ok(ret as isize)
|
||||
} else {
|
||||
// TODO: support unix socket
|
||||
@ -185,7 +188,7 @@ pub fn do_setsockopt(
|
||||
let file_ref = current!().file(fd as FileDesc)?;
|
||||
if let Ok(socket) = file_ref.as_host_socket() {
|
||||
let ret = try_libc!(libc::ocall::setsockopt(
|
||||
socket.host_fd(),
|
||||
socket.raw_host_fd() as i32,
|
||||
level,
|
||||
optname,
|
||||
optval,
|
||||
@ -215,7 +218,7 @@ pub fn do_getsockopt(
|
||||
let socket = file_ref.as_host_socket()?;
|
||||
|
||||
let ret = try_libc!(libc::ocall::getsockopt(
|
||||
socket.host_fd(),
|
||||
socket.raw_host_fd() as i32,
|
||||
level,
|
||||
optname,
|
||||
optval,
|
||||
@ -235,7 +238,11 @@ pub fn do_getpeername(
|
||||
);
|
||||
let file_ref = current!().file(fd as FileDesc)?;
|
||||
if let Ok(socket) = file_ref.as_host_socket() {
|
||||
let ret = try_libc!(libc::ocall::getpeername(socket.host_fd(), addr, addr_len));
|
||||
let ret = try_libc!(libc::ocall::getpeername(
|
||||
socket.raw_host_fd() as i32,
|
||||
addr,
|
||||
addr_len
|
||||
));
|
||||
Ok(ret as isize)
|
||||
} else if let Ok(unix_socket) = file_ref.as_unix_socket() {
|
||||
warn!("getpeername for unix socket is unimplemented");
|
||||
@ -259,7 +266,11 @@ pub fn do_getsockname(
|
||||
);
|
||||
let file_ref = current!().file(fd as FileDesc)?;
|
||||
if let Ok(socket) = file_ref.as_host_socket() {
|
||||
let ret = try_libc!(libc::ocall::getsockname(socket.host_fd(), addr, addr_len));
|
||||
let ret = try_libc!(libc::ocall::getsockname(
|
||||
socket.raw_host_fd() as i32,
|
||||
addr,
|
||||
addr_len
|
||||
));
|
||||
Ok(ret as isize)
|
||||
} else if let Ok(unix_socket) = file_ref.as_unix_socket() {
|
||||
warn!("getsockname for unix socket is unimplemented");
|
||||
@ -611,40 +622,53 @@ pub fn do_epoll_create(size: c_int) -> Result<isize> {
|
||||
}
|
||||
|
||||
pub fn do_epoll_create1(raw_flags: c_int) -> Result<isize> {
|
||||
debug!("epoll_create: raw_flags: {:?}", raw_flags);
|
||||
|
||||
// Only O_CLOEXEC is valid
|
||||
let flags = CreationFlags::from_bits(raw_flags as u32)
|
||||
.ok_or_else(|| errno!(EINVAL, "invalid flags"))?
|
||||
& CreationFlags::O_CLOEXEC;
|
||||
let epoll_file = io_multiplexing::EpollFile::new(flags)?;
|
||||
let file_ref: Arc<dyn File> = Arc::new(epoll_file);
|
||||
let epoll_file: Arc<EpollFile> = EpollFile::new();
|
||||
let close_on_spawn = flags.contains(CreationFlags::O_CLOEXEC);
|
||||
let fd = current!().add_file(file_ref, close_on_spawn);
|
||||
|
||||
Ok(fd as isize)
|
||||
let epfd = current!().add_file(epoll_file, close_on_spawn);
|
||||
Ok(epfd as isize)
|
||||
}
|
||||
|
||||
pub fn do_epoll_ctl(
|
||||
epfd: c_int,
|
||||
op: c_int,
|
||||
fd: c_int,
|
||||
event: *const libc::epoll_event,
|
||||
event_ptr: *const libc::epoll_event,
|
||||
) -> Result<isize> {
|
||||
debug!("epoll_ctl: epfd: {}, op: {:?}, fd: {}", epfd, op, fd);
|
||||
let inner_event = if !event.is_null() {
|
||||
from_user::check_ptr(event)?;
|
||||
Some(EpollEvent::from_raw(unsafe { &*event })?)
|
||||
} else {
|
||||
None
|
||||
|
||||
let get_c_event = |event_ptr| -> Result<&libc::epoll_event> {
|
||||
from_user::check_ptr(event_ptr)?;
|
||||
Ok(unsafe { &*event_ptr })
|
||||
};
|
||||
|
||||
let fd = fd as FileDesc;
|
||||
let ctl_cmd = match op {
|
||||
libc::EPOLL_CTL_ADD => {
|
||||
let c_event = get_c_event(event_ptr)?;
|
||||
let event = EpollEvent::from_c(c_event);
|
||||
let flags = EpollFlags::from_c(c_event);
|
||||
EpollCtl::Add(fd, event, flags)
|
||||
}
|
||||
libc::EPOLL_CTL_DEL => EpollCtl::Del(fd),
|
||||
libc::EPOLL_CTL_MOD => {
|
||||
let c_event = get_c_event(event_ptr)?;
|
||||
let event = EpollEvent::from_c(c_event);
|
||||
let flags = EpollFlags::from_c(c_event);
|
||||
EpollCtl::Mod(fd, event, flags)
|
||||
}
|
||||
_ => return_errno!(EINVAL, "invalid op"),
|
||||
};
|
||||
|
||||
let epfile_ref = current!().file(epfd as FileDesc)?;
|
||||
let epoll_file = epfile_ref.as_epfile()?;
|
||||
let epoll_file = epfile_ref.as_epoll_file()?;
|
||||
|
||||
epoll_file.control(
|
||||
EpollCtlCmd::try_from(op)?,
|
||||
fd as FileDesc,
|
||||
inner_event.as_ref(),
|
||||
)?;
|
||||
epoll_file.control(&ctl_cmd)?;
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
@ -652,8 +676,13 @@ pub fn do_epoll_wait(
|
||||
epfd: c_int,
|
||||
events: *mut libc::epoll_event,
|
||||
max_events: c_int,
|
||||
timeout: c_int,
|
||||
timeout_ms: c_int,
|
||||
) -> Result<isize> {
|
||||
debug!(
|
||||
"epoll_wait: epfd: {}, max_events: {:?}, timeout_ms: {}",
|
||||
epfd, max_events, timeout_ms
|
||||
);
|
||||
|
||||
let max_events = {
|
||||
if max_events <= 0 {
|
||||
return_errno!(EINVAL, "maxevents <= 0");
|
||||
@ -666,23 +695,26 @@ pub fn do_epoll_wait(
|
||||
};
|
||||
|
||||
// A new vector to store EpollEvent, which may degrade the performance due to extra copy.
|
||||
let mut inner_events: Vec<EpollEvent> =
|
||||
vec![EpollEvent::new(EpollEventFlags::empty(), 0); max_events];
|
||||
let mut inner_events: Vec<MaybeUninit<EpollEvent>> = vec![MaybeUninit::uninit(); max_events];
|
||||
|
||||
debug!(
|
||||
"epoll_wait: epfd: {}, len: {:?}, timeout: {}",
|
||||
epfd,
|
||||
raw_events.len(),
|
||||
timeout
|
||||
timeout_ms,
|
||||
);
|
||||
|
||||
let epfile_ref = current!().file(epfd as FileDesc)?;
|
||||
let epoll_file = epfile_ref.as_epfile()?;
|
||||
|
||||
let count = epoll_file.wait(&mut inner_events, timeout)?;
|
||||
let epoll_file = epfile_ref.as_epoll_file()?;
|
||||
let timeout = if timeout_ms >= 0 {
|
||||
Some(Duration::from_millis(timeout_ms as u64))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let count = epoll_file.wait(&mut inner_events, timeout.as_ref())?;
|
||||
|
||||
for i in 0..count {
|
||||
raw_events[i] = inner_events[i].to_raw();
|
||||
raw_events[i] = unsafe { inner_events[i].assume_init() }.to_c();
|
||||
}
|
||||
|
||||
Ok(count as isize)
|
||||
@ -698,7 +730,7 @@ pub fn do_epoll_pwait(
|
||||
if !sigmask.is_null() {
|
||||
warn!("epoll_pwait cannot handle signal mask, yet");
|
||||
} else {
|
||||
info!("epoll_wait");
|
||||
debug!("epoll_wait");
|
||||
}
|
||||
do_epoll_wait(epfd, events, maxevents, timeout)
|
||||
}
|
||||
|
@ -41,3 +41,31 @@ void occlum_ocall_eventfd_write_batch(
|
||||
write(eventfds[fd_i], &val, sizeof(val));
|
||||
}
|
||||
}
|
||||
|
||||
int occlum_ocall_poll_with_eventfd(
|
||||
struct pollfd *pollfds,
|
||||
nfds_t nfds,
|
||||
struct timespec *timeout,
|
||||
int eventfd_idx
|
||||
) {
|
||||
if (eventfd_idx >= 0) {
|
||||
pollfds[eventfd_idx].events |= POLLIN;
|
||||
}
|
||||
|
||||
// We use the ppoll syscall directly instead of the libc wrapper. This
|
||||
// is because the syscall version updates the timeout argument to indicate
|
||||
// how much time was left (which what we want), while the libc wrapper
|
||||
// keeps the timeout argument unchanged.
|
||||
int ret = RAW_PPOLL(pollfds, nfds, timeout);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (eventfd_idx >= 0 && (pollfds[eventfd_idx].revents & POLLIN) != 0) {
|
||||
int eventfd = pollfds[eventfd_idx].fd;
|
||||
char buf[8];
|
||||
read(eventfd, buf, 8);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
#include <errno.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/select.h>
|
||||
#include <sys/syscall.h>
|
||||
#include <sys/wait.h>
|
||||
@ -98,6 +100,51 @@ int test_select_timeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_epoll_timeout() {
|
||||
int pipe_fds[2];
|
||||
if (pipe(pipe_fds) < 0) {
|
||||
THROW_ERROR("failed to create a pipe");
|
||||
}
|
||||
int pipe_read_fd = pipe_fds[0];
|
||||
int pipe_write_fd = pipe_fds[1];
|
||||
|
||||
int ep_fd = epoll_create1(0);
|
||||
if (ep_fd < 0) {
|
||||
THROW_ERROR("failed to create an epoll");
|
||||
}
|
||||
|
||||
int ret;
|
||||
struct epoll_event event;
|
||||
|
||||
event.events = EPOLLIN; // we want the write end to be readable
|
||||
event.data.u32 = pipe_write_fd;
|
||||
ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_write_fd, &event);
|
||||
if (ret < 0) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
event.events = EPOLLOUT; // we want the read end to be writable
|
||||
event.data.u32 = pipe_read_fd;
|
||||
ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_read_fd, &event);
|
||||
if (ret < 0) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
// We are waiting for the write end to be readable or the read end to be
|
||||
// writable, which can never happen. So the epoll_wait must end with
|
||||
// timeout.
|
||||
errno = 0;
|
||||
struct epoll_event events[2];
|
||||
ret = epoll_wait(ep_fd, events, ARRAY_SIZE(events), 10 /* ms */);
|
||||
if (ret != 0 || errno != 0) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
free_pipe(pipe_fds);
|
||||
close(ep_fd);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_poll_timeout() {
|
||||
// Start the timer
|
||||
struct timeval tv_start, tv_end;
|
||||
@ -167,6 +214,48 @@ int test_poll_no_timeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_epoll_no_timeout() {
|
||||
int pipe_fds[2];
|
||||
if (pipe(pipe_fds) < 0) {
|
||||
THROW_ERROR("failed to create a pipe");
|
||||
}
|
||||
int pipe_read_fd = pipe_fds[0];
|
||||
int pipe_write_fd = pipe_fds[1];
|
||||
|
||||
int ep_fd = epoll_create1(0);
|
||||
if (ep_fd < 0) {
|
||||
THROW_ERROR("failed to create an epoll");
|
||||
}
|
||||
|
||||
int ret;
|
||||
struct epoll_event event;
|
||||
|
||||
event.events = EPOLLOUT; // writable
|
||||
event.data.u32 = pipe_write_fd;
|
||||
ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_write_fd, &event);
|
||||
if (ret < 0) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
event.events = EPOLLIN; // readable
|
||||
event.data.u32 = pipe_read_fd;
|
||||
ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, pipe_read_fd, &event);
|
||||
if (ret < 0) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
struct epoll_event events[2];
|
||||
ret = epoll_wait(ep_fd, events, ARRAY_SIZE(events), -1);
|
||||
// pipe_write_fd is ready, while pipe_read_fd is not
|
||||
if (ret != 1) {
|
||||
THROW_ERROR("failed to do epoll ctl");
|
||||
}
|
||||
|
||||
free_pipe(pipe_fds);
|
||||
close(ep_fd);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_select_read_write() {
|
||||
int pipe_fds[2];
|
||||
if (pipe(pipe_fds) < 0) {
|
||||
@ -229,8 +318,10 @@ static test_case_t test_cases[] = {
|
||||
TEST_CASE(test_create_with_flags),
|
||||
//TEST_CASE(test_select_timeout),
|
||||
//TEST_CASE(test_poll_timeout),
|
||||
TEST_CASE(test_epoll_timeout),
|
||||
//TEST_CASE(test_select_no_timeout),
|
||||
//TEST_CASE(test_poll_no_timeout),
|
||||
TEST_CASE(test_epoll_no_timeout),
|
||||
//TEST_CASE(test_select_read_write),
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user