Add a new kind of readers-writer lock

This commit is contained in:
He Sun 2020-06-28 16:29:20 +08:00
parent 0db804d131
commit 83637d7938
6 changed files with 430 additions and 0 deletions

@ -11,6 +11,10 @@
#![feature(alloc_layout_extra)]
#![feature(concat_idents)]
#![feature(trace_macros)]
// for !Send in rw_lock
#![feature(negative_impls)]
// for may_dangle in rw_lock
#![feature(dropck_eyepatch)]
#[macro_use]
extern crate alloc;

@ -21,6 +21,7 @@ use self::thread::{ThreadBuilder, ThreadId, ThreadInner};
use self::wait::{WaitQueue, Waiter};
pub use self::do_exit::handle_force_exit;
pub use self::do_futex::{futex_wait, futex_wake};
pub use self::do_spawn::do_spawn_without_exec;
pub use self::process::{Process, ProcessFilter, ProcessStatus, IDLE};
pub use self::syscalls::*;

@ -6,3 +6,4 @@ pub mod mem_util;
pub mod mpx_util;
pub mod ring_buf;
pub mod sgx;
pub mod sync;

@ -0,0 +1,5 @@
use super::*;
pub use rw_lock::RwLock;
pub mod rw_lock;

@ -0,0 +1,257 @@
// Copyright (C) 2020 Ant Financial Services Group. All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in
// the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Ant Financial Services Group nor the names
// of its contributors may be used to endorse or promote products
// derived from this software without specific prior written
// permission.
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// The design of the implementation is from musl libc:
// Copyright 2005-2019 Rich Felker, et al.
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
use super::*;
use crate::process::{futex_wait, futex_wake};
use std::sync::atomic::{spin_loop_hint, AtomicI32, Ordering};
// The implementaion of RwLock
//
// rw_lock: the highest bit holds the "last minute" waiter flag.
// The other bits hold the write lock state and the reader count:
// 0 means no one is holding the lock,
// 0x7FFF_FFFF means a writer is holding the lock,
// 1..0x7FFF_FFFE means the number of readers holding the lock.
//
// rw_waiters: the number of the lock waiters which can be readers or writers
//
#[derive(Debug)]
pub struct RwLockInner {
rw_lock: AtomicI32,
rw_waiters: AtomicI32,
}
impl RwLockInner {
pub fn new() -> Self {
Self {
rw_lock: AtomicI32::new(0),
rw_waiters: AtomicI32::new(0),
}
}
pub fn read(&self) -> Result<()> {
let ret = self.try_read();
if let Err(error) = &ret {
// Return error if the reader number reaches the limit of i32
if error.errno() == Errno::EAGAIN {
return ret;
}
} else {
// Return ok if try_lock succeeds
return ret;
}
// Spin shortly for a probably approaching lock release
// if no one is waiting but the lock is held
let mut spins: i32 = 100;
while spins != 0
&& self.rw_lock.load(Ordering::SeqCst) != 0
&& self.rw_waiters.load(Ordering::SeqCst) == 0
{
spin_loop_hint();
spins -= 1;
}
loop {
let mut ret = self.try_read();
if let Err(error) = &ret {
if error.errno() == Errno::EAGAIN {
return ret;
}
} else {
return ret;
}
let val: i32 = self.rw_lock.load(Ordering::SeqCst);
if (val & 0x7FFF_FFFF) != 0x7FFF_FFFF {
continue;
}
// Add rw_waiters before setting rw_lock to not to miss any waiters
// after the waiter flag is set
self.rw_waiters.fetch_add(1, Ordering::SeqCst);
let tmp = (val as u32 | 0x8000_0000) as i32;
self.rw_lock.compare_and_swap(val, tmp, Ordering::SeqCst);
ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None);
self.rw_waiters.fetch_sub(1, Ordering::SeqCst);
if let Err(error) = &ret {
match error.errno() {
Errno::ECANCELED => return ret,
_ => (),
}
}
}
}
pub fn try_read(&self) -> Result<()> {
loop {
let val: i32 = self.rw_lock.load(Ordering::SeqCst);
let cnt: i32 = val & 0x7FFF_FFFF;
if cnt == 0x7FFF_FFFF {
return_errno!(EBUSY, "a writer is holding the lock");
}
if cnt == 0x7FFF_FFFE {
return_errno!(EAGAIN, "the maximum number of read locks has been exceeded");
}
if self
.rw_lock
.compare_and_swap(val, val + 1, Ordering::SeqCst)
== val
{
break;
}
}
Ok(())
}
pub fn write(&self) -> Result<()> {
let ret = self.try_write();
if ret.is_ok() {
return Ok(());
}
let mut spins: i32 = 100;
while spins != 0
&& self.rw_lock.load(Ordering::SeqCst) != 0
&& self.rw_waiters.load(Ordering::SeqCst) == 0
{
spin_loop_hint();
spins -= 1;
}
loop {
let mut ret = self.try_write();
if ret.is_ok() {
return Ok(());
}
let val = self.rw_lock.load(Ordering::SeqCst);
if val == 0 {
continue;
}
self.rw_waiters.fetch_add(1, Ordering::SeqCst);
let tmp = (val as u32 | 0x8000_0000) as i32;
self.rw_lock.compare_and_swap(val, tmp, Ordering::SeqCst);
ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None);
self.rw_waiters.fetch_sub(1, Ordering::SeqCst);
if let Err(error) = &ret {
match error.errno() {
Errno::ECANCELED => return ret,
_ => (),
}
}
}
}
pub fn try_write(&self) -> Result<()> {
let val = self
.rw_lock
.compare_and_swap(0, 0x7FFF_FFFF, Ordering::SeqCst);
match val {
0 => Ok(()),
_ => return_errno!(EBUSY, "the lock is held for reading or writing"),
}
}
pub fn rw_unlock(&self) -> Result<()> {
let mut val: i32 = 0;
let mut cnt: i32 = 0;
let mut waiters: i32 = 0;
let mut new: i32 = 0;
loop {
// Reverse access order to rw_lock and rw_waiters of that in lock
val = self.rw_lock.load(Ordering::SeqCst);
cnt = val & 0x7FFF_FFFF;
waiters = self.rw_waiters.load(Ordering::SeqCst);
new = match cnt {
1 | 0x7FFF_FFFF => 0,
// val - 1 applies to both positive and negative value as:
// (i32 & 0x7FFF_FFFF) -1 = (i32 - 1) & 0x7FFF_FFFF
_ => val - 1,
};
if self.rw_lock.compare_and_swap(val, new, Ordering::SeqCst) == val {
break;
}
}
// Use both waiters and val in the condition to trigger the wake as much as possible
// and also to guard the situation where the number of waiters overflows to zero
if new == 0 && (waiters != 0 || val < 0) {
// The reasons to use cnt other than waiters here:
// For read_unlock, only one waiter which must be a writer needs to be waken;
// For write_unlock, at most 0x7FFF_FFFF waiters can be waken.
futex_wake(&self.rw_lock as *const _ as *const i32, cnt as usize);
}
Ok(())
}
pub fn read_unlock(&self) -> Result<()> {
self.rw_unlock()
}
pub fn write_unlock(&self) -> Result<()> {
self.rw_unlock()
}
pub fn destroy(&self) -> Result<()> {
Ok(())
}
}

