Add pipe syscall

This commit is contained in:
Tate, Hongliang Tian 2019-01-06 02:02:23 +08:00
parent 37f724c656
commit c00fddd2bf
9 changed files with 367 additions and 8 deletions

@ -17,6 +17,7 @@ extern ssize_t occlum_write(int fd, const void* buf, size_t size);
extern ssize_t occlum_readv(int fd, struct iovec* iov, int count); extern ssize_t occlum_readv(int fd, struct iovec* iov, int count);
extern ssize_t occlum_writev(int fd, const struct iovec* iov, int count); extern ssize_t occlum_writev(int fd, const struct iovec* iov, int count);
extern off_t occlum_lseek(int fd, off_t offset, int whence); extern off_t occlum_lseek(int fd, off_t offset, int whence);
extern int occlum_pipe(int fds[2]);
extern int occlum_spawn(int* child_pid, const char* path, extern int occlum_spawn(int* child_pid, const char* path,
const char** argv, const char** argv,

@ -269,7 +269,7 @@ impl File for StdoutFile {
} }
fn read(&self, buf: &mut [u8]) -> Result<usize, Error> { fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "Stdout does not support reading")) Err(Error::new(Errno::EBADF, "Stdout does not support read"))
} }
fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> { fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> {
@ -364,7 +364,7 @@ impl File for StdinFile {
} }
fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> { fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "Stdin does not support reading")) Err(Error::new(Errno::EBADF, "Stdin does not support write"))
} }
} }

@ -5,9 +5,11 @@ use std::sgxfs as fs_impl;
mod file; mod file;
mod file_table; mod file_table;
mod pipe;
pub use self::file::{File, FileRef, SgxFile, StdinFile, StdoutFile}; pub use self::file::{File, FileRef, SgxFile, StdinFile, StdoutFile};
pub use self::file_table::{FileDesc, FileTable}; pub use self::file_table::{FileDesc, FileTable};
pub use self::pipe::{Pipe};
pub const O_RDONLY : u32 = 0x00000000; pub const O_RDONLY : u32 = 0x00000000;
pub const O_WRONLY : u32 = 0x00000001; pub const O_WRONLY : u32 = 0x00000001;
@ -18,7 +20,7 @@ pub const O_APPEND : u32 = 0x00000400;
// TODO: use the type defined in Rust libc. // TODO: use the type defined in Rust libc.
// //
// However, off_t is defined as i64 in the current Rust SGX SDK, which is // However, off_t is defined as u64 in the current Rust SGX SDK, which is
// wrong (see issue https://github.com/baidu/rust-sgx-sdk/issues/46) // wrong (see issue https://github.com/baidu/rust-sgx-sdk/issues/46)
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub type off_t = i64; pub type off_t = i64;
@ -108,3 +110,14 @@ pub fn do_close(fd: FileDesc) -> Result<(), Error> {
None => Err(Error::new(Errno::EBADF, "Invalid file descriptor [do_close]")), None => Err(Error::new(Errno::EBADF, "Invalid file descriptor [do_close]")),
} }
} }
pub fn do_pipe() -> Result<[FileDesc; 2], Error> {
let current_ref = process::get_current();
let mut current = current_ref.lock().unwrap();
let pipe = Pipe::new()?;
let mut file_table = current.get_files_mut();
let reader_fd = file_table.put(Arc::new(Box::new(pipe.reader)));
let writer_fd = file_table.put(Arc::new(Box::new(pipe.writer)));
Ok([reader_fd, writer_fd])
}

126
src/libos/src/fs/pipe.rs Normal file

