Rewrite epoll implementation and the test

This commit is contained in:
He Sun 2020-03-18 14:04:24 +08:00
parent 60b1e2c28d
commit 221f5b78e8
10 changed files with 609 additions and 562 deletions

@ -1,367 +0,0 @@
use super::*;
use fs::{AsDevRandom, AsEvent, File, FileDesc, FileRef};
use std::any::Any;
use std::collections::btree_map::BTreeMap;
use std::fmt;
use std::sync::atomic::spin_loop_hint;
use std::vec::Vec;
/// Forward to host `poll`
/// (sgx_libc doesn't have `select`)
pub fn do_select(
nfds: usize,
readfds: &mut libc::fd_set,
writefds: &mut libc::fd_set,
exceptfds: &mut libc::fd_set,
timeout: Option<libc::timeval>,
) -> Result<usize> {
debug!("select: nfds: {}", nfds);
// convert libos fd to Linux fd
let mut host_to_libos_fd = [0; libc::FD_SETSIZE];
let mut polls = Vec::<libc::pollfd>::new();
let current_ref = process::get_current();
let mut proc = current_ref.lock().unwrap();
let file_table = proc.get_files().lock().unwrap();
for fd in 0..nfds {
let fd_ref = file_table.get(fd as FileDesc)?;
let (r, w, e) = (
readfds.is_set(fd),
writefds.is_set(fd),
exceptfds.is_set(fd),
);
if !(r || w || e) {
continue;
}
if let Ok(socket) = fd_ref.as_unix_socket() {
warn!("select unix socket is unimplemented, spin for read");
readfds.clear();
writefds.clear();
exceptfds.clear();
// FIXME: spin poll until can read (hack for php)
while r && socket.poll()?.0 == false {
spin_loop_hint();
}
let (rr, ww, ee) = socket.poll()?;
if r && rr {
readfds.set(fd);
}
if w && ww {
writefds.set(fd);
}
if e && ee {
writefds.set(fd);
}
return Ok(1);
}
let host_fd = if let Ok(socket) = fd_ref.as_socket() {
socket.fd()
} else if let Ok(eventfd) = fd_ref.as_event() {
eventfd.get_host_fd()
} else {
return_errno!(EBADF, "unsupported file type");
};
host_to_libos_fd[host_fd as usize] = fd;
let mut events = 0;
if r {
events |= libc::POLLIN;
}
if w {
events |= libc::POLLOUT;
}
if e {
events |= libc::POLLERR;
}
polls.push(libc::pollfd {
fd: host_fd as c_int,
events,
revents: 0,
});
}
// Unlock the current process and its file table as early as possible
drop(file_table);
drop(proc);
let timeout = match timeout {
None => -1,
Some(tv) => (tv.tv_sec * 1000 + tv.tv_usec / 1000) as i32,
};
let ret = try_libc!(libc::ocall::poll(
polls.as_mut_ptr(),
polls.len() as u64,
timeout
));
// convert fd back and write fdset
readfds.clear();
writefds.clear();
exceptfds.clear();
for poll in polls.iter() {
let fd = host_to_libos_fd[poll.fd as usize];
if poll.revents & libc::POLLIN != 0 {
readfds.set(fd);
}
if poll.revents & libc::POLLOUT != 0 {
writefds.set(fd);
}
if poll.revents & libc::POLLERR != 0 {
exceptfds.set(fd);
}
}
Ok(ret as usize)
}
pub fn do_poll(pollfds: &mut [libc::pollfd], timeout: c_int) -> Result<usize> {
debug!(
"poll: {:?}, timeout: {}",
pollfds.iter().map(|p| p.fd).collect::<Vec<_>>(),
timeout
);
// Untrusted pollfd's that will be modified by OCall
let mut u_pollfds: Vec<libc::pollfd> = pollfds.to_vec();
let current_ref = process::get_current();
let mut proc = current_ref.lock().unwrap();
for (i, pollfd) in pollfds.iter_mut().enumerate() {
// Poll should just ignore negative fds
if pollfd.fd < 0 {
u_pollfds[i].fd = -1;
u_pollfds[i].revents = 0;
continue;
}
let file_ref = proc
.get_files()
.lock()
.unwrap()
.get(pollfd.fd as FileDesc)?;
if let Ok(socket) = file_ref.as_socket() {
// convert libos fd to host fd in the copy to keep pollfds unchanged
u_pollfds[i].fd = socket.fd();
u_pollfds[i].revents = 0;
} else if let Ok(eventfd) = file_ref.as_event() {
u_pollfds[i].fd = eventfd.get_host_fd();
u_pollfds[i].revents = 0;
} else if let Ok(socket) = file_ref.as_unix_socket() {
// FIXME: spin poll until can read (hack for php)
while (pollfd.events & libc::POLLIN) != 0 && socket.poll()?.0 == false {
spin_loop_hint();
}
let (r, w, e) = socket.poll()?;
if r {
pollfd.revents |= libc::POLLIN;
}
if w {
pollfd.revents |= libc::POLLOUT;
}
pollfd.revents &= pollfd.events;
if e {
pollfd.revents |= libc::POLLERR;
}
warn!("poll unix socket is unimplemented, spin for read");
return Ok(1);
} else if let Ok(dev_random) = file_ref.as_dev_random() {
return Ok(dev_random.poll(pollfd)?);
} else {
return_errno!(EBADF, "not a supported file type");
}
}
// Unlock the current process as early as possible
drop(proc);
let num_events = try_libc!(libc::ocall::poll(
u_pollfds.as_mut_ptr(),
u_pollfds.len() as u64,
timeout
)) as usize;
assert!(num_events <= pollfds.len());
// Copy back revents from the untrusted pollfds
let mut num_nonzero_revents = 0;
for (i, pollfd) in pollfds.iter_mut().enumerate() {
if u_pollfds[i].revents == 0 {
continue;
}
pollfd.revents = u_pollfds[i].revents;
num_nonzero_revents += 1;
}
assert!(num_nonzero_revents == num_events);
Ok(num_events as usize)
}
pub fn do_epoll_create1(flags: c_int) -> Result<FileDesc> {
debug!("epoll_create1: flags: {}", flags);
let epoll = EpollFile::new()?;
let file_ref: Arc<Box<dyn File>> = Arc::new(Box::new(epoll));
let close_on_spawn = flags & libc::EPOLL_CLOEXEC != 0;
let fd = process::put_file(file_ref, close_on_spawn)?;
Ok(fd)
}
pub fn do_epoll_ctl(
epfd: FileDesc,
op: c_int,
fd: FileDesc,
event: *const libc::epoll_event,
) -> Result<()> {
debug!("epoll_ctl: epfd: {}, op: {:?}, fd: {}", epfd, op, fd);
let host_fd = {
let fd_ref = process::get_file(fd)?;
if let Ok(socket) = fd_ref.as_socket() {
socket.fd() as FileDesc
} else if let Ok(eventfd) = fd_ref.as_event() {
eventfd.get_host_fd() as FileDesc
} else {
return_errno!(EPERM, "unsupported file type");
}
// Notes on deadlock.
//
// All locks on fd (if any) will be released at this point. This means
// we don't have to worry about the potential deadlock caused by
// locking two files (say, fd and epfd) in an inconsistent order.
};
let epoll_file_ref = process::get_file(epfd)?;
let epoll = &epoll_file_ref.as_epoll()?.inner;
epoll.ctl(op, host_fd, event)?;
Ok(())
}
pub fn do_epoll_wait(
epfd: FileDesc,
events: &mut [libc::epoll_event],
timeout: c_int,
) -> Result<usize> {
debug!(
"epoll_wait: epfd: {}, len: {:?}, timeout: {}",
epfd,
events.len(),
timeout
);
let epoll_file_ref = process::get_file(epfd)?;
let epoll = &epoll_file_ref.as_epoll()?.inner;
let count = epoll.wait(events, timeout)?;
Ok(count)
}
/// Safe methods for `libc::fd_set`
trait FdSetExt {
fn set(&mut self, fd: usize);
fn clear(&mut self);
fn is_set(&mut self, fd: usize) -> bool;
}
impl FdSetExt for libc::fd_set {
fn set(&mut self, fd: usize) {
assert!(fd < libc::FD_SETSIZE);
unsafe {
libc::FD_SET(fd as c_int, self);
}
}
fn clear(&mut self) {
unsafe {
libc::FD_ZERO(self);
}
}
fn is_set(&mut self, fd: usize) -> bool {
assert!(fd < libc::FD_SETSIZE);
unsafe { libc::FD_ISSET(fd as c_int, self) }
}
}
pub struct EpollFile {
inner: EpollFileInner,
}
impl EpollFile {
pub fn new() -> Result<Self> {
Ok(Self {
inner: EpollFileInner::new()?,
})
}
}
struct EpollFileInner {
epoll_fd: c_int,
}
// FIXME: What if a Linux fd is closed but still in an epoll?
impl EpollFileInner {
/// Create a new Linux epoll file descriptor
pub fn new() -> Result<Self> {
let ret = try_libc!(libc::ocall::epoll_create1(0));
Ok(EpollFileInner { epoll_fd: ret })
}
pub fn ctl(&self, op: c_int, host_fd: FileDesc, event: *const libc::epoll_event) -> Result<()> {
try_libc!(libc::ocall::epoll_ctl(
self.epoll_fd,
op,
host_fd as c_int,
event as *mut _
));
Ok(())
}
/// Wait for an I/O event on the epoll.
/// Returns the number of file descriptors ready for the requested I/O.
pub fn wait(&self, events: &mut [libc::epoll_event], timeout: c_int) -> Result<usize> {
let ret = try_libc!(libc::ocall::epoll_wait(
self.epoll_fd,
events.as_mut_ptr(),
events.len() as c_int,
timeout,
));
Ok(ret as usize)
}
}
impl Drop for EpollFileInner {
fn drop(&mut self) {
unsafe {
libc::ocall::close(self.epoll_fd);
}
}
}
impl File for EpollFile {
fn as_any(&self) -> &dyn Any {
self
}
}
impl Debug for EpollFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let inner = &self.inner;
f.debug_struct("EpollFile")
.field("epoll_fd", &inner.epoll_fd)
.finish()
}
}
pub trait AsEpoll {
fn as_epoll(&self) -> Result<&EpollFile>;
}
impl AsEpoll for FileRef {
fn as_epoll(&self) -> Result<&EpollFile> {
self.as_any()
.downcast_ref::<EpollFile>()
.ok_or_else(|| errno!(EBADF, "not a epoll"))
}
}

