evobench_tools/key_val_fs/
key_val.rs

1use std::{
2    ffi::{OsStr, OsString},
3    fmt::Debug,
4    fs::{File, create_dir},
5    io::{Read, Write},
6    marker::PhantomData,
7    path::{Path, PathBuf},
8    thread::sleep,
9    time::{Duration, SystemTime},
10};
11
12use chj_unix_util::polling_signals::PollingSignalsSender;
13use cj_path_util::path_util::AppendToPath;
14use serde::{Serialize, de::DeserializeOwned};
15
16use crate::io_utils::lockable_file::{ExclusiveFileLock, LockableFile, SharedFileLock};
17
18use super::as_key::AsKey;
19
20/// Returns `None` if `file_name` is for a tmp file
21fn key_from_file_name<K: AsKey>(
22    file_name: &OsStr,
23    base_dir: &PathBuf,
24) -> Result<Option<K>, KeyValError> {
25    let file_name = file_name
26        .to_str()
27        .ok_or_else(|| KeyValError::InvalidFileNameInStorage {
28            base_dir: base_dir.clone(),
29            ctx: "does not decode as string",
30            invalid_file_name: file_name.to_owned(),
31        })?;
32    if file_name.starts_with('.') {
33        // tmp file
34        Ok(None)
35    } else {
36        if let Some(key) = K::try_from_filename_str(&file_name) {
37            Ok(Some(key))
38        } else {
39            Err(KeyValError::InvalidFileNameInStorage {
40                base_dir: base_dir.clone(),
41                ctx: "can't be parsed back into key",
42                invalid_file_name: file_name.into(),
43            })
44        }
45    }
46}
47
48// Locking the dir (whole `KeyVal` data structure)
49macro_rules! define_lock_helper {
50    { $name:ident, $lock_type:tt, $method:tt } =>  {
51        fn $name<'l>(
52            lock_file: &'l LockableFile<File>,
53            base_dir: &PathBuf,
54        ) -> Result<$lock_type<'l, File>, KeyValError> {
55            lock_file.$method().map_err(|error| KeyValError::IO {
56                ctx: "getting lock on dir",
57                base_dir: base_dir.clone(),
58                path: base_dir.clone(),
59                error,
60            })
61        }
62    }
63}
64define_lock_helper! {lock_exclusive, ExclusiveFileLock, lock_exclusive}
65define_lock_helper! {lock_shared, SharedFileLock, lock_shared}
66
67/// Accessor for an entry, embeds an open file handle, allows to load
68/// or lock-and-delete etc.
69#[derive(Debug)]
70pub struct Entry<'p, K: AsKey, V: DeserializeOwned + Serialize> {
71    key_type: PhantomData<fn() -> K>,
72    val_type: PhantomData<fn() -> V>,
73    base_dir: &'p PathBuf,
74    target_path: PathBuf,
75    // Becoming None when calling `take_lockable_file`
76    value_file: Option<File>,
77    signal_change: Option<&'p PollingSignalsSender>,
78}
79
80impl<'p, K: AsKey, V: DeserializeOwned + Serialize> PartialEq for Entry<'p, K, V> {
81    fn eq(&self, other: &Self) -> bool {
82        self.base_dir == other.base_dir && self.target_path == other.target_path
83    }
84}
85
86impl<'p, K: AsKey, V: DeserializeOwned + Serialize> Entry<'p, K, V> {
87    pub fn target_path(&self) -> &Path {
88        &self.target_path
89    }
90
91    pub fn file_name(&self) -> &OsStr {
92        self.target_path
93            .file_name()
94            .expect("entries always have a file name")
95    }
96
97    /// Can return file name decoding errors for files that were
98    /// inserted into the directory via other means than this library.
99    pub fn key(&self) -> Result<K, KeyValError> {
100        let maybe_key = key_from_file_name(self.file_name(), &self.base_dir)?;
101        Ok(maybe_key.expect("an `Entry` is never created from a tmp file"))
102    }
103
104    pub fn get(&mut self) -> Result<V, KeyValError> {
105        let mut s = String::new();
106        if let Some(file) = &mut self.value_file {
107            file.read_to_string(&mut s)
108                .map_err(|error| KeyValError::IO {
109                    ctx: "reading/UTF-decoding value file",
110                    base_dir: self.base_dir.clone(),
111                    path: self.target_path.clone(),
112                    error,
113                })?;
114            let val: V =
115                serde_json::from_str(&s).map_err(|error| KeyValError::Deserialization {
116                    base_dir: self.base_dir.clone(),
117                    path: self.target_path.clone(),
118                    error,
119                })?;
120            Ok(val)
121        } else {
122            Err(KeyValError::FileTaken {
123                base_dir: self.base_dir.clone(),
124                path: self.target_path.clone(),
125            })
126        }
127    }
128
129    /// Giving up the Entry accessor, means you have to call `get`
130    /// first. (Yes, locking wouldn't protect that anyway! This lock
131    /// is not for that!) Returns None if already taken.
132    pub fn take_lockable_file(&mut self) -> Option<LockableFile<File>> {
133        Some(LockableFile::from(self.value_file.take()?))
134    }
135
136    /// Whether the entry currently (still) exists -- XX But there is
137    /// no guarantee it is still the old one!
138    pub fn exists(&mut self) -> bool {
139        self.target_path.exists()
140    }
141
142    /// Returns whether a file deletion actually happened (concurrent
143    /// deletes might already have removed it). XX Same caveat as with
144    /// `exists`.
145    pub fn delete(&self) -> Result<bool, KeyValError> {
146        match std::fs::remove_file(&self.target_path) {
147            Ok(()) => {
148                if let Some(signal) = &self.signal_change {
149                    signal.send_signal();
150                }
151                Ok(true)
152            }
153            Err(error) => match error.kind() {
154                std::io::ErrorKind::NotFound => Ok(false),
155                _ => Err(KeyValError::IO {
156                    base_dir: self.base_dir.clone(),
157                    path: self.target_path.clone(),
158                    ctx: "deleting the value file",
159                    error,
160                }),
161            },
162        }
163    }
164}
165
166#[derive(thiserror::Error, Debug)]
167pub enum KeyValError {
168    #[error("creating directory {base_dir:?}: {error:#}")]
169    CreateDir {
170        base_dir: PathBuf,
171        error: std::io::Error,
172    },
173    #[error("key_val_fs db at {base_dir:?}: {ctx} {path:?}: {error:#}")]
174    IO {
175        base_dir: PathBuf,
176        path: PathBuf,
177        ctx: &'static str,
178        error: std::io::Error,
179    },
180    #[error("serializing value to JSON")]
181    Serialization {
182        #[from]
183        error: serde_json::Error,
184    },
185    #[error("deserializing value from JSON at {base_dir:?}: file {path:?}: {error:#}")]
186    Deserialization {
187        base_dir: PathBuf,
188        path: PathBuf,
189        error: serde_json::Error,
190    },
191    #[error("mapping already exists for key {key_debug_string} in {base_dir:?}")]
192    KeyExists {
193        base_dir: PathBuf,
194        key_debug_string: String,
195    },
196    #[error("invalid file name in directory in {base_dir:?}: {ctx} {invalid_file_name:?}")]
197    InvalidFileNameInStorage {
198        base_dir: PathBuf,
199        ctx: &'static str,
200        invalid_file_name: OsString,
201    },
202    #[error("usage error: file handle already taken out (in {base_dir:?}: {path:?})")]
203    FileTaken { base_dir: PathBuf, path: PathBuf },
204    #[error("lock is already taken in {base_dir:?} for {path:?}")]
205    LockTaken { base_dir: PathBuf, path: PathBuf },
206    #[error("bug: lock has already been taken, `no_lock` was false")]
207    AlreadyLocked { base_dir: PathBuf, path: PathBuf },
208}
209
210#[derive(Debug, Clone, Copy, PartialEq)]
211pub enum KeyValSync {
212    /// Do not call sync; fastest, but may end up with corrupt files
213    /// in the database
214    No,
215    /// Call fsync on files before moving them in place inside the
216    /// database. This should prevent the possibility for corrupt
217    /// files, but does not guarantee that entries are persisted after
218    /// returning from modifying functions.
219    Files,
220    /// Call fsync on files and then also on the containing dir. This
221    /// should guarantee that changes are persisted by the time
222    /// functions return.
223    All,
224}
225
226impl KeyValSync {
227    fn do_sync_files(self) -> bool {
228        match self {
229            KeyValSync::No => false,
230            KeyValSync::Files => true,
231            KeyValSync::All => true,
232        }
233    }
234    fn do_sync_dirs(self) -> bool {
235        match self {
236            KeyValSync::No => false,
237            KeyValSync::Files => false,
238            KeyValSync::All => true,
239        }
240    }
241
242    fn flush_and_perhaps_sync_file(
243        self,
244        file: &mut File,
245        flush_ctx: &'static str,
246        sync_ctx: &'static str,
247        base_dir: &Path,
248        path: &Path,
249    ) -> Result<(), KeyValError> {
250        file.flush().map_err(|error| KeyValError::IO {
251            ctx: flush_ctx,
252            base_dir: base_dir.to_owned(),
253            path: path.to_owned(),
254            error,
255        })?;
256        if self.do_sync_files() {
257            file.sync_all().map_err(|error| KeyValError::IO {
258                ctx: sync_ctx,
259                base_dir: base_dir.to_owned(),
260                path: path.to_owned(),
261                error,
262            })?;
263        }
264        Ok(())
265    }
266
267    fn perhaps_sync_dir(self, dir_file: &File, base_dir: &Path) -> Result<(), KeyValError> {
268        if self.do_sync_dirs() {
269            dir_file.sync_all().map_err(|error| KeyValError::IO {
270                ctx: "sync of the directory",
271                base_dir: base_dir.to_owned(),
272                path: base_dir.to_owned(),
273                error,
274            })?;
275        }
276        Ok(())
277    }
278}
279
280/// Configuration for `KeyVal`
281#[derive(Debug, Clone, PartialEq)]
282pub struct KeyValConfig {
283    /// Whether to call fsync on files and the containing directory
284    /// when doing changes (default: All).
285    pub sync: KeyValSync,
286    /// Whether to create the directory holding the data, if it
287    /// doesn't exist already (default: true)
288    pub create_dir_if_not_exists: bool,
289}
290
291impl Default for KeyValConfig {
292    fn default() -> Self {
293        KeyValConfig {
294            sync: KeyValSync::All,
295            create_dir_if_not_exists: true,
296        }
297    }
298}
299
300#[derive(Debug)]
301pub struct KeyVal<K: AsKey, V: DeserializeOwned + Serialize> {
302    keys: PhantomData<fn() -> K>,
303    vals: PhantomData<fn() -> V>,
304    pub config: KeyValConfig,
305    pub base_dir: PathBuf,
306    // Filehandle to the directory, for flock (was a .lock file, but
307    // dir itself works, too, on Linux anyway)
308    lock_file: LockableFile<File>,
309    // Filehandle to the directory again, for sync, since the one
310    // above is wrapped
311    dir_file: File,
312    // An optional signalling handle for changes
313    signal_change: Option<PollingSignalsSender>,
314}
315
316// Just for testing
317impl<K: AsKey, V: DeserializeOwned + Serialize> PartialEq for KeyVal<K, V> {
318    fn eq(&self, other: &Self) -> bool {
319        self.config.eq(&other.config) && self.base_dir.eq(&other.base_dir)
320    }
321}
322
323impl<K: AsKey, V: DeserializeOwned + Serialize> KeyVal<K, V> {
324    pub fn open(
325        base_dir: impl AsRef<Path>,
326        config: KeyValConfig,
327        signal_change: Option<PollingSignalsSender>,
328    ) -> Result<Self, KeyValError> {
329        let base_dir = base_dir.as_ref().to_owned();
330        let dir_needs_sync;
331        if config.create_dir_if_not_exists {
332            if let Err(error) = create_dir(&base_dir) {
333                match error.kind() {
334                    std::io::ErrorKind::AlreadyExists => dir_needs_sync = false,
335                    _ => return Err(KeyValError::CreateDir { base_dir, error }),
336                }
337            } else {
338                dir_needs_sync = true;
339            }
340        } else {
341            dir_needs_sync = false;
342        }
343        let open_base_dir = || {
344            File::open(&base_dir).map_err(|error| KeyValError::IO {
345                ctx: "opening directory as file",
346                base_dir: base_dir.to_owned(),
347                path: base_dir.to_owned(),
348                error,
349            })
350        };
351        let mut dir_file = open_base_dir()?;
352        /// Can we use try_clone and then call both fsync and flock on
353        /// all supported architectures? It's fine on Linux. Try for
354        /// now.
355        const CAN_USE_DIR_FILE_CLONE: bool = true;
356        let lock_file = if CAN_USE_DIR_FILE_CLONE {
357            dir_file.try_clone().map_err(|error| KeyValError::IO {
358                ctx: "cloning directory filehandle",
359                base_dir: base_dir.to_owned(),
360                path: base_dir.to_owned(),
361                error,
362            })
363        } else {
364            open_base_dir()
365        }?
366        .into();
367
368        if dir_needs_sync {
369            config.sync.perhaps_sync_dir(&mut dir_file, &base_dir)?;
370        }
371        Ok(Self {
372            keys: PhantomData,
373            vals: PhantomData,
374            config,
375            base_dir,
376            lock_file,
377            dir_file,
378            signal_change,
379        })
380    }
381
382    pub fn lock_exclusive(&self) -> Result<ExclusiveFileLock<'_, File>, KeyValError> {
383        lock_exclusive(&self.lock_file, &self.base_dir)
384    }
385
386    pub fn lock_shared(&self) -> Result<SharedFileLock<'_, File>, KeyValError> {
387        lock_shared(&self.lock_file, &self.base_dir)
388    }
389
390    /// Insert a mapping; if `exclusive` is true, give an error if
391    /// `key` is already in the map (otherwise the previous value is
392    /// silently overwritten).
393    pub fn insert(&self, key: &K, val: &V, exclusive: bool) -> Result<(), KeyValError>
394    where
395        K: Debug,
396    {
397        let valstr = serde_json::to_string(val)?;
398        let key_filename = key.verified_as_filename_str();
399        let tmp_path = {
400            let tmp_filename = format!(".{}", key_filename);
401            (&self.base_dir).append(tmp_filename)
402        };
403        let target_path = (&self.base_dir).append(key_filename.as_ref());
404
405        // The lock is required since we only use 1 tmp file path for
406        // all processes! Also, for a race-free existence check.
407        let _lock = lock_exclusive(&self.lock_file, &self.base_dir)?;
408
409        if exclusive && target_path.exists() {
410            return Err(KeyValError::KeyExists {
411                base_dir: self.base_dir.clone(),
412                key_debug_string: format!("{key:?}"),
413            });
414        }
415        let mut out = File::create(&tmp_path).map_err(|error| KeyValError::IO {
416            ctx: "creating file",
417            base_dir: self.base_dir.clone(),
418            path: tmp_path.clone(),
419            error,
420        })?;
421        out.write_all(valstr.as_bytes())
422            .map_err(|error| KeyValError::IO {
423                ctx: "writing to file",
424                base_dir: self.base_dir.clone(),
425                path: tmp_path.clone(),
426                error,
427            })?;
428        self.config.sync.flush_and_perhaps_sync_file(
429            &mut out,
430            "flush of the file",
431            "sync of the file",
432            &self.base_dir,
433            &tmp_path,
434        )?;
435        drop(out);
436        std::fs::rename(&tmp_path, &target_path).map_err(|error| KeyValError::IO {
437            ctx: "renaming to file",
438            base_dir: self.base_dir.clone(),
439            path: target_path.clone(),
440            error,
441        })?;
442        self.config
443            .sync
444            .perhaps_sync_dir(&self.dir_file, &self.base_dir)?;
445        if let Some(signal) = &self.signal_change {
446            signal.send_signal();
447        }
448        Ok(())
449    }
450
451    /// Returns whether an entry was actually deleted (false means
452    /// that no entry for `key` existed)
453    pub fn delete(&self, key: &K) -> Result<bool, KeyValError> {
454        let key_filename = key.verified_as_filename_str();
455        let target_path = (&self.base_dir).append(key_filename.as_ref());
456        match std::fs::remove_file(&target_path) {
457            Ok(()) => {
458                if let Some(signal) = &self.signal_change {
459                    signal.send_signal();
460                }
461                Ok(true)
462            }
463            Err(error) => match error.kind() {
464                std::io::ErrorKind::NotFound => Ok(false),
465                _ => Err(KeyValError::IO {
466                    base_dir: self.base_dir.clone(),
467                    path: target_path,
468                    ctx: "delete",
469                    error,
470                }),
471            },
472        }
473    }
474
475    /// Get access to an entry, if it exists. Note: does not lock,
476    /// and on Windows might possibly deadlock with the rename calls
477    /// of `insert`?  Only tested on Linux (and macOS?)
478    pub fn entry_opt(&self, key: &K) -> Result<Option<Entry<'_, K, V>>, KeyValError> {
479        let key_filename = key.verified_as_filename_str();
480        let target_path = (&self.base_dir).append(key_filename.as_ref());
481        match File::open(&target_path) {
482            Ok(value_file) => Ok(Some(Entry {
483                key_type: PhantomData,
484                val_type: PhantomData,
485                base_dir: &self.base_dir,
486                target_path,
487                value_file: Some(value_file),
488                signal_change: self.signal_change.as_ref(),
489            })),
490            Err(error) => match error.kind() {
491                std::io::ErrorKind::NotFound => Ok(None),
492                _ => Err(KeyValError::IO {
493                    ctx: "opening value file",
494                    base_dir: self.base_dir.clone(),
495                    path: target_path,
496                    error,
497                }),
498            },
499        }
500    }
501
502    /// Note: does not lock, and on Windows might possibly deadlock
503    /// with the rename calls of `insert`! Only tested on Linux (and
504    /// macOS?)
505    pub fn get(&self, key: &K) -> Result<Option<V>, KeyValError> {
506        if let Some(mut entry) = self.entry_opt(key)? {
507            Some(entry.get()).transpose()
508        } else {
509            Ok(None)
510        }
511    }
512
513    /// Stops waiting at `stop_at` if given. Returns true if it found
514    /// an entry, false if it timed out.
515    pub fn wait_for_entries(&self, stop_at: Option<SystemTime>) -> Result<bool, KeyValError> {
516        // XX hack, use file notifications instead
517        let mut sleep_time = 1000;
518        loop {
519            let mut dir = std::fs::read_dir(&self.base_dir).map_err(|error| KeyValError::IO {
520                ctx: "opening directory",
521                base_dir: self.base_dir.clone(),
522                path: self.base_dir.clone(),
523                error,
524            })?;
525            if dir.next().is_some() {
526                return Ok(true);
527            }
528
529            if let Some(stop_at) = stop_at {
530                let now = SystemTime::now();
531                if now >= stop_at {
532                    return Ok(false);
533                }
534            }
535
536            // dbg!(sleep_time);
537            sleep(Duration::from_nanos(sleep_time));
538            if sleep_time < 2_000_000_000 {
539                sleep_time = (sleep_time * 101) / 100;
540            }
541        }
542    }
543
544    /// Get all the keys contained in the map. Their order is not
545    /// defined. Note that the returned entries may not exist any more
546    /// by the time your code looks at them, since exclusivity can't
547    /// be statically ensured, and taking a lock for the iterator's
548    /// life time seems excessive. If `wait_for_entries` is true,
549    /// blocks until entries exist (but note that with concurrent
550    /// deletions, by the time the entries are read, they may be gone
551    /// again--the returned sequence might still be empty).
552    pub fn keys<'s>(
553        &'s self,
554        wait_for_entries: bool,
555        stop_at: Option<SystemTime>,
556    ) -> Result<impl Iterator<Item = Result<K, KeyValError>> + use<'s, K, V>, KeyValError> {
557        if wait_for_entries {
558            self.wait_for_entries(stop_at)?;
559        }
560
561        let dir = std::fs::read_dir(&self.base_dir).map_err(|error| KeyValError::IO {
562            ctx: "opening directory",
563            base_dir: self.base_dir.clone(),
564            path: self.base_dir.clone(),
565            error,
566        })?;
567        Ok(dir
568            .map(|entry| -> Result<Option<K>, KeyValError> {
569                let entry = entry.map_err(|error| KeyValError::IO {
570                    ctx: "reading directory entry",
571                    base_dir: self.base_dir.clone(),
572                    path: self.base_dir.clone(),
573                    error,
574                })?;
575                key_from_file_name(&entry.file_name(), &self.base_dir)
576            })
577            .filter_map(|val| val.transpose()))
578    }
579
580    /// Sorted output of `keys()`.
581    pub fn sorted_keys(
582        &self,
583        wait_for_entries: bool,
584        stop_at: Option<SystemTime>,
585        reverse: bool,
586    ) -> Result<Vec<K>, KeyValError>
587    where
588        K: Ord,
589    {
590        let mut keys: Vec<_> = self
591            .keys(wait_for_entries, stop_at)?
592            .collect::<Result<_, _>>()?;
593        // No way to sort in reverse from the get go?
594        keys.sort();
595        if reverse {
596            keys.reverse();
597        }
598        Ok(keys)
599    }
600}