1use std::{
2 ffi::{OsStr, OsString},
3 fmt::Debug,
4 fs::{File, create_dir},
5 io::{Read, Write},
6 marker::PhantomData,
7 path::{Path, PathBuf},
8 thread::sleep,
9 time::{Duration, SystemTime},
10};
11
12use chj_unix_util::polling_signals::PollingSignalsSender;
13use cj_path_util::path_util::AppendToPath;
14use serde::{Serialize, de::DeserializeOwned};
15
16use crate::io_utils::lockable_file::{ExclusiveFileLock, LockableFile, SharedFileLock};
17
18use super::as_key::AsKey;
19
20fn key_from_file_name<K: AsKey>(
22 file_name: &OsStr,
23 base_dir: &PathBuf,
24) -> Result<Option<K>, KeyValError> {
25 let file_name = file_name
26 .to_str()
27 .ok_or_else(|| KeyValError::InvalidFileNameInStorage {
28 base_dir: base_dir.clone(),
29 ctx: "does not decode as string",
30 invalid_file_name: file_name.to_owned(),
31 })?;
32 if file_name.starts_with('.') {
33 Ok(None)
35 } else {
36 if let Some(key) = K::try_from_filename_str(&file_name) {
37 Ok(Some(key))
38 } else {
39 Err(KeyValError::InvalidFileNameInStorage {
40 base_dir: base_dir.clone(),
41 ctx: "can't be parsed back into key",
42 invalid_file_name: file_name.into(),
43 })
44 }
45 }
46}
47
48macro_rules! define_lock_helper {
50 { $name:ident, $lock_type:tt, $method:tt } => {
51 fn $name<'l>(
52 lock_file: &'l LockableFile<File>,
53 base_dir: &PathBuf,
54 ) -> Result<$lock_type<'l, File>, KeyValError> {
55 lock_file.$method().map_err(|error| KeyValError::IO {
56 ctx: "getting lock on dir",
57 base_dir: base_dir.clone(),
58 path: base_dir.clone(),
59 error,
60 })
61 }
62 }
63}
64define_lock_helper! {lock_exclusive, ExclusiveFileLock, lock_exclusive}
65define_lock_helper! {lock_shared, SharedFileLock, lock_shared}
66
67#[derive(Debug)]
70pub struct Entry<'p, K: AsKey, V: DeserializeOwned + Serialize> {
71 key_type: PhantomData<fn() -> K>,
72 val_type: PhantomData<fn() -> V>,
73 base_dir: &'p PathBuf,
74 target_path: PathBuf,
75 value_file: Option<File>,
77 signal_change: Option<&'p PollingSignalsSender>,
78}
79
80impl<'p, K: AsKey, V: DeserializeOwned + Serialize> PartialEq for Entry<'p, K, V> {
81 fn eq(&self, other: &Self) -> bool {
82 self.base_dir == other.base_dir && self.target_path == other.target_path
83 }
84}
85
86impl<'p, K: AsKey, V: DeserializeOwned + Serialize> Entry<'p, K, V> {
87 pub fn target_path(&self) -> &Path {
88 &self.target_path
89 }
90
91 pub fn file_name(&self) -> &OsStr {
92 self.target_path
93 .file_name()
94 .expect("entries always have a file name")
95 }
96
97 pub fn key(&self) -> Result<K, KeyValError> {
100 let maybe_key = key_from_file_name(self.file_name(), &self.base_dir)?;
101 Ok(maybe_key.expect("an `Entry` is never created from a tmp file"))
102 }
103
104 pub fn get(&mut self) -> Result<V, KeyValError> {
105 let mut s = String::new();
106 if let Some(file) = &mut self.value_file {
107 file.read_to_string(&mut s)
108 .map_err(|error| KeyValError::IO {
109 ctx: "reading/UTF-decoding value file",
110 base_dir: self.base_dir.clone(),
111 path: self.target_path.clone(),
112 error,
113 })?;
114 let val: V =
115 serde_json::from_str(&s).map_err(|error| KeyValError::Deserialization {
116 base_dir: self.base_dir.clone(),
117 path: self.target_path.clone(),
118 error,
119 })?;
120 Ok(val)
121 } else {
122 Err(KeyValError::FileTaken {
123 base_dir: self.base_dir.clone(),
124 path: self.target_path.clone(),
125 })
126 }
127 }
128
129 pub fn take_lockable_file(&mut self) -> Option<LockableFile<File>> {
133 Some(LockableFile::from(self.value_file.take()?))
134 }
135
136 pub fn exists(&mut self) -> bool {
139 self.target_path.exists()
140 }
141
142 pub fn delete(&self) -> Result<bool, KeyValError> {
146 match std::fs::remove_file(&self.target_path) {
147 Ok(()) => {
148 if let Some(signal) = &self.signal_change {
149 signal.send_signal();
150 }
151 Ok(true)
152 }
153 Err(error) => match error.kind() {
154 std::io::ErrorKind::NotFound => Ok(false),
155 _ => Err(KeyValError::IO {
156 base_dir: self.base_dir.clone(),
157 path: self.target_path.clone(),
158 ctx: "deleting the value file",
159 error,
160 }),
161 },
162 }
163 }
164}
165
166#[derive(thiserror::Error, Debug)]
167pub enum KeyValError {
168 #[error("creating directory {base_dir:?}: {error:#}")]
169 CreateDir {
170 base_dir: PathBuf,
171 error: std::io::Error,
172 },
173 #[error("key_val_fs db at {base_dir:?}: {ctx} {path:?}: {error:#}")]
174 IO {
175 base_dir: PathBuf,
176 path: PathBuf,
177 ctx: &'static str,
178 error: std::io::Error,
179 },
180 #[error("serializing value to JSON")]
181 Serialization {
182 #[from]
183 error: serde_json::Error,
184 },
185 #[error("deserializing value from JSON at {base_dir:?}: file {path:?}: {error:#}")]
186 Deserialization {
187 base_dir: PathBuf,
188 path: PathBuf,
189 error: serde_json::Error,
190 },
191 #[error("mapping already exists for key {key_debug_string} in {base_dir:?}")]
192 KeyExists {
193 base_dir: PathBuf,
194 key_debug_string: String,
195 },
196 #[error("invalid file name in directory in {base_dir:?}: {ctx} {invalid_file_name:?}")]
197 InvalidFileNameInStorage {
198 base_dir: PathBuf,
199 ctx: &'static str,
200 invalid_file_name: OsString,
201 },
202 #[error("usage error: file handle already taken out (in {base_dir:?}: {path:?})")]
203 FileTaken { base_dir: PathBuf, path: PathBuf },
204 #[error("lock is already taken in {base_dir:?} for {path:?}")]
205 LockTaken { base_dir: PathBuf, path: PathBuf },
206 #[error("bug: lock has already been taken, `no_lock` was false")]
207 AlreadyLocked { base_dir: PathBuf, path: PathBuf },
208}
209
210#[derive(Debug, Clone, Copy, PartialEq)]
211pub enum KeyValSync {
212 No,
215 Files,
220 All,
224}
225
226impl KeyValSync {
227 fn do_sync_files(self) -> bool {
228 match self {
229 KeyValSync::No => false,
230 KeyValSync::Files => true,
231 KeyValSync::All => true,
232 }
233 }
234 fn do_sync_dirs(self) -> bool {
235 match self {
236 KeyValSync::No => false,
237 KeyValSync::Files => false,
238 KeyValSync::All => true,
239 }
240 }
241
242 fn flush_and_perhaps_sync_file(
243 self,
244 file: &mut File,
245 flush_ctx: &'static str,
246 sync_ctx: &'static str,
247 base_dir: &Path,
248 path: &Path,
249 ) -> Result<(), KeyValError> {
250 file.flush().map_err(|error| KeyValError::IO {
251 ctx: flush_ctx,
252 base_dir: base_dir.to_owned(),
253 path: path.to_owned(),
254 error,
255 })?;
256 if self.do_sync_files() {
257 file.sync_all().map_err(|error| KeyValError::IO {
258 ctx: sync_ctx,
259 base_dir: base_dir.to_owned(),
260 path: path.to_owned(),
261 error,
262 })?;
263 }
264 Ok(())
265 }
266
267 fn perhaps_sync_dir(self, dir_file: &File, base_dir: &Path) -> Result<(), KeyValError> {
268 if self.do_sync_dirs() {
269 dir_file.sync_all().map_err(|error| KeyValError::IO {
270 ctx: "sync of the directory",
271 base_dir: base_dir.to_owned(),
272 path: base_dir.to_owned(),
273 error,
274 })?;
275 }
276 Ok(())
277 }
278}
279
280#[derive(Debug, Clone, PartialEq)]
282pub struct KeyValConfig {
283 pub sync: KeyValSync,
286 pub create_dir_if_not_exists: bool,
289}
290
291impl Default for KeyValConfig {
292 fn default() -> Self {
293 KeyValConfig {
294 sync: KeyValSync::All,
295 create_dir_if_not_exists: true,
296 }
297 }
298}
299
300#[derive(Debug)]
301pub struct KeyVal<K: AsKey, V: DeserializeOwned + Serialize> {
302 keys: PhantomData<fn() -> K>,
303 vals: PhantomData<fn() -> V>,
304 pub config: KeyValConfig,
305 pub base_dir: PathBuf,
306 lock_file: LockableFile<File>,
309 dir_file: File,
312 signal_change: Option<PollingSignalsSender>,
314}
315
316impl<K: AsKey, V: DeserializeOwned + Serialize> PartialEq for KeyVal<K, V> {
318 fn eq(&self, other: &Self) -> bool {
319 self.config.eq(&other.config) && self.base_dir.eq(&other.base_dir)
320 }
321}
322
323impl<K: AsKey, V: DeserializeOwned + Serialize> KeyVal<K, V> {
324 pub fn open(
325 base_dir: impl AsRef<Path>,
326 config: KeyValConfig,
327 signal_change: Option<PollingSignalsSender>,
328 ) -> Result<Self, KeyValError> {
329 let base_dir = base_dir.as_ref().to_owned();
330 let dir_needs_sync;
331 if config.create_dir_if_not_exists {
332 if let Err(error) = create_dir(&base_dir) {
333 match error.kind() {
334 std::io::ErrorKind::AlreadyExists => dir_needs_sync = false,
335 _ => return Err(KeyValError::CreateDir { base_dir, error }),
336 }
337 } else {
338 dir_needs_sync = true;
339 }
340 } else {
341 dir_needs_sync = false;
342 }
343 let open_base_dir = || {
344 File::open(&base_dir).map_err(|error| KeyValError::IO {
345 ctx: "opening directory as file",
346 base_dir: base_dir.to_owned(),
347 path: base_dir.to_owned(),
348 error,
349 })
350 };
351 let mut dir_file = open_base_dir()?;
352 const CAN_USE_DIR_FILE_CLONE: bool = true;
356 let lock_file = if CAN_USE_DIR_FILE_CLONE {
357 dir_file.try_clone().map_err(|error| KeyValError::IO {
358 ctx: "cloning directory filehandle",
359 base_dir: base_dir.to_owned(),
360 path: base_dir.to_owned(),
361 error,
362 })
363 } else {
364 open_base_dir()
365 }?
366 .into();
367
368 if dir_needs_sync {
369 config.sync.perhaps_sync_dir(&mut dir_file, &base_dir)?;
370 }
371 Ok(Self {
372 keys: PhantomData,
373 vals: PhantomData,
374 config,
375 base_dir,
376 lock_file,
377 dir_file,
378 signal_change,
379 })
380 }
381
382 pub fn lock_exclusive(&self) -> Result<ExclusiveFileLock<'_, File>, KeyValError> {
383 lock_exclusive(&self.lock_file, &self.base_dir)
384 }
385
386 pub fn lock_shared(&self) -> Result<SharedFileLock<'_, File>, KeyValError> {
387 lock_shared(&self.lock_file, &self.base_dir)
388 }
389
390 pub fn insert(&self, key: &K, val: &V, exclusive: bool) -> Result<(), KeyValError>
394 where
395 K: Debug,
396 {
397 let valstr = serde_json::to_string(val)?;
398 let key_filename = key.verified_as_filename_str();
399 let tmp_path = {
400 let tmp_filename = format!(".{}", key_filename);
401 (&self.base_dir).append(tmp_filename)
402 };
403 let target_path = (&self.base_dir).append(key_filename.as_ref());
404
405 let _lock = lock_exclusive(&self.lock_file, &self.base_dir)?;
408
409 if exclusive && target_path.exists() {
410 return Err(KeyValError::KeyExists {
411 base_dir: self.base_dir.clone(),
412 key_debug_string: format!("{key:?}"),
413 });
414 }
415 let mut out = File::create(&tmp_path).map_err(|error| KeyValError::IO {
416 ctx: "creating file",
417 base_dir: self.base_dir.clone(),
418 path: tmp_path.clone(),
419 error,
420 })?;
421 out.write_all(valstr.as_bytes())
422 .map_err(|error| KeyValError::IO {
423 ctx: "writing to file",
424 base_dir: self.base_dir.clone(),
425 path: tmp_path.clone(),
426 error,
427 })?;
428 self.config.sync.flush_and_perhaps_sync_file(
429 &mut out,
430 "flush of the file",
431 "sync of the file",
432 &self.base_dir,
433 &tmp_path,
434 )?;
435 drop(out);
436 std::fs::rename(&tmp_path, &target_path).map_err(|error| KeyValError::IO {
437 ctx: "renaming to file",
438 base_dir: self.base_dir.clone(),
439 path: target_path.clone(),
440 error,
441 })?;
442 self.config
443 .sync
444 .perhaps_sync_dir(&self.dir_file, &self.base_dir)?;
445 if let Some(signal) = &self.signal_change {
446 signal.send_signal();
447 }
448 Ok(())
449 }
450
451 pub fn delete(&self, key: &K) -> Result<bool, KeyValError> {
454 let key_filename = key.verified_as_filename_str();
455 let target_path = (&self.base_dir).append(key_filename.as_ref());
456 match std::fs::remove_file(&target_path) {
457 Ok(()) => {
458 if let Some(signal) = &self.signal_change {
459 signal.send_signal();
460 }
461 Ok(true)
462 }
463 Err(error) => match error.kind() {
464 std::io::ErrorKind::NotFound => Ok(false),
465 _ => Err(KeyValError::IO {
466 base_dir: self.base_dir.clone(),
467 path: target_path,
468 ctx: "delete",
469 error,
470 }),
471 },
472 }
473 }
474
475 pub fn entry_opt(&self, key: &K) -> Result<Option<Entry<'_, K, V>>, KeyValError> {
479 let key_filename = key.verified_as_filename_str();
480 let target_path = (&self.base_dir).append(key_filename.as_ref());
481 match File::open(&target_path) {
482 Ok(value_file) => Ok(Some(Entry {
483 key_type: PhantomData,
484 val_type: PhantomData,
485 base_dir: &self.base_dir,
486 target_path,
487 value_file: Some(value_file),
488 signal_change: self.signal_change.as_ref(),
489 })),
490 Err(error) => match error.kind() {
491 std::io::ErrorKind::NotFound => Ok(None),
492 _ => Err(KeyValError::IO {
493 ctx: "opening value file",
494 base_dir: self.base_dir.clone(),
495 path: target_path,
496 error,
497 }),
498 },
499 }
500 }
501
502 pub fn get(&self, key: &K) -> Result<Option<V>, KeyValError> {
506 if let Some(mut entry) = self.entry_opt(key)? {
507 Some(entry.get()).transpose()
508 } else {
509 Ok(None)
510 }
511 }
512
513 pub fn wait_for_entries(&self, stop_at: Option<SystemTime>) -> Result<bool, KeyValError> {
516 let mut sleep_time = 1000;
518 loop {
519 let mut dir = std::fs::read_dir(&self.base_dir).map_err(|error| KeyValError::IO {
520 ctx: "opening directory",
521 base_dir: self.base_dir.clone(),
522 path: self.base_dir.clone(),
523 error,
524 })?;
525 if dir.next().is_some() {
526 return Ok(true);
527 }
528
529 if let Some(stop_at) = stop_at {
530 let now = SystemTime::now();
531 if now >= stop_at {
532 return Ok(false);
533 }
534 }
535
536 sleep(Duration::from_nanos(sleep_time));
538 if sleep_time < 2_000_000_000 {
539 sleep_time = (sleep_time * 101) / 100;
540 }
541 }
542 }
543
544 pub fn keys<'s>(
553 &'s self,
554 wait_for_entries: bool,
555 stop_at: Option<SystemTime>,
556 ) -> Result<impl Iterator<Item = Result<K, KeyValError>> + use<'s, K, V>, KeyValError> {
557 if wait_for_entries {
558 self.wait_for_entries(stop_at)?;
559 }
560
561 let dir = std::fs::read_dir(&self.base_dir).map_err(|error| KeyValError::IO {
562 ctx: "opening directory",
563 base_dir: self.base_dir.clone(),
564 path: self.base_dir.clone(),
565 error,
566 })?;
567 Ok(dir
568 .map(|entry| -> Result<Option<K>, KeyValError> {
569 let entry = entry.map_err(|error| KeyValError::IO {
570 ctx: "reading directory entry",
571 base_dir: self.base_dir.clone(),
572 path: self.base_dir.clone(),
573 error,
574 })?;
575 key_from_file_name(&entry.file_name(), &self.base_dir)
576 })
577 .filter_map(|val| val.transpose()))
578 }
579
580 pub fn sorted_keys(
582 &self,
583 wait_for_entries: bool,
584 stop_at: Option<SystemTime>,
585 reverse: bool,
586 ) -> Result<Vec<K>, KeyValError>
587 where
588 K: Ord,
589 {
590 let mut keys: Vec<_> = self
591 .keys(wait_for_entries, stop_at)?
592 .collect::<Result<_, _>>()?;
593 keys.sort();
595 if reverse {
596 keys.reverse();
597 }
598 Ok(keys)
599 }
600}