@ -0,0 +1,175 @@
use super::*;
#[derive(Debug, Copy, Clone)]
pub enum EpollCtlCmd {
/// Add a file decriptor to the interface
Add = 1,
/// Remove a file decriptor from the interface
Del = 2,
/// Change file decriptor epoll_event structre
Mod = 3,
}
impl TryFrom<i32> for EpollCtlCmd {
type Error = error::Error;
fn try_from(op_num: i32) -> Result<Self> {
match op_num {
1 => Ok(EpollCtlCmd::Add),
2 => Ok(EpollCtlCmd::Del),
3 => Ok(EpollCtlCmd::Mod),
_ => return_errno!(EINVAL, "invalid operation number"),
}
}
}
bitflags! {
#[derive(Default)]
pub struct EpollEventFlags: u32 {
// The available events are got from linux source.
// This struct contains more flags than linux man page described.
const EPOLLIN = 0x0001;
const EPOLLPRI = 0x0002;
const EPOLLOUT = 0x0004;
const EPOLLERR = 0x0008;
const EPOLLHUP = 0x0010;
const EPOLLNVAL = 0x0020;
const EPOLLRDNORM = 0x0040;
const EPOLLRDBAND = 0x0080;
const EPOLLWRNORM = 0x0100;
const EPOLLWRBAND = 0x0200;
const EPOLLMSG = 0x0400;
const EPOLLRDHUP = 0x2000;
const EPOLLEXCLUSIVE = (1 << 28);
const EPOLLWAKEUP = (1 << 29);
const EPOLLONESHOT = (1 << 30);
const EPOLLET = (1 << 31);
}
}
//TODO: Add more mitigations to protect from iago attacks
#[derive(Copy, Clone, Debug, Default)]
pub struct EpollEvent {
/// Epoll Events
events: EpollEventFlags,
/// Libos-agnostic user data variable
data: uint64_t,
}
impl EpollEvent {
pub fn new(events: EpollEventFlags, data: uint64_t) -> Self {
Self { events, data }
}
pub fn from_raw(epoll_event: &libc::epoll_event) -> Result<Self> {
Ok(Self::new(
EpollEventFlags::from_bits(epoll_event.events)
.ok_or_else(|| errno!(EINVAL, "invalid flags"))?,
epoll_event.u64,
))
}
pub fn to_raw(&self) -> libc::epoll_event {
libc::epoll_event {
events: self.events.bits(),
u64: self.data,
}
}
}
#[derive(Debug)]
pub struct EpollFile {
host_fd: c_int,
}
impl EpollFile {
/// Creates a new Linux epoll file descriptor
pub fn new(flags: CreationFlags) -> Result<Self> {
debug!("create epollfile: flags: {:?}", flags);
let host_fd = try_libc!(libc::ocall::epoll_create1(flags.bits() as i32));
Ok(Self { host_fd })
}
pub fn control(&self, op: EpollCtlCmd, fd: FileDesc, event: Option<&EpollEvent>) -> Result<()> {
let host_fd = {
let fd_ref = process::get_file(fd)?;
if let Ok(socket) = fd_ref.as_socket() {
socket.fd()
} else if let Ok(eventfd) = fd_ref.as_event() {
eventfd.get_host_fd()
} else {
return_errno!(EPERM, "unsupported file type");
}
};
// Notes on deadlock.
//
// All locks on fd (if any) will be released at this point. This means
// we don't have to worry about the potential deadlock caused by
// locking two files (say, fd and epfd) in an inconsistent order.
let raw_epevent_ptr: *mut libc::epoll_event = match event {
Some(epevent) => {
//TODO: Shoud be const.
// Cast const to mut to be compatiable with the ocall from rust sdk.
&mut epevent.to_raw()
}
_ => std::ptr::null_mut(),
};
try_libc!(libc::ocall::epoll_ctl(
self.host_fd,
op as i32,
host_fd,
raw_epevent_ptr,
));
Ok(())
}
/// Waits for an I/O event on the epoll file.
///
/// Returns the number of file descriptors ready for the requested I/O.
pub fn wait(&self, events: &mut [EpollEvent], timeout: c_int) -> Result<usize> {
let mut raw_events: Vec<libc::epoll_event> =
vec![libc::epoll_event { events: 0, u64: 0 }; events.len()];
let ret = try_libc!(libc::ocall::epoll_wait(
self.host_fd,
raw_events.as_mut_ptr(),
raw_events.len() as c_int,
timeout,
)) as usize;
assert!(ret <= events.len());
for i in 0..ret {
events[i] = EpollEvent::from_raw(&raw_events[i])?;
}
Ok(ret)
}
}
impl Drop for EpollFile {
fn drop(&mut self) {
unsafe {
libc::ocall::close(self.host_fd);
}
}
}
impl File for EpollFile {
fn as_any(&self) -> &dyn Any {
self
}
}
pub trait AsEpollFile {
fn as_epfile(&self) -> Result<&EpollFile>;
}
impl AsEpollFile for FileRef {
fn as_epfile(&self) -> Result<&EpollFile> {
self.as_any()
.downcast_ref::<EpollFile>()
.ok_or_else(|| errno!(EBADF, "not an epoll file"))
}
}

