diff --git a/src/libos/include/syscall.h b/src/libos/include/syscall.h index 59d74050..a7cd296d 100644 --- a/src/libos/include/syscall.h +++ b/src/libos/include/syscall.h @@ -17,6 +17,7 @@ extern ssize_t occlum_write(int fd, const void* buf, size_t size); extern ssize_t occlum_readv(int fd, struct iovec* iov, int count); extern ssize_t occlum_writev(int fd, const struct iovec* iov, int count); extern off_t occlum_lseek(int fd, off_t offset, int whence); +extern int occlum_pipe(int fds[2]); extern int occlum_spawn(int* child_pid, const char* path, const char** argv, diff --git a/src/libos/src/fs/file.rs b/src/libos/src/fs/file.rs index f066701b..4dd4d07c 100644 --- a/src/libos/src/fs/file.rs +++ b/src/libos/src/fs/file.rs @@ -269,7 +269,7 @@ impl File for StdoutFile { } fn read(&self, buf: &mut [u8]) -> Result { - Err(Error::new(Errno::EBADF, "Stdout does not support reading")) + Err(Error::new(Errno::EBADF, "Stdout does not support read")) } fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result { @@ -364,7 +364,7 @@ impl File for StdinFile { } fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result { - Err(Error::new(Errno::EBADF, "Stdin does not support reading")) + Err(Error::new(Errno::EBADF, "Stdin does not support write")) } } diff --git a/src/libos/src/fs/mod.rs b/src/libos/src/fs/mod.rs index b2cc13b3..5f3cb123 100644 --- a/src/libos/src/fs/mod.rs +++ b/src/libos/src/fs/mod.rs @@ -5,9 +5,11 @@ use std::sgxfs as fs_impl; mod file; mod file_table; +mod pipe; pub use self::file::{File, FileRef, SgxFile, StdinFile, StdoutFile}; pub use self::file_table::{FileDesc, FileTable}; +pub use self::pipe::{Pipe}; pub const O_RDONLY : u32 = 0x00000000; pub const O_WRONLY : u32 = 0x00000001; @@ -18,7 +20,7 @@ pub const O_APPEND : u32 = 0x00000400; // TODO: use the type defined in Rust libc. // -// However, off_t is defined as i64 in the current Rust SGX SDK, which is +// However, off_t is defined as u64 in the current Rust SGX SDK, which is // wrong (see issue https://github.com/baidu/rust-sgx-sdk/issues/46) #[allow(non_camel_case_types)] pub type off_t = i64; @@ -108,3 +110,14 @@ pub fn do_close(fd: FileDesc) -> Result<(), Error> { None => Err(Error::new(Errno::EBADF, "Invalid file descriptor [do_close]")), } } + +pub fn do_pipe() -> Result<[FileDesc; 2], Error> { + let current_ref = process::get_current(); + let mut current = current_ref.lock().unwrap(); + let pipe = Pipe::new()?; + + let mut file_table = current.get_files_mut(); + let reader_fd = file_table.put(Arc::new(Box::new(pipe.reader))); + let writer_fd = file_table.put(Arc::new(Box::new(pipe.writer))); + Ok([reader_fd, writer_fd]) +} diff --git a/src/libos/src/fs/pipe.rs b/src/libos/src/fs/pipe.rs new file mode 100644 index 00000000..57cd4099 --- /dev/null +++ b/src/libos/src/fs/pipe.rs @@ -0,0 +1,126 @@ +use super::*; +use util::{new_ring_buf, RingBufReader, RingBufWriter}; + +// TODO: Use Waiter and WaitQueue infrastructure to sleep when blocking + +pub const PIPE_BUF_SIZE : usize = 2 * 1024 * 1024; + +#[derive(Debug)] +pub struct Pipe { + pub reader: PipeReader, + pub writer: PipeWriter, +} + +impl Pipe { + pub fn new() -> Result { + let (reader, writer) = new_ring_buf(PIPE_BUF_SIZE); + Ok(Pipe { + reader: PipeReader { + inner: SgxMutex::new(reader), + }, + writer: PipeWriter { + inner: SgxMutex::new(writer), + } + }) + } +} + + +#[derive(Debug)] +pub struct PipeReader { + inner: SgxMutex, +} + +impl File for PipeReader { + fn read(&self, buf: &mut [u8]) -> Result { + let ringbuf = self.inner.lock().unwrap(); + ringbuf.read(buf) + } + + fn readv<'a, 'b>(&self, bufs: &'a mut [&'b mut [u8]]) -> Result { + let mut ringbuf = self.inner.lock().unwrap(); + let mut total_bytes = 0; + for buf in bufs { + match ringbuf.read(buf) { + Ok(this_len) => { + total_bytes += this_len; + if this_len < buf.len() { break; } + } + Err(e) => { + match total_bytes { + // a complete failure + 0 => return Err(e), + // a partially failure + _ => break, + } + } + } + } + Ok(total_bytes) + } + + fn write(&self, buf: &[u8]) -> Result { + Err(Error::new(Errno::EBADF, "PipeReader does not support write")) + } + + fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result { + Err(Error::new(Errno::EBADF, "PipeReader does not support write")) + } + + fn seek(&self, pos: SeekFrom) -> Result { + Err(Error::new(Errno::ESPIPE, "Pipe does not support seek")) + } +} + +unsafe impl Send for PipeReader {} +unsafe impl Sync for PipeReader {} + + +#[derive(Debug)] +pub struct PipeWriter { + inner: SgxMutex, +} + +impl File for PipeWriter { + fn write(&self, buf: &[u8]) -> Result { + let ringbuf = self.inner.lock().unwrap(); + ringbuf.write(buf) + } + + fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result { + let ringbuf = self.inner.lock().unwrap(); + let mut total_bytes = 0; + for buf in bufs { + match ringbuf.write(buf) { + Ok(this_len) => { + total_bytes += this_len; + if this_len < buf.len() { break; } + } + Err(e) => { + match total_bytes { + // a complete failure + 0 => return Err(e), + // a partially failure + _ => break, + } + } + } + } + Ok(total_bytes) + } + + fn read(&self, buf: &mut [u8]) -> Result { + Err(Error::new(Errno::EBADF, "PipeWriter does not support read")) + } + + fn readv<'a, 'b>(&self, bufs: &'a mut [&'b mut [u8]]) -> Result { + Err(Error::new(Errno::EBADF, "PipeWriter does not support read")) + } + + fn seek(&self, seek_pos: SeekFrom) -> Result { + Err(Error::new(Errno::ESPIPE, "Pipe does not support seek")) + } +} + +unsafe impl Send for PipeWriter {} +unsafe impl Sync for PipeWriter {} diff --git a/src/libos/src/process/spawn/mod.rs b/src/libos/src/process/spawn/mod.rs index cd52ab79..650fb52f 100644 --- a/src/libos/src/process/spawn/mod.rs +++ b/src/libos/src/process/spawn/mod.rs @@ -55,7 +55,7 @@ pub fn do_spawn>(elf_path: &P, argv: &[CString], envp: &[CString] let stack_top = vm.get_stack_top(); init_task(program_entry, stack_top, argv, envp)? }; - let files = init_files()?; + let files = init_files(parent_ref)?; let exec_path = elf_path.as_ref().to_str().unwrap(); Process::new(exec_path, task, vm, files)? }; @@ -65,9 +65,17 @@ pub fn do_spawn>(elf_path: &P, argv: &[CString], envp: &[CString] Ok(new_pid) } -fn init_files() -> Result { - let mut file_table = FileTable::new(); +fn init_files(parent_ref: &ProcessRef) -> Result { + // Usually, we just inherit the file table from the parent + let parent = parent_ref.lock().unwrap(); + let should_inherit_file_table = parent.get_pid() > 0; + if should_inherit_file_table { + return Ok(parent.get_files().clone()); + } + drop(parent); + // But, for init process, we initialize file table for it + let mut file_table = FileTable::new(); let stdin : Arc> = Arc::new(Box::new(StdinFile::new())); let stdout : Arc> = Arc::new(Box::new(StdoutFile::new())); // TODO: implement and use a real stderr @@ -75,7 +83,6 @@ fn init_files() -> Result { file_table.put(stdin); file_table.put(stdout); file_table.put(stderr); - Ok(file_table) } diff --git a/src/libos/src/syscall/mod.rs b/src/libos/src/syscall/mod.rs index 4aa3c23e..9c7abf0e 100644 --- a/src/libos/src/syscall/mod.rs +++ b/src/libos/src/syscall/mod.rs @@ -237,6 +237,28 @@ pub extern "C" fn occlum_brk(addr: *const c_void) -> *const c_void { } } +fn do_pipe(fds_u: *mut c_int) -> Result<(), Error> { + check_mut_array_from_user(fds_u, 2)?; + let fds = fs::do_pipe()?; + unsafe { + *fds_u.offset(0) = fds[0] as c_int; + *fds_u.offset(1) = fds[1] as c_int; + } + Ok(()) +} + +#[no_mangle] +pub extern "C" fn occlum_pipe(fds: *mut c_int) -> c_int { + match do_pipe(fds) { + Ok(()) => { + 0 + }, + Err(e) => { + e.errno.as_retval() + } + } +} + #[no_mangle] pub extern "C" fn occlum_open(path_buf: * const c_char, flags: c_int, mode: c_int) -> c_int { let path = unsafe { diff --git a/src/libos/src/syscall/syscall_entry.c b/src/libos/src/syscall/syscall_entry.c index b172e2e4..6dbe83c1 100644 --- a/src/libos/src/syscall/syscall_entry.c +++ b/src/libos/src/syscall/syscall_entry.c @@ -114,6 +114,11 @@ long dispatch_syscall(int num, long arg0, long arg1, long arg2, long arg3, long ret = (long) occlum_brk(addr); break; } + case SYS_pipe: { + DECL_SYSCALL_ARG(int*, fds, arg0); + ret = (long) occlum_pipe(fds); + break; + } default: ret = occlum_unknown(num); break; diff --git a/src/libos/src/util/mod.rs b/src/libos/src/util/mod.rs index 35942ab0..018640b4 100644 --- a/src/libos/src/util/mod.rs +++ b/src/libos/src/util/mod.rs @@ -1,5 +1,8 @@ -use prelude::*; +use super::*; pub use self::mpx_util::{*}; +pub use self::ring_buf::{RingBufReader, RingBufWriter}; +pub use self::ring_buf::with_fixed_capacity as new_ring_buf; mod mpx_util; +mod ring_buf; diff --git a/src/libos/src/util/ring_buf.rs b/src/libos/src/util/ring_buf.rs new file mode 100644 index 00000000..65be50d2 --- /dev/null +++ b/src/libos/src/util/ring_buf.rs @@ -0,0 +1,182 @@ +use super::{*}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; +use std::cmp::{min, max}; +use std::{ptr}; + +pub fn with_fixed_capacity(capacity: usize) -> (RingBufReader, RingBufWriter) { + let inner = Arc::new(RingBufInner::new(capacity)); + let reader = RingBufReader { inner: inner.clone() }; + let writer = RingBufWriter { inner: inner }; + (reader, writer) +} + +#[derive(Debug)] +pub struct RingBufReader { + inner: Arc, +} + +#[derive(Debug)] +pub struct RingBufWriter { + inner: Arc, +} + +#[derive(Debug)] +struct RingBufInner { + buf: *mut u8, + capacity: usize, + head: AtomicUsize, // write to head + tail: AtomicUsize, // read from tail + closed: AtomicBool, // if reader has been dropped +} + +impl RingBufInner { + fn new(capacity: usize) -> RingBufInner { + let capacity = max(capacity, 16).next_power_of_two(); + RingBufInner { + buf: unsafe { + let mut buf_ptr = ptr::null_mut(); + libc::posix_memalign(&mut buf_ptr, 16, capacity); + assert!(buf_ptr != ptr::null_mut()); + buf_ptr as *mut u8 + }, + capacity: capacity, + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), + closed: AtomicBool::new(false), + } + } + + fn get_mask(&self) -> usize { + self.capacity - 1 // Note that capacity is a power of two + } + + fn get_head(&self) -> usize { + self.head.load(Ordering::SeqCst) + } + + fn get_tail(&self) -> usize { + self.tail.load(Ordering::SeqCst) + } + + fn set_head(&self, new_head: usize) { + self.head.store(new_head, Ordering::SeqCst) + } + + fn set_tail(&self, new_tail: usize) { + self.head.store(new_tail, Ordering::SeqCst) + } + + fn is_closed(&self) -> bool { + self.closed.load(Ordering::SeqCst) + } + + fn close(&self) { + self.closed.store(true, Ordering::SeqCst); + } + + unsafe fn read_at(&self, pos: usize, dst_buf: &mut [u8]) { + let dst_ptr = dst_buf.as_mut_ptr(); + let dst_len = dst_buf.len(); + let src_ptr = self.buf.offset(pos as isize); + unsafe { + src_ptr.copy_to_nonoverlapping(dst_ptr, dst_len); + } + } + + unsafe fn write_at(&self, pos: usize, src_buf: &[u8]) { + let src_ptr = src_buf.as_ptr(); + let src_len = src_buf.len(); + let dst_ptr = self.buf.offset(pos as isize); + unsafe { + dst_ptr.copy_from_nonoverlapping(src_ptr, src_len); + } + } +} + +impl Drop for RingBufInner { + fn drop(&mut self) { + unsafe { libc::free(self.buf as *mut c_void) } + } +} + + +impl RingBufReader { + pub fn read(&self, buf: &mut [u8]) -> Result { + let mut tail = self.inner.get_tail(); + let mut buf_remain = buf.len(); + let mut buf_pos = 0; + while buf_remain > 0 { + let head = self.inner.get_head(); + + let read_nbytes = { + let may_read_nbytes = if tail <= head { + head - tail + } else { + self.inner.capacity - tail + }; + if may_read_nbytes == 0 { break; } + + min(may_read_nbytes, buf_remain) + }; + + let dst_buf = &mut buf[buf_pos..(buf_pos + read_nbytes)]; + unsafe { + self.inner.read_at(tail, dst_buf); + } + + tail = (tail + read_nbytes) & self.inner.get_mask(); + self.inner.set_tail(tail); + + buf_pos += read_nbytes; + buf_remain -= read_nbytes; + } + Ok(buf_pos) + } +} + +impl Drop for RingBufReader { + fn drop(&mut self) { + // So the writer knows when a reader is finished + self.inner.close(); + } +} + + +impl RingBufWriter { + pub fn write(&self, buf: &[u8]) -> Result { + if self.inner.is_closed() { + return errno!(EPIPE, "Reader has been closed"); + } + + let mut head = self.inner.get_head(); + let mut buf_remain = buf.len(); + let mut buf_pos = 0; + while buf_remain > 0 { + let tail = self.inner.get_tail(); + + let write_nbytes = { + let may_write_nbytes = if tail <= head { + self.inner.capacity - head + } else { + tail - head - 1 + }; + if may_write_nbytes == 0 { break; } + + min(may_write_nbytes, buf_remain) + }; + + let src_buf = &buf[buf_pos..(buf_pos + write_nbytes)]; + unsafe { + self.inner.write_at(head, src_buf); + } + + head = (head + write_nbytes) & self.inner.get_mask(); + self.inner.set_head(head); + + buf_pos += write_nbytes; + buf_remain -= write_nbytes; + } + Ok(buf_pos) + } +}