Resolve the nested block of Unix socket

Read, write, connect and accept have both blocking and nonblocking mode.
It may block after the status lock is acquired resulting in potential
deadlock. This commit resolve the deadlock issue.
This commit is contained in:
He Sun 2020-12-29 19:28:01 +08:00 committed by Tate, Hongliang Tian
parent 1101bdf9e7
commit 3101d8fa88
4 changed files with 125 additions and 54 deletions

@ -77,6 +77,18 @@ impl<I> Channel<I> {
(producer, consumer) (producer, consumer)
} }
pub fn consumer(&self) -> &Consumer<I> {
&self.consumer
}
pub fn producer(&self) -> &Producer<I> {
&self.producer
}
pub fn capacity(&self) -> usize {
self.consumer.capacity()
}
pub fn items_to_consume(&self) -> usize { pub fn items_to_consume(&self) -> usize {
self.consumer.items_to_consume() self.consumer.items_to_consume()
} }
@ -407,6 +419,11 @@ impl<I> Consumer<I> {
pub fn items_to_consume(&self) -> usize { pub fn items_to_consume(&self) -> usize {
self.inner.lock().unwrap().len() self.inner.lock().unwrap().len()
} }
pub fn capacity(&self) -> usize {
let rb_consumer = self.inner.lock().unwrap();
rb_consumer.capacity()
}
} }
impl<I: Copy> Consumer<I> { impl<I: Copy> Consumer<I> {

@ -31,19 +31,31 @@ impl AddressSpace {
} }
} }
pub fn add_listener(&self, addr: &Addr, capacity: usize) -> Result<()> { pub fn add_listener(&self, addr: &Addr, capacity: usize, nonblocking: bool) -> Result<()> {
let key = Self::get_key(addr);
let mut space = self.get_space(addr);
if let Some(option) = space.get(&key) {
if option.is_none() {
space.insert(key, Some(Arc::new(Listener::new(capacity, nonblocking)?)));
Ok(())
} else {
return_errno!(EINVAL, "the socket is already listened");
}
} else {
return_errno!(EINVAL, "the socket is not bound");
}
}
pub fn resize_listener(&self, addr: &Addr, capacity: usize) -> Result<()> {
let key = Self::get_key(addr); let key = Self::get_key(addr);
let mut space = self.get_space(addr); let mut space = self.get_space(addr);
if let Some(option) = space.get(&key) { if let Some(option) = space.get(&key) {
if let Some(listener) = option { if let Some(listener) = option {
let new_listener = Listener::new(capacity)?; listener.resize(capacity);
for i in 0..std::cmp::min(listener.remaining(), capacity) {
new_listener.push_incoming(listener.pop_incoming().unwrap());
}
space.insert(key, Some(Arc::new(new_listener)));
} else { } else {
space.insert(key, Some(Arc::new(Listener::new(capacity)?))); return_errno!(EINVAL, "the socket is not listening");
} }
Ok(()) Ok(())
} else { } else {
@ -54,8 +66,7 @@ impl AddressSpace {
pub fn push_incoming(&self, addr: &Addr, sock: Endpoint) -> Result<()> { pub fn push_incoming(&self, addr: &Addr, sock: Endpoint) -> Result<()> {
self.get_listener_ref(addr) self.get_listener_ref(addr)
.ok_or_else(|| errno!(ECONNREFUSED, "no one's listening on the remote address"))? .ok_or_else(|| errno!(ECONNREFUSED, "no one's listening on the remote address"))?
.push_incoming(sock); .push_incoming(sock)
Ok(())
} }
pub fn pop_incoming(&self, addr: &Addr) -> Result<Endpoint> { pub fn pop_incoming(&self, addr: &Addr) -> Result<Endpoint> {

@ -5,14 +5,19 @@ use std::any::Any;
impl File for Stream { impl File for Stream {
fn read(&self, buf: &mut [u8]) -> Result<usize> { fn read(&self, buf: &mut [u8]) -> Result<usize> {
match &*self.inner() { // The connected status will not be changed any more
// in the current implementation. Use clone to release
// the mutex lock early.
let status = (*self.inner()).clone();
match status {
Status::Connected(endpoint) => endpoint.read(buf), Status::Connected(endpoint) => endpoint.read(buf),
_ => return_errno!(ENOTCONN, "unconnected socket"), _ => return_errno!(ENOTCONN, "unconnected socket"),
} }
} }
fn write(&self, buf: &[u8]) -> Result<usize> { fn write(&self, buf: &[u8]) -> Result<usize> {
match &*self.inner() { let status = (*self.inner()).clone();
match status {
Status::Connected(endpoint) => endpoint.write(buf), Status::Connected(endpoint) => endpoint.write(buf),
_ => return_errno!(ENOTCONN, "unconnected socket"), _ => return_errno!(ENOTCONN, "unconnected socket"),
} }
@ -33,14 +38,16 @@ impl File for Stream {
} }
fn readv(&self, bufs: &mut [&mut [u8]]) -> Result<usize> { fn readv(&self, bufs: &mut [&mut [u8]]) -> Result<usize> {
match &*self.inner() { let status = (*self.inner()).clone();
match status {
Status::Connected(endpoint) => endpoint.readv(bufs), Status::Connected(endpoint) => endpoint.readv(bufs),
_ => return_errno!(ENOTCONN, "unconnected socket"), _ => return_errno!(ENOTCONN, "unconnected socket"),
} }
} }
fn writev(&self, bufs: &[&[u8]]) -> Result<usize> { fn writev(&self, bufs: &[&[u8]]) -> Result<usize> {
match &*self.inner() { let status = (*self.inner()).clone();
match status {
Status::Connected(endpoint) => endpoint.writev(bufs), Status::Connected(endpoint) => endpoint.writev(bufs),
_ => return_errno!(ENOTCONN, "unconnected socket"), _ => return_errno!(ENOTCONN, "unconnected socket"),
} }

@ -18,7 +18,7 @@ pub struct Stream {
impl Stream { impl Stream {
pub fn new(flags: FileFlags) -> Self { pub fn new(flags: FileFlags) -> Self {
Self { Self {
inner: SgxMutex::new(Status::Unconnected(Info::new( inner: SgxMutex::new(Status::Idle(Info::new(
flags.contains(FileFlags::SOCK_NONBLOCK), flags.contains(FileFlags::SOCK_NONBLOCK),
))), ))),
} }
@ -41,7 +41,7 @@ impl Stream {
pub fn addr(&self) -> Option<Addr> { pub fn addr(&self) -> Option<Addr> {
match &*self.inner() { match &*self.inner() {
Status::Unconnected(info) => info.addr().clone(), Status::Idle(info) => info.addr().clone(),
Status::Connected(endpoint) => endpoint.addr(), Status::Connected(endpoint) => endpoint.addr(),
Status::Listening(addr) => Some(addr).cloned(), Status::Listening(addr) => Some(addr).cloned(),
} }
@ -59,7 +59,7 @@ impl Stream {
// TODO: create the corresponding file in the fs // TODO: create the corresponding file in the fs
pub fn bind(&self, addr: &Addr) -> Result<()> { pub fn bind(&self, addr: &Addr) -> Result<()> {
match &mut *self.inner() { match &mut *self.inner() {
Status::Unconnected(ref mut info) => { Status::Idle(ref mut info) => {
if info.addr().is_some() { if info.addr().is_some() {
return_errno!(EINVAL, "the socket is already bound"); return_errno!(EINVAL, "the socket is already bound");
} }
@ -91,28 +91,30 @@ impl Stream {
let mut inner = self.inner(); let mut inner = self.inner();
match &*inner { match &*inner {
Status::Unconnected(info) => { Status::Idle(info) => {
if let Some(addr) = info.addr() { if let Some(addr) = info.addr() {
ADDRESS_SPACE.add_listener(addr, capacity)?; ADDRESS_SPACE.add_listener(addr, capacity, info.nonblocking())?;
*inner = Status::Listening(addr.clone()); *inner = Status::Listening(addr.clone());
} else { } else {
return_errno!(EINVAL, "the socket is not bound"); return_errno!(EINVAL, "the socket is not bound");
} }
} }
Status::Connected(_) => return_errno!(EINVAL, "the socket is already connected"), Status::Connected(_) => return_errno!(EINVAL, "the socket is already connected"),
/// Modify the capacity of the channel holding incoming sockets // Modify the capacity of the channel holding incoming sockets
Status::Listening(addr) => ADDRESS_SPACE.add_listener(&addr, capacity)?, Status::Listening(addr) => ADDRESS_SPACE.resize_listener(&addr, capacity)?,
} }
Ok(()) Ok(())
} }
/// The establishment of the connection is very fast and can be done immediately.
/// Therefore, the connect function in our implementation will never block.
pub fn connect(&self, addr: &Addr) -> Result<()> { pub fn connect(&self, addr: &Addr) -> Result<()> {
debug!("connect to {:?}", addr); debug!("connect to {:?}", addr);
let mut inner = self.inner(); let mut inner = self.inner();
match &*inner { match &*inner {
Status::Unconnected(info) => { Status::Idle(info) => {
let self_addr_opt = info.addr(); let self_addr_opt = info.addr();
if let Some(self_addr) = self_addr_opt { if let Some(self_addr) = self_addr_opt {
if self_addr == addr { if self_addr == addr {
@ -126,7 +128,12 @@ impl Stream {
end_self.set_addr(self_addr); end_self.set_addr(self_addr);
} }
ADDRESS_SPACE.push_incoming(addr, end_incoming)?; ADDRESS_SPACE
.push_incoming(addr, end_incoming)
.map_err(|e| match e.errno() {
Errno::EAGAIN => errno!(ECONNREFUSED, "the backlog is full"),
_ => e,
})?;
*inner = Status::Connected(end_self); *inner = Status::Connected(end_self);
Ok(()) Ok(())
@ -137,7 +144,8 @@ impl Stream {
} }
pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option<Addr>)> { pub fn accept(&self, flags: FileFlags) -> Result<(Self, Option<Addr>)> {
match &*self.inner() { let status = (*self.inner()).clone();
match status {
Status::Listening(addr) => { Status::Listening(addr) => {
let endpoint = ADDRESS_SPACE.pop_incoming(&addr)?; let endpoint = ADDRESS_SPACE.pop_incoming(&addr)?;
endpoint.set_nonblocking(flags.contains(FileFlags::SOCK_NONBLOCK)); endpoint.set_nonblocking(flags.contains(FileFlags::SOCK_NONBLOCK));
@ -183,7 +191,7 @@ impl Stream {
pub(super) fn nonblocking(&self) -> bool { pub(super) fn nonblocking(&self) -> bool {
match &*self.inner() { match &*self.inner() {
Status::Unconnected(info) => info.nonblocking(), Status::Idle(info) => info.nonblocking(),
Status::Connected(endpoint) => endpoint.nonblocking(), Status::Connected(endpoint) => endpoint.nonblocking(),
Status::Listening(addr) => ADDRESS_SPACE.get_listener_ref(&addr).unwrap().nonblocking(), Status::Listening(addr) => ADDRESS_SPACE.get_listener_ref(&addr).unwrap().nonblocking(),
} }
@ -191,7 +199,7 @@ impl Stream {
pub(super) fn set_nonblocking(&self, nonblocking: bool) { pub(super) fn set_nonblocking(&self, nonblocking: bool) {
match &mut *self.inner() { match &mut *self.inner() {
Status::Unconnected(ref mut info) => info.set_nonblocking(nonblocking), Status::Idle(ref mut info) => info.set_nonblocking(nonblocking),
Status::Connected(ref mut endpoint) => endpoint.set_nonblocking(nonblocking), Status::Connected(ref mut endpoint) => endpoint.set_nonblocking(nonblocking),
Status::Listening(addr) => ADDRESS_SPACE Status::Listening(addr) => ADDRESS_SPACE
.get_listener_ref(&addr) .get_listener_ref(&addr)
@ -217,7 +225,7 @@ impl Debug for Stream {
impl Drop for Stream { impl Drop for Stream {
fn drop(&mut self) { fn drop(&mut self) {
match &*self.inner() { match &*self.inner() {
Status::Unconnected(info) => { Status::Idle(info) => {
if let Some(addr) = info.addr() { if let Some(addr) = info.addr() {
ADDRESS_SPACE.remove_addr(&addr); ADDRESS_SPACE.remove_addr(&addr);
} }
@ -225,8 +233,8 @@ impl Drop for Stream {
Status::Listening(addr) => { Status::Listening(addr) => {
let listener = ADDRESS_SPACE.get_listener_ref(&addr).unwrap(); let listener = ADDRESS_SPACE.get_listener_ref(&addr).unwrap();
ADDRESS_SPACE.remove_addr(&addr); ADDRESS_SPACE.remove_addr(&addr);
/// handle the blocking of other sockets holding the reference to the listener, // handle the blocking of other sockets holding the reference to the listener,
/// e.g., pushing to a listener full of incoming sockets // e.g., pushing to a listener full of incoming sockets
listener.shutdown(); listener.shutdown();
} }
_ => {} _ => {}
@ -234,10 +242,11 @@ impl Drop for Stream {
} }
} }
#[derive(Clone)]
pub enum Status { pub enum Status {
Unconnected(Info), Idle(Info),
/// The listeners are stored in a global data structure indexed by the address. // The listeners are stored in a global data structure indexed by the address.
/// The consitency of Status with that data structure should be carefully maintained. // The consitency of Status with that data structure should be carefully maintained.
Listening(Addr), Listening(Addr),
Connected(Endpoint), Connected(Endpoint),
} }
@ -273,53 +282,80 @@ impl Info {
} }
} }
/// The listener status of a stream unix socket.
/// It contains a channel holding incoming connections.
/// The nonblocking status of the reader end keeps the same with the socket.
/// The writer end is always non-blocking. The connect function returns
/// ECONNREFUSED rather than block when the channel is full.
pub struct Listener { pub struct Listener {
channel: Channel<Endpoint>, channel: RwLock<Channel<Endpoint>>,
nonblocking: AtomicBool,
} }
impl Listener { impl Listener {
pub fn new(capacity: usize) -> Result<Self> { pub fn new(capacity: usize, nonblocking: bool) -> Result<Self> {
let channel = Channel::new(capacity)?; let channel = Channel::new(capacity)?;
// It may incur blocking inside a blocking if the channel is blocking. Set the channel to channel.producer().set_nonblocking(true);
// nonblocking permanently to avoid the nested blocking. This also results in nonblocking channel.consumer().set_nonblocking(nonblocking);
// accept and connect. Future work is needed to resolve this blocking issue to support
// blocking accept and connect.
channel.set_nonblocking(true);
/// The listener is blocking by default
let nonblocking = AtomicBool::new(true);
Ok(Self { Ok(Self {
channel, channel: RwLock::new(channel),
nonblocking,
}) })
} }
pub fn push_incoming(&self, stream_socket: Endpoint) { pub fn capacity(&self) -> usize {
self.channel.push(stream_socket); let channel = self.channel.read().unwrap();
channel.capacity()
}
// TODO: when pop_incoming is blocked somewhere, the resize operation will blockingly wait for
// the block to end. This is a rare scenario, so we will fix it in the future.
pub fn resize(&self, capacity: usize) {
if self.capacity() == capacity {
return;
}
let mut channel = self.channel.write().unwrap();
let new_channel = Channel::new(capacity).unwrap();
new_channel.producer().set_nonblocking(true);
new_channel
.consumer()
.set_nonblocking(channel.consumer().is_nonblocking());
let remaining = channel.items_to_consume();
for i in 0..std::cmp::min(remaining, capacity) {
new_channel.push(channel.pop().unwrap().unwrap()).unwrap();
}
*channel = new_channel;
}
pub fn push_incoming(&self, stream_socket: Endpoint) -> Result<()> {
let channel = self.channel.read().unwrap();
channel.push(stream_socket)
} }
pub fn pop_incoming(&self) -> Option<Endpoint> { pub fn pop_incoming(&self) -> Option<Endpoint> {
self.channel.pop().ok().flatten() let channel = self.channel.read().unwrap();
channel.pop().ok().flatten()
} }
pub fn remaining(&self) -> usize { pub fn remaining(&self) -> usize {
self.channel.items_to_consume() let channel = self.channel.read().unwrap();
channel.items_to_consume()
} }
pub fn nonblocking(&self) -> bool { pub fn nonblocking(&self) -> bool {
warn!("the channel works in a nonblocking way regardless of the nonblocking status"); let channel = self.channel.read().unwrap();
channel.consumer().is_nonblocking()
self.nonblocking.load(Ordering::Acquire)
} }
pub fn set_nonblocking(&self, nonblocking: bool) { pub fn set_nonblocking(&self, nonblocking: bool) {
warn!("the channel works in a nonblocking way regardless of the nonblocking status"); let channel = self.channel.read().unwrap();
channel.consumer().set_nonblocking(nonblocking);
self.nonblocking.store(nonblocking, Ordering::Release);
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
self.channel.shutdown(); let channel = self.channel.read().unwrap();
channel.shutdown();
} }
} }