@ -0,0 +1,126 @@
use super::*;
use util::{new_ring_buf, RingBufReader, RingBufWriter};
// TODO: Use Waiter and WaitQueue infrastructure to sleep when blocking
pub const PIPE_BUF_SIZE : usize = 2 * 1024 * 1024;
#[derive(Debug)]
pub struct Pipe {
pub reader: PipeReader,
pub writer: PipeWriter,
}
impl Pipe {
pub fn new() -> Result<Pipe, Error> {
let (reader, writer) = new_ring_buf(PIPE_BUF_SIZE);
Ok(Pipe {
reader: PipeReader {
inner: SgxMutex::new(reader),
},
writer: PipeWriter {
inner: SgxMutex::new(writer),
}
})
}
}
#[derive(Debug)]
pub struct PipeReader {
inner: SgxMutex<RingBufReader>,
}
impl File for PipeReader {
fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
let ringbuf = self.inner.lock().unwrap();
ringbuf.read(buf)
}
fn readv<'a, 'b>(&self, bufs: &'a mut [&'b mut [u8]]) -> Result<usize, Error> {
let mut ringbuf = self.inner.lock().unwrap();
let mut total_bytes = 0;
for buf in bufs {
match ringbuf.read(buf) {
Ok(this_len) => {
total_bytes += this_len;
if this_len < buf.len() { break; }
}
Err(e) => {
match total_bytes {
// a complete failure
0 => return Err(e),
// a partially failure
_ => break,
}
}
}
}
Ok(total_bytes)
}
fn write(&self, buf: &[u8]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "PipeReader does not support write"))
}
fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "PipeReader does not support write"))
}
fn seek(&self, pos: SeekFrom) -> Result<off_t, Error> {
Err(Error::new(Errno::ESPIPE, "Pipe does not support seek"))
}
}
unsafe impl Send for PipeReader {}
unsafe impl Sync for PipeReader {}
#[derive(Debug)]
pub struct PipeWriter {
inner: SgxMutex<RingBufWriter>,
}
impl File for PipeWriter {
fn write(&self, buf: &[u8]) -> Result<usize, Error> {
let ringbuf = self.inner.lock().unwrap();
ringbuf.write(buf)
}
fn writev<'a, 'b>(&self, bufs: &'a [&'b [u8]]) -> Result<usize, Error> {
let ringbuf = self.inner.lock().unwrap();
let mut total_bytes = 0;
for buf in bufs {
match ringbuf.write(buf) {
Ok(this_len) => {
total_bytes += this_len;
if this_len < buf.len() { break; }
}
Err(e) => {
match total_bytes {
// a complete failure
0 => return Err(e),
// a partially failure
_ => break,
}
}
}
}
Ok(total_bytes)
}
fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "PipeWriter does not support read"))
}
fn readv<'a, 'b>(&self, bufs: &'a mut [&'b mut [u8]]) -> Result<usize, Error> {
Err(Error::new(Errno::EBADF, "PipeWriter does not support read"))
}
fn seek(&self, seek_pos: SeekFrom) -> Result<off_t, Error> {
Err(Error::new(Errno::ESPIPE, "Pipe does not support seek"))
}
}
unsafe impl Send for PipeWriter {}
unsafe impl Sync for PipeWriter {}