@ -0,0 +1,15 @@
use super::*;
mod epoll;
mod poll;
mod select;
pub use self::epoll::{AsEpollFile, EpollCtlCmd, EpollEvent, EpollEventFlags, EpollFile};
pub use self::poll::do_poll;
pub use self::select::do_select;
use fs::{AsDevRandom, AsEvent, CreationFlags, File, FileDesc, FileRef};
use std::any::Any;
use std::convert::TryFrom;
use std::fmt;
use std::sync::atomic::spin_loop_hint;

@ -0,0 +1,82 @@
use super::*;
pub fn do_poll(pollfds: &mut [libc::pollfd], timeout: c_int) -> Result<usize> {
debug!(
"poll: {:?}, timeout: {}",
pollfds.iter().map(|p| p.fd).collect::<Vec<_>>(),
timeout
);
// Untrusted pollfd's that will be modified by OCall
let mut u_pollfds: Vec<libc::pollfd> = pollfds.to_vec();
let current_ref = process::get_current();
let mut proc = current_ref.lock().unwrap();
for (i, pollfd) in pollfds.iter_mut().enumerate() {
// Poll should just ignore negative fds
if pollfd.fd < 0 {
u_pollfds[i].fd = -1;
u_pollfds[i].revents = 0;
continue;
}
let file_ref = proc
.get_files()
.lock()
.unwrap()
.get(pollfd.fd as FileDesc)?;
if let Ok(socket) = file_ref.as_socket() {
// convert libos fd to host fd in the copy to keep pollfds unchanged
u_pollfds[i].fd = socket.fd();
u_pollfds[i].revents = 0;
} else if let Ok(eventfd) = file_ref.as_event() {
u_pollfds[i].fd = eventfd.get_host_fd();
u_pollfds[i].revents = 0;
} else if let Ok(socket) = file_ref.as_unix_socket() {
// FIXME: spin poll until can read (hack for php)
while (pollfd.events & libc::POLLIN) != 0 && socket.poll()?.0 == false {
spin_loop_hint();
}
let (r, w, e) = socket.poll()?;
if r {
pollfd.revents |= libc::POLLIN;
}
if w {
pollfd.revents |= libc::POLLOUT;
}
pollfd.revents &= pollfd.events;
if e {
pollfd.revents |= libc::POLLERR;
}
warn!("poll unix socket is unimplemented, spin for read");
return Ok(1);
} else if let Ok(dev_random) = file_ref.as_dev_random() {
return Ok(dev_random.poll(pollfd)?);
} else {
return_errno!(EBADF, "not a supported file type");
}
}
// Unlock the current process as early as possible
drop(proc);
let num_events = try_libc!(libc::ocall::poll(
u_pollfds.as_mut_ptr(),
u_pollfds.len() as u64,
timeout
)) as usize;
assert!(num_events <= pollfds.len());
// Copy back revents from the untrusted pollfds
let mut num_nonzero_revents = 0;
for (i, pollfd) in pollfds.iter_mut().enumerate() {
if u_pollfds[i].revents == 0 {
continue;
}
pollfd.revents = u_pollfds[i].revents;
num_nonzero_revents += 1;
}
assert!(num_nonzero_revents == num_events);
Ok(num_events as usize)
}

