Fix a design flaw in handling events of host files
This commit is contained in:
parent
798cbfd843
commit
a857cf9bfb
@ -116,11 +116,11 @@ impl File for EventFile {
|
||||
Some(&self.host_fd)
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.store(*events, Ordering::Release);
|
||||
fn update_host_events(&self, ready: &IoEvents, mask: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.update(ready, mask, Ordering::Release);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
self.notifier.broadcast(ready);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
use atomic::{Atomic, Ordering};
|
||||
|
||||
use crate::events::{Event, EventFilter, Notifier, Observer};
|
||||
use crate::prelude::*;
|
||||
|
||||
@ -38,6 +40,32 @@ impl IoEvents {
|
||||
|
||||
impl Event for IoEvents {}
|
||||
|
||||
pub trait AtomicIoEvents {
|
||||
/// Update the IoEvents atomically.
|
||||
///
|
||||
/// The update is equivalent to the following assignment
|
||||
/// ```
|
||||
/// self.store(self.load(Ordering::Relaxed) & !*mask | *ready, ordering)
|
||||
/// ```
|
||||
fn update(&self, ready: &IoEvents, mask: &IoEvents, ordering: Ordering);
|
||||
}
|
||||
|
||||
impl AtomicIoEvents for Atomic<IoEvents> {
|
||||
fn update(&self, ready: &IoEvents, mask: &IoEvents, ordering: Ordering) {
|
||||
loop {
|
||||
let old_val = self.load(Ordering::Relaxed);
|
||||
let new_val = old_val & !*mask | *ready;
|
||||
let success_ordering = ordering;
|
||||
let failure_ordering = Ordering::Relaxed;
|
||||
if let Ok(_) =
|
||||
self.compare_exchange(old_val, new_val, success_ordering, failure_ordering)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventFilter<IoEvents> for IoEvents {
|
||||
fn filter(&self, events: &IoEvents) -> bool {
|
||||
self.intersects(*events)
|
||||
|
@ -123,13 +123,13 @@ pub trait File: Debug + Sync + Send + Any {
|
||||
return None;
|
||||
}
|
||||
|
||||
/// Receive events from the host.
|
||||
/// Update the ready events of a host file.
|
||||
///
|
||||
/// After calling this method, the `poll` method of the `File` trait will
|
||||
/// return the latest events on the `HostFile`.
|
||||
/// return the latest event state on the `HostFile`.
|
||||
///
|
||||
/// This method has no effect if the `host_fd` method returns `None`.
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {}
|
||||
fn update_host_events(&self, ready: &IoEvents, mask: &IoEvents, trigger_notifier: bool) {}
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ use untrusted::{SliceAsMutPtrAndLen, SliceAsPtrAndLen};
|
||||
|
||||
pub use self::dev_fs::AsDevRandom;
|
||||
pub use self::event_file::{AsEvent, EventCreationFlags, EventFile};
|
||||
pub use self::events::{IoEvents, IoNotifier};
|
||||
pub use self::events::{AtomicIoEvents, IoEvents, IoNotifier};
|
||||
pub use self::file::{File, FileRef};
|
||||
pub use self::file_ops::{
|
||||
occlum_ocall_ioctl, AccessMode, BuiltinIoctlNum, CreationFlags, FileMode, Flock, FlockType,
|
||||
|
@ -6,11 +6,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
|
||||
use atomic::Atomic;
|
||||
|
||||
use super::epoll_waiter::EpollWaiter;
|
||||
use super::host_file_epoller::HostFileEpoller;
|
||||
use super::{EpollCtl, EpollEvent, EpollFlags};
|
||||
use crate::events::{Observer, Waiter, WaiterQueue};
|
||||
use crate::fs::{File, HostFd, IoEvents, IoNotifier};
|
||||
use crate::fs::{AtomicIoEvents, File, HostFd, IoEvents, IoNotifier};
|
||||
use crate::prelude::*;
|
||||
|
||||
// TODO: Prevent two epoll files from monitoring each other, which may cause
|
||||
@ -54,6 +56,8 @@ pub struct EpollFile {
|
||||
host_file_epoller: HostFileEpoller,
|
||||
// Any EpollFile is wrapped with Arc when created.
|
||||
weak_self: Weak<Self>,
|
||||
// Host events
|
||||
host_events: Atomic<IoEvents>,
|
||||
}
|
||||
|
||||
impl EpollFile {
|
||||
@ -64,6 +68,7 @@ impl EpollFile {
|
||||
let notifier = IoNotifier::new();
|
||||
let host_file_epoller = HostFileEpoller::new();
|
||||
let weak_self = Default::default();
|
||||
let host_events = Atomic::new(IoEvents::empty());
|
||||
|
||||
Self {
|
||||
interest,
|
||||
@ -72,6 +77,7 @@ impl EpollFile {
|
||||
notifier,
|
||||
host_file_epoller,
|
||||
weak_self,
|
||||
host_events,
|
||||
}
|
||||
.wrap_self()
|
||||
}
|
||||
@ -119,7 +125,12 @@ impl EpollFile {
|
||||
let waiter = EpollWaiter::new(&self.host_file_epoller);
|
||||
|
||||
loop {
|
||||
// Poll the latest states of the interested host files
|
||||
// Poll the latest states of the interested host files. If a host
|
||||
// file is ready, then it will be pushed into the ready list. Note
|
||||
// that this is the only way through which a host file can appear in
|
||||
// the ready list. This ensures that only the host files whose
|
||||
// events are update-to-date will be returned, reducing the chances
|
||||
// of false positive results to the minimum.
|
||||
self.host_file_epoller.poll_events(max_count);
|
||||
|
||||
// Prepare for the waiter.wait_mut() at the end of the loop
|
||||
@ -167,7 +178,11 @@ impl EpollFile {
|
||||
.intersects(EpollFlags::EDGE_TRIGGER | EpollFlags::ONE_SHOT)
|
||||
{
|
||||
drop(inner);
|
||||
reinsert.push_back(ep_entry);
|
||||
|
||||
// Host files should not be reinserted into the ready list
|
||||
if ep_entry.file.host_fd().is_none() {
|
||||
reinsert.push_back(ep_entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -230,6 +245,7 @@ impl EpollFile {
|
||||
if ep_entry.file.host_fd().is_some() {
|
||||
self.host_file_epoller
|
||||
.add_file(ep_entry.file.clone(), event, flags);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
@ -281,6 +297,7 @@ impl EpollFile {
|
||||
if ep_entry.file.host_fd().is_some() {
|
||||
self.host_file_epoller
|
||||
.mod_file(&ep_entry.file, event, flags);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ep_entry
|
||||
@ -365,7 +382,11 @@ impl EpollFile {
|
||||
|
||||
impl File for EpollFile {
|
||||
fn poll_new(&self) -> IoEvents {
|
||||
if !self.host_file_epoller.poll().is_empty() {
|
||||
if self
|
||||
.host_events
|
||||
.load(Ordering::Acquire)
|
||||
.contains(IoEvents::IN)
|
||||
{
|
||||
return IoEvents::IN;
|
||||
}
|
||||
|
||||
@ -385,11 +406,11 @@ impl File for EpollFile {
|
||||
Some(self.host_file_epoller.host_fd())
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_file_epoller.recv_host_events(events);
|
||||
fn update_host_events(&self, ready: &IoEvents, mask: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.update(ready, mask, Ordering::Release);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
self.notifier.broadcast(ready);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,14 +10,12 @@ use crate::prelude::*;
|
||||
#[derive(Debug)]
|
||||
pub struct HostFileEpoller {
|
||||
/// A map from host fd to HostFile, which maintains the set of the interesting
|
||||
/// host files.
|
||||
host_files: SgxMutex<HashMap<FileDesc, FileRef>>,
|
||||
/// host files and their interesting events.
|
||||
host_files_and_events: SgxMutex<HashMap<FileDesc, (FileRef, IoEvents)>>,
|
||||
/// The number of the interesting host files.
|
||||
count: AtomicUsize,
|
||||
/// The host fd of the underlying host epoll file.
|
||||
host_epoll_fd: HostFd,
|
||||
/// Whether the host epoll file has any events.
|
||||
is_ready: AtomicBool,
|
||||
}
|
||||
|
||||
// TODO: the `add/mod/del_file` operation can be postponed until a `poll_files` operation,
|
||||
@ -25,7 +23,7 @@ pub struct HostFileEpoller {
|
||||
|
||||
impl HostFileEpoller {
|
||||
pub fn new() -> Self {
|
||||
let host_files = Default::default();
|
||||
let host_files_and_events = Default::default();
|
||||
let count = Default::default();
|
||||
let host_epoll_fd = {
|
||||
let raw_host_fd = (|| -> Result<u32> {
|
||||
@ -36,19 +34,19 @@ impl HostFileEpoller {
|
||||
|
||||
HostFd::new(raw_host_fd)
|
||||
};
|
||||
let is_ready = Default::default();
|
||||
Self {
|
||||
host_files,
|
||||
host_files_and_events,
|
||||
count,
|
||||
host_epoll_fd,
|
||||
is_ready,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_file(&self, host_file: FileRef, event: EpollEvent, flags: EpollFlags) -> Result<()> {
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
let mut host_files_and_events = self.host_files_and_events.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let already_added = host_files.insert(host_fd, host_file.clone()).is_some();
|
||||
let already_added = host_files_and_events
|
||||
.insert(host_fd, (host_file.clone(), event.mask))
|
||||
.is_some();
|
||||
if already_added {
|
||||
// TODO: handle the case where one host file is somehow to be added more than once.
|
||||
warn!(
|
||||
@ -62,7 +60,7 @@ impl HostFileEpoller {
|
||||
self.do_epoll_ctl(libc::EPOLL_CTL_ADD, &host_file, Some((event, flags)))
|
||||
|
||||
// Concurrency note:
|
||||
// The lock on self.host_files must be hold while invoking
|
||||
// The lock on self.host_files_and_events must be hold while invoking
|
||||
// do_epoll_ctl to prevent race conditions that cause the OCall to fail.
|
||||
// This same argument applies to mod_file and del_file methods.
|
||||
}
|
||||
@ -70,23 +68,28 @@ impl HostFileEpoller {
|
||||
pub fn mod_file(
|
||||
&self,
|
||||
host_file: &FileRef,
|
||||
event: EpollEvent,
|
||||
flags: EpollFlags,
|
||||
new_event: EpollEvent,
|
||||
new_flags: EpollFlags,
|
||||
) -> Result<()> {
|
||||
let host_files = self.host_files.lock().unwrap();
|
||||
let mut host_files_and_events = self.host_files_and_events.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let not_added = !host_files.contains_key(&host_fd);
|
||||
if not_added {
|
||||
return_errno!(ENOENT, "the host file must be added before modifying");
|
||||
}
|
||||
let event = match host_files_and_events.get_mut(&host_fd) {
|
||||
None => return_errno!(ENOENT, "the host file must be added before modifying"),
|
||||
Some((_, event)) => event,
|
||||
};
|
||||
*event = new_event.mask;
|
||||
|
||||
self.do_epoll_ctl(libc::EPOLL_CTL_MOD, &host_file, Some((event, flags)))
|
||||
self.do_epoll_ctl(
|
||||
libc::EPOLL_CTL_MOD,
|
||||
&host_file,
|
||||
Some((new_event, new_flags)),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn del_file(&self, host_file: &FileRef) -> Result<()> {
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
let mut host_files_and_events = self.host_files_and_events.lock().unwrap();
|
||||
let host_fd = host_file.host_fd().unwrap().to_raw();
|
||||
let not_added = !host_files.remove(&host_fd).is_some();
|
||||
let not_added = !host_files_and_events.remove(&host_fd).is_some();
|
||||
if not_added {
|
||||
return_errno!(ENOENT, "the host file must be added before deleting");
|
||||
}
|
||||
@ -153,13 +156,13 @@ impl HostFileEpoller {
|
||||
|
||||
// Use the polled events from the host to update the states of the
|
||||
// corresponding host files
|
||||
let mut host_files = self.host_files.lock().unwrap();
|
||||
let mut host_files_and_events = self.host_files_and_events.lock().unwrap();
|
||||
for raw_event in &raw_events[..count] {
|
||||
let raw_event = unsafe { raw_event.assume_init() };
|
||||
let io_events = IoEvents::from_raw(raw_event.events as u32);
|
||||
let host_fd = raw_event.u64 as u32;
|
||||
|
||||
let host_file = match host_files.get(&host_fd) {
|
||||
let (host_file, mask) = match host_files_and_events.get(&host_fd) {
|
||||
None => {
|
||||
count -= 1;
|
||||
// The corresponding host file may be deleted
|
||||
@ -168,7 +171,7 @@ impl HostFileEpoller {
|
||||
Some(host_file) => host_file,
|
||||
};
|
||||
|
||||
host_file.recv_host_events(&io_events, true);
|
||||
host_file.update_host_events(&io_events, mask, true);
|
||||
}
|
||||
count
|
||||
}
|
||||
@ -176,19 +179,6 @@ impl HostFileEpoller {
|
||||
pub fn host_fd(&self) -> &HostFd {
|
||||
&self.host_epoll_fd
|
||||
}
|
||||
|
||||
pub fn poll(&self) -> IoEvents {
|
||||
if self.is_ready.load(Ordering::Acquire) {
|
||||
IoEvents::IN
|
||||
} else {
|
||||
IoEvents::empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv_host_events(&self, events: &IoEvents) {
|
||||
let is_ready = events.contains(IoEvents::IN);
|
||||
self.is_ready.store(is_ready, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HostFileEpoller {
|
||||
|
@ -4,7 +4,7 @@ use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::events::{Observer, Waiter, WaiterQueueObserver};
|
||||
use crate::fs::IoEvents;
|
||||
use crate::fs::{AtomicIoEvents, IoEvents};
|
||||
use crate::prelude::*;
|
||||
use crate::time::{timespec_t, TIMERSLACK};
|
||||
|
||||
@ -41,10 +41,10 @@ impl EventMonitor {
|
||||
}
|
||||
|
||||
/// Returns an iterator for the host files in the set of the interesting files.
|
||||
pub fn host_files(&self) -> impl Iterator<Item = &FileRef> {
|
||||
pub fn host_files_and_events(&self) -> impl Iterator<Item = &(FileRef, IoEvents)> {
|
||||
self.host_file_idxes
|
||||
.iter()
|
||||
.map(move |idx| &self.files_and_events[*idx].0)
|
||||
.map(move |idx| &self.files_and_events[*idx])
|
||||
}
|
||||
|
||||
/// Reset the monitor so that it can wait for new events.
|
||||
@ -80,7 +80,7 @@ impl EventMonitor {
|
||||
// 4. the time is up.
|
||||
let num_events = self.do_poll_ocall(&mut timeout)?;
|
||||
|
||||
self.recv_host_file_events(num_events);
|
||||
self.update_host_file_events(num_events);
|
||||
|
||||
// Poll syscall does not treat timeout as error. So we need
|
||||
// to distinguish the case by ourselves.
|
||||
@ -104,7 +104,7 @@ impl EventMonitor {
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
self.recv_host_file_events(num_events);
|
||||
self.update_host_file_events(num_events);
|
||||
}
|
||||
|
||||
fn do_poll_ocall(&mut self, timeout: &mut Option<&mut Duration>) -> Result<usize> {
|
||||
@ -148,22 +148,19 @@ impl EventMonitor {
|
||||
Ok(num_events)
|
||||
}
|
||||
|
||||
fn recv_host_file_events(&self, num_events: usize) {
|
||||
fn update_host_file_events(&self, num_events: usize) {
|
||||
if num_events == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
// According to the output pollfds, update the states of the corresponding host files
|
||||
let output_pollfds = self.ocall_pollfds[..self.ocall_pollfds.len() - 1].iter();
|
||||
for (pollfd, host_file) in output_pollfds.zip(self.host_files()) {
|
||||
for (pollfd, (host_file, mask)) in output_pollfds.zip(self.host_files_and_events()) {
|
||||
let revents = {
|
||||
if pollfd.revents == 0 {
|
||||
continue;
|
||||
}
|
||||
assert!((pollfd.revents & libc::POLLNVAL) == 0);
|
||||
IoEvents::from_raw(pollfd.revents as u32)
|
||||
};
|
||||
host_file.recv_host_events(&revents, false);
|
||||
host_file.update_host_events(&revents, mask, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ use atomic::{Atomic, Ordering};
|
||||
|
||||
use super::*;
|
||||
use crate::fs::{
|
||||
occlum_ocall_ioctl, AccessMode, CreationFlags, File, FileRef, HostFd, IoEvents, IoctlCmd,
|
||||
StatusFlags,
|
||||
occlum_ocall_ioctl, AccessMode, AtomicIoEvents, CreationFlags, File, FileRef, HostFd, IoEvents,
|
||||
IoctlCmd, StatusFlags,
|
||||
};
|
||||
|
||||
//TODO: refactor write syscall to allow zero length with non-zero buffer
|
||||
@ -89,11 +89,11 @@ impl File for HostSocket {
|
||||
Some(&self.notifier)
|
||||
}
|
||||
|
||||
fn recv_host_events(&self, events: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.store(*events, Ordering::Release);
|
||||
fn update_host_events(&self, ready: &IoEvents, mask: &IoEvents, trigger_notifier: bool) {
|
||||
self.host_events.update(ready, mask, Ordering::Release);
|
||||
|
||||
if trigger_notifier {
|
||||
self.notifier.broadcast(events);
|
||||
self.notifier.broadcast(ready);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user