chj_unix_util/
polling_signals.rs

1use std::{
2    fmt::Debug,
3    fs::OpenOptions,
4    io::Write,
5    mem::transmute,
6    os::unix::fs::OpenOptionsExt,
7    path::Path,
8    sync::{
9        atomic::{AtomicU64, Ordering},
10        Arc,
11    },
12};
13
14use memmap2::{MmapMut, MmapOptions};
15
16type PollingSignalsAtomic = AtomicU64;
17
18/// A filesystem path based cross-process atomic counter
19pub struct IPCAtomicU64 {
20    mmap: MmapMut,
21}
22
23impl Debug for IPCAtomicU64 {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("IPCAtomicU64").finish()
26    }
27}
28
29#[derive(thiserror::Error, Debug)]
30pub enum IPCAtomicError {
31    #[error("IO error: {0}")]
32    IOError(#[from] std::io::Error),
33    #[error("invalid length {0} of file contents")]
34    InvalidFileContentsLength(u64),
35}
36
37impl IPCAtomicU64 {
38    pub fn open(path: &Path, initial_value: u64) -> Result<Self, IPCAtomicError> {
39        let mut opts = OpenOptions::new();
40        opts.read(true);
41        opts.write(true);
42        opts.truncate(false);
43        opts.create(true);
44        opts.mode(0o600); // XX how to make portable?
45        let mut file = opts.open(path)?;
46        let m = file.metadata()?;
47        let l = m.len();
48        const PSA_SIZE: usize = size_of::<PollingSignalsAtomic>();
49        const PSA_LEN: u64 = PSA_SIZE as u64;
50        match l {
51            0 => {
52                let a = PollingSignalsAtomic::new(initial_value);
53                let b: &[u8; PSA_SIZE] = unsafe { transmute(&a) };
54                file.write_all(b)?;
55            }
56            PSA_LEN => (),
57            _ => Err(IPCAtomicError::InvalidFileContentsLength(l))?,
58        }
59        let mmap = unsafe { MmapOptions::new().len(PSA_SIZE).map(&file)?.make_mut()? };
60        Ok(Self { mmap })
61    }
62
63    #[inline]
64    pub fn atomic(&self) -> &AtomicU64 {
65        let Self { mmap } = self;
66        let value: &[u8; size_of::<PollingSignalsAtomic>()] = (&(**mmap)
67            [0..size_of::<PollingSignalsAtomic>()])
68            .try_into()
69            .expect("same size of PollingSignalsAtomic bytes");
70        let ptr = value.as_ptr() as *const AtomicU64;
71        unsafe { &*ptr }
72    }
73
74    #[inline]
75    pub fn load(&self) -> u64 {
76        self.atomic().load(Ordering::SeqCst)
77    }
78
79    #[inline]
80    pub fn store(&self, val: u64) {
81        self.atomic().store(val, Ordering::SeqCst)
82    }
83
84    #[inline]
85    pub fn inc(&self) -> u64 {
86        self.atomic().fetch_add(1, Ordering::SeqCst)
87    }
88
89    /// Returns the last-seen value from reading when `f` returns None
90    /// as Err; returns Ok with the new value if `f` returned Some
91    /// (and storing succeeded--if it failed, it retries until `f`
92    /// returns None).
93    #[inline]
94    pub fn fetch_update(&self, f: impl FnMut(u64) -> Option<u64>) -> Result<u64, u64> {
95        self.atomic()
96            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, f)
97    }
98}
99
100/// A filesystem path based way to poll for 'signals'. Cloning makes
101/// another receiver that checks independently from the original (but
102/// both receive the same 'signals').
103#[derive(Debug, Clone)]
104pub struct PollingSignals {
105    seen: u64,
106    atomic: Arc<IPCAtomicU64>,
107}
108
109/// A filesystem path based way to poll for 'signals'. When receiving
110/// a signal, it has to be confirmed explicitly, and the confirmation
111/// is shared amongst all readers (usable for "work was done" on a
112/// 'versioned' input; the work confirmed refers to the version of the
113/// signal that was read; the confirmation with the highest signal
114/// value is stored). Note that multiple readers might start doing the
115/// same work at the same time. Cloning shares the same receiver and
116/// sender.
117#[derive(Debug, Clone)]
118pub struct SharedPollingSignals {
119    done: Arc<IPCAtomicU64>,
120    atomic: Arc<IPCAtomicU64>,
121}
122
123/// Sending-only 'end' of a `PollingSignals` or
124/// `SharedPollingSignals`. Can be cloned to give yet another sending
125/// end.
126#[derive(Debug, Clone)]
127pub struct PollingSignalsSender {
128    atomic: Arc<IPCAtomicU64>,
129}
130
131impl PollingSignals {
132    pub fn open(path: &Path, initial_value: u64) -> Result<Self, IPCAtomicError> {
133        let atomic = IPCAtomicU64::open(path, initial_value)?.into();
134        let mut s = Self {
135            seen: initial_value,
136            atomic,
137        };
138        s.get_number_of_signals();
139        Ok(s)
140    }
141
142    /// Check how many signals were received since the last check.
143    pub fn get_number_of_signals(&mut self) -> u64 {
144        let Self { seen, atomic } = self;
145        let new_seen = atomic.load();
146        let d = new_seen.wrapping_sub(*seen);
147        *seen = new_seen;
148        d
149    }
150
151    /// Check whether there were any signals (just
152    /// `get_number_of_signals() > 0`)
153    pub fn got_signals(&mut self) -> bool {
154        self.get_number_of_signals() > 0
155    }
156
157    /// Send one signal. This is excluded from this `PollingSignals`
158    /// instance, i.e. `get_number_of_signals()` will not report
159    /// it. Returns the previous value.
160    pub fn send_signal(&mut self) -> u64 {
161        let Self { seen, atomic } = self;
162        *seen = seen.wrapping_add(1);
163        atomic.inc()
164    }
165
166    pub fn sender(&self) -> PollingSignalsSender {
167        let atomic = self.atomic.clone();
168        PollingSignalsSender { atomic }
169    }
170}
171
172/// A received signal. This is a (dynamically-checked) 'linear type'
173/// value: it must be confirmed via `confirm` or `ignore`; dropping it
174/// panics!
175#[derive(Debug)]
176#[must_use]
177pub struct Signal<'t> {
178    seen: u64,
179    done: &'t IPCAtomicU64,
180    // careful: this struct is forgotten, do not add any non-Copy
181    // types!
182}
183
184impl<'t> Drop for Signal<'t> {
185    fn drop(&mut self) {
186        panic!("{self:?} must be passed to the confirm or ignore methods but was dropped")
187    }
188}
189
190impl<'t> Signal<'t> {
191    /// Returns true if we prevailed, false if another actor updated
192    /// past us (which is usually fine, too).
193    pub fn confirm(self) -> bool {
194        let Self { seen, done } = self;
195        std::mem::forget(self);
196        match done.fetch_update(|new_seen| if seen > new_seen { Some(seen) } else { None }) {
197            Ok(_) => true,
198            Err(_) => false,
199        }
200    }
201
202    pub fn ignore(self) {
203        std::mem::forget(self)
204    }
205}
206
207impl SharedPollingSignals {
208    pub fn open(
209        channel_path: &Path,
210        done_path: &Path,
211        initial_value: u64,
212    ) -> Result<Self, IPCAtomicError> {
213        let atomic = IPCAtomicU64::open(channel_path, initial_value)?.into();
214        let done = IPCAtomicU64::open(done_path, initial_value)?.into();
215        Ok(Self { done, atomic })
216    }
217
218    /// If there were signals, returns a representation of the last
219    /// one. `confirm` or `ignore` must be called on it when the
220    /// action warranted by the signal has been carried out, dropping
221    /// it will panic!
222    pub fn get_latest_signal(&self) -> Option<Signal<'_>> {
223        let Self { done, atomic } = self;
224        // XX these are two separate loads. Still OK since loading
225        // with SeqCst and we have a sequence?
226        let seen = atomic.load();
227        let done_value = done.load();
228        if seen > done_value {
229            Some(Signal {
230                seen,
231                done: &*self.done,
232            })
233        } else {
234            None
235        }
236    }
237
238    pub fn sender(&self) -> PollingSignalsSender {
239        let atomic = self.atomic.clone();
240        PollingSignalsSender { atomic }
241    }
242
243    // Do we really want a send_signal() method here? Just always use
244    // sender().send_signal() instead?
245}
246
247impl PollingSignalsSender {
248    /// Send one signal. Returns the previous value.
249    pub fn send_signal(&self) -> u64 {
250        self.atomic.inc()
251    }
252}