@ -0,0 +1,142 @@
use super::*;
/// Forward to host `poll`
/// (sgx_libc doesn't have `select`)
pub fn do_select(
nfds: usize,
readfds: &mut libc::fd_set,
writefds: &mut libc::fd_set,
exceptfds: &mut libc::fd_set,
timeout: Option<libc::timeval>,
) -> Result<usize> {
debug!("select: nfds: {}", nfds);
// convert libos fd to Linux fd
let mut host_to_libos_fd = [0; libc::FD_SETSIZE];
let mut polls = Vec::<libc::pollfd>::new();
let current_ref = process::get_current();
let mut proc = current_ref.lock().unwrap();
let file_table = proc.get_files().lock().unwrap();
for fd in 0..nfds {
let fd_ref = file_table.get(fd as FileDesc)?;
let (r, w, e) = (
readfds.is_set(fd),
writefds.is_set(fd),
exceptfds.is_set(fd),
);
if !(r || w || e) {
continue;
}
if let Ok(socket) = fd_ref.as_unix_socket() {
warn!("select unix socket is unimplemented, spin for read");
readfds.clear();
writefds.clear();
exceptfds.clear();
// FIXME: spin poll until can read (hack for php)
while r && socket.poll()?.0 == false {
spin_loop_hint();
}
let (rr, ww, ee) = socket.poll()?;
if r && rr {
readfds.set(fd);
}
if w && ww {
writefds.set(fd);
}
if e && ee {
writefds.set(fd);
}
return Ok(1);
}
let host_fd = if let Ok(socket) = fd_ref.as_socket() {
socket.fd()
} else if let Ok(eventfd) = fd_ref.as_event() {
eventfd.get_host_fd()
} else {
return_errno!(EBADF, "unsupported file type");
};
host_to_libos_fd[host_fd as usize] = fd;
let mut events = 0;
if r {
events |= libc::POLLIN;
}
if w {
events |= libc::POLLOUT;
}
if e {
events |= libc::POLLERR;
}
polls.push(libc::pollfd {
fd: host_fd as c_int,
events,
revents: 0,
});
}
// Unlock the current process and its file table as early as possible
drop(file_table);
drop(proc);
let timeout = match timeout {
None => -1,
Some(tv) => (tv.tv_sec * 1000 + tv.tv_usec / 1000) as i32,
};
let ret = try_libc!(libc::ocall::poll(
polls.as_mut_ptr(),
polls.len() as u64,
timeout
));
// convert fd back and write fdset
readfds.clear();
writefds.clear();
exceptfds.clear();
for poll in polls.iter() {
let fd = host_to_libos_fd[poll.fd as usize];
if poll.revents & libc::POLLIN != 0 {
readfds.set(fd);
}
if poll.revents & libc::POLLOUT != 0 {
writefds.set(fd);
}
if poll.revents & libc::POLLERR != 0 {
exceptfds.set(fd);
}
}
Ok(ret as usize)
}
/// Safe methods for `libc::fd_set`
trait FdSetExt {
fn set(&mut self, fd: usize);
fn clear(&mut self);
fn is_set(&mut self, fd: usize) -> bool;
}
impl FdSetExt for libc::fd_set {
fn set(&mut self, fd: usize) {
assert!(fd < libc::FD_SETSIZE);
unsafe {
libc::FD_SET(fd as c_int, self);
}
}
fn clear(&mut self) {
unsafe {
libc::FD_ZERO(self);
}
}
fn is_set(&mut self, fd: usize) -> bool {
assert!(fd < libc::FD_SETSIZE);
unsafe { libc::FD_ISSET(fd as c_int, self) }
}
}

