evobench_tools/key_val_fs/
queue.rs

1use std::{
2    borrow::Cow,
3    fmt::{Debug, Display},
4    fs::File,
5    path::{Path, PathBuf},
6    sync::atomic::AtomicU64,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use chj_unix_util::polling_signals::PollingSignalsSender;
11use chrono::{DateTime, Local};
12use genawaiter::rc::Gen;
13use ouroboros::self_referencing;
14use serde::{Serialize, de::DeserializeOwned};
15
16use crate::{
17    info_if,
18    io_utils::lockable_file::{ExclusiveFileLock, LockableFile, SharedFileLock},
19    utillib::slice_or_box::SliceOrBox,
20};
21
22use super::{
23    as_key::AsKey,
24    key_val::{Entry, KeyVal, KeyValConfig, KeyValError},
25};
26
27fn next_id() -> u64 {
28    static IDS: AtomicU64 = AtomicU64::new(0);
29    // Relaxed means each thread might get ids out of order with
30    // reagards to other actions of the threads, but each still gets a
31    // unique id, which is enough for us.
32    IDS.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
36pub struct TimeKey {
37    /// Nanoseconds since UNIX_EPOCH
38    nanos: u128,
39    pid: u32,
40    id: u64,
41}
42
43impl Display for TimeKey {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        let Self { nanos: _, pid, id } = self;
46
47        let t = self.datetime();
48        write!(f, "{t} ({pid}-{id})")
49    }
50}
51
52impl TimeKey {
53    /// Possibly panics if the system clock is outside the range
54    /// representable as duration by `std::time`.
55    pub fn now() -> Self {
56        let time = SystemTime::now();
57        let t = time
58            .duration_since(UNIX_EPOCH)
59            .expect("now is never out of range");
60        let nanos: u128 = t.as_nanos();
61        let pid = std::process::id();
62        let id = next_id();
63        Self { nanos, pid, id }
64    }
65
66    pub fn unixtime_nanoseconds(&self) -> u128 {
67        self.nanos
68    }
69
70    pub fn unixtime_seconds_and_nanoseconds(&self) -> (u64, u32) {
71        let nanos = self.nanos;
72        let secs = (nanos / 1_000_000_000) as u64;
73        let nanos = (nanos % 1_000_000_000) as u32;
74        (secs, nanos)
75    }
76
77    pub fn system_time(&self) -> SystemTime {
78        let (secs, nanos) = self.unixtime_seconds_and_nanoseconds();
79        UNIX_EPOCH + Duration::new(secs, nanos)
80    }
81
82    pub fn datetime(&self) -> DateTime<Local> {
83        self.system_time().into()
84    }
85}
86
87impl AsKey for TimeKey {
88    fn as_filename_str(&self) -> Cow<'_, str> {
89        let Self { nanos, pid, id } = self;
90        format!("{nanos}-{pid}-{id}").into()
91    }
92
93    fn try_from_filename_str(file_name: &str) -> Option<Self> {
94        let (nanos, pid_id) = file_name.split_once('-')?;
95        let (pid, id) = pid_id.split_once('-')?;
96        let nanos: u128 = nanos.parse().ok()?;
97        let pid: u32 = pid.parse().ok()?;
98        let id: u64 = id.parse().ok()?;
99        Some(Self { nanos, pid, id })
100    }
101}
102
103#[derive(Debug, PartialEq, Clone, Copy)]
104pub struct QueueGetItemOptions {
105    /// Used for debugging in some places
106    pub verbose: bool,
107    /// Do not attempt to lock entries (default: false)
108    pub no_lock: bool,
109    /// Instead of blocking to get a lock on an entry, return with an
110    /// error if an entry is locked.
111    pub error_when_locked: bool,
112    /// Delete entry before returning the item (alternatively, call
113    /// `delete`).
114    pub delete_first: bool,
115}
116
117#[derive(Debug, PartialEq, Clone)]
118pub struct QueueIterationOptions {
119    /// Wait for entries if the queue is empty (i.e. go on forever)
120    pub wait: bool,
121    /// Stop at this time if given. Unblocks "wait" (waiting for new
122    /// messages), but not currently blocking on locks of entries!
123    pub stop_at: Option<SystemTime>,
124    /// Sort in reverse
125    pub reverse: bool,
126
127    pub get_item_opts: QueueGetItemOptions,
128}
129
130#[derive(Debug)]
131enum PerhapsLock<'l, T> {
132    /// User asked not to lock (`no_lock`)
133    NoLock(&'l mut LockableFile<File>),
134    /// Entry vanished thus let go lock again
135    EntryGone,
136    /// Lock
137    Lock(T),
138}
139
140impl<'l, T> PartialEq for PerhapsLock<'l, T> {
141    fn eq(&self, other: &Self) -> bool {
142        match (self, other) {
143            (Self::NoLock(_), Self::NoLock(_)) => true,
144            (Self::EntryGone, Self::EntryGone) => true,
145            (Self::Lock(_), Self::Lock(_)) => true,
146            _ => false,
147        }
148    }
149}
150
151impl<'l, T> PerhapsLock<'l, T> {
152    fn is_gone(&self) -> bool {
153        match self {
154            PerhapsLock::NoLock(_) => false,
155            PerhapsLock::EntryGone => true,
156            PerhapsLock::Lock(_) => false,
157        }
158    }
159}
160
161#[self_referencing]
162pub struct QueueItem<'basedir, V: DeserializeOwned + Serialize + 'static> {
163    verbose: bool,
164    lockable: LockableFile<File>,
165    entry: Entry<'basedir, TimeKey, V>,
166    #[borrows(mut entry, mut lockable)]
167    #[covariant]
168    // The lock unless `no_lock` was given.
169    perhaps_lock: (
170        &'this mut Entry<'basedir, TimeKey, V>,
171        PerhapsLock<'this, ExclusiveFileLock<'this, File>>,
172    ),
173}
174
175impl<'basedir, V: DeserializeOwned + Serialize + 'static + Debug> PartialEq
176    for QueueItem<'basedir, V>
177{
178    fn eq(&self, other: &Self) -> bool {
179        self.borrow_verbose() == other.borrow_verbose()
180            && self.borrow_perhaps_lock() == other.borrow_perhaps_lock()
181    }
182}
183
184impl<'basedir, V: DeserializeOwned + Serialize + 'static + Debug> Debug for QueueItem<'basedir, V> {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        let perhaps_lock = self.with_perhaps_lock(|v| v);
187        write!(
188            f,
189            "QueueItem {{ verbose: {}, perhaps_lock: {perhaps_lock:?} }}",
190            self.borrow_verbose()
191        )
192    }
193}
194
195impl<'basedir, V: DeserializeOwned + Serialize> QueueItem<'basedir, V> {
196    pub fn from_entry<'s>(
197        mut entry: Entry<'s, TimeKey, V>,
198        base_dir: &PathBuf,
199        opts: QueueGetItemOptions,
200    ) -> Result<QueueItem<'s, V>, KeyValError> {
201        let QueueGetItemOptions {
202            no_lock,
203            error_when_locked,
204            delete_first,
205            verbose,
206        } = opts;
207        let lockable = entry
208            .take_lockable_file()
209            .expect("we have not taken it yet");
210
211        QueueItem::try_new(
212            verbose,
213            lockable,
214            entry,
215            |entry, lockable: &mut LockableFile<File>| -> Result<_, _> {
216                if no_lock {
217                    if delete_first {
218                        entry.delete()?;
219                    }
220                    Ok((entry, PerhapsLock::NoLock(lockable)))
221                } else {
222                    let lock = if error_when_locked {
223                        keyvalerror_from_lock_error(
224                            lockable.try_lock_exclusive(),
225                            base_dir,
226                            entry.target_path(),
227                        )?
228                        .ok_or_else(|| KeyValError::LockTaken {
229                            base_dir: base_dir.clone(),
230                            path: entry.target_path().to_owned(),
231                        })?
232                    } else {
233                        keyvalerror_from_lock_error(
234                            lockable.lock_exclusive(),
235                            base_dir,
236                            entry.target_path(),
237                        )?
238                    };
239                    info_if!(verbose, "got lock");
240                    let exists = entry.exists();
241                    if !exists {
242                        info_if!(verbose, "but entry now deleted by another process");
243                        return Ok((entry, PerhapsLock::EntryGone));
244                    }
245                    if delete_first {
246                        entry.delete()?;
247                    }
248                    Ok((entry, PerhapsLock::Lock(lock)))
249                }
250            },
251        )
252    }
253
254    /// Get the key inside this queue, usable with `get_entry`
255    /// (careful, there is no check against using it in the wrong
256    /// queue!)
257    pub fn key(&self) -> Result<TimeKey, KeyValError> {
258        // (Does not take a lock, in spite of the name of this method
259        // making it sound like.)
260        self.with_perhaps_lock(|(entry, _lock)| entry.key())
261    }
262
263    /// Delete this item now (alternatively, give `delete_first` in
264    /// `QueueIterationOptions`)
265    pub fn delete(&self) -> Result<(), KeyValError> {
266        let deleted = self.with_perhaps_lock(|(entry, _lock)| entry.delete())?;
267        info_if!(*self.borrow_verbose(), "deleted entry: {:?}", deleted);
268        Ok(())
269    }
270
271    /// Lock this item lazily (when `no_lock` == true given, but now a
272    /// lock is needed). If `no_lock` was false, then this gives a
273    /// `KeyValError::AlreadyLocked` error rather than dead-locking.
274    pub fn lock_exclusive<'s>(&'s self) -> Result<ExclusiveFileLock<'s, File>, KeyValError> {
275        let (entry, perhaps_lock) = self.borrow_perhaps_lock();
276        match perhaps_lock {
277            PerhapsLock::NoLock(lockable_file) => {
278                return lockable_file.lock_exclusive().map_err(|error| {
279                    let path = entry.target_path().to_owned();
280                    let base_dir = path.parent().expect("always has a parent dir").to_owned();
281                    KeyValError::IO {
282                        base_dir,
283                        path,
284                        ctx: "QueueItem.lock_exclusive",
285                        error,
286                    }
287                });
288            }
289            PerhapsLock::EntryGone => (),
290            PerhapsLock::Lock(_) => (),
291        }
292        let path = entry.target_path().to_owned();
293        let base_dir = path.parent().expect("always has a parent dir").to_owned();
294        Err(KeyValError::AlreadyLocked { base_dir, path })
295    }
296}
297
298#[derive(Debug, PartialEq)]
299pub struct Queue<V: DeserializeOwned + Serialize>(KeyVal<TimeKey, V>);
300
301fn keyvalerror_from_lock_error<V>(
302    res: Result<V, std::io::Error>,
303    base_dir: &PathBuf,
304    path: &Path,
305) -> Result<V, KeyValError> {
306    res.map_err(|error| KeyValError::IO {
307        ctx: "getting lock on file",
308        base_dir: base_dir.clone(),
309        path: path.to_owned(),
310        error,
311    })
312}
313
314impl<V: DeserializeOwned + Serialize + 'static> Queue<V> {
315    pub fn open(
316        base_dir: impl AsRef<Path>,
317        config: KeyValConfig,
318        signal_change: Option<PollingSignalsSender>,
319    ) -> Result<Self, KeyValError> {
320        Ok(Queue(KeyVal::open(base_dir, config, signal_change)?))
321    }
322
323    /// Give access to the underlying key-value database. WARNING: it
324    /// *will* be possible to mess up the queue this way.
325    pub fn key_val(&self) -> &KeyVal<TimeKey, V> {
326        &self.0
327    }
328
329    /// Give access to the underlying key-value database. WARNING: it
330    /// *will* be possible to mess up the queue this way.
331    pub fn key_val_mut(&mut self) -> &mut KeyVal<TimeKey, V> {
332        &mut self.0
333    }
334
335    pub fn base_dir(&self) -> &PathBuf {
336        &self.0.base_dir
337    }
338
339    pub fn get_entry<'s>(
340        &'s self,
341        key: &TimeKey,
342    ) -> Result<Option<Entry<'s, TimeKey, V>>, KeyValError> {
343        self.0.entry_opt(key)
344    }
345
346    pub fn get_item<'s>(
347        &'s self,
348        key: &TimeKey,
349        opts: QueueGetItemOptions,
350    ) -> Result<Option<QueueItem<'s, V>>, KeyValError> {
351        if let Some(entry) = self.0.entry_opt(key)? {
352            Ok(Some(QueueItem::from_entry(entry, self.base_dir(), opts)?))
353        } else {
354            Ok(None)
355        }
356    }
357
358    pub fn lock_exclusive(&self) -> Result<ExclusiveFileLock<'_, File>, KeyValError> {
359        self.0.lock_exclusive()
360    }
361    pub fn lock_shared(&self) -> Result<SharedFileLock<'_, File>, KeyValError> {
362        self.0.lock_shared()
363    }
364
365    pub fn push_front(&self, val: &V) -> Result<(), KeyValError> {
366        let key = TimeKey::now();
367        self.0.insert(&key, val, true)
368    }
369
370    pub fn resolve_entries<'s>(
371        &'s self,
372        keys: SliceOrBox<'s, TimeKey>,
373    ) -> impl Iterator<Item = Result<Entry<'s, TimeKey, V>, KeyValError>> + use<'s, V> {
374        keys.into_iter()
375            .filter_map(|key| self.0.entry_opt(key.as_ref()).transpose())
376    }
377
378    pub fn sorted_keys(
379        &self,
380        wait_for_entries: bool,
381        stop_at: Option<SystemTime>,
382        reverse: bool,
383    ) -> Result<Vec<TimeKey>, KeyValError> {
384        self.0.sorted_keys(wait_for_entries, stop_at, reverse)
385    }
386
387    /// Get all entries in order of insertion according to hires
388    /// system time (assumes correct clocks!). The entries are
389    /// collected at the time of this method call; entries
390    /// disappearing later are skipped, but no entries inserted after
391    /// this method call are returned from the iterator. Because this
392    /// has O(n) cost with the number of entries, and there's no more
393    /// efficient possibility for a `pop_back`, this should be used
394    /// and amortized by handling all entries if possible. If that's
395    /// not possible, just taking the first entry is still currently
396    /// the best the underlying storage can do.
397    pub fn sorted_entries<'s>(
398        &'s self,
399        wait_for_entries: bool,
400        stop_at: Option<SystemTime>,
401        reverse: bool,
402    ) -> Result<
403        impl Iterator<Item = Result<Entry<'s, TimeKey, V>, KeyValError>> + use<'s, V>,
404        KeyValError,
405    > {
406        Ok(self.resolve_entries(self.sorted_keys(wait_for_entries, stop_at, reverse)?.into()))
407    }
408
409    /// Like `sorted_entries`, but (1) allows to lock entries and in
410    /// this case skips over entries that have been deleted by the
411    /// time we have the lock, (2) allows to go on forever, (3) always
412    /// retrieves the values, and offers an easy method to delete the
413    /// entry as well as delete it automatically immediately.
414    pub fn items<'s>(
415        &'s self,
416        opts: QueueIterationOptions,
417    ) -> impl Iterator<Item = Result<(QueueItem<'s, V>, V), KeyValError>> + use<'s, V> {
418        let base_dir = self.0.base_dir.clone();
419        Gen::new(|co| async move {
420            let QueueIterationOptions {
421                wait,
422                stop_at,
423                get_item_opts,
424                reverse,
425            } = opts;
426
427            let mut entries = None;
428            loop {
429                if entries.is_none() {
430                    match self.sorted_entries(wait, stop_at, reverse) {
431                        Ok(v) => entries = Some(v),
432                        Err(e) => {
433                            co.yield_(Err(e)).await;
434                            return;
435                        }
436                    }
437                }
438                if let Some(entry) = entries.as_mut().expect("set 2 lines above").next() {
439                    match entry {
440                        Ok(mut entry) => match entry.get() {
441                            Ok(value) => {
442                                match QueueItem::from_entry(entry, &base_dir, get_item_opts) {
443                                    Ok(item) => {
444                                        if !item.borrow_perhaps_lock().1.is_gone() {
445                                            co.yield_(Ok((item, value))).await;
446                                        }
447                                    }
448                                    Err(e) => {
449                                        co.yield_(Err(e)).await;
450                                        return;
451                                    }
452                                }
453                            }
454                            Err(e) => {
455                                co.yield_(Err(e)).await;
456                                return;
457                            }
458                        },
459                        Err(e) => {
460                            co.yield_(Err(e)).await;
461                            return;
462                        }
463                    }
464                } else {
465                    entries = None;
466                    if !wait {
467                        break;
468                    }
469                }
470            }
471        })
472        .into_iter()
473    }
474}