Fix the dequeuing order of items in a FutexBucket
This commit is contained in:
parent
6d39587c40
commit
b04aa2d7ea
@ -75,7 +75,7 @@ pub fn futex_wait(
|
|||||||
futex_val: i32,
|
futex_val: i32,
|
||||||
timeout: &Option<timespec_t>,
|
timeout: &Option<timespec_t>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!(
|
debug!(
|
||||||
"futex_wait addr: {:#x}, val: {}, timeout: {:?}",
|
"futex_wait addr: {:#x}, val: {}, timeout: {:?}",
|
||||||
futex_addr as usize, futex_val, timeout
|
futex_addr as usize, futex_val, timeout
|
||||||
);
|
);
|
||||||
@ -130,6 +130,11 @@ pub fn futex_wait(
|
|||||||
|
|
||||||
/// Do futex wake
|
/// Do futex wake
|
||||||
pub fn futex_wake(futex_addr: *const i32, max_count: usize) -> Result<usize> {
|
pub fn futex_wake(futex_addr: *const i32, max_count: usize) -> Result<usize> {
|
||||||
|
debug!(
|
||||||
|
"futex_wake addr: {:#x}, max_count: {}",
|
||||||
|
futex_addr as usize, max_count
|
||||||
|
);
|
||||||
|
|
||||||
// Get and lock the futex bucket
|
// Get and lock the futex bucket
|
||||||
let futex_key = FutexKey::new(futex_addr);
|
let futex_key = FutexKey::new(futex_addr);
|
||||||
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
|
let (_, futex_bucket_ref) = FUTEX_BUCKETS.get_bucket(futex_key);
|
||||||
@ -266,29 +271,29 @@ impl FutexBucket {
|
|||||||
self.queue.push_back(item);
|
self.queue.push_back(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: this is an O(N) operation. Try to make it more efficient
|
||||||
pub fn dequeue_item(&mut self, futex_item: &FutexItem) -> Option<FutexItem> {
|
pub fn dequeue_item(&mut self, futex_item: &FutexItem) -> Option<FutexItem> {
|
||||||
let item_i = self.queue.iter().position(|item| *item == *futex_item);
|
let item_i = self.queue.iter().position(|item| *item == *futex_item);
|
||||||
if item_i.is_none() {
|
if item_i.is_none() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
self.queue.swap_remove_back(item_i.unwrap())
|
self.queue.remove(item_i.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: consider using std::future to improve the readability
|
// TODO: consider using std::future to improve the readability
|
||||||
pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize {
|
pub fn dequeue_and_wake_items(&mut self, key: FutexKey, max_count: usize) -> usize {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let mut idx = 0;
|
|
||||||
let mut items_to_wake = Vec::new();
|
let mut items_to_wake = Vec::new();
|
||||||
while count < max_count && idx < self.queue.len() {
|
|
||||||
if key == self.queue[idx].key {
|
self.queue.retain(|item| {
|
||||||
if let Some(item) = self.queue.swap_remove_back(idx) {
|
if count >= max_count || key != item.key {
|
||||||
items_to_wake.push(item);
|
true
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
idx += 1;
|
items_to_wake.push(item.clone());
|
||||||
}
|
count += 1;
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
FutexItem::batch_wake(&items_to_wake);
|
FutexItem::batch_wake(&items_to_wake);
|
||||||
count
|
count
|
||||||
@ -315,18 +320,18 @@ impl FutexBucket {
|
|||||||
max_nrequeues: usize,
|
max_nrequeues: usize,
|
||||||
) -> () {
|
) -> () {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let mut idx = 0;
|
|
||||||
while count < max_nrequeues && idx < self.queue.len() {
|
self.queue.retain(|item| {
|
||||||
if key == self.queue[idx].key {
|
if count >= max_nrequeues || key != item.key {
|
||||||
if let Some(mut item) = self.queue.swap_remove_back(idx) {
|
true
|
||||||
item.key = new_key;
|
|
||||||
another.enqueue_item(item);
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
idx += 1;
|
let mut new_item = item.clone();
|
||||||
}
|
new_item.key = new_key;
|
||||||
|
another.enqueue_item(new_item);
|
||||||
|
count += 1;
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user