Refactor rwlock implementation

1. Improve readability
2. Ease the restriction on memory ordering for better performance
This commit is contained in:
Hui, Chunyang 2022-04-26 11:48:07 +00:00 committed by volcano
parent fd950132ce
commit cd5d9e6d57
3 changed files with 286 additions and 93 deletions

@ -23,6 +23,7 @@
#![feature(atomic_from_mut)]
#![feature(btree_drain_filter)]
#![feature(bench_black_box)]
#![feature(arbitrary_enum_discriminant)]
#[macro_use]
extern crate alloc;

@ -52,9 +52,15 @@
use super::*;
use crate::process::{futex_wait, futex_wake};
use std::convert::{TryFrom, TryInto};
use std::hint;
use std::sync::atomic::{AtomicI32, Ordering};
/// The number of spinning time before sleeping
/// In musl's implmenetation, this is `100`. Considering more overhead in SGX environment,
/// here we make it bigger.
const SPIN_COUNT: usize = 1000;
// The implementaion of RwLock
//
// rw_lock: the highest bit holds the "last minute" waiter flag.
@ -64,22 +70,34 @@ use std::sync::atomic::{AtomicI32, Ordering};
// 1..0x7FFF_FFFE means the number of readers holding the lock.
//
// rw_waiters: the number of the lock waiters which can be readers or writers
//
#[derive(Debug)]
pub struct RwLockInner {
rw_lock: AtomicI32,
pub(super) struct RwLockInner {
status: AtomicRwLockStatus,
rw_waiters: AtomicI32,
}
#[derive(Debug)]
// This struct is the atomic wrapper for RwLockStatus.
struct AtomicRwLockStatus(AtomicI32);
#[derive(Debug, Copy, Clone)]
#[repr(i32)]
enum RwLockStatus {
Free = 0, // Set by unlocking thread.
ReaderLocked(i32), // Set by locking thread. The number indicates the number of readers who hold the lock
Waiting(i32), // Set by waiting thread. The number is nagative and indicates whether it is reader locked or writer locked.
WriterLocked = i32::MAX, // Set by locking thread. A writer is holding the lock.
}
impl RwLockInner {
pub fn new() -> Self {
pub(super) fn new() -> Self {
Self {
rw_lock: AtomicI32::new(0),
status: AtomicRwLockStatus::init(),
rw_waiters: AtomicI32::new(0),
}
}
pub fn read(&self) -> Result<()> {
pub(super) fn read(&self) -> Result<()> {
let ret = self.try_read();
if let Err(error) = &ret {
// Return error if the reader number reaches the limit of i32
@ -91,19 +109,19 @@ impl RwLockInner {
return ret;
}
// Spin shortly for a probably approaching lock release
// if no one is waiting but the lock is held
let mut spins: i32 = 100;
// Spin shortly for a probably approaching lock release if no one is waiting but the lock is held
let mut spins = SPIN_COUNT;
while spins != 0
&& self.rw_lock.load(Ordering::SeqCst) != 0
&& self.rw_waiters.load(Ordering::SeqCst) == 0
&& self.status.is_locked()
// Can't reorder here. `Relaxed` is enough.
&& self.rw_waiters.load(Ordering::Relaxed) == 0
{
hint::spin_loop();
spins -= 1;
}
loop {
let mut ret = self.try_read();
let ret = self.try_read();
if let Err(error) = &ret {
if error.errno() == Errno::EAGAIN {
return ret;
@ -112,21 +130,38 @@ impl RwLockInner {
return ret;
}
let val: i32 = self.rw_lock.load(Ordering::SeqCst);
if (val & 0x7FFF_FFFF) != 0x7FFF_FFFF {
// Check status again
let current_status = self.status();
match current_status.get_locker() {
// If it is free or locked by readers, try_read should success.
RwLockStatus::Free | RwLockStatus::ReaderLocked(_) => {
continue;
}
_ => {}
}
// Someone is holding the write lock. Need to wait.
debug_assert!(current_status.get_locker() == RwLockStatus::WriterLocked);
// Add rw_waiters before setting rw_lock to not to miss any waiters
// after the waiter flag is set
self.rw_waiters.fetch_add(1, Ordering::SeqCst);
// Add rw_waiters before setting status to not to miss any waiters after the waiting flag is set
// This can be reordered and in try_set_new_status, `AcqRel` will make sure this happens before.
self.rw_waiters.fetch_add(1, Ordering::Relaxed);
let tmp = (val as u32 | 0x8000_0000) as i32;
self.rw_lock
.compare_exchange(val, tmp, Ordering::SeqCst, Ordering::SeqCst);
ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None);
// new_status indicates whether it is wait for reader lock or writer lock
let new_status = current_status.set_waiting();
self.rw_waiters.fetch_sub(1, Ordering::SeqCst);
// Ignore the result here because if setting the new_status fails, the wait will not block.
self.status
.try_set_new_status(current_status, new_status)
.map_err(|e| errno!(e.errno(), "failed to set RwLock status"))
.ok();
let ret = futex_wait(
&self.status as *const _ as *const i32,
new_status.as_i32(),
&None,
);
self.rw_waiters.fetch_sub(1, Ordering::Relaxed);
if let Err(error) = &ret {
match error.errno() {
@ -137,62 +172,71 @@ impl RwLockInner {
}
}
pub fn try_read(&self) -> Result<()> {
pub(super) fn try_read(&self) -> Result<()> {
loop {
let val: i32 = self.rw_lock.load(Ordering::SeqCst);
let cnt: i32 = val & 0x7FFF_FFFF;
if cnt == 0x7FFF_FFFF {
let current_status = self.status();
let locker = current_status.get_locker();
match locker {
RwLockStatus::Free => {}
RwLockStatus::WriterLocked => {
return_errno!(EBUSY, "a writer is holding the lock");
}
if cnt == 0x7FFF_FFFE {
return_errno!(EAGAIN, "the maximum number of read locks has been exceeded");
RwLockStatus::ReaderLocked(cnt) => {
if cnt == RwLockStatus::max_read_lock_holder_num() {
return_errno!(EAGAIN, "the maximum number of read locks has reached");
}
}
_ => unreachable!(),
}
if self
.rw_lock
.compare_exchange(val, val + 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if self.status.try_add_one_reader(current_status).is_ok() {
break;
}
}
Ok(())
}
pub fn write(&self) -> Result<()> {
let ret = self.try_write();
if ret.is_ok() {
pub(super) fn write(&self) -> Result<()> {
if let Ok(_) = self.try_write() {
return Ok(());
}
let mut spins: i32 = 100;
let mut spins = SPIN_COUNT;
while spins != 0
&& self.rw_lock.load(Ordering::SeqCst) != 0
&& self.rw_waiters.load(Ordering::SeqCst) == 0
&& self.status.is_locked()
// Can't reorder here. `Relaxed` is enough.
&& self.rw_waiters.load(Ordering::Relaxed) == 0
{
hint::spin_loop();
spins -= 1;
}
loop {
let mut ret = self.try_write();
if ret.is_ok() {
if let Ok(_) = self.try_write() {
return Ok(());
}
let val = self.rw_lock.load(Ordering::SeqCst);
if val == 0 {
let status = self.status();
if status == RwLockStatus::Free {
continue;
}
self.rw_waiters.fetch_add(1, Ordering::SeqCst);
// Add rw_waiters before setting status to not to miss any waiters after the waiting flag is set.
// This can be reordered and in try_set_new_status, `AcqRel` will make sure this happens before.
self.rw_waiters.fetch_add(1, Ordering::Relaxed);
let tmp = (val as u32 | 0x8000_0000) as i32;
self.rw_lock
.compare_exchange(val, tmp, Ordering::SeqCst, Ordering::SeqCst);
ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None);
let new_status = status.set_waiting();
self.status
.try_set_new_status(status, new_status)
.map_err(|e| errno!(e.errno(), "failed to set RwLock status"))
.ok();
let ret = futex_wait(
&self.status as *const _ as *const i32,
new_status.as_i32(),
&None,
);
self.rw_waiters.fetch_sub(1, Ordering::SeqCst);
self.rw_waiters.fetch_sub(1, Ordering::Relaxed);
if let Err(error) = &ret {
match error.errno() {
@ -203,64 +247,219 @@ impl RwLockInner {
}
}
pub fn try_write(&self) -> Result<()> {
if self
.rw_lock
.compare_exchange(0, 0x7FFF_FFFF, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
pub(super) fn try_write(&self) -> Result<()> {
if self.status.try_set_writer_locked().is_ok() {
Ok(())
} else {
Err(errno!(EBUSY, "the lock is held for reading or writing"))
}
}
pub fn rw_unlock(&self) -> Result<()> {
let mut val: i32 = 0;
let mut cnt: i32 = 0;
let mut waiters: i32 = 0;
let mut new: i32 = 0;
pub(super) fn rw_unlock(&self) -> Result<()> {
let mut status;
let mut waiters;
let mut new_status;
// Set status to Free or subtract one reader lock holder
loop {
// Reverse access order to rw_lock and rw_waiters of that in lock
val = self.rw_lock.load(Ordering::SeqCst);
cnt = val & 0x7FFF_FFFF;
waiters = self.rw_waiters.load(Ordering::SeqCst);
new = match cnt {
1 | 0x7FFF_FFFF => 0,
// val - 1 applies to both positive and negative value as:
status = self.status();
new_status = match status.get_lock_holder_num() {
1 => RwLockStatus::Free,
// status - 1 applies to both positive and negative value as:
// (i32 & 0x7FFF_FFFF) -1 = (i32 - 1) & 0x7FFF_FFFF
_ => val - 1,
_ => (status.as_i32() - 1).try_into().unwrap(),
};
if self
.rw_lock
.compare_exchange(val, new, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if self.status.try_set_new_status(status, new_status).is_ok() {
// This can't be reordered. `Relaxed` is enough.
waiters = self.rw_waiters.load(Ordering::Relaxed);
break;
}
}
// Use both waiters and val in the condition to trigger the wake as much as possible
// and also to guard the situation where the number of waiters overflows to zero
if new == 0 && (waiters != 0 || val < 0) {
// The reasons to use cnt other than waiters here:
// For read_unlock, only one waiter which must be a writer needs to be waken;
// For write_unlock, at most 0x7FFF_FFFF waiters can be waken.
futex_wake(&self.rw_lock as *const _ as *const i32, cnt as usize);
if new_status == RwLockStatus::Free && (waiters != 0 || status.is_waiting()) {
let wake_num = status.get_waking_num();
futex_wake(&self.status as *const _ as *const i32, wake_num as usize);
}
Ok(())
}
pub fn read_unlock(&self) -> Result<()> {
pub(super) fn read_unlock(&self) -> Result<()> {
self.rw_unlock()
}
pub fn write_unlock(&self) -> Result<()> {
pub(super) fn write_unlock(&self) -> Result<()> {
self.rw_unlock()
}
pub fn destroy(&self) -> Result<()> {
fn status(&self) -> RwLockStatus {
// Use `Acquire` here to make sure all memory access before are completed.
self.status.0.load(Ordering::Acquire).try_into().unwrap()
}
}
// For AtomicRwLockStatus, global ordering is not needed. `Acquire` and `Release` are enough for the atomic operations.
impl AtomicRwLockStatus {
fn init() -> Self {
Self(AtomicI32::new(RwLockStatus::init().as_i32()))
}
fn is_free(&self) -> bool {
self.0.load(Ordering::Acquire) == RwLockStatus::Free.as_i32()
}
fn is_locked(&self) -> bool {
self.0.load(Ordering::Acquire) != RwLockStatus::Free.as_i32()
}
fn is_reader_locked(&self) -> bool {
self.0.load(Ordering::Acquire) & 0x7FFF_FFFF != 0x7FFF_FFFF
}
fn try_add_one_reader(&self, current_status: RwLockStatus) -> Result<()> {
let status_raw = current_status.as_i32();
if let Err(_) = self.0.compare_exchange(
status_raw,
status_raw + 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
return_errno!(EAGAIN, "current status changed, try again");
} else {
Ok(())
}
}
fn try_set_writer_locked(&self) -> Result<()> {
if let Err(_) = self.0.compare_exchange(
RwLockStatus::Free.as_i32(),
RwLockStatus::WriterLocked.as_i32(),
Ordering::AcqRel,
Ordering::Relaxed,
) {
return_errno!(EBUSY, "try set writer locked failed");
} else {
Ok(())
}
}
// This function is called both in lock and unlock, which need different type of ordering.
fn try_set_new_status(
&self,
current_status: RwLockStatus,
new_status: RwLockStatus,
) -> Result<()> {
if let Err(_) = self.0.compare_exchange(
current_status.as_i32(),
new_status.as_i32(),
Ordering::AcqRel,
Ordering::Relaxed, // We don't care failure thus make it `Relaxed`.
) {
return_errno!(EAGAIN, "try set waiting failed");
}
Ok(())
}
}
impl RwLockStatus {
fn init() -> Self {
RwLockStatus::Free
}
fn reader_num(&self) -> i32 {
match self {
RwLockStatus::ReaderLocked(num) => {
debug_assert!(*num > 0);
*num
}
_ => 0,
}
}
fn max_read_lock_holder_num() -> i32 {
i32::MAX - 1 // 0x7FFF_FFFE
}
fn get_locker(&self) -> RwLockStatus {
let num = self.as_i32() & 0x7FFF_FFFF;
debug_assert!(num >= 0);
match num {
0 => RwLockStatus::Free,
i32::MAX => RwLockStatus::WriterLocked,
_ => RwLockStatus::ReaderLocked(num),
}
}
fn get_lock_holder_num(&self) -> i32 {
let locker = self.get_locker();
match locker {
RwLockStatus::Free => return 0,
// One reader holder or one writer holder
RwLockStatus::ReaderLocked(1) | RwLockStatus::WriterLocked => return 1,
RwLockStatus::ReaderLocked(num) => return num,
_ => unreachable!(), // can't be Waiting
}
}
fn get_waking_num(&self) -> i32 {
let locker = self.get_locker();
match locker {
// For write_unlock, wake as much as possible (i32::MAX)
RwLockStatus::WriterLocked => RwLockStatus::WriterLocked.as_i32(),
// For reader_unlock (last read lock holder), only one waiter which must be a writer needs to be waken;
RwLockStatus::ReaderLocked(num) => {
debug_assert!(num == 1);
return num;
}
// This function are supposed to be called only in wake(). For other situations, wake should not be called.
_ => unreachable!(),
}
}
fn is_waiting(&self) -> bool {
match self {
RwLockStatus::Waiting(_) => true,
_ => false,
}
}
#[allow(overflowing_literals)]
fn set_waiting(&self) -> RwLockStatus {
RwLockStatus::Waiting(self.as_i32() | 0x8000_0000)
}
fn as_i32(&self) -> i32 {
match self {
RwLockStatus::Free => 0,
RwLockStatus::ReaderLocked(num) => *num,
RwLockStatus::WriterLocked => i32::MAX,
RwLockStatus::Waiting(num) => *num,
}
}
}
impl PartialEq for RwLockStatus {
fn eq(&self, other: &Self) -> bool {
self.as_i32() == other.as_i32()
}
}
impl Eq for RwLockStatus {}
impl TryFrom<i32> for RwLockStatus {
type Error = Error;
fn try_from(v: i32) -> Result<Self> {
match v {
x if x == RwLockStatus::Free.as_i32() => Ok(RwLockStatus::Free),
x if x == RwLockStatus::WriterLocked.as_i32() => Ok(RwLockStatus::WriterLocked),
x if x > RwLockStatus::Free.as_i32() && x < RwLockStatus::WriterLocked.as_i32() => {
Ok(RwLockStatus::ReaderLocked(x))
}
// negative means someone is waiting, and we also need to keep track of the number of lock holders
x if x < RwLockStatus::Free.as_i32() => Ok(RwLockStatus::Waiting(x)),
_ => return_errno!(EINVAL, "Invalid RwLock status"),
}
}
}

@ -17,7 +17,8 @@ pub struct RwLock<T: ?Sized> {
}
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
// RwLock doesn't need T to be Sync here because RwLock doesn't access T directly.
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
// The RAII guard for read that can be held by many readers
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
@ -86,7 +87,6 @@ impl<T: ?Sized> RwLock<T> {
(ptr::read(inner), ptr::read(data))
};
mem::forget(self);
inner.destroy();
drop(inner);
Ok(data.into_inner())
@ -99,13 +99,6 @@ impl<T: ?Sized> RwLock<T> {
}
}
// Use may_dangle to assert not to access T
unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> {
fn drop(&mut self) {
self.inner.destroy().unwrap();
}
}
impl<T: ?Sized + Default> Default for RwLock<T> {
fn default() -> RwLock<T> {
RwLock::new(Default::default())