From 83637d7938d2072c9622d371360ddcf04b903e09 Mon Sep 17 00:00:00 2001 From: He Sun Date: Sun, 28 Jun 2020 16:29:20 +0800 Subject: [PATCH] Add a new kind of readers-writer lock --- src/libos/src/lib.rs | 4 + src/libos/src/process/mod.rs | 1 + src/libos/src/util/mod.rs | 1 + src/libos/src/util/sync/mod.rs | 5 + src/libos/src/util/sync/rw_lock/inner.rs | 257 +++++++++++++++++++++++ src/libos/src/util/sync/rw_lock/mod.rs | 162 ++++++++++++++ 6 files changed, 430 insertions(+) create mode 100644 src/libos/src/util/sync/mod.rs create mode 100644 src/libos/src/util/sync/rw_lock/inner.rs create mode 100644 src/libos/src/util/sync/rw_lock/mod.rs diff --git a/src/libos/src/lib.rs b/src/libos/src/lib.rs index a6268e53..797c8354 100644 --- a/src/libos/src/lib.rs +++ b/src/libos/src/lib.rs @@ -11,6 +11,10 @@ #![feature(alloc_layout_extra)] #![feature(concat_idents)] #![feature(trace_macros)] +// for !Send in rw_lock +#![feature(negative_impls)] +// for may_dangle in rw_lock +#![feature(dropck_eyepatch)] #[macro_use] extern crate alloc; diff --git a/src/libos/src/process/mod.rs b/src/libos/src/process/mod.rs index c9f45ff6..070cbc1a 100644 --- a/src/libos/src/process/mod.rs +++ b/src/libos/src/process/mod.rs @@ -21,6 +21,7 @@ use self::thread::{ThreadBuilder, ThreadId, ThreadInner}; use self::wait::{WaitQueue, Waiter}; pub use self::do_exit::handle_force_exit; +pub use self::do_futex::{futex_wait, futex_wake}; pub use self::do_spawn::do_spawn_without_exec; pub use self::process::{Process, ProcessFilter, ProcessStatus, IDLE}; pub use self::syscalls::*; diff --git a/src/libos/src/util/mod.rs b/src/libos/src/util/mod.rs index dfbf6eab..0dbf1cbc 100644 --- a/src/libos/src/util/mod.rs +++ b/src/libos/src/util/mod.rs @@ -6,3 +6,4 @@ pub mod mem_util; pub mod mpx_util; pub mod ring_buf; pub mod sgx; +pub mod sync; diff --git a/src/libos/src/util/sync/mod.rs b/src/libos/src/util/sync/mod.rs new file mode 100644 index 00000000..18d6949c --- /dev/null +++ b/src/libos/src/util/sync/mod.rs @@ -0,0 +1,5 @@ +use super::*; + +pub use rw_lock::RwLock; + +pub mod rw_lock; diff --git a/src/libos/src/util/sync/rw_lock/inner.rs b/src/libos/src/util/sync/rw_lock/inner.rs new file mode 100644 index 00000000..4006c3c5 --- /dev/null +++ b/src/libos/src/util/sync/rw_lock/inner.rs @@ -0,0 +1,257 @@ +// Copyright (C) 2020 Ant Financial Services Group. All rights reserved. + +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: + +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in +// the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Ant Financial Services Group nor the names +// of its contributors may be used to endorse or promote products +// derived from this software without specific prior written +// permission. + +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// The design of the implementation is from musl libc: +// Copyright 2005-2019 Rich Felker, et al. + +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: + +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use super::*; + +use crate::process::{futex_wait, futex_wake}; +use std::sync::atomic::{spin_loop_hint, AtomicI32, Ordering}; + +// The implementaion of RwLock +// +// rw_lock: the highest bit holds the "last minute" waiter flag. +// The other bits hold the write lock state and the reader count: +// 0 means no one is holding the lock, +// 0x7FFF_FFFF means a writer is holding the lock, +// 1..0x7FFF_FFFE means the number of readers holding the lock. +// +// rw_waiters: the number of the lock waiters which can be readers or writers +// +#[derive(Debug)] +pub struct RwLockInner { + rw_lock: AtomicI32, + rw_waiters: AtomicI32, +} + +impl RwLockInner { + pub fn new() -> Self { + Self { + rw_lock: AtomicI32::new(0), + rw_waiters: AtomicI32::new(0), + } + } + + pub fn read(&self) -> Result<()> { + let ret = self.try_read(); + if let Err(error) = &ret { + // Return error if the reader number reaches the limit of i32 + if error.errno() == Errno::EAGAIN { + return ret; + } + } else { + // Return ok if try_lock succeeds + return ret; + } + + // Spin shortly for a probably approaching lock release + // if no one is waiting but the lock is held + let mut spins: i32 = 100; + while spins != 0 + && self.rw_lock.load(Ordering::SeqCst) != 0 + && self.rw_waiters.load(Ordering::SeqCst) == 0 + { + spin_loop_hint(); + spins -= 1; + } + + loop { + let mut ret = self.try_read(); + if let Err(error) = &ret { + if error.errno() == Errno::EAGAIN { + return ret; + } + } else { + return ret; + } + + let val: i32 = self.rw_lock.load(Ordering::SeqCst); + if (val & 0x7FFF_FFFF) != 0x7FFF_FFFF { + continue; + } + + // Add rw_waiters before setting rw_lock to not to miss any waiters + // after the waiter flag is set + self.rw_waiters.fetch_add(1, Ordering::SeqCst); + + let tmp = (val as u32 | 0x8000_0000) as i32; + self.rw_lock.compare_and_swap(val, tmp, Ordering::SeqCst); + ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None); + + self.rw_waiters.fetch_sub(1, Ordering::SeqCst); + + if let Err(error) = &ret { + match error.errno() { + Errno::ECANCELED => return ret, + _ => (), + } + } + } + } + + pub fn try_read(&self) -> Result<()> { + loop { + let val: i32 = self.rw_lock.load(Ordering::SeqCst); + let cnt: i32 = val & 0x7FFF_FFFF; + if cnt == 0x7FFF_FFFF { + return_errno!(EBUSY, "a writer is holding the lock"); + } + if cnt == 0x7FFF_FFFE { + return_errno!(EAGAIN, "the maximum number of read locks has been exceeded"); + } + + if self + .rw_lock + .compare_and_swap(val, val + 1, Ordering::SeqCst) + == val + { + break; + } + } + Ok(()) + } + + pub fn write(&self) -> Result<()> { + let ret = self.try_write(); + if ret.is_ok() { + return Ok(()); + } + + let mut spins: i32 = 100; + while spins != 0 + && self.rw_lock.load(Ordering::SeqCst) != 0 + && self.rw_waiters.load(Ordering::SeqCst) == 0 + { + spin_loop_hint(); + spins -= 1; + } + + loop { + let mut ret = self.try_write(); + if ret.is_ok() { + return Ok(()); + } + + let val = self.rw_lock.load(Ordering::SeqCst); + if val == 0 { + continue; + } + + self.rw_waiters.fetch_add(1, Ordering::SeqCst); + + let tmp = (val as u32 | 0x8000_0000) as i32; + self.rw_lock.compare_and_swap(val, tmp, Ordering::SeqCst); + ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None); + + self.rw_waiters.fetch_sub(1, Ordering::SeqCst); + + if let Err(error) = &ret { + match error.errno() { + Errno::ECANCELED => return ret, + _ => (), + } + } + } + } + + pub fn try_write(&self) -> Result<()> { + let val = self + .rw_lock + .compare_and_swap(0, 0x7FFF_FFFF, Ordering::SeqCst); + match val { + 0 => Ok(()), + _ => return_errno!(EBUSY, "the lock is held for reading or writing"), + } + } + + pub fn rw_unlock(&self) -> Result<()> { + let mut val: i32 = 0; + let mut cnt: i32 = 0; + let mut waiters: i32 = 0; + let mut new: i32 = 0; + loop { + // Reverse access order to rw_lock and rw_waiters of that in lock + val = self.rw_lock.load(Ordering::SeqCst); + cnt = val & 0x7FFF_FFFF; + waiters = self.rw_waiters.load(Ordering::SeqCst); + new = match cnt { + 1 | 0x7FFF_FFFF => 0, + // val - 1 applies to both positive and negative value as: + // (i32 & 0x7FFF_FFFF) -1 = (i32 - 1) & 0x7FFF_FFFF + _ => val - 1, + }; + + if self.rw_lock.compare_and_swap(val, new, Ordering::SeqCst) == val { + break; + } + } + + // Use both waiters and val in the condition to trigger the wake as much as possible + // and also to guard the situation where the number of waiters overflows to zero + if new == 0 && (waiters != 0 || val < 0) { + // The reasons to use cnt other than waiters here: + // For read_unlock, only one waiter which must be a writer needs to be waken; + // For write_unlock, at most 0x7FFF_FFFF waiters can be waken. + futex_wake(&self.rw_lock as *const _ as *const i32, cnt as usize); + } + Ok(()) + } + + pub fn read_unlock(&self) -> Result<()> { + self.rw_unlock() + } + + pub fn write_unlock(&self) -> Result<()> { + self.rw_unlock() + } + + pub fn destroy(&self) -> Result<()> { + Ok(()) + } +} diff --git a/src/libos/src/util/sync/rw_lock/mod.rs b/src/libos/src/util/sync/rw_lock/mod.rs new file mode 100644 index 00000000..f1452410 --- /dev/null +++ b/src/libos/src/util/sync/rw_lock/mod.rs @@ -0,0 +1,162 @@ +mod inner; + +use super::*; + +use core::cell::UnsafeCell; +use core::ops::{Deref, DerefMut}; +use core::{fmt, mem, ptr}; +use inner::RwLockInner; +use std::boxed::Box; + +// A readers-writer lock with the same methods as std::sync::RwLock except is_poisoned. +// It allows many readers or at most one writer at the same time. +// TODO: Add poison support +pub struct RwLock { + inner: Box, + data: UnsafeCell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +// The RAII guard for read that can be held by many readers +pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock, +} + +// The read guard can be obtained by different threads +// but not sent from one thread to another thread +impl !Send for RwLockReadGuard<'_, T> {} +unsafe impl Sync for RwLockReadGuard<'_, T> {} + +// The RAII gurad for write that can be held by only one writer +pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock, +} + +// The write guard can be obtained by different threads +// but not sent from one thread to another thread +impl !Send for RwLockWriteGuard<'_, T> {} +unsafe impl Sync for RwLockWriteGuard<'_, T> {} + +impl RwLock { + pub fn new(t: T) -> RwLock { + RwLock { + inner: Box::new(RwLockInner::new()), + data: UnsafeCell::new(t), + } + } +} + +impl RwLock { + pub fn read(&self) -> Result> { + self.inner.read()?; + RwLockReadGuard::new(self) + } + + pub fn try_read(&self) -> Result> { + self.inner.try_read()?; + RwLockReadGuard::new(self) + } + + pub fn write(&self) -> Result> { + unsafe { + self.inner.write()?; + RwLockWriteGuard::new(self) + } + } + + pub fn try_write(&self) -> Result> { + unsafe { + self.inner.try_write()?; + RwLockWriteGuard::new(self) + } + } + + pub fn into_inner(self) -> Result + where + T: Sized, + { + unsafe { + let (inner, data) = { + let RwLock { + ref inner, + ref data, + } = self; + (ptr::read(inner), ptr::read(data)) + }; + mem::forget(self); + inner.destroy(); + drop(inner); + + Ok(data.into_inner()) + } + } + + pub fn get_mut(&mut self) -> Result<&mut T> { + let data = unsafe { &mut *self.data.get() }; + Ok(data) + } +} + +// Use may_dangle to assert not to access T +unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock { + fn drop(&mut self) { + self.inner.destroy().unwrap(); + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RwLock") + .field("inner", unsafe { &self.inner }) + .field("data", unsafe { &(*self.data.get()) }) + .finish() + } +} + +impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { + pub fn new(lock: &'a RwLock) -> Result> { + Ok(RwLockReadGuard { lock }) + } +} + +impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { + pub fn new(lock: &'a RwLock) -> Result> { + Ok(RwLockWriteGuard { lock }) + } +} + +impl Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +impl Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + self.lock.inner.read_unlock().unwrap(); + } +} + +impl Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + self.lock.inner.write_unlock().unwrap(); + } +}