@ -9,6 +9,7 @@ mod socket_file;
mod syscalls;
mod unix_socket;
pub use self::io_multiplexing::EpollEvent;
pub use self::iovs::{Iovs, IovsMut, SliceAsLibcIovec};
pub use self::msg::{msghdr, msghdr_mut, MsgHdr, MsgHdrMut};
pub use self::msg_flags::{MsgHdrFlags, RecvFlags, SendFlags};

@ -1,8 +1,9 @@
use super::*;
use super::io_multiplexing;
use fs::{File, FileDesc, FileRef};
use super::io_multiplexing::{AsEpollFile, EpollCtlCmd, EpollEventFlags, EpollFile};
use fs::{CreationFlags, File, FileDesc, FileRef};
use process::Process;
use std::convert::TryFrom;
use util::mem_util::from_user;
pub fn do_sendmsg(fd: c_int, msg_ptr: *const msghdr, flags_c: c_int) -> Result<isize> {
@ -183,8 +184,16 @@ pub fn do_epoll_create(size: c_int) -> Result<isize> {
do_epoll_create1(0)
}
pub fn do_epoll_create1(flags: c_int) -> Result<isize> {
let fd = io_multiplexing::do_epoll_create1(flags)?;
pub fn do_epoll_create1(raw_flags: c_int) -> Result<isize> {
// Only O_CLOEXEC is valid
let flags = CreationFlags::from_bits(raw_flags as u32)
.ok_or_else(|| errno!(EINVAL, "invalid flags"))?
& CreationFlags::O_CLOEXEC;
let epoll_file = io_multiplexing::EpollFile::new(flags)?;
let file_ref: Arc<Box<dyn File>> = Arc::new(Box::new(epoll_file));
let close_on_spawn = flags.contains(CreationFlags::O_CLOEXEC);
let fd = process::put_file(file_ref, close_on_spawn)?;
Ok(fd as isize)
}
@ -194,30 +203,62 @@ pub fn do_epoll_ctl(
fd: c_int,
event: *const libc::epoll_event,
) -> Result<isize> {
if !event.is_null() {
debug!("epoll_ctl: epfd: {}, op: {:?}, fd: {}", epfd, op, fd);
let inner_event = if !event.is_null() {
from_user::check_ptr(event)?;
}
io_multiplexing::do_epoll_ctl(epfd as FileDesc, op, fd as FileDesc, event)?;
Some(EpollEvent::from_raw(unsafe { &*event })?)
} else {
None
};
let epfile_ref = process::get_file(epfd as FileDesc)?;
let epoll_file = epfile_ref.as_epfile()?;
epoll_file.control(
EpollCtlCmd::try_from(op)?,
fd as FileDesc,
inner_event.as_ref(),
)?;
Ok(0)
}
pub fn do_epoll_wait(
epfd: c_int,
events: *mut libc::epoll_event,
maxevents: c_int,
max_events: c_int,
timeout: c_int,
) -> Result<isize> {
let maxevents = {
if maxevents <= 0 {
let max_events = {
if max_events <= 0 {
return_errno!(EINVAL, "maxevents <= 0");
}
maxevents as usize
max_events as usize
};
let events = {
from_user::check_mut_array(events, maxevents)?;
unsafe { std::slice::from_raw_parts_mut(events, maxevents) }
let raw_events = {
from_user::check_mut_array(events, max_events)?;
unsafe { std::slice::from_raw_parts_mut(events, max_events) }
};
let count = io_multiplexing::do_epoll_wait(epfd as FileDesc, events, timeout)?;
// A new vector to store EpollEvent, which may degrade the performance due to extra copy.
let mut inner_events: Vec<EpollEvent> =
vec![EpollEvent::new(EpollEventFlags::empty(), 0); max_events];
debug!(
"epoll_wait: epfd: {}, len: {:?}, timeout: {}",
epfd,
raw_events.len(),
timeout
);
let epfile_ref = process::get_file(epfd as FileDesc)?;
let epoll_file = epfile_ref.as_epfile()?;
let count = epoll_file.wait(&mut inner_events, timeout)?;
for i in 0..count {
raw_events[i] = inner_events[i].to_raw();
}
Ok(count as isize)
}
@ -230,6 +271,8 @@ pub fn do_epoll_pwait(
) -> Result<isize> {
if !sigmask.is_null() {
warn!("epoll_pwait cannot handle signal mask, yet");
} else {
info!("epoll_wait");
}
do_epoll_wait(epfd, events, maxevents, timeout)
}

@ -17,8 +17,8 @@ use fs::{
use misc::{resource_t, rlimit_t, utsname_t};
use net::{
do_epoll_create, do_epoll_create1, do_epoll_ctl, do_epoll_pwait, do_epoll_wait, do_poll,
do_recvmsg, do_select, do_sendmsg, msghdr, msghdr_mut, AsSocket, AsUnixSocket, SocketFile,
UnixSocketFile,
do_recvmsg, do_select, do_sendmsg, msghdr, msghdr_mut, AsSocket, AsUnixSocket, EpollEvent,
SocketFile, UnixSocketFile,
};
use process::{pid_t, ChildProcessFilter, CloneFlags, CpuSet, FileAction, FutexFlags, FutexOp};
use std::any::Any;

@ -3,6 +3,7 @@
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
#define _STR(x) #x
#define STR(x) _STR(x)
@ -10,7 +11,6 @@
#define MAX(a, b) ((a) >= (b) ? (a) : (b))
#define ARRAY_SIZE(array) (sizeof(array)/sizeof(array[0]))
typedef int(*test_case_func_t)(void);
typedef struct {

@ -1,36 +1,36 @@
// Modified from https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/epoll-example.c
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <spawn.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "test.h"
#define MAXEVENTS 64
#define DEFAULT_PROC_NUM 3
#define DEFAULT_MSG "Hello World!\n"
static int
create_and_bind() {
static int create_and_bind() {
int listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (listenfd < 0) {
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return -1;
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
struct sockaddr_in servaddr = {0};
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(6667);
int reuse = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
perror("setsockopt port to reuse failed");
THROW_ERROR("setsockopt port to reuse failed");
int ret = bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
if (ret < 0) {
@ -40,160 +40,116 @@ create_and_bind() {
return listenfd;
}
int
main(int argc, char *argv[]) {
int sfd = create_and_bind();
int test_ip_socket() {
int ret = 0;
int server_fd = create_and_bind();
int s = listen(sfd, SOMAXCONN);
if (s == -1) {
perror("listen");
return -1;
ret = listen(server_fd, DEFAULT_PROC_NUM);
if (ret == -1) {
THROW_ERROR("failed to listen");
}
int efd = epoll_create1(0);
if (efd == -1) {
perror("epoll_create");
return -1;
int epfd = epoll_create1(0);
if (epfd == -1) {
close(server_fd);
THROW_ERROR("epoll_create failed");
}
struct epoll_event event;
event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1) {
perror("epoll_ctl");
return -1;
struct epoll_event listened_event;
listened_event.data.fd = server_fd;
listened_event.events = EPOLLIN | EPOLLET;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &listened_event);
if (ret == -1) {
close_files(2, server_fd, epfd);
THROW_ERROR("epoll_ctl failed");
}
/* Buffer where events are returned */
struct epoll_event *events = calloc(MAXEVENTS, sizeof event);
// spawn clients
int client_pid;
int proc_num = DEFAULT_PROC_NUM;
char* client_argv[] = {"client", "127.0.0.1", "6667", NULL};
for(int i=0; i<DEFAULT_PROC_NUM; ++i) {
int ret = posix_spawn(&client_pid, "/bin/client", NULL, NULL, client_argv, NULL);
if (ret < 0) {
printf("spawn client process error: %s(errno: %d), %d process(es) spawned\n", strerror(errno), errno, i);
if (i == 0) {
perror("no client is successfully spawned");
return -1;
close_files(2, server_fd, epfd);
THROW_ERROR("no client is successfully spawned");
} else {
printf("%d client(s) spawned\n", i);
proc_num = i;
break;
}
}
}
/* The event loop */
int done_count = 0;
while (done_count < proc_num) {
int n = epoll_pwait(efd, events, MAXEVENTS, -1, NULL);
for (int i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (sfd == events[i].data.fd) {
/* We have a notification on the listening socket, which
means one or more incoming connections. */
int count = 0;
while (count < proc_num) {
struct epoll_event events[MAXEVENTS] = {0};
int nfds = epoll_pwait(epfd, events, MAXEVENTS, -1, NULL);
if (nfds == -1) {
close_files(2, server_fd, epfd);
THROW_ERROR("epoll_wait failed");
}
for (int i = 0; i < nfds; i++) {
if (server_fd == events[i].data.fd) {
// There is incoming connection to server_fd.
// Loop to accept all the connections.
while (1) {
struct sockaddr in_addr;
struct sockaddr in_addr = {0};
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept4(sfd, &in_addr, &in_len, SOCK_NONBLOCK);
if (infd == -1) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
/* We have processed all incoming
connections. */
int in_fd;
in_len = sizeof(in_addr);
in_fd = accept4(server_fd, &in_addr, &in_len, SOCK_NONBLOCK);
if (in_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No pending connections are present.
break;
} else {
perror("accept");
break;
close_files(2, server_fd, epfd);
THROW_ERROR("unexpected accept error");
}
}
s = getnameinfo(&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0) {
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}
// add it to the list of fds to monitor
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1) {
perror("epoll_ctl");
return -1;
struct epoll_event client_event;
client_event.data.fd = in_fd;
client_event.events = EPOLLIN | EPOLLET;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, in_fd, &client_event);
if (ret == -1) {
close_files(2, server_fd, epfd);
THROW_ERROR("epoll_ctl failed");
}
}
continue;
} else if (events[i].events & EPOLLIN) {
// Channel is ready to read.
char buf[36];
if ((read(events[i].data.fd, buf, sizeof buf)) != 0) {
if(strcmp(buf, DEFAULT_MSG) != 0) {
close_files(2, server_fd, epfd);
THROW_ERROR("msg mismatched");
}
} else {
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;
while (1) {
ssize_t count;
char buf[512];
count = read(events[i].data.fd, buf, sizeof buf);
if (count == -1) {
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN) {
perror("read");
done = 1;
}
break;
} else if (count == 0) {
/* End of file. The remote has closed the
connection. */
done = 1;
break;
close_files(2, server_fd, epfd);
THROW_ERROR("read error");
}
/* Write the buffer to standard output */
s = write(1, buf, count);
if (s == -1) {
perror("write");
return -1;
}
}
if (done) {
printf("Closed connection on descriptor %d\n",
events[i].data.fd);
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close(events[i].data.fd);
done_count ++;
}
// Finish communication with one process.
count++;
} else {
close_files(2, server_fd, epfd);
THROW_ERROR("should never reach here");
}
}
}
free(events);
close(sfd);
return EXIT_SUCCESS;
close_files(2, server_fd, epfd);
return 0;
}
static test_case_t test_cases[] = {
TEST_CASE(test_ip_socket),
};
int main(int argc, const char* argv[]) {
return test_suite_run(test_cases, ARRAY_SIZE(test_cases));
}