diff --git a/src/libos/src/fs/channel.rs b/src/libos/src/fs/channel.rs index f84bee85..bcb12df1 100644 --- a/src/libos/src/fs/channel.rs +++ b/src/libos/src/fs/channel.rs @@ -77,6 +77,18 @@ impl Channel { (producer, consumer) } + pub fn consumer(&self) -> &Consumer { + &self.consumer + } + + pub fn producer(&self) -> &Producer { + &self.producer + } + + pub fn capacity(&self) -> usize { + self.consumer.capacity() + } + pub fn items_to_consume(&self) -> usize { self.consumer.items_to_consume() } @@ -407,6 +419,11 @@ impl Consumer { pub fn items_to_consume(&self) -> usize { self.inner.lock().unwrap().len() } + + pub fn capacity(&self) -> usize { + let rb_consumer = self.inner.lock().unwrap(); + rb_consumer.capacity() + } } impl Consumer { diff --git a/src/libos/src/net/socket/unix/stream/address_space.rs b/src/libos/src/net/socket/unix/stream/address_space.rs index c9840e07..b9055145 100644 --- a/src/libos/src/net/socket/unix/stream/address_space.rs +++ b/src/libos/src/net/socket/unix/stream/address_space.rs @@ -31,19 +31,31 @@ impl AddressSpace { } } - pub fn add_listener(&self, addr: &Addr, capacity: usize) -> Result<()> { + pub fn add_listener(&self, addr: &Addr, capacity: usize, nonblocking: bool) -> Result<()> { + let key = Self::get_key(addr); + let mut space = self.get_space(addr); + + if let Some(option) = space.get(&key) { + if option.is_none() { + space.insert(key, Some(Arc::new(Listener::new(capacity, nonblocking)?))); + Ok(()) + } else { + return_errno!(EINVAL, "the socket is already listened"); + } + } else { + return_errno!(EINVAL, "the socket is not bound"); + } + } + + pub fn resize_listener(&self, addr: &Addr, capacity: usize) -> Result<()> { let key = Self::get_key(addr); let mut space = self.get_space(addr); if let Some(option) = space.get(&key) { if let Some(listener) = option { - let new_listener = Listener::new(capacity)?; - for i in 0..std::cmp::min(listener.remaining(), capacity) { - new_listener.push_incoming(listener.pop_incoming().unwrap()); - } - space.insert(key, Some(Arc::new(new_listener))); + listener.resize(capacity); } else { - space.insert(key, Some(Arc::new(Listener::new(capacity)?))); + return_errno!(EINVAL, "the socket is not listening"); } Ok(()) } else { @@ -54,8 +66,7 @@ impl AddressSpace { pub fn push_incoming(&self, addr: &Addr, sock: Endpoint) -> Result<()> { self.get_listener_ref(addr) .ok_or_else(|| errno!(ECONNREFUSED, "no one's listening on the remote address"))? - .push_incoming(sock); - Ok(()) + .push_incoming(sock) } pub fn pop_incoming(&self, addr: &Addr) -> Result { diff --git a/src/libos/src/net/socket/unix/stream/file.rs b/src/libos/src/net/socket/unix/stream/file.rs index 171ad8ef..a39ed671 100644 --- a/src/libos/src/net/socket/unix/stream/file.rs +++ b/src/libos/src/net/socket/unix/stream/file.rs @@ -5,14 +5,19 @@ use std::any::Any; impl File for Stream { fn read(&self, buf: &mut [u8]) -> Result { - match &*self.inner() { + // The connected status will not be changed any more + // in the current implementation. Use clone to release + // the mutex lock early. + let status = (*self.inner()).clone(); + match status { Status::Connected(endpoint) => endpoint.read(buf), _ => return_errno!(ENOTCONN, "unconnected socket"), } } fn write(&self, buf: &[u8]) -> Result { - match &*self.inner() { + let status = (*self.inner()).clone(); + match status { Status::Connected(endpoint) => endpoint.write(buf), _ => return_errno!(ENOTCONN, "unconnected socket"), } @@ -33,14 +38,16 @@ impl File for Stream { } fn readv(&self, bufs: &mut [&mut [u8]]) -> Result { - match &*self.inner() { + let status = (*self.inner()).clone(); + match status { Status::Connected(endpoint) => endpoint.readv(bufs), _ => return_errno!(ENOTCONN, "unconnected socket"), } } fn writev(&self, bufs: &[&[u8]]) -> Result { - match &*self.inner() { + let status = (*self.inner()).clone(); + match status { Status::Connected(endpoint) => endpoint.writev(bufs), _ => return_errno!(ENOTCONN, "unconnected socket"), } diff --git a/src/libos/src/net/socket/unix/stream/stream.rs b/src/libos/src/net/socket/unix/stream/stream.rs index 7de0ff42..66423389 100644 --- a/src/libos/src/net/socket/unix/stream/stream.rs +++ b/src/libos/src/net/socket/unix/stream/stream.rs @@ -18,7 +18,7 @@ pub struct Stream { impl Stream { pub fn new(flags: FileFlags) -> Self { Self { - inner: SgxMutex::new(Status::Unconnected(Info::new( + inner: SgxMutex::new(Status::Idle(Info::new( flags.contains(FileFlags::SOCK_NONBLOCK), ))), } @@ -41,7 +41,7 @@ impl Stream { pub fn addr(&self) -> Option { match &*self.inner() { - Status::Unconnected(info) => info.addr().clone(), + Status::Idle(info) => info.addr().clone(), Status::Connected(endpoint) => endpoint.addr(), Status::Listening(addr) => Some(addr).cloned(), } @@ -59,7 +59,7 @@ impl Stream { // TODO: create the corresponding file in the fs pub fn bind(&self, addr: &Addr) -> Result<()> { match &mut *self.inner() { - Status::Unconnected(ref mut info) => { + Status::Idle(ref mut info) => { if info.addr().is_some() { return_errno!(EINVAL, "the socket is already bound"); } @@ -91,28 +91,30 @@ impl Stream { let mut inner = self.inner(); match &*inner { - Status::Unconnected(info) => { + Status::Idle(info) => { if let Some(addr) = info.addr() { - ADDRESS_SPACE.add_listener(addr, capacity)?; + ADDRESS_SPACE.add_listener(addr, capacity, info.nonblocking())?; *inner = Status::Listening(addr.clone()); } else { return_errno!(EINVAL, "the socket is not bound"); } } Status::Connected(_) => return_errno!(EINVAL, "the socket is already connected"), - /// Modify the capacity of the channel holding incoming sockets - Status::Listening(addr) => ADDRESS_SPACE.add_listener(&addr, capacity)?, + // Modify the capacity of the channel holding incoming sockets + Status::Listening(addr) => ADDRESS_SPACE.resize_listener(&addr, capacity)?, } Ok(()) } + /// The establishment of the connection is very fast and can be done immediately. + /// Therefore, the connect function in our implementation will never block. pub fn connect(&self, addr: &Addr) -> Result<()> { debug!("connect to {:?}", addr); let mut inner = self.inner(); match &*inner { - Status::Unconnected(info) => { + Status::Idle(info) => { let self_addr_opt = info.addr(); if let Some(self_addr) = self_addr_opt { if self_addr == addr { @@ -126,7 +128,12 @@ impl Stream { end_self.set_addr(self_addr); } - ADDRESS_SPACE.push_incoming(addr, end_incoming)?; + ADDRESS_SPACE + .push_incoming(addr, end_incoming) + .map_err(|e| match e.errno() { + Errno::EAGAIN => errno!(ECONNREFUSED, "the backlog is full"), + _ => e, + })?; *inner = Status::Connected(end_self); Ok(()) @@ -137,7 +144,8 @@ impl Stream { } pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option)> { - match &*self.inner() { + let status = (*self.inner()).clone(); + match status { Status::Listening(addr) => { let endpoint = ADDRESS_SPACE.pop_incoming(&addr)?; endpoint.set_nonblocking(flags.contains(FileFlags::SOCK_NONBLOCK)); @@ -183,7 +191,7 @@ impl Stream { pub(super) fn nonblocking(&self) -> bool { match &*self.inner() { - Status::Unconnected(info) => info.nonblocking(), + Status::Idle(info) => info.nonblocking(), Status::Connected(endpoint) => endpoint.nonblocking(), Status::Listening(addr) => ADDRESS_SPACE.get_listener_ref(&addr).unwrap().nonblocking(), } @@ -191,7 +199,7 @@ impl Stream { pub(super) fn set_nonblocking(&self, nonblocking: bool) { match &mut *self.inner() { - Status::Unconnected(ref mut info) => info.set_nonblocking(nonblocking), + Status::Idle(ref mut info) => info.set_nonblocking(nonblocking), Status::Connected(ref mut endpoint) => endpoint.set_nonblocking(nonblocking), Status::Listening(addr) => ADDRESS_SPACE .get_listener_ref(&addr) @@ -217,7 +225,7 @@ impl Debug for Stream { impl Drop for Stream { fn drop(&mut self) { match &*self.inner() { - Status::Unconnected(info) => { + Status::Idle(info) => { if let Some(addr) = info.addr() { ADDRESS_SPACE.remove_addr(&addr); } @@ -225,8 +233,8 @@ impl Drop for Stream { Status::Listening(addr) => { let listener = ADDRESS_SPACE.get_listener_ref(&addr).unwrap(); ADDRESS_SPACE.remove_addr(&addr); - /// handle the blocking of other sockets holding the reference to the listener, - /// e.g., pushing to a listener full of incoming sockets + // handle the blocking of other sockets holding the reference to the listener, + // e.g., pushing to a listener full of incoming sockets listener.shutdown(); } _ => {} @@ -234,10 +242,11 @@ impl Drop for Stream { } } +#[derive(Clone)] pub enum Status { - Unconnected(Info), - /// The listeners are stored in a global data structure indexed by the address. - /// The consitency of Status with that data structure should be carefully maintained. + Idle(Info), + // The listeners are stored in a global data structure indexed by the address. + // The consitency of Status with that data structure should be carefully maintained. Listening(Addr), Connected(Endpoint), } @@ -273,53 +282,80 @@ impl Info { } } +/// The listener status of a stream unix socket. +/// It contains a channel holding incoming connections. +/// The nonblocking status of the reader end keeps the same with the socket. +/// The writer end is always non-blocking. The connect function returns +/// ECONNREFUSED rather than block when the channel is full. pub struct Listener { - channel: Channel, - nonblocking: AtomicBool, + channel: RwLock>, } impl Listener { - pub fn new(capacity: usize) -> Result { + pub fn new(capacity: usize, nonblocking: bool) -> Result { let channel = Channel::new(capacity)?; - // It may incur blocking inside a blocking if the channel is blocking. Set the channel to - // nonblocking permanently to avoid the nested blocking. This also results in nonblocking - // accept and connect. Future work is needed to resolve this blocking issue to support - // blocking accept and connect. - channel.set_nonblocking(true); - /// The listener is blocking by default - let nonblocking = AtomicBool::new(true); + channel.producer().set_nonblocking(true); + channel.consumer().set_nonblocking(nonblocking); Ok(Self { - channel, - nonblocking, + channel: RwLock::new(channel), }) } - pub fn push_incoming(&self, stream_socket: Endpoint) { - self.channel.push(stream_socket); + pub fn capacity(&self) -> usize { + let channel = self.channel.read().unwrap(); + channel.capacity() + } + + // TODO: when pop_incoming is blocked somewhere, the resize operation will blockingly wait for + // the block to end. This is a rare scenario, so we will fix it in the future. + pub fn resize(&self, capacity: usize) { + if self.capacity() == capacity { + return; + } + + let mut channel = self.channel.write().unwrap(); + let new_channel = Channel::new(capacity).unwrap(); + new_channel.producer().set_nonblocking(true); + new_channel + .consumer() + .set_nonblocking(channel.consumer().is_nonblocking()); + + let remaining = channel.items_to_consume(); + for i in 0..std::cmp::min(remaining, capacity) { + new_channel.push(channel.pop().unwrap().unwrap()).unwrap(); + } + + *channel = new_channel; + } + + pub fn push_incoming(&self, stream_socket: Endpoint) -> Result<()> { + let channel = self.channel.read().unwrap(); + channel.push(stream_socket) } pub fn pop_incoming(&self) -> Option { - self.channel.pop().ok().flatten() + let channel = self.channel.read().unwrap(); + channel.pop().ok().flatten() } pub fn remaining(&self) -> usize { - self.channel.items_to_consume() + let channel = self.channel.read().unwrap(); + channel.items_to_consume() } pub fn nonblocking(&self) -> bool { - warn!("the channel works in a nonblocking way regardless of the nonblocking status"); - - self.nonblocking.load(Ordering::Acquire) + let channel = self.channel.read().unwrap(); + channel.consumer().is_nonblocking() } pub fn set_nonblocking(&self, nonblocking: bool) { - warn!("the channel works in a nonblocking way regardless of the nonblocking status"); - - self.nonblocking.store(nonblocking, Ordering::Release); + let channel = self.channel.read().unwrap(); + channel.consumer().set_nonblocking(nonblocking); } pub fn shutdown(&self) { - self.channel.shutdown(); + let channel = self.channel.read().unwrap(); + channel.shutdown(); } }