evobench_tools/utillib/rayon_util/semaphore.rs
1use std::{
2 num::NonZeroU32,
3 sync::atomic::{AtomicU32, Ordering},
4 thread,
5};
6
7use crate::debug;
8
9pub struct Semaphore(AtomicU32);
10
11pub struct SemaphoreHolder<'t>(&'t Semaphore);
12
13impl<'t> Drop for SemaphoreHolder<'t> {
14 fn drop(&mut self) {
15 self.0.0.fetch_add(1, Ordering::SeqCst);
16 }
17}
18
19impl Semaphore {
20 pub fn new(parallelism: NonZeroU32) -> Self {
21 Self(parallelism.get().into())
22 }
23
24 /// Meant to be run from a thread in a rayon thread pool, so that
25 /// it can efficiently yield to other rayon work. If you give
26 /// `panic_if_not_rayon == true` then it will panic if this is not
27 /// the case.
28 pub fn acquire_rayon(&self, panic_if_not_rayon: bool) -> SemaphoreHolder<'_> {
29 let mut os_yields: usize = 0;
30 loop {
31 // Tempting: instead use signed atomic, decrement with
32 // atomic dec, then check if the old value is >= 0, and if
33 // not increment again. But that leads to potentially
34 // multiple parties trying to acquire to all need to
35 // increment again (plus a drop from a holder) until one
36 // can succeed again, which seems problematic; and it will
37 // always have two writes at the limit, which might even
38 // make it slower without the scheduling problem. Thus,
39 // leave it at fetch_update.
40 if self
41 .0
42 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| old.checked_sub(1))
43 .is_ok()
44 {
45 if os_yields > 0 {
46 debug!("acquire_rayon: yielded to the OS {os_yields} times");
47 }
48 return SemaphoreHolder(self);
49 }
50 match rayon::yield_now() {
51 Some(x) => match x {
52 rayon::Yield::Executed => continue,
53 rayon::Yield::Idle => (),
54 },
55 None => {
56 if panic_if_not_rayon {
57 panic!("not running in a rayon thread pool")
58 }
59 }
60 }
61 os_yields += 1;
62 thread::yield_now();
63 }
64 }
65
66 /// Not efficient, yields to the OS on each failure, and without
67 /// blocking!
68 pub fn acquire_os(&self) -> SemaphoreHolder<'_> {
69 loop {
70 if self
71 .0
72 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| old.checked_sub(1))
73 .is_ok()
74 {
75 return SemaphoreHolder(self);
76 }
77 thread::yield_now();
78 }
79 }
80}