From cd5d9e6d57738a8b5ea5d557ad05670e79c462bc Mon Sep 17 00:00:00 2001 From: "Hui, Chunyang" Date: Tue, 26 Apr 2022 11:48:07 +0000 Subject: [PATCH] Refactor rwlock implementation 1. Improve readability 2. Ease the restriction on memory ordering for better performance --- src/libos/src/lib.rs | 1 + src/libos/src/util/sync/rw_lock/inner.rs | 367 +++++++++++++++++------ src/libos/src/util/sync/rw_lock/mod.rs | 11 +- 3 files changed, 286 insertions(+), 93 deletions(-) diff --git a/src/libos/src/lib.rs b/src/libos/src/lib.rs index 9172efc9..de4b0c16 100644 --- a/src/libos/src/lib.rs +++ b/src/libos/src/lib.rs @@ -23,6 +23,7 @@ #![feature(atomic_from_mut)] #![feature(btree_drain_filter)] #![feature(bench_black_box)] +#![feature(arbitrary_enum_discriminant)] #[macro_use] extern crate alloc; diff --git a/src/libos/src/util/sync/rw_lock/inner.rs b/src/libos/src/util/sync/rw_lock/inner.rs index f4b88c44..7f805b34 100644 --- a/src/libos/src/util/sync/rw_lock/inner.rs +++ b/src/libos/src/util/sync/rw_lock/inner.rs @@ -52,9 +52,15 @@ use super::*; use crate::process::{futex_wait, futex_wake}; +use std::convert::{TryFrom, TryInto}; use std::hint; use std::sync::atomic::{AtomicI32, Ordering}; +/// The number of spinning time before sleeping +/// In musl's implmenetation, this is `100`. Considering more overhead in SGX environment, +/// here we make it bigger. +const SPIN_COUNT: usize = 1000; + // The implementaion of RwLock // // rw_lock: the highest bit holds the "last minute" waiter flag. @@ -64,22 +70,34 @@ use std::sync::atomic::{AtomicI32, Ordering}; // 1..0x7FFF_FFFE means the number of readers holding the lock. // // rw_waiters: the number of the lock waiters which can be readers or writers -// #[derive(Debug)] -pub struct RwLockInner { - rw_lock: AtomicI32, +pub(super) struct RwLockInner { + status: AtomicRwLockStatus, rw_waiters: AtomicI32, } +#[derive(Debug)] +// This struct is the atomic wrapper for RwLockStatus. +struct AtomicRwLockStatus(AtomicI32); + +#[derive(Debug, Copy, Clone)] +#[repr(i32)] +enum RwLockStatus { + Free = 0, // Set by unlocking thread. + ReaderLocked(i32), // Set by locking thread. The number indicates the number of readers who hold the lock + Waiting(i32), // Set by waiting thread. The number is nagative and indicates whether it is reader locked or writer locked. + WriterLocked = i32::MAX, // Set by locking thread. A writer is holding the lock. +} + impl RwLockInner { - pub fn new() -> Self { + pub(super) fn new() -> Self { Self { - rw_lock: AtomicI32::new(0), + status: AtomicRwLockStatus::init(), rw_waiters: AtomicI32::new(0), } } - pub fn read(&self) -> Result<()> { + pub(super) fn read(&self) -> Result<()> { let ret = self.try_read(); if let Err(error) = &ret { // Return error if the reader number reaches the limit of i32 @@ -91,19 +109,19 @@ impl RwLockInner { return ret; } - // Spin shortly for a probably approaching lock release - // if no one is waiting but the lock is held - let mut spins: i32 = 100; + // Spin shortly for a probably approaching lock release if no one is waiting but the lock is held + let mut spins = SPIN_COUNT; while spins != 0 - && self.rw_lock.load(Ordering::SeqCst) != 0 - && self.rw_waiters.load(Ordering::SeqCst) == 0 + && self.status.is_locked() + // Can't reorder here. `Relaxed` is enough. + && self.rw_waiters.load(Ordering::Relaxed) == 0 { hint::spin_loop(); spins -= 1; } loop { - let mut ret = self.try_read(); + let ret = self.try_read(); if let Err(error) = &ret { if error.errno() == Errno::EAGAIN { return ret; @@ -112,21 +130,38 @@ impl RwLockInner { return ret; } - let val: i32 = self.rw_lock.load(Ordering::SeqCst); - if (val & 0x7FFF_FFFF) != 0x7FFF_FFFF { - continue; + // Check status again + let current_status = self.status(); + match current_status.get_locker() { + // If it is free or locked by readers, try_read should success. + RwLockStatus::Free | RwLockStatus::ReaderLocked(_) => { + continue; + } + _ => {} } + // Someone is holding the write lock. Need to wait. + debug_assert!(current_status.get_locker() == RwLockStatus::WriterLocked); - // Add rw_waiters before setting rw_lock to not to miss any waiters - // after the waiter flag is set - self.rw_waiters.fetch_add(1, Ordering::SeqCst); + // Add rw_waiters before setting status to not to miss any waiters after the waiting flag is set + // This can be reordered and in try_set_new_status, `AcqRel` will make sure this happens before. + self.rw_waiters.fetch_add(1, Ordering::Relaxed); - let tmp = (val as u32 | 0x8000_0000) as i32; - self.rw_lock - .compare_exchange(val, tmp, Ordering::SeqCst, Ordering::SeqCst); - ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None); + // new_status indicates whether it is wait for reader lock or writer lock + let new_status = current_status.set_waiting(); - self.rw_waiters.fetch_sub(1, Ordering::SeqCst); + // Ignore the result here because if setting the new_status fails, the wait will not block. + self.status + .try_set_new_status(current_status, new_status) + .map_err(|e| errno!(e.errno(), "failed to set RwLock status")) + .ok(); + + let ret = futex_wait( + &self.status as *const _ as *const i32, + new_status.as_i32(), + &None, + ); + + self.rw_waiters.fetch_sub(1, Ordering::Relaxed); if let Err(error) = &ret { match error.errno() { @@ -137,62 +172,71 @@ impl RwLockInner { } } - pub fn try_read(&self) -> Result<()> { + pub(super) fn try_read(&self) -> Result<()> { loop { - let val: i32 = self.rw_lock.load(Ordering::SeqCst); - let cnt: i32 = val & 0x7FFF_FFFF; - if cnt == 0x7FFF_FFFF { - return_errno!(EBUSY, "a writer is holding the lock"); - } - if cnt == 0x7FFF_FFFE { - return_errno!(EAGAIN, "the maximum number of read locks has been exceeded"); + let current_status = self.status(); + let locker = current_status.get_locker(); + match locker { + RwLockStatus::Free => {} + RwLockStatus::WriterLocked => { + return_errno!(EBUSY, "a writer is holding the lock"); + } + RwLockStatus::ReaderLocked(cnt) => { + if cnt == RwLockStatus::max_read_lock_holder_num() { + return_errno!(EAGAIN, "the maximum number of read locks has reached"); + } + } + _ => unreachable!(), } - if self - .rw_lock - .compare_exchange(val, val + 1, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { + if self.status.try_add_one_reader(current_status).is_ok() { break; } } Ok(()) } - pub fn write(&self) -> Result<()> { - let ret = self.try_write(); - if ret.is_ok() { + pub(super) fn write(&self) -> Result<()> { + if let Ok(_) = self.try_write() { return Ok(()); } - let mut spins: i32 = 100; + let mut spins = SPIN_COUNT; while spins != 0 - && self.rw_lock.load(Ordering::SeqCst) != 0 - && self.rw_waiters.load(Ordering::SeqCst) == 0 + && self.status.is_locked() + // Can't reorder here. `Relaxed` is enough. + && self.rw_waiters.load(Ordering::Relaxed) == 0 { hint::spin_loop(); spins -= 1; } loop { - let mut ret = self.try_write(); - if ret.is_ok() { + if let Ok(_) = self.try_write() { return Ok(()); } - let val = self.rw_lock.load(Ordering::SeqCst); - if val == 0 { + let status = self.status(); + if status == RwLockStatus::Free { continue; } - self.rw_waiters.fetch_add(1, Ordering::SeqCst); + // Add rw_waiters before setting status to not to miss any waiters after the waiting flag is set. + // This can be reordered and in try_set_new_status, `AcqRel` will make sure this happens before. + self.rw_waiters.fetch_add(1, Ordering::Relaxed); - let tmp = (val as u32 | 0x8000_0000) as i32; - self.rw_lock - .compare_exchange(val, tmp, Ordering::SeqCst, Ordering::SeqCst); - ret = futex_wait(&self.rw_lock as *const _ as *const i32, tmp, &None); + let new_status = status.set_waiting(); + self.status + .try_set_new_status(status, new_status) + .map_err(|e| errno!(e.errno(), "failed to set RwLock status")) + .ok(); + let ret = futex_wait( + &self.status as *const _ as *const i32, + new_status.as_i32(), + &None, + ); - self.rw_waiters.fetch_sub(1, Ordering::SeqCst); + self.rw_waiters.fetch_sub(1, Ordering::Relaxed); if let Err(error) = &ret { match error.errno() { @@ -203,64 +247,219 @@ impl RwLockInner { } } - pub fn try_write(&self) -> Result<()> { - if self - .rw_lock - .compare_exchange(0, 0x7FFF_FFFF, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { + pub(super) fn try_write(&self) -> Result<()> { + if self.status.try_set_writer_locked().is_ok() { Ok(()) } else { Err(errno!(EBUSY, "the lock is held for reading or writing")) } } - pub fn rw_unlock(&self) -> Result<()> { - let mut val: i32 = 0; - let mut cnt: i32 = 0; - let mut waiters: i32 = 0; - let mut new: i32 = 0; + pub(super) fn rw_unlock(&self) -> Result<()> { + let mut status; + let mut waiters; + let mut new_status; + // Set status to Free or subtract one reader lock holder loop { - // Reverse access order to rw_lock and rw_waiters of that in lock - val = self.rw_lock.load(Ordering::SeqCst); - cnt = val & 0x7FFF_FFFF; - waiters = self.rw_waiters.load(Ordering::SeqCst); - new = match cnt { - 1 | 0x7FFF_FFFF => 0, - // val - 1 applies to both positive and negative value as: + status = self.status(); + new_status = match status.get_lock_holder_num() { + 1 => RwLockStatus::Free, + // status - 1 applies to both positive and negative value as: // (i32 & 0x7FFF_FFFF) -1 = (i32 - 1) & 0x7FFF_FFFF - _ => val - 1, + _ => (status.as_i32() - 1).try_into().unwrap(), }; - if self - .rw_lock - .compare_exchange(val, new, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { + if self.status.try_set_new_status(status, new_status).is_ok() { + // This can't be reordered. `Relaxed` is enough. + waiters = self.rw_waiters.load(Ordering::Relaxed); break; } } // Use both waiters and val in the condition to trigger the wake as much as possible // and also to guard the situation where the number of waiters overflows to zero - if new == 0 && (waiters != 0 || val < 0) { - // The reasons to use cnt other than waiters here: - // For read_unlock, only one waiter which must be a writer needs to be waken; - // For write_unlock, at most 0x7FFF_FFFF waiters can be waken. - futex_wake(&self.rw_lock as *const _ as *const i32, cnt as usize); + if new_status == RwLockStatus::Free && (waiters != 0 || status.is_waiting()) { + let wake_num = status.get_waking_num(); + futex_wake(&self.status as *const _ as *const i32, wake_num as usize); } Ok(()) } - pub fn read_unlock(&self) -> Result<()> { + pub(super) fn read_unlock(&self) -> Result<()> { self.rw_unlock() } - pub fn write_unlock(&self) -> Result<()> { + pub(super) fn write_unlock(&self) -> Result<()> { self.rw_unlock() } - pub fn destroy(&self) -> Result<()> { + fn status(&self) -> RwLockStatus { + // Use `Acquire` here to make sure all memory access before are completed. + self.status.0.load(Ordering::Acquire).try_into().unwrap() + } +} + +// For AtomicRwLockStatus, global ordering is not needed. `Acquire` and `Release` are enough for the atomic operations. +impl AtomicRwLockStatus { + fn init() -> Self { + Self(AtomicI32::new(RwLockStatus::init().as_i32())) + } + + fn is_free(&self) -> bool { + self.0.load(Ordering::Acquire) == RwLockStatus::Free.as_i32() + } + + fn is_locked(&self) -> bool { + self.0.load(Ordering::Acquire) != RwLockStatus::Free.as_i32() + } + + fn is_reader_locked(&self) -> bool { + self.0.load(Ordering::Acquire) & 0x7FFF_FFFF != 0x7FFF_FFFF + } + + fn try_add_one_reader(&self, current_status: RwLockStatus) -> Result<()> { + let status_raw = current_status.as_i32(); + if let Err(_) = self.0.compare_exchange( + status_raw, + status_raw + 1, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + return_errno!(EAGAIN, "current status changed, try again"); + } else { + Ok(()) + } + } + + fn try_set_writer_locked(&self) -> Result<()> { + if let Err(_) = self.0.compare_exchange( + RwLockStatus::Free.as_i32(), + RwLockStatus::WriterLocked.as_i32(), + Ordering::AcqRel, + Ordering::Relaxed, + ) { + return_errno!(EBUSY, "try set writer locked failed"); + } else { + Ok(()) + } + } + + // This function is called both in lock and unlock, which need different type of ordering. + fn try_set_new_status( + &self, + current_status: RwLockStatus, + new_status: RwLockStatus, + ) -> Result<()> { + if let Err(_) = self.0.compare_exchange( + current_status.as_i32(), + new_status.as_i32(), + Ordering::AcqRel, + Ordering::Relaxed, // We don't care failure thus make it `Relaxed`. + ) { + return_errno!(EAGAIN, "try set waiting failed"); + } Ok(()) } } + +impl RwLockStatus { + fn init() -> Self { + RwLockStatus::Free + } + + fn reader_num(&self) -> i32 { + match self { + RwLockStatus::ReaderLocked(num) => { + debug_assert!(*num > 0); + *num + } + _ => 0, + } + } + + fn max_read_lock_holder_num() -> i32 { + i32::MAX - 1 // 0x7FFF_FFFE + } + + fn get_locker(&self) -> RwLockStatus { + let num = self.as_i32() & 0x7FFF_FFFF; + debug_assert!(num >= 0); + match num { + 0 => RwLockStatus::Free, + i32::MAX => RwLockStatus::WriterLocked, + _ => RwLockStatus::ReaderLocked(num), + } + } + + fn get_lock_holder_num(&self) -> i32 { + let locker = self.get_locker(); + match locker { + RwLockStatus::Free => return 0, + // One reader holder or one writer holder + RwLockStatus::ReaderLocked(1) | RwLockStatus::WriterLocked => return 1, + RwLockStatus::ReaderLocked(num) => return num, + _ => unreachable!(), // can't be Waiting + } + } + + fn get_waking_num(&self) -> i32 { + let locker = self.get_locker(); + match locker { + // For write_unlock, wake as much as possible (i32::MAX) + RwLockStatus::WriterLocked => RwLockStatus::WriterLocked.as_i32(), + // For reader_unlock (last read lock holder), only one waiter which must be a writer needs to be waken; + RwLockStatus::ReaderLocked(num) => { + debug_assert!(num == 1); + return num; + } + // This function are supposed to be called only in wake(). For other situations, wake should not be called. + _ => unreachable!(), + } + } + + fn is_waiting(&self) -> bool { + match self { + RwLockStatus::Waiting(_) => true, + _ => false, + } + } + + #[allow(overflowing_literals)] + fn set_waiting(&self) -> RwLockStatus { + RwLockStatus::Waiting(self.as_i32() | 0x8000_0000) + } + + fn as_i32(&self) -> i32 { + match self { + RwLockStatus::Free => 0, + RwLockStatus::ReaderLocked(num) => *num, + RwLockStatus::WriterLocked => i32::MAX, + RwLockStatus::Waiting(num) => *num, + } + } +} + +impl PartialEq for RwLockStatus { + fn eq(&self, other: &Self) -> bool { + self.as_i32() == other.as_i32() + } +} + +impl Eq for RwLockStatus {} + +impl TryFrom for RwLockStatus { + type Error = Error; + + fn try_from(v: i32) -> Result { + match v { + x if x == RwLockStatus::Free.as_i32() => Ok(RwLockStatus::Free), + x if x == RwLockStatus::WriterLocked.as_i32() => Ok(RwLockStatus::WriterLocked), + x if x > RwLockStatus::Free.as_i32() && x < RwLockStatus::WriterLocked.as_i32() => { + Ok(RwLockStatus::ReaderLocked(x)) + } + // negative means someone is waiting, and we also need to keep track of the number of lock holders + x if x < RwLockStatus::Free.as_i32() => Ok(RwLockStatus::Waiting(x)), + _ => return_errno!(EINVAL, "Invalid RwLock status"), + } + } +} diff --git a/src/libos/src/util/sync/rw_lock/mod.rs b/src/libos/src/util/sync/rw_lock/mod.rs index 0d7382e1..fd20a547 100644 --- a/src/libos/src/util/sync/rw_lock/mod.rs +++ b/src/libos/src/util/sync/rw_lock/mod.rs @@ -17,7 +17,8 @@ pub struct RwLock { } unsafe impl Send for RwLock {} -unsafe impl Sync for RwLock {} +// RwLock doesn't need T to be Sync here because RwLock doesn't access T directly. +unsafe impl Sync for RwLock {} // The RAII guard for read that can be held by many readers pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { @@ -86,7 +87,6 @@ impl RwLock { (ptr::read(inner), ptr::read(data)) }; mem::forget(self); - inner.destroy(); drop(inner); Ok(data.into_inner()) @@ -99,13 +99,6 @@ impl RwLock { } } -// Use may_dangle to assert not to access T -unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock { - fn drop(&mut self) { - self.inner.destroy().unwrap(); - } -} - impl Default for RwLock { fn default() -> RwLock { RwLock::new(Default::default())