chj_unix_util/
polling_signals.rs1use std::{
2 fmt::Debug,
3 fs::OpenOptions,
4 io::Write,
5 mem::transmute,
6 os::unix::fs::OpenOptionsExt,
7 path::Path,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12};
13
14use memmap2::{MmapMut, MmapOptions};
15
16type PollingSignalsAtomic = AtomicU64;
17
18pub struct IPCAtomicU64 {
20 mmap: MmapMut,
21}
22
23impl Debug for IPCAtomicU64 {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("IPCAtomicU64").finish()
26 }
27}
28
29#[derive(thiserror::Error, Debug)]
30pub enum IPCAtomicError {
31 #[error("IO error: {0}")]
32 IOError(#[from] std::io::Error),
33 #[error("invalid length {0} of file contents")]
34 InvalidFileContentsLength(u64),
35}
36
37impl IPCAtomicU64 {
38 pub fn open(path: &Path, initial_value: u64) -> Result<Self, IPCAtomicError> {
39 let mut opts = OpenOptions::new();
40 opts.read(true);
41 opts.write(true);
42 opts.truncate(false);
43 opts.create(true);
44 opts.mode(0o600); let mut file = opts.open(path)?;
46 let m = file.metadata()?;
47 let l = m.len();
48 const PSA_SIZE: usize = size_of::<PollingSignalsAtomic>();
49 const PSA_LEN: u64 = PSA_SIZE as u64;
50 match l {
51 0 => {
52 let a = PollingSignalsAtomic::new(initial_value);
53 let b: &[u8; PSA_SIZE] = unsafe { transmute(&a) };
54 file.write_all(b)?;
55 }
56 PSA_LEN => (),
57 _ => Err(IPCAtomicError::InvalidFileContentsLength(l))?,
58 }
59 let mmap = unsafe { MmapOptions::new().len(PSA_SIZE).map(&file)?.make_mut()? };
60 Ok(Self { mmap })
61 }
62
63 #[inline]
64 pub fn atomic(&self) -> &AtomicU64 {
65 let Self { mmap } = self;
66 let value: &[u8; size_of::<PollingSignalsAtomic>()] = (&(**mmap)
67 [0..size_of::<PollingSignalsAtomic>()])
68 .try_into()
69 .expect("same size of PollingSignalsAtomic bytes");
70 let ptr = value.as_ptr() as *const AtomicU64;
71 unsafe { &*ptr }
72 }
73
74 #[inline]
75 pub fn load(&self) -> u64 {
76 self.atomic().load(Ordering::SeqCst)
77 }
78
79 #[inline]
80 pub fn store(&self, val: u64) {
81 self.atomic().store(val, Ordering::SeqCst)
82 }
83
84 #[inline]
85 pub fn inc(&self) -> u64 {
86 self.atomic().fetch_add(1, Ordering::SeqCst)
87 }
88
89 #[inline]
94 pub fn fetch_update(&self, f: impl FnMut(u64) -> Option<u64>) -> Result<u64, u64> {
95 self.atomic()
96 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, f)
97 }
98}
99
100#[derive(Debug, Clone)]
104pub struct PollingSignals {
105 seen: u64,
106 atomic: Arc<IPCAtomicU64>,
107}
108
109#[derive(Debug, Clone)]
118pub struct SharedPollingSignals {
119 done: Arc<IPCAtomicU64>,
120 atomic: Arc<IPCAtomicU64>,
121}
122
123#[derive(Debug, Clone)]
127pub struct PollingSignalsSender {
128 atomic: Arc<IPCAtomicU64>,
129}
130
131impl PollingSignals {
132 pub fn open(path: &Path, initial_value: u64) -> Result<Self, IPCAtomicError> {
133 let atomic = IPCAtomicU64::open(path, initial_value)?.into();
134 let mut s = Self {
135 seen: initial_value,
136 atomic,
137 };
138 s.get_number_of_signals();
139 Ok(s)
140 }
141
142 pub fn get_number_of_signals(&mut self) -> u64 {
144 let Self { seen, atomic } = self;
145 let new_seen = atomic.load();
146 let d = new_seen.wrapping_sub(*seen);
147 *seen = new_seen;
148 d
149 }
150
151 pub fn got_signals(&mut self) -> bool {
154 self.get_number_of_signals() > 0
155 }
156
157 pub fn send_signal(&mut self) -> u64 {
161 let Self { seen, atomic } = self;
162 *seen = seen.wrapping_add(1);
163 atomic.inc()
164 }
165
166 pub fn sender(&self) -> PollingSignalsSender {
167 let atomic = self.atomic.clone();
168 PollingSignalsSender { atomic }
169 }
170}
171
172#[derive(Debug)]
176#[must_use]
177pub struct Signal<'t> {
178 seen: u64,
179 done: &'t IPCAtomicU64,
180 }
183
184impl<'t> Drop for Signal<'t> {
185 fn drop(&mut self) {
186 panic!("{self:?} must be passed to the confirm or ignore methods but was dropped")
187 }
188}
189
190impl<'t> Signal<'t> {
191 pub fn confirm(self) -> bool {
194 let Self { seen, done } = self;
195 std::mem::forget(self);
196 match done.fetch_update(|new_seen| if seen > new_seen { Some(seen) } else { None }) {
197 Ok(_) => true,
198 Err(_) => false,
199 }
200 }
201
202 pub fn ignore(self) {
203 std::mem::forget(self)
204 }
205}
206
207impl SharedPollingSignals {
208 pub fn open(
209 channel_path: &Path,
210 done_path: &Path,
211 initial_value: u64,
212 ) -> Result<Self, IPCAtomicError> {
213 let atomic = IPCAtomicU64::open(channel_path, initial_value)?.into();
214 let done = IPCAtomicU64::open(done_path, initial_value)?.into();
215 Ok(Self { done, atomic })
216 }
217
218 pub fn get_latest_signal(&self) -> Option<Signal<'_>> {
223 let Self { done, atomic } = self;
224 let seen = atomic.load();
227 let done_value = done.load();
228 if seen > done_value {
229 Some(Signal {
230 seen,
231 done: &*self.done,
232 })
233 } else {
234 None
235 }
236 }
237
238 pub fn sender(&self) -> PollingSignalsSender {
239 let atomic = self.atomic.clone();
240 PollingSignalsSender { atomic }
241 }
242
243 }
246
247impl PollingSignalsSender {
248 pub fn send_signal(&self) -> u64 {
250 self.atomic.inc()
251 }
252}