@ -0,0 +1,162 @@
mod inner;
use super::*;
use core::cell::UnsafeCell;
use core::ops::{Deref, DerefMut};
use core::{fmt, mem, ptr};
use inner::RwLockInner;
use std::boxed::Box;
// A readers-writer lock with the same methods as std::sync::RwLock except is_poisoned.
// It allows many readers or at most one writer at the same time.
// TODO: Add poison support
pub struct RwLock<T: ?Sized> {
inner: Box<RwLockInner>,
data: UnsafeCell<T>,
}
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
// The RAII guard for read that can be held by many readers
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
lock: &'a RwLock<T>,
}
// The read guard can be obtained by different threads
// but not sent from one thread to another thread
impl<T: ?Sized> !Send for RwLockReadGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
// The RAII gurad for write that can be held by only one writer
pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
lock: &'a RwLock<T>,
}
// The write guard can be obtained by different threads
// but not sent from one thread to another thread
impl<T: ?Sized> !Send for RwLockWriteGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
impl<T> RwLock<T> {
pub fn new(t: T) -> RwLock<T> {
RwLock {
inner: Box::new(RwLockInner::new()),
data: UnsafeCell::new(t),
}
}
}
impl<T: ?Sized> RwLock<T> {
pub fn read(&self) -> Result<RwLockReadGuard<'_, T>> {
self.inner.read()?;
RwLockReadGuard::new(self)
}
pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>> {
self.inner.try_read()?;
RwLockReadGuard::new(self)
}
pub fn write(&self) -> Result<RwLockWriteGuard<'_, T>> {
unsafe {
self.inner.write()?;
RwLockWriteGuard::new(self)
}
}
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>> {
unsafe {
self.inner.try_write()?;
RwLockWriteGuard::new(self)
}
}
pub fn into_inner(self) -> Result<T>
where
T: Sized,
{
unsafe {
let (inner, data) = {
let RwLock {
ref inner,
ref data,
} = self;
(ptr::read(inner), ptr::read(data))
};
mem::forget(self);
inner.destroy();
drop(inner);
Ok(data.into_inner())
}
}
pub fn get_mut(&mut self) -> Result<&mut T> {
let data = unsafe { &mut *self.data.get() };
Ok(data)
}
}
// Use may_dangle to assert not to access T
unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> {
fn drop(&mut self) {
self.inner.destroy().unwrap();
}
}
impl<T: core::fmt::Debug> fmt::Debug for RwLock<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RwLock")
.field("inner", unsafe { &self.inner })
.field("data", unsafe { &(*self.data.get()) })
.finish()
}
}
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
pub fn new(lock: &'a RwLock<T>) -> Result<RwLockReadGuard<'a, T>> {
Ok(RwLockReadGuard { lock })
}
}
impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
pub fn new(lock: &'a RwLock<T>) -> Result<RwLockWriteGuard<'a, T>> {
Ok(RwLockWriteGuard { lock })
}
}
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.data.get() }
}
}
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
self.lock.inner.read_unlock().unwrap();
}
}
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
self.lock.inner.write_unlock().unwrap();
}
}