evobench_tools/run/
working_directory_pool.rs

1//! A pool of `WorkingDirectory`.
2
3//! Error concept: if there are errors, the WorkingDirectory is
4//! renamed but stays in the pool directory. (Only directories with
5//! names that are parseable as u64 are treated as usable entries.)
6
7use std::{
8    collections::BTreeMap,
9    fmt::Display,
10    fs::File,
11    num::NonZeroU8,
12    path::{Path, PathBuf},
13    str::FromStr,
14    sync::{
15        Arc,
16        atomic::{AtomicBool, Ordering},
17    },
18    u64,
19};
20
21use anyhow::{Result, anyhow, bail};
22use chj_unix_util::polling_signals::PollingSignalsSender;
23use cj_path_util::path_util::AppendToPath;
24use rayon::iter::{IntoParallelIterator, ParallelIterator};
25use serde::{Deserialize, Serialize};
26
27use crate::{
28    config_file::load_ron_file,
29    ctx, debug, def_linear,
30    git::GitHash,
31    info, io_utils,
32    io_utils::owning_lockable_file::{OwningExclusiveFileLock, OwningLockableFile},
33    run::{
34        key::{BenchmarkingJobParameters, RunParameters},
35        working_directory::{
36            WorkingDirectoryAutoCleanOpts, WorkingDirectoryPath, WorkingDirectoryWithPoolLock,
37            WorkingDirectoryWithPoolLockMut, WorkingDirectoryWithPoolMut,
38        },
39    },
40    serde_types::{date_and_time::DateTimeWithOffset, git_url::GitUrl},
41    utillib::arc::CloneArc,
42};
43
44use super::{
45    run_queues::RunQueuesData,
46    working_directory::{Status, WorkingDirectory, WorkingDirectoryStatus},
47};
48
49/// For `RunConfigOpts` configuration file.  `remote_repository_url`
50/// is separate from this because that seemed to make more sense for
51/// the `RunConfigOpts`.
52#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
53#[serde(deny_unknown_fields)]
54#[serde(rename = "WorkingDirectoryPool")]
55pub struct WorkingDirectoryPoolOpts {
56    /// Path to a directory where clones of the project to be
57    /// benchmarked should be kept. By default at
58    /// `.evobench/working_directory_pool/`.
59    pub base_dir: Option<PathBuf>,
60
61    /// How many clones of the target project should be maintained;
62    /// more is better when multiple commits are benchmarked
63    /// alternatively, to avoid needing a rebuild (and input
64    /// re-preparation), but costing disk space.
65    pub capacity: NonZeroU8,
66
67    /// To enable working directory auto-cleaning, give the
68    /// cleaning options. Currently "cleaning" just means full
69    /// deletion by the runner with no involvement of the target
70    /// project.
71    pub auto_clean: Option<WorkingDirectoryAutoCleanOpts>,
72}
73
74/// Completed options, and WorkingDirectoryPoolBaseDir even has an
75/// open (lockable) file.
76#[derive(Debug)]
77pub struct WorkingDirectoryPoolContext {
78    pub capacity: NonZeroU8,
79    /// Option since doing cleanup is optional (not because there are
80    /// values in another place), None means do not do cleanup.
81    pub auto_clean: Option<WorkingDirectoryAutoCleanOpts>,
82    pub remote_repository_url: GitUrl,
83    pub base_dir: Arc<WorkingDirectoryPoolBaseDir>,
84    /// Where to signal changes with the "current" link or working
85    /// directory status files. (Do *not* pass the
86    /// `working_directory_change_signals` here, those are for
87    /// signalling changes to the daemon to reload working directory
88    /// state; whereas the changes that are to be sent here
89    /// *originate* from the same daemon; the daemon would be
90    /// signalling itself.)
91    pub signal_change: Option<PollingSignalsSender>,
92}
93
94/// A precursor of `WorkingDirectoryId` that allows both `D123` and
95/// `123` as IDs on parsing. This is to allow for `allow_bare`. Must
96/// be converted before use.
97#[derive(Debug, Clone)]
98pub struct WorkingDirectoryIdOpt {
99    has_prefix: bool,
100    id: u64,
101}
102
103impl FromStr for WorkingDirectoryIdOpt {
104    type Err = anyhow::Error;
105
106    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
107        let (has_prefix, number_string) = match s.strip_prefix("D").or_else(|| s.strip_prefix("d"))
108        {
109            Some(s) => (true, s),
110            None => (false, s),
111        };
112        let id = number_string.parse()?;
113        Ok(Self { has_prefix, id })
114    }
115}
116
117#[derive(Debug, Clone, Copy, clap::Args)]
118pub struct WdAllowBareOpt {
119    /// Allow bare numbers as working directory IDs. By default,
120    /// the 'D' or 'd' prefix is required.
121    #[clap(short, long)]
122    allow_bare: bool,
123}
124
125impl WorkingDirectoryIdOpt {
126    pub fn to_working_directory_id(self, allow_bare: WdAllowBareOpt) -> Result<WorkingDirectoryId> {
127        let WdAllowBareOpt { allow_bare } = allow_bare;
128        let Self { has_prefix, id } = self;
129        if allow_bare || has_prefix {
130            Ok(WorkingDirectoryId(id))
131        } else {
132            bail!("missing 'D' or 'd' at beginning of working directory ID: {id}")
133        }
134    }
135}
136
137pub fn finish_parsing_working_directory_ids(
138    ids: Vec<WorkingDirectoryIdOpt>,
139    allow_bare: WdAllowBareOpt,
140) -> Result<Vec<WorkingDirectoryId>> {
141    ids.into_iter()
142        .map(|id| id.to_working_directory_id(allow_bare))
143        .collect()
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub struct WorkingDirectoryId(u64);
148
149impl WorkingDirectoryId {
150    fn to_number_string(self) -> String {
151        format!("{}", self.0)
152    }
153    pub fn to_directory_file_name(self) -> String {
154        self.to_number_string()
155    }
156    pub fn from_prefixless_str(s: &str) -> Result<Self> {
157        let id = s.parse()?;
158        Ok(Self(id))
159    }
160}
161
162impl Display for WorkingDirectoryId {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(f, "D{}", self.0)
165    }
166}
167
168/// But actually use `WorkingDirectoryPoolOpts` and its
169/// `to_working_directory_id` method instead!
170pub static WORKING_DIRECTORY_ID_ALLOW_WITHOUT_PREFIX: AtomicBool = AtomicBool::new(false);
171
172impl FromStr for WorkingDirectoryId {
173    type Err = anyhow::Error;
174
175    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
176        let id: WorkingDirectoryIdOpt = s.parse()?;
177        let allow_bare = WdAllowBareOpt {
178            allow_bare: WORKING_DIRECTORY_ID_ALLOW_WITHOUT_PREFIX.load(Ordering::Relaxed),
179        };
180        id.to_working_directory_id(allow_bare)
181    }
182}
183
184// -----------------------------------------------------------------------------
185
186def_linear!(Linear in WorkingDirectoryCleanupToken);
187
188/// This is a linear type (i.e. it cannot be dropped) and has to be
189/// passed to `working_directory_cleanup`, which will potentially
190/// clean up or delete the working directory that this represents. If
191/// you want to prevent that, call `prohibit_cleanup()` on it before
192/// passing it, or call its `force_drop()` method (easier to do in
193/// error handlers).
194#[must_use]
195pub struct WorkingDirectoryCleanupToken {
196    linear_token: Linear,
197    working_directory_id: WorkingDirectoryId,
198    needs_cleanup: bool,
199}
200// For impl WorkingDirectoryCleanupToken: `force_drop` and
201// `prohibiting_cleanup` methods, see git history.
202
203// -----------------------------------------------------------------------------
204
205#[derive(Debug)]
206pub struct WorkingDirectoryPoolBaseDir {
207    path: Arc<Path>,
208    dir_file: OwningLockableFile<File>,
209}
210
211impl WorkingDirectoryPoolBaseDir {
212    pub fn new(
213        base_dir: Option<PathBuf>,
214        get_working_directory_pool_base: &dyn Fn() -> Result<PathBuf>,
215    ) -> Result<Self> {
216        let path: Arc<Path> = if let Some(path) = base_dir {
217            path
218        } else {
219            get_working_directory_pool_base()?
220        }
221        .into();
222        let dir_file = OwningLockableFile::open(path.clone_arc())
223            .map_err(ctx!("opening working directory base dir {path:?}"))?;
224        Ok(Self { path, dir_file })
225    }
226
227    pub fn path(&self) -> &Path {
228        &self.path
229    }
230
231    // Keep private!, pub use should be the typed
232    // `get_working_directory_path` method on `WorkingDirectoryPool`.
233    fn get_working_directory_path(&self, working_directory_id: WorkingDirectoryId) -> PathBuf {
234        self.path
235            .append(working_directory_id.to_directory_file_name())
236    }
237
238    /// The path to the symlink to the currently used working
239    /// directory
240    fn current_working_directory_symlink_path(&self) -> PathBuf {
241        self.path().append("current")
242    }
243
244    /// Lock the base dir of the pool, blocking (this is *not* the
245    /// global job-running lock any more!)
246    // XX *not* &mut, bc dont need, keep it private, ok?, yet won't
247    // help the problem anyway. Anyway, keep private, since
248    // OwningExclusiveFileLock does not have a lifetime!
249    fn get_lock<'s>(&'s self, locker: &str) -> Result<OwningExclusiveFileLock<File>> {
250        let path = self.path();
251        debug!(
252            "getting working directory pool lock on {:?} for {locker}",
253            self.path(),
254        );
255        self.dir_file
256            .lock_exclusive()
257            .map_err(ctx!("locking working directory pool base dir {path:?}"))
258    }
259
260    /// Lock the base dir of the pool, blocking (this is *not* the
261    /// global job-running lock any more!)
262    pub fn lock<'s>(&'s self, locker: &str) -> Result<WorkingDirectoryPoolBaseDirLock<'s>> {
263        let lock = self.get_lock(locker)?;
264        Ok(WorkingDirectoryPoolBaseDirLock {
265            base_dir: self,
266            _lock: Some(lock),
267        })
268    }
269}
270
271/// A public lock on a WorkingDirectoryPoolBaseDir. Takes exclusive
272/// access to the WorkingDirectoryPoolBaseDir.
273// do *not* allow to clone, or even move or share between threads, OK?
274pub struct WorkingDirectoryPoolBaseDirLock<'t> {
275    base_dir: &'t WorkingDirectoryPoolBaseDir,
276    _lock: Option<OwningExclusiveFileLock<File>>,
277}
278
279impl<'t> WorkingDirectoryPoolBaseDirLock<'t> {
280    /// Read the working directory from symlink, if present
281    pub fn read_current_working_directory(&self) -> Result<Option<WorkingDirectoryId>> {
282        let path = self.base_dir.current_working_directory_symlink_path();
283        match std::fs::read_link(&path) {
284            Ok(val) => {
285                let s = val
286                    .to_str()
287                    .ok_or_else(|| anyhow!("missing symlink target in {path:?}"))?;
288                let id = WorkingDirectoryId::from_prefixless_str(s)?;
289                Ok(Some(id))
290            }
291            Err(e) => match e.kind() {
292                std::io::ErrorKind::NotFound => Ok(None),
293                _ => Err(e).map_err(ctx!("reading symlink {path:?}")),
294            },
295        }
296    }
297
298    pub fn read_working_directory_status(
299        &self,
300        id: WorkingDirectoryId,
301    ) -> Result<WorkingDirectoryStatus> {
302        let path = self.base_dir.get_working_directory_path(id);
303        // XX partial copy paste from WorkingDirectory::open (ok not too much though)
304        let status_path = WorkingDirectory::status_path_from_working_dir_path(&path)?;
305        load_ron_file(&status_path)
306    }
307}
308
309/// The mutable state
310#[derive(Debug)]
311struct WorkingDirectoryPoolState {
312    next_id: u64,
313    /// Contains working dirs with Status::Error, too, must be ignored
314    /// when picking a dir!
315    all_entries: BTreeMap<WorkingDirectoryId, WorkingDirectory>,
316}
317
318#[derive(Debug)]
319pub struct WorkingDirectoryPool {
320    /// Immutable, but contains an open lockable file handle
321    context: WorkingDirectoryPoolContext,
322    /// The mutable state
323    state: WorkingDirectoryPoolState,
324}
325
326pub struct WorkingDirectoryPoolGuard<'pool> {
327    // Option since it is also used via `to_non_mut`
328    _lock: Option<OwningExclusiveFileLock<File>>,
329    state: &'pool WorkingDirectoryPoolState,
330}
331
332impl<'pool> WorkingDirectoryPoolGuard<'pool> {
333    // Option since it is also used via `to_non_mut`. Must be done for
334    // a particular WorkingDirectoryPool instance and tie it down, but
335    // its state is enough to make it easier for
336    // WorkingDirectoryPool::open, that's why it takes a reference to
337    // the state only. Keep this all internal to this module, please.
338    fn new(
339        state: &'pool WorkingDirectoryPoolState,
340        _lock: Option<OwningExclusiveFileLock<File>>,
341    ) -> Self {
342        WorkingDirectoryPoolGuard { _lock, state }
343    }
344
345    pub(crate) fn locked_working_directory_mut<'s: 'pool>(
346        &'s self,
347        wd: &'pool mut WorkingDirectory,
348    ) -> WorkingDirectoryWithPoolLockMut<'pool> {
349        WorkingDirectoryWithPoolLockMut { wd }
350    }
351}
352
353pub struct WorkingDirectoryPoolGuardMut<'pool> {
354    pub(crate) _lock: OwningExclusiveFileLock<File>,
355    pub(crate) pool: &'pool mut WorkingDirectoryPool,
356}
357
358impl<'pool> WorkingDirectoryPoolGuardMut<'pool> {
359    /// The mut guard can also do shared operations; XX todo: just
360    /// Deref, ah, would be double use of deref?
361    // NOTE: do *not* give 'pool life time to
362    // WorkingDirectoryPoolGuard! Only as long as the lock is
363    // guaranteed to be held!
364    pub fn shared<'s: 'pool>(&'s self) -> WorkingDirectoryPoolGuard<'s> {
365        WorkingDirectoryPoolGuard::new(
366            &self.pool.state,
367            // OK since self is guaranteed to outlive us
368            None,
369        )
370    }
371
372    pub fn locked_base_dir<'s>(&'s self) -> WorkingDirectoryPoolBaseDirLock<'s> {
373        WorkingDirectoryPoolBaseDirLock {
374            base_dir: &self.pool.context.base_dir,
375            // OK since self is guaranteed to outlive us
376            _lock: None,
377        }
378    }
379}
380
381pub struct WorkingDirectoryPoolAndLock(WorkingDirectoryPool, Option<OwningExclusiveFileLock<File>>);
382
383impl WorkingDirectoryPoolAndLock {
384    /// Take out the lock/guard; can only be done once
385    pub fn take_guard<'t>(&'t mut self) -> Option<WorkingDirectoryPoolGuard<'t>> {
386        Some(WorkingDirectoryPoolGuard::new(
387            // hmm reborrowed, right, could just take & right away?
388            &mut self.0.state,
389            Some(self.1.take()?),
390        ))
391    }
392
393    /// Drop the lock and get the bare pool
394    pub fn into_inner(self) -> WorkingDirectoryPool {
395        self.0
396    }
397}
398
399#[derive(Debug, Serialize)]
400#[serde(deny_unknown_fields)]
401pub struct ProcessingError {
402    /// An Option since working directory pools are also used for
403    /// things that are not benchmark runs
404    benchmarking_job_parameters: Option<BenchmarkingJobParameters>,
405    context: String,
406    error: String,
407}
408
409impl WorkingDirectoryPool {
410    /// Get exclusive lock, but sharing self
411    pub fn lock<'t>(&'t self, locker: &str) -> Result<WorkingDirectoryPoolGuard<'t>> {
412        let _lock = Some(self.context.base_dir.get_lock(locker)?);
413        Ok(WorkingDirectoryPoolGuard::new(&self.state, _lock))
414    }
415
416    /// Get exclusive lock, for exclusive access to self
417    pub fn lock_mut<'t>(&'t mut self, locker: &str) -> Result<WorkingDirectoryPoolGuardMut<'t>> {
418        let _lock = self.context.base_dir.get_lock(locker)?;
419        Ok(WorkingDirectoryPoolGuardMut { _lock, pool: self })
420    }
421
422    /// `omit_check` is passed on to `WorkingDirectory::open` (it
423    /// should only be set to true for dir listings).
424    pub fn open(
425        context: WorkingDirectoryPoolContext,
426        create_dir_if_not_exists: bool,
427        omit_check: bool,
428    ) -> Result<WorkingDirectoryPoolAndLock> {
429        if create_dir_if_not_exists {
430            io_utils::div::create_dir_if_not_exists(
431                context.base_dir.path(),
432                "working pool directory",
433            )?;
434        }
435
436        // Need to have exclusive access while, at least, reading ron
437        // files
438        let lock = context.base_dir.get_lock("WorkingDirectoryPool::open")?;
439
440        let mut next_id: u64 = 0;
441
442        // To tell WorkingDirectory::open that we do have the lock we
443        // need to make a guard, and for that we need a
444        // WorkingDirectoryPool already, or, we have arranged for
445        // WorkingDirectoryPoolState to be enough. Thus, create a fake
446        // WorkingDirectoryPoolState.
447        let fake_state = WorkingDirectoryPoolState {
448            next_id,
449            all_entries: Default::default(),
450        };
451        let mut guard = WorkingDirectoryPoolGuard::new(&fake_state, Some(lock));
452
453        let all_entries: Vec<(WorkingDirectoryId, PathBuf)> =
454            std::fs::read_dir(context.base_dir.path())
455                .map_err(ctx!(
456                    "opening working pool directory {:?}",
457                    context.base_dir.path()
458                ))?
459                .map(|entry| -> Result<Option<(WorkingDirectoryId, PathBuf)>> {
460                    let entry = entry?;
461                    let ft = entry.file_type()?;
462                    if !ft.is_dir() {
463                        return Ok(None);
464                    }
465                    let id = if let Some(fname) = entry.file_name().to_str() {
466                        if let Some((id_str, _rest)) = fname.split_once('.') {
467                            if let Ok(id) = u64::from_str(id_str) {
468                                if id >= next_id {
469                                    next_id = id + 1;
470                                }
471                            }
472                            return Ok(None);
473                        } else {
474                            if let Ok(id) = fname.parse() {
475                                if id >= next_id {
476                                    next_id = id + 1;
477                                }
478                                WorkingDirectoryId(id)
479                            } else {
480                                return Ok(None);
481                            }
482                        }
483                    } else {
484                        return Ok(None);
485                    };
486                    let path = entry.path();
487                    Ok(Some((id, path)))
488                })
489                .filter_map(Result::transpose)
490                .collect::<Result<_>>()
491                .map_err(ctx!(
492                    "reading contents of working pool directory {:?}",
493                    context.base_dir.path()
494                ))?;
495
496        let all_entries: BTreeMap<WorkingDirectoryId, WorkingDirectory> = all_entries
497            .into_par_iter()
498            .map(
499                |(id, path)| -> Result<(WorkingDirectoryId, WorkingDirectory)> {
500                    let wd = WorkingDirectory::open(
501                        path,
502                        &context.remote_repository_url,
503                        &guard,
504                        omit_check,
505                        context.signal_change.clone(),
506                    )?;
507                    Ok((id, wd))
508                },
509            )
510            .collect::<Result<_>>()
511            .map_err(ctx!(
512                "opening working directories {:?}",
513                context.base_dir.path()
514            ))?;
515
516        // Remove the lock since we need to hand it out with the
517        // finished instance
518        let lock = guard._lock.take().expect("we put it there above");
519
520        let slf = WorkingDirectoryPool {
521            context,
522            state: WorkingDirectoryPoolState {
523                next_id,
524                all_entries,
525            },
526        };
527
528        info!(
529            "opened directory pool {:?} with next_id {next_id}, len {}/{}",
530            slf.context.base_dir,
531            slf.active_len(),
532            slf.capacity()
533        );
534        debug!("{slf:#?}");
535
536        Ok(WorkingDirectoryPoolAndLock(slf, Some(lock)))
537    }
538
539    /// Also see the method on `WorkingDirectoryPoolGuard`!
540    pub fn get_working_directory(
541        &self,
542        working_directory_id: WorkingDirectoryId,
543    ) -> Option<&WorkingDirectory> {
544        self.state.all_entries.get(&working_directory_id)
545    }
546
547    /// For cases where the working directory existing does not matter
548    // Also see private get_working_directory_path() method on
549    // WorkingDirectoryPoolBaseDir.
550    pub fn get_working_directory_path(
551        &self,
552        working_directory_id: WorkingDirectoryId,
553    ) -> WorkingDirectoryPath {
554        Arc::new(
555            self.context
556                .base_dir
557                .get_working_directory_path(working_directory_id),
558        )
559        .into()
560    }
561
562    /// Also see the method on `WorkingDirectoryPoolGuard`!
563    pub fn get_working_directory_mut(
564        &mut self,
565        working_directory_id: WorkingDirectoryId,
566    ) -> Option<&mut WorkingDirectory> {
567        self.state.all_entries.get_mut(&working_directory_id)
568    }
569
570    pub fn base_dir(&self) -> &Arc<WorkingDirectoryPoolBaseDir> {
571        &self.context.base_dir
572    }
573
574    /// The value from the configuration as `usize`. Guaranteed to be
575    /// at least 1.
576    pub fn capacity(&self) -> usize {
577        self.context.capacity.get().into()
578    }
579
580    pub fn git_url(&self) -> &GitUrl {
581        &self.context.remote_repository_url
582    }
583
584    /// This includes working dirs with errors, that (normally) must
585    /// be left aside and not used for processing!  The returned
586    /// entries are sorted by `WorkingDirectoryId`
587    pub fn all_entries(&self) -> impl Iterator<Item = (WorkingDirectoryId, &WorkingDirectory)> {
588        self.state.all_entries.iter().map(|(id, wd)| (*id, wd))
589    }
590
591    pub fn all_entries_mut(
592        &mut self,
593    ) -> impl Iterator<Item = (WorkingDirectoryId, &mut WorkingDirectory)> {
594        self.state.all_entries.iter_mut().map(|(id, wd)| (*id, wd))
595    }
596
597    /// The entries that can be used for processing. The returned
598    /// entries are sorted by `WorkingDirectoryId`
599    pub fn active_entries(&self) -> impl Iterator<Item = (WorkingDirectoryId, &WorkingDirectory)> {
600        self.all_entries()
601            .filter(|(_, wd)| wd.working_directory_status.status.can_be_used_for_jobs())
602    }
603
604    pub fn active_entries_mut(
605        &mut self,
606    ) -> impl Iterator<Item = (WorkingDirectoryId, &mut WorkingDirectory)> {
607        self.all_entries_mut()
608            .filter(|(_, wd)| wd.working_directory_status.status.can_be_used_for_jobs())
609    }
610
611    /// The number of entries that are not of Status::Error
612    pub fn active_len(&self) -> usize {
613        // Could optimize to not scan, but would want an abstraction
614        // around the entries for that, don't do now.
615        self.active_entries().count()
616    }
617
618    ///  Runs the given action on the requested working directory with
619    ///  the pool lock; the lock allows to use working directory
620    ///  actions that require the lock, but it's important to release
621    ///  the lock as soon as possible via `into_inner()` (giving the
622    ///  bare working directory, which can still be used for methods
623    ///  that don't require the lock), so that e.g. `evobench wd`
624    ///  actions don't block for the whole duration of an action
625    ///  (i.e. a whole benchmarking run)!  If the action returns with
626    ///  an error, stores it as metadata with the directory and
627    ///  changes the working directory to status `Error`. Returns an
628    ///  error if a working directory with the given id doesn't
629    ///  exist. The returned `WorkingDirectoryCleanupToken` must be
630    ///  passed to `working_directory_cleanup`. NOTE: is getting the
631    ///  lock internally (multiple times for short durations, but also
632    ///  passes the lock to `action` as mentioned above).
633    pub fn process_in_working_directory<'pool, T>(
634        &'pool mut self,
635        working_directory_id: WorkingDirectoryId,
636        timestamp: &DateTimeWithOffset,
637        action: impl FnOnce(WorkingDirectoryWithPoolMut) -> Result<T>,
638        benchmarking_job_parameters: Option<&BenchmarkingJobParameters>,
639        context: &str,
640        have_other_jobs_for_same_commit: Option<&dyn Fn() -> bool>,
641    ) -> Result<(T, WorkingDirectoryCleanupToken)> {
642        let mut guard =
643            self.lock_mut("WorkingDirectoryPool.process_in_working_directory for action")?;
644
645        guard.set_current_working_directory(working_directory_id)?;
646
647        let mut wd = guard
648            .get_working_directory_mut(working_directory_id)
649            // Can't just .expect here because the use cases seem too
650            // complex (concurrency means that a working directory
651            // very well might disappear), thus:
652            .ok_or_else(|| anyhow!("working directory id must still exist"))?;
653
654        if !wd.working_directory_status.status.can_be_used_for_jobs() {
655            bail!(
656                "working directory {working_directory_id} is set aside (in '{}' state)",
657                wd.working_directory_status.status
658            )
659        }
660
661        wd.set_and_save_status(Status::Processing)?;
662
663        info!(
664            "process_working_directory {working_directory_id} \
665             ({:?} for {context} at_{timestamp})...",
666            benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
667        );
668
669        match action(guard.into_get_working_directory_mut(working_directory_id)) {
670            Ok(v) => {
671                self.lock_mut("WorkingDirectoryPool.process_in_working_directory after action Ok")?
672                    .get_working_directory_mut(working_directory_id)
673                    .expect("we're not removing it in the mean time")
674                    .set_and_save_status(Status::Finished)?;
675
676                info!(
677                    "process_working_directory {working_directory_id} \
678                     ({:?} for {context} at_{timestamp}) succeeded.",
679                    benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
680                );
681
682                let wd = self
683                    .get_working_directory(working_directory_id)
684                    .expect("we're not removing it in the mean time");
685
686                let needs_cleanup = wd.needs_cleanup(
687                    self.context.auto_clean.as_ref(),
688                    have_other_jobs_for_same_commit,
689                )?;
690                let token = WorkingDirectoryCleanupToken {
691                    working_directory_id,
692                    needs_cleanup,
693                    linear_token: Linear::new(false),
694                };
695                Ok((v, token))
696            }
697            Err(error) => {
698                let mut lock = self.lock_mut(
699                    "WorkingDirectoryPool.process_in_working_directory after action Err",
700                )?;
701                lock.get_working_directory_mut(working_directory_id)
702                    .expect("we're not removing it in the mean time")
703                    .set_and_save_status(Status::Error)?;
704
705                let err = format!("{error:#?}");
706                lock.save_processing_error(
707                    working_directory_id,
708                    ProcessingError {
709                        benchmarking_job_parameters: benchmarking_job_parameters.cloned(),
710                        context: context.to_string(),
711                        error: err.clone(),
712                    },
713                    timestamp,
714                )
715                .map_err(ctx!("error storing the error {err:#}"))?;
716
717                info!(
718                    // Do not show error as it might be large; XX
719                    // which is a mis-feature!
720                    "process_working_directory {working_directory_id} \
721                     ({:?} for {context} at_{timestamp}) failed.",
722                    benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
723                );
724
725                Err(error)
726            }
727        }
728    }
729
730    /// Possibly calls `delete_working_directory`, depending on what
731    /// the token says. NOTE: takes the lock internally, only when
732    /// needed.
733    pub fn working_directory_cleanup(
734        &mut self,
735        cleanup: WorkingDirectoryCleanupToken,
736    ) -> Result<()> {
737        let WorkingDirectoryCleanupToken {
738            linear_token,
739            working_directory_id,
740            needs_cleanup,
741        } = cleanup;
742        linear_token.bury();
743        if needs_cleanup {
744            let mut lock = self.lock_mut("WorkingDirectoryPool.working_directory_cleanup")?;
745            lock.delete_working_directory(working_directory_id)?;
746        }
747        Ok(())
748    }
749}
750
751impl<'pool> WorkingDirectoryPoolGuard<'pool> {
752    /// There's also a method on `WorkingDirectoryPool`!
753    pub fn get_working_directory<'guard: 'pool>(
754        &'guard self,
755        working_directory_id: WorkingDirectoryId,
756    ) -> Option<WorkingDirectoryWithPoolLock<'guard>> {
757        Some(WorkingDirectoryWithPoolLock {
758            wd: self.state.all_entries.get(&working_directory_id)?,
759        })
760    }
761}
762
763impl<'pool> WorkingDirectoryPoolGuardMut<'pool> {
764    /// There's also a method on `WorkingDirectoryPool`!
765    pub fn get_working_directory_mut<'guard>(
766        &'guard mut self,
767        working_directory_id: WorkingDirectoryId,
768    ) -> Option<WorkingDirectoryWithPoolLockMut<'guard>> {
769        Some(WorkingDirectoryWithPoolLockMut {
770            wd: self.pool.state.all_entries.get_mut(&working_directory_id)?,
771        })
772    }
773
774    /// Similar to `get_working_directory_mut` but transfer ownership
775    /// of the guard into the result (does *not* unlock!).
776    pub fn into_get_working_directory_mut(
777        self,
778        working_directory_id: WorkingDirectoryId,
779    ) -> WorkingDirectoryWithPoolMut<'pool> {
780        WorkingDirectoryWithPoolMut {
781            guard: self,
782            working_directory_id,
783        }
784    }
785
786    /// Always gets a working directory, but doesn't check for any
787    /// best fit. If none was cloned yet, that is done now.
788    pub fn get_first(&mut self) -> Result<WorkingDirectoryId> {
789        if let Some((key, _)) = self.pool.active_entries().next() {
790            return Ok(key);
791        }
792        self.get_new()
793    }
794
795    /// This is *not* public as it is not checking whether there is
796    /// capacity left for a new one!
797    fn get_new(&mut self) -> Result<WorkingDirectoryId> {
798        let id = self.next_id();
799        debug!("get_new: using {id:?}");
800        let dir = WorkingDirectory::clone_repo(
801            self.pool.base_dir().path(),
802            &id.to_directory_file_name(),
803            self.pool.git_url(),
804            &self.shared(),
805            self.pool.context.signal_change.clone(),
806        )?;
807        self.pool.state.all_entries.insert(id, dir);
808        Ok(id)
809    }
810
811    /// Save a processing error (not doing that to the status since
812    /// that would get overwritten when changing it back to an active
813    /// status). This method does *not* change the status of the
814    /// working directory, that must be done separately.
815    fn save_processing_error(
816        &mut self,
817        id: WorkingDirectoryId,
818        processing_error: ProcessingError,
819        timestamp: &DateTimeWithOffset,
820    ) -> Result<()> {
821        let error_file_path = self.pool.base_dir().path().append(format!(
822            "{}.error_at_{timestamp}",
823            id.to_directory_file_name()
824        ));
825        let processing_error_string = serde_yml::to_string(&processing_error)?;
826        std::fs::write(&error_file_path, &processing_error_string)
827            .map_err(ctx!("writing to {error_file_path:?}"))?;
828
829        info!("saved processing error to {error_file_path:?}");
830
831        Ok(())
832    }
833
834    /// Note: may leave behind a broken `current` symlink, but that's
835    /// probably the way it should be?
836    pub fn delete_working_directory(
837        &mut self,
838        working_directory_id: WorkingDirectoryId,
839    ) -> Result<()> {
840        let wd = self
841            .pool
842            .state
843            .all_entries
844            .get_mut(&working_directory_id)
845            .ok_or_else(|| anyhow!("working directory id must still exist"))?;
846        let path = wd.git_working_dir.working_dir_path_arc();
847        info!("delete_working_directory: deleting directory {path:?}");
848        self.pool.state.all_entries.remove(&working_directory_id);
849        std::fs::remove_dir_all(&*path).map_err(ctx!("deleting directory {path:?}"))?;
850        Ok(())
851    }
852
853    fn next_id(&mut self) -> WorkingDirectoryId {
854        let id = self.pool.state.next_id;
855        self.pool.state.next_id += 1;
856        WorkingDirectoryId(id)
857    }
858
859    /// Ensure all *active* working directories have their commit field
860    /// initialized
861    fn init_active_commit_ids(&mut self) -> Result<()> {
862        for (_, wd) in self.pool.active_entries_mut() {
863            // SAFETY: It's OK to claim that the working dir has the
864            // lock as we are a method of
865            // `WorkingDirectoryPoolGuardMut` and locking currently
866            // works on the whole pool.
867            let mut wd = WorkingDirectoryWithPoolLockMut { wd };
868            wd.commit()?;
869        }
870        Ok(())
871    }
872
873    /// Pick a working directory already checked out for the given
874    /// commit, and if possible already built or even tested for
875    /// it. Returns its id so that the right kind of fresh borrow can
876    /// be done.
877    fn try_get_fitting_working_directory_for(
878        &mut self,
879        run_parameters: &RunParameters,
880        run_queues_data: &RunQueuesData,
881    ) -> Result<Option<WorkingDirectoryId>> {
882        // (todo?: is the working dir used last time for the same job
883        // available? Maybe doesn't really matter any more though?)
884
885        let commit: &GitHash = &run_parameters.commit_id;
886
887        // Find one with the same commit. First ensure all commit
888        // fields are set to avoid dealing with IO errors.
889        self.init_active_commit_ids()?;
890        if let Some((id, _dir)) = self
891            .pool
892            .active_entries_mut()
893            .filter(|(_, wd)| wd.commit.as_ref().expect("initialized above") == commit)
894            // Prefer one that proceeded further and is matching
895            // closely: todo: store parameters for dir.
896            .max_by_key(|(_, dir)| dir.working_directory_status.status)
897        {
898            info!("try_get_best_working_directory_for: found by commit {commit}");
899            return Ok(Some(id));
900        }
901
902        // Find one that is *not* used by other jobs in the pipeline (i.e. obsolete),
903        // and todo: similar parameters
904        if let Some((id, _dir)) = self
905            .pool
906            .active_entries()
907            .filter(|(_, dir)| {
908                !run_queues_data
909                    .have_job_with_commit_id(dir.commit.as_ref().expect("initialized above"))
910            })
911            .max_by_key(|(_, dir)| dir.working_directory_status.status)
912        {
913            info!("try_get_best_working_directory_for: found as obsolete");
914            return Ok(Some(id));
915        }
916
917        Ok(None)
918    }
919
920    /// Return the ~best working directory for the given
921    /// run_parameters (e.g. with the requested commit checked out)
922    /// and queue pipeline situation (e.g. if forced to change the
923    /// checked out commit in a working directory, choose one that
924    /// doesn't have a commit checked out that is in the
925    /// pipeline). Does *not* check out the commit needed for
926    /// run_parameters!
927    pub fn get_a_working_directory_for<'s>(
928        &'s mut self,
929        run_parameters: &RunParameters,
930        run_queues_data: &RunQueuesData,
931    ) -> Result<WorkingDirectoryId> {
932        if let Some(id) =
933            self.try_get_fitting_working_directory_for(run_parameters, run_queues_data)?
934        {
935            info!("get_a_working_directory_for -> good old {id:?}");
936            Ok(id)
937        } else {
938            if self.pool.active_len() < self.pool.capacity() {
939                // allocate a new one
940                let id = self.get_new()?;
941                info!("get_a_working_directory_for -> new {id:?}");
942                Ok(id)
943            } else {
944                // get the least-recently used one
945                let id = self
946                    .pool
947                    .active_entries()
948                    .min_by_key(|(_, entry)| entry.last_use)
949                    .expect("capacity is guaranteed >= 1")
950                    .0
951                    .clone();
952                info!("get_a_working_directory_for -> lru old {id:?}");
953                Ok(id)
954            }
955        }
956    }
957
958    /// Remove the symlink to the currently used working
959    /// directory. TODO: this is a mess, always forgetting; at least
960    /// move to a compile time checked API? What was the purpose,
961    /// really: sure, it was to put in some check that the dir was
962    /// actually removed normally? But then that 'never' happens
963    /// anyway? Do the removal statically (and for the case of errors
964    /// preventing the removal, just always remove at runtime when
965    /// setting it anew / do tmp-and-rename)?
966    pub fn clear_current_working_directory(&self) -> Result<()> {
967        let path = self
968            .pool
969            .context
970            .base_dir
971            .current_working_directory_symlink_path();
972        if let Err(e) = std::fs::remove_file(&path) {
973            match e.kind() {
974                std::io::ErrorKind::NotFound => (),
975                _ => Err(e).map_err(ctx!("removing symlink {path:?}"))?,
976            }
977        }
978        if let Some(signal_change) = &self.pool.context.signal_change {
979            signal_change.send_signal();
980        }
981        Ok(())
982    }
983
984    /// Set the symlink to the currently used working directory. The
985    /// old one must be removed beforehand via
986    /// `clear_current_working_directory`.
987    fn set_current_working_directory(&self, id: WorkingDirectoryId) -> Result<()> {
988        let source_path = id.to_directory_file_name();
989        let target_path = self
990            .pool
991            .context
992            .base_dir
993            .current_working_directory_symlink_path();
994        std::os::unix::fs::symlink(&source_path, &target_path).map_err(ctx!(
995            "creating symlink at {target_path:?} to {source_path:?}"
996        ))?;
997        if let Some(signal_change) = &self.pool.context.signal_change {
998            signal_change.send_signal();
999        }
1000        Ok(())
1001    }
1002}