@ -55,7 +55,7 @@ pub fn do_spawn<P: AsRef<Path>>(elf_path: &P, argv: &[CString], envp: &[CString]
let stack_top = vm.get_stack_top(); let stack_top = vm.get_stack_top();
init_task(program_entry, stack_top, argv, envp)? init_task(program_entry, stack_top, argv, envp)?
}; };
let files = init_files()?; let files = init_files(parent_ref)?;
let exec_path = elf_path.as_ref().to_str().unwrap(); let exec_path = elf_path.as_ref().to_str().unwrap();
Process::new(exec_path, task, vm, files)? Process::new(exec_path, task, vm, files)?
}; };
@ -65,9 +65,17 @@ pub fn do_spawn<P: AsRef<Path>>(elf_path: &P, argv: &[CString], envp: &[CString]
Ok(new_pid) Ok(new_pid)
} }
fn init_files() -> Result<FileTable, Error> { fn init_files(parent_ref: &ProcessRef) -> Result<FileTable, Error> {
let mut file_table = FileTable::new(); // Usually, we just inherit the file table from the parent
let parent = parent_ref.lock().unwrap();
let should_inherit_file_table = parent.get_pid() > 0;
if should_inherit_file_table {
return Ok(parent.get_files().clone());
}
drop(parent);
// But, for init process, we initialize file table for it
let mut file_table = FileTable::new();
let stdin : Arc<Box<File>> = Arc::new(Box::new(StdinFile::new())); let stdin : Arc<Box<File>> = Arc::new(Box::new(StdinFile::new()));
let stdout : Arc<Box<File>> = Arc::new(Box::new(StdoutFile::new())); let stdout : Arc<Box<File>> = Arc::new(Box::new(StdoutFile::new()));
// TODO: implement and use a real stderr // TODO: implement and use a real stderr
@ -75,7 +83,6 @@ fn init_files() -> Result<FileTable, Error> {
file_table.put(stdin); file_table.put(stdin);
file_table.put(stdout); file_table.put(stdout);
file_table.put(stderr); file_table.put(stderr);
Ok(file_table) Ok(file_table)
} }

@ -237,6 +237,28 @@ pub extern "C" fn occlum_brk(addr: *const c_void) -> *const c_void {
} }
} }
fn do_pipe(fds_u: *mut c_int) -> Result<(), Error> {
check_mut_array_from_user(fds_u, 2)?;
let fds = fs::do_pipe()?;
unsafe {
*fds_u.offset(0) = fds[0] as c_int;
*fds_u.offset(1) = fds[1] as c_int;
}
Ok(())
}
#[no_mangle]
pub extern "C" fn occlum_pipe(fds: *mut c_int) -> c_int {
match do_pipe(fds) {
Ok(()) => {
0
},
Err(e) => {
e.errno.as_retval()
}
}
}
#[no_mangle] #[no_mangle]
pub extern "C" fn occlum_open(path_buf: * const c_char, flags: c_int, mode: c_int) -> c_int { pub extern "C" fn occlum_open(path_buf: * const c_char, flags: c_int, mode: c_int) -> c_int {
let path = unsafe { let path = unsafe {

@ -114,6 +114,11 @@ long dispatch_syscall(int num, long arg0, long arg1, long arg2, long arg3, long
ret = (long) occlum_brk(addr); ret = (long) occlum_brk(addr);
break; break;
} }
case SYS_pipe: {
DECL_SYSCALL_ARG(int*, fds, arg0);
ret = (long) occlum_pipe(fds);
break;
}
default: default:
ret = occlum_unknown(num); ret = occlum_unknown(num);
break; break;

@ -1,5 +1,8 @@
use prelude::*; use super::*;
pub use self::mpx_util::{*}; pub use self::mpx_util::{*};
pub use self::ring_buf::{RingBufReader, RingBufWriter};
pub use self::ring_buf::with_fixed_capacity as new_ring_buf;
mod mpx_util; mod mpx_util;
mod ring_buf;

@ -0,0 +1,182 @@
use super::{*};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::cmp::{min, max};
use std::{ptr};
pub fn with_fixed_capacity(capacity: usize) -> (RingBufReader, RingBufWriter) {
let inner = Arc::new(RingBufInner::new(capacity));
let reader = RingBufReader { inner: inner.clone() };
let writer = RingBufWriter { inner: inner };
(reader, writer)
}
#[derive(Debug)]
pub struct RingBufReader {
inner: Arc<RingBufInner>,
}
#[derive(Debug)]
pub struct RingBufWriter {
inner: Arc<RingBufInner>,
}
#[derive(Debug)]
struct RingBufInner {
buf: *mut u8,
capacity: usize,
head: AtomicUsize, // write to head
tail: AtomicUsize, // read from tail
closed: AtomicBool, // if reader has been dropped
}
impl RingBufInner {
fn new(capacity: usize) -> RingBufInner {
let capacity = max(capacity, 16).next_power_of_two();
RingBufInner {
buf: unsafe {
let mut buf_ptr = ptr::null_mut();
libc::posix_memalign(&mut buf_ptr, 16, capacity);
assert!(buf_ptr != ptr::null_mut());
buf_ptr as *mut u8
},
capacity: capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
closed: AtomicBool::new(false),
}
}
fn get_mask(&self) -> usize {
self.capacity - 1 // Note that capacity is a power of two
}
fn get_head(&self) -> usize {
self.head.load(Ordering::SeqCst)
}
fn get_tail(&self) -> usize {
self.tail.load(Ordering::SeqCst)
}
fn set_head(&self, new_head: usize) {
self.head.store(new_head, Ordering::SeqCst)
}
fn set_tail(&self, new_tail: usize) {
self.head.store(new_tail, Ordering::SeqCst)
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
fn close(&self) {
self.closed.store(true, Ordering::SeqCst);
}
unsafe fn read_at(&self, pos: usize, dst_buf: &mut [u8]) {
let dst_ptr = dst_buf.as_mut_ptr();
let dst_len = dst_buf.len();
let src_ptr = self.buf.offset(pos as isize);
unsafe {
src_ptr.copy_to_nonoverlapping(dst_ptr, dst_len);
}
}
unsafe fn write_at(&self, pos: usize, src_buf: &[u8]) {
let src_ptr = src_buf.as_ptr();
let src_len = src_buf.len();
let dst_ptr = self.buf.offset(pos as isize);
unsafe {
dst_ptr.copy_from_nonoverlapping(src_ptr, src_len);
}
}
}
impl Drop for RingBufInner {
fn drop(&mut self) {
unsafe { libc::free(self.buf as *mut c_void) }
}
}
impl RingBufReader {
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
let mut tail = self.inner.get_tail();
let mut buf_remain = buf.len();
let mut buf_pos = 0;
while buf_remain > 0 {
let head = self.inner.get_head();
let read_nbytes = {
let may_read_nbytes = if tail <= head {
head - tail
} else {
self.inner.capacity - tail
};
if may_read_nbytes == 0 { break; }
min(may_read_nbytes, buf_remain)
};
let dst_buf = &mut buf[buf_pos..(buf_pos + read_nbytes)];
unsafe {
self.inner.read_at(tail, dst_buf);
}
tail = (tail + read_nbytes) & self.inner.get_mask();
self.inner.set_tail(tail);
buf_pos += read_nbytes;
buf_remain -= read_nbytes;
}
Ok(buf_pos)
}
}
impl Drop for RingBufReader {
fn drop(&mut self) {
// So the writer knows when a reader is finished
self.inner.close();
}
}
impl RingBufWriter {
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
if self.inner.is_closed() {
return errno!(EPIPE, "Reader has been closed");
}
let mut head = self.inner.get_head();
let mut buf_remain = buf.len();
let mut buf_pos = 0;
while buf_remain > 0 {
let tail = self.inner.get_tail();
let write_nbytes = {
let may_write_nbytes = if tail <= head {
self.inner.capacity - head
} else {
tail - head - 1
};
if may_write_nbytes == 0 { break; }
min(may_write_nbytes, buf_remain)
};
let src_buf = &buf[buf_pos..(buf_pos + write_nbytes)];
unsafe {
self.inner.write_at(head, src_buf);
}
head = (head + write_nbytes) & self.inner.get_mask();
self.inner.set_head(head);
buf_pos += write_nbytes;
buf_remain -= write_nbytes;
}
Ok(buf_pos)
}
}