Fix panic when there is no enough memory for a new ringbuffer

This commit is contained in:
He Sun 2020-04-22 11:21:25 +08:00 committed by Tate, Hongliang Tian
parent 490e45a52e
commit 9815523a95
3 changed files with 30 additions and 23 deletions

@ -2,8 +2,9 @@ use super::*;
use util::ring_buf::*; use util::ring_buf::*;
// TODO: Use Waiter and WaitQueue infrastructure to sleep when blocking // TODO: Use Waiter and WaitQueue infrastructure to sleep when blocking
// TODO: Add F_SETPIPE_SZ in fcntl to dynamically change the size of pipe
pub const PIPE_BUF_SIZE: usize = 2 * 1024 * 1024; // to improve memory efficiency. This value is got from /proc/sys/fs/pipe-max-size on linux.
pub const PIPE_BUF_SIZE: usize = 1024 * 1024;
#[derive(Debug)] #[derive(Debug)]
pub struct Pipe { pub struct Pipe {
@ -13,7 +14,8 @@ pub struct Pipe {
impl Pipe { impl Pipe {
pub fn new(flags: StatusFlags) -> Result<Pipe> { pub fn new(flags: StatusFlags) -> Result<Pipe> {
let mut ring_buf = RingBuf::new(PIPE_BUF_SIZE); let mut ring_buf =
RingBuf::new(PIPE_BUF_SIZE).map_err(|e| errno!(ENFILE, "No memory for new pipes"))?;
// Only O_NONBLOCK and O_DIRECT can be applied during pipe creation // Only O_NONBLOCK and O_DIRECT can be applied during pipe creation
let valid_flags = flags & (StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT); let valid_flags = flags & (StatusFlags::O_NONBLOCK | StatusFlags::O_DIRECT);
Ok(Pipe { Ok(Pipe {

@ -233,7 +233,8 @@ impl UnixSocket {
} }
let obj = UnixSocketObject::get(path) let obj = UnixSocketObject::get(path)
.ok_or_else(|| errno!(EINVAL, "unix socket path not found"))?; .ok_or_else(|| errno!(EINVAL, "unix socket path not found"))?;
let (channel1, channel2) = Channel::new_pair(); // TODO: Mov the buffer allocation to function new to comply with the bahavior of unix
let (channel1, channel2) = Channel::new_pair()?;
self.status = Status::Connected(channel1); self.status = Status::Connected(channel1);
obj.push(UnixSocket { obj.push(UnixSocket {
obj: Some(obj.clone()), obj: Some(obj.clone()),
@ -336,9 +337,9 @@ unsafe impl Send for Channel {}
unsafe impl Sync for Channel {} unsafe impl Sync for Channel {}
impl Channel { impl Channel {
fn new_pair() -> (Channel, Channel) { fn new_pair() -> Result<(Channel, Channel)> {
let buf1 = RingBuf::new(DEFAULT_BUF_SIZE); let buf1 = RingBuf::new(DEFAULT_BUF_SIZE)?;
let buf2 = RingBuf::new(DEFAULT_BUF_SIZE); let buf2 = RingBuf::new(DEFAULT_BUF_SIZE)?;
let channel1 = Channel { let channel1 = Channel {
reader: buf1.reader, reader: buf1.reader,
writer: buf2.writer, writer: buf2.writer,
@ -347,11 +348,13 @@ impl Channel {
reader: buf2.reader, reader: buf2.reader,
writer: buf1.writer, writer: buf1.writer,
}; };
(channel1, channel2) Ok((channel1, channel2))
} }
} }
pub const DEFAULT_BUF_SIZE: usize = 1 * 1024 * 1024; // TODO: Add SO_SNDBUF and SO_RCVBUF to set/getsockopt to dynamcally change the size.
// This value is got from /proc/sys/net/core/rmem_max and wmem_max that are same on linux.
pub const DEFAULT_BUF_SIZE: usize = 208 * 1024;
lazy_static! { lazy_static! {
static ref UNIX_SOCKET_OBJS: Mutex<BTreeMap<String, Arc<UnixSocketObject>>> = static ref UNIX_SOCKET_OBJS: Mutex<BTreeMap<String, Arc<UnixSocketObject>>> =

@ -14,16 +14,16 @@ pub struct RingBuf {
} }
impl RingBuf { impl RingBuf {
pub fn new(capacity: usize) -> RingBuf { pub fn new(capacity: usize) -> Result<RingBuf> {
let inner = Arc::new(RingBufInner::new(capacity)); let inner = Arc::new(RingBufInner::new(capacity)?);
let reader = RingBufReader { let reader = RingBufReader {
inner: inner.clone(), inner: inner.clone(),
}; };
let writer = RingBufWriter { inner: inner }; let writer = RingBufWriter { inner: inner };
RingBuf { Ok(RingBuf {
reader: reader, reader: reader,
writer: writer, writer: writer,
} })
} }
} }
@ -49,20 +49,22 @@ struct RingBufInner {
const RING_BUF_ALIGN: usize = 16; const RING_BUF_ALIGN: usize = 16;
impl RingBufInner { impl RingBufInner {
fn new(capacity: usize) -> RingBufInner { fn new(capacity: usize) -> Result<RingBufInner> {
// Capacity should be power of two as capacity - 1 is used as mask
let capacity = max(capacity, RING_BUF_ALIGN).next_power_of_two(); let capacity = max(capacity, RING_BUF_ALIGN).next_power_of_two();
RingBufInner { let buf_layout = Layout::from_size_align(capacity, RING_BUF_ALIGN)?;
buf: unsafe { let buf_ptr = unsafe { alloc(buf_layout) };
let buf_layout = Layout::from_size_align_unchecked(capacity, RING_BUF_ALIGN); if buf_ptr.is_null() {
let buf_ptr = alloc(buf_layout); return_errno!(ENOMEM, "no memory for new ring buffers");
assert!(buf_ptr != ptr::null_mut()); }
buf_ptr
}, Ok(RingBufInner {
buf: buf_ptr,
capacity: capacity, capacity: capacity,
head: AtomicUsize::new(0), head: AtomicUsize::new(0),
tail: AtomicUsize::new(0), tail: AtomicUsize::new(0),
closed: AtomicBool::new(false), closed: AtomicBool::new(false),
} })
} }
fn get_mask(&self) -> usize { fn get_mask(&self) -> usize {
@ -114,8 +116,8 @@ impl RingBufInner {
impl Drop for RingBufInner { impl Drop for RingBufInner {
fn drop(&mut self) { fn drop(&mut self) {
let buf_layout = Layout::from_size_align(self.capacity, RING_BUF_ALIGN).unwrap();
unsafe { unsafe {
let buf_layout = Layout::from_size_align_unchecked(self.capacity, RING_BUF_ALIGN);
dealloc(self.buf, buf_layout); dealloc(self.buf, buf_layout);
} }
} }