1use std::{
2 borrow::Cow,
3 fmt::{Debug, Display},
4 fs::File,
5 path::{Path, PathBuf},
6 sync::atomic::AtomicU64,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use chj_unix_util::polling_signals::PollingSignalsSender;
11use chrono::{DateTime, Local};
12use genawaiter::rc::Gen;
13use ouroboros::self_referencing;
14use serde::{Serialize, de::DeserializeOwned};
15
16use crate::{
17 info_if,
18 io_utils::lockable_file::{ExclusiveFileLock, LockableFile, SharedFileLock},
19 utillib::slice_or_box::SliceOrBox,
20};
21
22use super::{
23 as_key::AsKey,
24 key_val::{Entry, KeyVal, KeyValConfig, KeyValError},
25};
26
27fn next_id() -> u64 {
28 static IDS: AtomicU64 = AtomicU64::new(0);
29 IDS.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
36pub struct TimeKey {
37 nanos: u128,
39 pid: u32,
40 id: u64,
41}
42
43impl Display for TimeKey {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 let Self { nanos: _, pid, id } = self;
46
47 let t = self.datetime();
48 write!(f, "{t} ({pid}-{id})")
49 }
50}
51
52impl TimeKey {
53 pub fn now() -> Self {
56 let time = SystemTime::now();
57 let t = time
58 .duration_since(UNIX_EPOCH)
59 .expect("now is never out of range");
60 let nanos: u128 = t.as_nanos();
61 let pid = std::process::id();
62 let id = next_id();
63 Self { nanos, pid, id }
64 }
65
66 pub fn unixtime_nanoseconds(&self) -> u128 {
67 self.nanos
68 }
69
70 pub fn unixtime_seconds_and_nanoseconds(&self) -> (u64, u32) {
71 let nanos = self.nanos;
72 let secs = (nanos / 1_000_000_000) as u64;
73 let nanos = (nanos % 1_000_000_000) as u32;
74 (secs, nanos)
75 }
76
77 pub fn system_time(&self) -> SystemTime {
78 let (secs, nanos) = self.unixtime_seconds_and_nanoseconds();
79 UNIX_EPOCH + Duration::new(secs, nanos)
80 }
81
82 pub fn datetime(&self) -> DateTime<Local> {
83 self.system_time().into()
84 }
85}
86
87impl AsKey for TimeKey {
88 fn as_filename_str(&self) -> Cow<'_, str> {
89 let Self { nanos, pid, id } = self;
90 format!("{nanos}-{pid}-{id}").into()
91 }
92
93 fn try_from_filename_str(file_name: &str) -> Option<Self> {
94 let (nanos, pid_id) = file_name.split_once('-')?;
95 let (pid, id) = pid_id.split_once('-')?;
96 let nanos: u128 = nanos.parse().ok()?;
97 let pid: u32 = pid.parse().ok()?;
98 let id: u64 = id.parse().ok()?;
99 Some(Self { nanos, pid, id })
100 }
101}
102
103#[derive(Debug, PartialEq, Clone, Copy)]
104pub struct QueueGetItemOptions {
105 pub verbose: bool,
107 pub no_lock: bool,
109 pub error_when_locked: bool,
112 pub delete_first: bool,
115}
116
117#[derive(Debug, PartialEq, Clone)]
118pub struct QueueIterationOptions {
119 pub wait: bool,
121 pub stop_at: Option<SystemTime>,
124 pub reverse: bool,
126
127 pub get_item_opts: QueueGetItemOptions,
128}
129
130#[derive(Debug)]
131enum PerhapsLock<'l, T> {
132 NoLock(&'l mut LockableFile<File>),
134 EntryGone,
136 Lock(T),
138}
139
140impl<'l, T> PartialEq for PerhapsLock<'l, T> {
141 fn eq(&self, other: &Self) -> bool {
142 match (self, other) {
143 (Self::NoLock(_), Self::NoLock(_)) => true,
144 (Self::EntryGone, Self::EntryGone) => true,
145 (Self::Lock(_), Self::Lock(_)) => true,
146 _ => false,
147 }
148 }
149}
150
151impl<'l, T> PerhapsLock<'l, T> {
152 fn is_gone(&self) -> bool {
153 match self {
154 PerhapsLock::NoLock(_) => false,
155 PerhapsLock::EntryGone => true,
156 PerhapsLock::Lock(_) => false,
157 }
158 }
159}
160
161#[self_referencing]
162pub struct QueueItem<'basedir, V: DeserializeOwned + Serialize + 'static> {
163 verbose: bool,
164 lockable: LockableFile<File>,
165 entry: Entry<'basedir, TimeKey, V>,
166 #[borrows(mut entry, mut lockable)]
167 #[covariant]
168 perhaps_lock: (
170 &'this mut Entry<'basedir, TimeKey, V>,
171 PerhapsLock<'this, ExclusiveFileLock<'this, File>>,
172 ),
173}
174
175impl<'basedir, V: DeserializeOwned + Serialize + 'static + Debug> PartialEq
176 for QueueItem<'basedir, V>
177{
178 fn eq(&self, other: &Self) -> bool {
179 self.borrow_verbose() == other.borrow_verbose()
180 && self.borrow_perhaps_lock() == other.borrow_perhaps_lock()
181 }
182}
183
184impl<'basedir, V: DeserializeOwned + Serialize + 'static + Debug> Debug for QueueItem<'basedir, V> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 let perhaps_lock = self.with_perhaps_lock(|v| v);
187 write!(
188 f,
189 "QueueItem {{ verbose: {}, perhaps_lock: {perhaps_lock:?} }}",
190 self.borrow_verbose()
191 )
192 }
193}
194
195impl<'basedir, V: DeserializeOwned + Serialize> QueueItem<'basedir, V> {
196 pub fn from_entry<'s>(
197 mut entry: Entry<'s, TimeKey, V>,
198 base_dir: &PathBuf,
199 opts: QueueGetItemOptions,
200 ) -> Result<QueueItem<'s, V>, KeyValError> {
201 let QueueGetItemOptions {
202 no_lock,
203 error_when_locked,
204 delete_first,
205 verbose,
206 } = opts;
207 let lockable = entry
208 .take_lockable_file()
209 .expect("we have not taken it yet");
210
211 QueueItem::try_new(
212 verbose,
213 lockable,
214 entry,
215 |entry, lockable: &mut LockableFile<File>| -> Result<_, _> {
216 if no_lock {
217 if delete_first {
218 entry.delete()?;
219 }
220 Ok((entry, PerhapsLock::NoLock(lockable)))
221 } else {
222 let lock = if error_when_locked {
223 keyvalerror_from_lock_error(
224 lockable.try_lock_exclusive(),
225 base_dir,
226 entry.target_path(),
227 )?
228 .ok_or_else(|| KeyValError::LockTaken {
229 base_dir: base_dir.clone(),
230 path: entry.target_path().to_owned(),
231 })?
232 } else {
233 keyvalerror_from_lock_error(
234 lockable.lock_exclusive(),
235 base_dir,
236 entry.target_path(),
237 )?
238 };
239 info_if!(verbose, "got lock");
240 let exists = entry.exists();
241 if !exists {
242 info_if!(verbose, "but entry now deleted by another process");
243 return Ok((entry, PerhapsLock::EntryGone));
244 }
245 if delete_first {
246 entry.delete()?;
247 }
248 Ok((entry, PerhapsLock::Lock(lock)))
249 }
250 },
251 )
252 }
253
254 pub fn key(&self) -> Result<TimeKey, KeyValError> {
258 self.with_perhaps_lock(|(entry, _lock)| entry.key())
261 }
262
263 pub fn delete(&self) -> Result<(), KeyValError> {
266 let deleted = self.with_perhaps_lock(|(entry, _lock)| entry.delete())?;
267 info_if!(*self.borrow_verbose(), "deleted entry: {:?}", deleted);
268 Ok(())
269 }
270
271 pub fn lock_exclusive<'s>(&'s self) -> Result<ExclusiveFileLock<'s, File>, KeyValError> {
275 let (entry, perhaps_lock) = self.borrow_perhaps_lock();
276 match perhaps_lock {
277 PerhapsLock::NoLock(lockable_file) => {
278 return lockable_file.lock_exclusive().map_err(|error| {
279 let path = entry.target_path().to_owned();
280 let base_dir = path.parent().expect("always has a parent dir").to_owned();
281 KeyValError::IO {
282 base_dir,
283 path,
284 ctx: "QueueItem.lock_exclusive",
285 error,
286 }
287 });
288 }
289 PerhapsLock::EntryGone => (),
290 PerhapsLock::Lock(_) => (),
291 }
292 let path = entry.target_path().to_owned();
293 let base_dir = path.parent().expect("always has a parent dir").to_owned();
294 Err(KeyValError::AlreadyLocked { base_dir, path })
295 }
296}
297
298#[derive(Debug, PartialEq)]
299pub struct Queue<V: DeserializeOwned + Serialize>(KeyVal<TimeKey, V>);
300
301fn keyvalerror_from_lock_error<V>(
302 res: Result<V, std::io::Error>,
303 base_dir: &PathBuf,
304 path: &Path,
305) -> Result<V, KeyValError> {
306 res.map_err(|error| KeyValError::IO {
307 ctx: "getting lock on file",
308 base_dir: base_dir.clone(),
309 path: path.to_owned(),
310 error,
311 })
312}
313
314impl<V: DeserializeOwned + Serialize + 'static> Queue<V> {
315 pub fn open(
316 base_dir: impl AsRef<Path>,
317 config: KeyValConfig,
318 signal_change: Option<PollingSignalsSender>,
319 ) -> Result<Self, KeyValError> {
320 Ok(Queue(KeyVal::open(base_dir, config, signal_change)?))
321 }
322
323 pub fn key_val(&self) -> &KeyVal<TimeKey, V> {
326 &self.0
327 }
328
329 pub fn key_val_mut(&mut self) -> &mut KeyVal<TimeKey, V> {
332 &mut self.0
333 }
334
335 pub fn base_dir(&self) -> &PathBuf {
336 &self.0.base_dir
337 }
338
339 pub fn get_entry<'s>(
340 &'s self,
341 key: &TimeKey,
342 ) -> Result<Option<Entry<'s, TimeKey, V>>, KeyValError> {
343 self.0.entry_opt(key)
344 }
345
346 pub fn get_item<'s>(
347 &'s self,
348 key: &TimeKey,
349 opts: QueueGetItemOptions,
350 ) -> Result<Option<QueueItem<'s, V>>, KeyValError> {
351 if let Some(entry) = self.0.entry_opt(key)? {
352 Ok(Some(QueueItem::from_entry(entry, self.base_dir(), opts)?))
353 } else {
354 Ok(None)
355 }
356 }
357
358 pub fn lock_exclusive(&self) -> Result<ExclusiveFileLock<'_, File>, KeyValError> {
359 self.0.lock_exclusive()
360 }
361 pub fn lock_shared(&self) -> Result<SharedFileLock<'_, File>, KeyValError> {
362 self.0.lock_shared()
363 }
364
365 pub fn push_front(&self, val: &V) -> Result<(), KeyValError> {
366 let key = TimeKey::now();
367 self.0.insert(&key, val, true)
368 }
369
370 pub fn resolve_entries<'s>(
371 &'s self,
372 keys: SliceOrBox<'s, TimeKey>,
373 ) -> impl Iterator<Item = Result<Entry<'s, TimeKey, V>, KeyValError>> + use<'s, V> {
374 keys.into_iter()
375 .filter_map(|key| self.0.entry_opt(key.as_ref()).transpose())
376 }
377
378 pub fn sorted_keys(
379 &self,
380 wait_for_entries: bool,
381 stop_at: Option<SystemTime>,
382 reverse: bool,
383 ) -> Result<Vec<TimeKey>, KeyValError> {
384 self.0.sorted_keys(wait_for_entries, stop_at, reverse)
385 }
386
387 pub fn sorted_entries<'s>(
398 &'s self,
399 wait_for_entries: bool,
400 stop_at: Option<SystemTime>,
401 reverse: bool,
402 ) -> Result<
403 impl Iterator<Item = Result<Entry<'s, TimeKey, V>, KeyValError>> + use<'s, V>,
404 KeyValError,
405 > {
406 Ok(self.resolve_entries(self.sorted_keys(wait_for_entries, stop_at, reverse)?.into()))
407 }
408
409 pub fn items<'s>(
415 &'s self,
416 opts: QueueIterationOptions,
417 ) -> impl Iterator<Item = Result<(QueueItem<'s, V>, V), KeyValError>> + use<'s, V> {
418 let base_dir = self.0.base_dir.clone();
419 Gen::new(|co| async move {
420 let QueueIterationOptions {
421 wait,
422 stop_at,
423 get_item_opts,
424 reverse,
425 } = opts;
426
427 let mut entries = None;
428 loop {
429 if entries.is_none() {
430 match self.sorted_entries(wait, stop_at, reverse) {
431 Ok(v) => entries = Some(v),
432 Err(e) => {
433 co.yield_(Err(e)).await;
434 return;
435 }
436 }
437 }
438 if let Some(entry) = entries.as_mut().expect("set 2 lines above").next() {
439 match entry {
440 Ok(mut entry) => match entry.get() {
441 Ok(value) => {
442 match QueueItem::from_entry(entry, &base_dir, get_item_opts) {
443 Ok(item) => {
444 if !item.borrow_perhaps_lock().1.is_gone() {
445 co.yield_(Ok((item, value))).await;
446 }
447 }
448 Err(e) => {
449 co.yield_(Err(e)).await;
450 return;
451 }
452 }
453 }
454 Err(e) => {
455 co.yield_(Err(e)).await;
456 return;
457 }
458 },
459 Err(e) => {
460 co.yield_(Err(e)).await;
461 return;
462 }
463 }
464 } else {
465 entries = None;
466 if !wait {
467 break;
468 }
469 }
470 }
471 })
472 .into_iter()
473 }
474}