evobench_tools/utillib/rayon_util/
semaphore.rs

1use std::{
2    num::NonZeroU32,
3    sync::atomic::{AtomicU32, Ordering},
4    thread,
5};
6
7use crate::debug;
8
9pub struct Semaphore(AtomicU32);
10
11pub struct SemaphoreHolder<'t>(&'t Semaphore);
12
13impl<'t> Drop for SemaphoreHolder<'t> {
14    fn drop(&mut self) {
15        self.0.0.fetch_add(1, Ordering::SeqCst);
16    }
17}
18
19impl Semaphore {
20    pub fn new(parallelism: NonZeroU32) -> Self {
21        Self(parallelism.get().into())
22    }
23
24    /// Meant to be run from a thread in a rayon thread pool, so that
25    /// it can efficiently yield to other rayon work. If you give
26    /// `panic_if_not_rayon == true` then it will panic if this is not
27    /// the case.
28    pub fn acquire_rayon(&self, panic_if_not_rayon: bool) -> SemaphoreHolder<'_> {
29        let mut os_yields: usize = 0;
30        loop {
31            // Tempting: instead use signed atomic, decrement with
32            // atomic dec, then check if the old value is >= 0, and if
33            // not increment again. But that leads to potentially
34            // multiple parties trying to acquire to all need to
35            // increment again (plus a drop from a holder) until one
36            // can succeed again, which seems problematic; and it will
37            // always have two writes at the limit, which might even
38            // make it slower without the scheduling problem. Thus,
39            // leave it at fetch_update.
40            if self
41                .0
42                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| old.checked_sub(1))
43                .is_ok()
44            {
45                if os_yields > 0 {
46                    debug!("acquire_rayon: yielded to the OS {os_yields} times");
47                }
48                return SemaphoreHolder(self);
49            }
50            match rayon::yield_now() {
51                Some(x) => match x {
52                    rayon::Yield::Executed => continue,
53                    rayon::Yield::Idle => (),
54                },
55                None => {
56                    if panic_if_not_rayon {
57                        panic!("not running in a rayon thread pool")
58                    }
59                }
60            }
61            os_yields += 1;
62            thread::yield_now();
63        }
64    }
65
66    /// Not efficient, yields to the OS on each failure, and without
67    /// blocking!
68    pub fn acquire_os(&self) -> SemaphoreHolder<'_> {
69        loop {
70            if self
71                .0
72                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |old| old.checked_sub(1))
73                .is_ok()
74            {
75                return SemaphoreHolder(self);
76            }
77            thread::yield_now();
78        }
79    }
80}