1use std::{
8 collections::BTreeMap,
9 fmt::Display,
10 fs::File,
11 num::NonZeroU8,
12 path::{Path, PathBuf},
13 str::FromStr,
14 sync::{
15 Arc,
16 atomic::{AtomicBool, Ordering},
17 },
18 u64,
19};
20
21use anyhow::{Result, anyhow, bail};
22use chj_unix_util::polling_signals::PollingSignalsSender;
23use cj_path_util::path_util::AppendToPath;
24use rayon::iter::{IntoParallelIterator, ParallelIterator};
25use serde::{Deserialize, Serialize};
26
27use crate::{
28 config_file::load_ron_file,
29 ctx, debug, def_linear,
30 git::GitHash,
31 info, io_utils,
32 io_utils::owning_lockable_file::{OwningExclusiveFileLock, OwningLockableFile},
33 run::{
34 key::{BenchmarkingJobParameters, RunParameters},
35 working_directory::{
36 WorkingDirectoryAutoCleanOpts, WorkingDirectoryPath, WorkingDirectoryWithPoolLock,
37 WorkingDirectoryWithPoolLockMut, WorkingDirectoryWithPoolMut,
38 },
39 },
40 serde_types::{date_and_time::DateTimeWithOffset, git_url::GitUrl},
41 utillib::arc::CloneArc,
42};
43
44use super::{
45 run_queues::RunQueuesData,
46 working_directory::{Status, WorkingDirectory, WorkingDirectoryStatus},
47};
48
49#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
53#[serde(deny_unknown_fields)]
54#[serde(rename = "WorkingDirectoryPool")]
55pub struct WorkingDirectoryPoolOpts {
56 pub base_dir: Option<PathBuf>,
60
61 pub capacity: NonZeroU8,
66
67 pub auto_clean: Option<WorkingDirectoryAutoCleanOpts>,
72}
73
74#[derive(Debug)]
77pub struct WorkingDirectoryPoolContext {
78 pub capacity: NonZeroU8,
79 pub auto_clean: Option<WorkingDirectoryAutoCleanOpts>,
82 pub remote_repository_url: GitUrl,
83 pub base_dir: Arc<WorkingDirectoryPoolBaseDir>,
84 pub signal_change: Option<PollingSignalsSender>,
92}
93
94#[derive(Debug, Clone)]
98pub struct WorkingDirectoryIdOpt {
99 has_prefix: bool,
100 id: u64,
101}
102
103impl FromStr for WorkingDirectoryIdOpt {
104 type Err = anyhow::Error;
105
106 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
107 let (has_prefix, number_string) = match s.strip_prefix("D").or_else(|| s.strip_prefix("d"))
108 {
109 Some(s) => (true, s),
110 None => (false, s),
111 };
112 let id = number_string.parse()?;
113 Ok(Self { has_prefix, id })
114 }
115}
116
117#[derive(Debug, Clone, Copy, clap::Args)]
118pub struct WdAllowBareOpt {
119 #[clap(short, long)]
122 allow_bare: bool,
123}
124
125impl WorkingDirectoryIdOpt {
126 pub fn to_working_directory_id(self, allow_bare: WdAllowBareOpt) -> Result<WorkingDirectoryId> {
127 let WdAllowBareOpt { allow_bare } = allow_bare;
128 let Self { has_prefix, id } = self;
129 if allow_bare || has_prefix {
130 Ok(WorkingDirectoryId(id))
131 } else {
132 bail!("missing 'D' or 'd' at beginning of working directory ID: {id}")
133 }
134 }
135}
136
137pub fn finish_parsing_working_directory_ids(
138 ids: Vec<WorkingDirectoryIdOpt>,
139 allow_bare: WdAllowBareOpt,
140) -> Result<Vec<WorkingDirectoryId>> {
141 ids.into_iter()
142 .map(|id| id.to_working_directory_id(allow_bare))
143 .collect()
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub struct WorkingDirectoryId(u64);
148
149impl WorkingDirectoryId {
150 fn to_number_string(self) -> String {
151 format!("{}", self.0)
152 }
153 pub fn to_directory_file_name(self) -> String {
154 self.to_number_string()
155 }
156 pub fn from_prefixless_str(s: &str) -> Result<Self> {
157 let id = s.parse()?;
158 Ok(Self(id))
159 }
160}
161
162impl Display for WorkingDirectoryId {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "D{}", self.0)
165 }
166}
167
168pub static WORKING_DIRECTORY_ID_ALLOW_WITHOUT_PREFIX: AtomicBool = AtomicBool::new(false);
171
172impl FromStr for WorkingDirectoryId {
173 type Err = anyhow::Error;
174
175 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
176 let id: WorkingDirectoryIdOpt = s.parse()?;
177 let allow_bare = WdAllowBareOpt {
178 allow_bare: WORKING_DIRECTORY_ID_ALLOW_WITHOUT_PREFIX.load(Ordering::Relaxed),
179 };
180 id.to_working_directory_id(allow_bare)
181 }
182}
183
184def_linear!(Linear in WorkingDirectoryCleanupToken);
187
188#[must_use]
195pub struct WorkingDirectoryCleanupToken {
196 linear_token: Linear,
197 working_directory_id: WorkingDirectoryId,
198 needs_cleanup: bool,
199}
200#[derive(Debug)]
206pub struct WorkingDirectoryPoolBaseDir {
207 path: Arc<Path>,
208 dir_file: OwningLockableFile<File>,
209}
210
211impl WorkingDirectoryPoolBaseDir {
212 pub fn new(
213 base_dir: Option<PathBuf>,
214 get_working_directory_pool_base: &dyn Fn() -> Result<PathBuf>,
215 ) -> Result<Self> {
216 let path: Arc<Path> = if let Some(path) = base_dir {
217 path
218 } else {
219 get_working_directory_pool_base()?
220 }
221 .into();
222 let dir_file = OwningLockableFile::open(path.clone_arc())
223 .map_err(ctx!("opening working directory base dir {path:?}"))?;
224 Ok(Self { path, dir_file })
225 }
226
227 pub fn path(&self) -> &Path {
228 &self.path
229 }
230
231 fn get_working_directory_path(&self, working_directory_id: WorkingDirectoryId) -> PathBuf {
234 self.path
235 .append(working_directory_id.to_directory_file_name())
236 }
237
238 fn current_working_directory_symlink_path(&self) -> PathBuf {
241 self.path().append("current")
242 }
243
244 fn get_lock<'s>(&'s self, locker: &str) -> Result<OwningExclusiveFileLock<File>> {
250 let path = self.path();
251 debug!(
252 "getting working directory pool lock on {:?} for {locker}",
253 self.path(),
254 );
255 self.dir_file
256 .lock_exclusive()
257 .map_err(ctx!("locking working directory pool base dir {path:?}"))
258 }
259
260 pub fn lock<'s>(&'s self, locker: &str) -> Result<WorkingDirectoryPoolBaseDirLock<'s>> {
263 let lock = self.get_lock(locker)?;
264 Ok(WorkingDirectoryPoolBaseDirLock {
265 base_dir: self,
266 _lock: Some(lock),
267 })
268 }
269}
270
271pub struct WorkingDirectoryPoolBaseDirLock<'t> {
275 base_dir: &'t WorkingDirectoryPoolBaseDir,
276 _lock: Option<OwningExclusiveFileLock<File>>,
277}
278
279impl<'t> WorkingDirectoryPoolBaseDirLock<'t> {
280 pub fn read_current_working_directory(&self) -> Result<Option<WorkingDirectoryId>> {
282 let path = self.base_dir.current_working_directory_symlink_path();
283 match std::fs::read_link(&path) {
284 Ok(val) => {
285 let s = val
286 .to_str()
287 .ok_or_else(|| anyhow!("missing symlink target in {path:?}"))?;
288 let id = WorkingDirectoryId::from_prefixless_str(s)?;
289 Ok(Some(id))
290 }
291 Err(e) => match e.kind() {
292 std::io::ErrorKind::NotFound => Ok(None),
293 _ => Err(e).map_err(ctx!("reading symlink {path:?}")),
294 },
295 }
296 }
297
298 pub fn read_working_directory_status(
299 &self,
300 id: WorkingDirectoryId,
301 ) -> Result<WorkingDirectoryStatus> {
302 let path = self.base_dir.get_working_directory_path(id);
303 let status_path = WorkingDirectory::status_path_from_working_dir_path(&path)?;
305 load_ron_file(&status_path)
306 }
307}
308
309#[derive(Debug)]
311struct WorkingDirectoryPoolState {
312 next_id: u64,
313 all_entries: BTreeMap<WorkingDirectoryId, WorkingDirectory>,
316}
317
318#[derive(Debug)]
319pub struct WorkingDirectoryPool {
320 context: WorkingDirectoryPoolContext,
322 state: WorkingDirectoryPoolState,
324}
325
326pub struct WorkingDirectoryPoolGuard<'pool> {
327 _lock: Option<OwningExclusiveFileLock<File>>,
329 state: &'pool WorkingDirectoryPoolState,
330}
331
332impl<'pool> WorkingDirectoryPoolGuard<'pool> {
333 fn new(
339 state: &'pool WorkingDirectoryPoolState,
340 _lock: Option<OwningExclusiveFileLock<File>>,
341 ) -> Self {
342 WorkingDirectoryPoolGuard { _lock, state }
343 }
344
345 pub(crate) fn locked_working_directory_mut<'s: 'pool>(
346 &'s self,
347 wd: &'pool mut WorkingDirectory,
348 ) -> WorkingDirectoryWithPoolLockMut<'pool> {
349 WorkingDirectoryWithPoolLockMut { wd }
350 }
351}
352
353pub struct WorkingDirectoryPoolGuardMut<'pool> {
354 pub(crate) _lock: OwningExclusiveFileLock<File>,
355 pub(crate) pool: &'pool mut WorkingDirectoryPool,
356}
357
358impl<'pool> WorkingDirectoryPoolGuardMut<'pool> {
359 pub fn shared<'s: 'pool>(&'s self) -> WorkingDirectoryPoolGuard<'s> {
365 WorkingDirectoryPoolGuard::new(
366 &self.pool.state,
367 None,
369 )
370 }
371
372 pub fn locked_base_dir<'s>(&'s self) -> WorkingDirectoryPoolBaseDirLock<'s> {
373 WorkingDirectoryPoolBaseDirLock {
374 base_dir: &self.pool.context.base_dir,
375 _lock: None,
377 }
378 }
379}
380
381pub struct WorkingDirectoryPoolAndLock(WorkingDirectoryPool, Option<OwningExclusiveFileLock<File>>);
382
383impl WorkingDirectoryPoolAndLock {
384 pub fn take_guard<'t>(&'t mut self) -> Option<WorkingDirectoryPoolGuard<'t>> {
386 Some(WorkingDirectoryPoolGuard::new(
387 &mut self.0.state,
389 Some(self.1.take()?),
390 ))
391 }
392
393 pub fn into_inner(self) -> WorkingDirectoryPool {
395 self.0
396 }
397}
398
399#[derive(Debug, Serialize)]
400#[serde(deny_unknown_fields)]
401pub struct ProcessingError {
402 benchmarking_job_parameters: Option<BenchmarkingJobParameters>,
405 context: String,
406 error: String,
407}
408
409impl WorkingDirectoryPool {
410 pub fn lock<'t>(&'t self, locker: &str) -> Result<WorkingDirectoryPoolGuard<'t>> {
412 let _lock = Some(self.context.base_dir.get_lock(locker)?);
413 Ok(WorkingDirectoryPoolGuard::new(&self.state, _lock))
414 }
415
416 pub fn lock_mut<'t>(&'t mut self, locker: &str) -> Result<WorkingDirectoryPoolGuardMut<'t>> {
418 let _lock = self.context.base_dir.get_lock(locker)?;
419 Ok(WorkingDirectoryPoolGuardMut { _lock, pool: self })
420 }
421
422 pub fn open(
425 context: WorkingDirectoryPoolContext,
426 create_dir_if_not_exists: bool,
427 omit_check: bool,
428 ) -> Result<WorkingDirectoryPoolAndLock> {
429 if create_dir_if_not_exists {
430 io_utils::div::create_dir_if_not_exists(
431 context.base_dir.path(),
432 "working pool directory",
433 )?;
434 }
435
436 let lock = context.base_dir.get_lock("WorkingDirectoryPool::open")?;
439
440 let mut next_id: u64 = 0;
441
442 let fake_state = WorkingDirectoryPoolState {
448 next_id,
449 all_entries: Default::default(),
450 };
451 let mut guard = WorkingDirectoryPoolGuard::new(&fake_state, Some(lock));
452
453 let all_entries: Vec<(WorkingDirectoryId, PathBuf)> =
454 std::fs::read_dir(context.base_dir.path())
455 .map_err(ctx!(
456 "opening working pool directory {:?}",
457 context.base_dir.path()
458 ))?
459 .map(|entry| -> Result<Option<(WorkingDirectoryId, PathBuf)>> {
460 let entry = entry?;
461 let ft = entry.file_type()?;
462 if !ft.is_dir() {
463 return Ok(None);
464 }
465 let id = if let Some(fname) = entry.file_name().to_str() {
466 if let Some((id_str, _rest)) = fname.split_once('.') {
467 if let Ok(id) = u64::from_str(id_str) {
468 if id >= next_id {
469 next_id = id + 1;
470 }
471 }
472 return Ok(None);
473 } else {
474 if let Ok(id) = fname.parse() {
475 if id >= next_id {
476 next_id = id + 1;
477 }
478 WorkingDirectoryId(id)
479 } else {
480 return Ok(None);
481 }
482 }
483 } else {
484 return Ok(None);
485 };
486 let path = entry.path();
487 Ok(Some((id, path)))
488 })
489 .filter_map(Result::transpose)
490 .collect::<Result<_>>()
491 .map_err(ctx!(
492 "reading contents of working pool directory {:?}",
493 context.base_dir.path()
494 ))?;
495
496 let all_entries: BTreeMap<WorkingDirectoryId, WorkingDirectory> = all_entries
497 .into_par_iter()
498 .map(
499 |(id, path)| -> Result<(WorkingDirectoryId, WorkingDirectory)> {
500 let wd = WorkingDirectory::open(
501 path,
502 &context.remote_repository_url,
503 &guard,
504 omit_check,
505 context.signal_change.clone(),
506 )?;
507 Ok((id, wd))
508 },
509 )
510 .collect::<Result<_>>()
511 .map_err(ctx!(
512 "opening working directories {:?}",
513 context.base_dir.path()
514 ))?;
515
516 let lock = guard._lock.take().expect("we put it there above");
519
520 let slf = WorkingDirectoryPool {
521 context,
522 state: WorkingDirectoryPoolState {
523 next_id,
524 all_entries,
525 },
526 };
527
528 info!(
529 "opened directory pool {:?} with next_id {next_id}, len {}/{}",
530 slf.context.base_dir,
531 slf.active_len(),
532 slf.capacity()
533 );
534 debug!("{slf:#?}");
535
536 Ok(WorkingDirectoryPoolAndLock(slf, Some(lock)))
537 }
538
539 pub fn get_working_directory(
541 &self,
542 working_directory_id: WorkingDirectoryId,
543 ) -> Option<&WorkingDirectory> {
544 self.state.all_entries.get(&working_directory_id)
545 }
546
547 pub fn get_working_directory_path(
551 &self,
552 working_directory_id: WorkingDirectoryId,
553 ) -> WorkingDirectoryPath {
554 Arc::new(
555 self.context
556 .base_dir
557 .get_working_directory_path(working_directory_id),
558 )
559 .into()
560 }
561
562 pub fn get_working_directory_mut(
564 &mut self,
565 working_directory_id: WorkingDirectoryId,
566 ) -> Option<&mut WorkingDirectory> {
567 self.state.all_entries.get_mut(&working_directory_id)
568 }
569
570 pub fn base_dir(&self) -> &Arc<WorkingDirectoryPoolBaseDir> {
571 &self.context.base_dir
572 }
573
574 pub fn capacity(&self) -> usize {
577 self.context.capacity.get().into()
578 }
579
580 pub fn git_url(&self) -> &GitUrl {
581 &self.context.remote_repository_url
582 }
583
584 pub fn all_entries(&self) -> impl Iterator<Item = (WorkingDirectoryId, &WorkingDirectory)> {
588 self.state.all_entries.iter().map(|(id, wd)| (*id, wd))
589 }
590
591 pub fn all_entries_mut(
592 &mut self,
593 ) -> impl Iterator<Item = (WorkingDirectoryId, &mut WorkingDirectory)> {
594 self.state.all_entries.iter_mut().map(|(id, wd)| (*id, wd))
595 }
596
597 pub fn active_entries(&self) -> impl Iterator<Item = (WorkingDirectoryId, &WorkingDirectory)> {
600 self.all_entries()
601 .filter(|(_, wd)| wd.working_directory_status.status.can_be_used_for_jobs())
602 }
603
604 pub fn active_entries_mut(
605 &mut self,
606 ) -> impl Iterator<Item = (WorkingDirectoryId, &mut WorkingDirectory)> {
607 self.all_entries_mut()
608 .filter(|(_, wd)| wd.working_directory_status.status.can_be_used_for_jobs())
609 }
610
611 pub fn active_len(&self) -> usize {
613 self.active_entries().count()
616 }
617
618 pub fn process_in_working_directory<'pool, T>(
634 &'pool mut self,
635 working_directory_id: WorkingDirectoryId,
636 timestamp: &DateTimeWithOffset,
637 action: impl FnOnce(WorkingDirectoryWithPoolMut) -> Result<T>,
638 benchmarking_job_parameters: Option<&BenchmarkingJobParameters>,
639 context: &str,
640 have_other_jobs_for_same_commit: Option<&dyn Fn() -> bool>,
641 ) -> Result<(T, WorkingDirectoryCleanupToken)> {
642 let mut guard =
643 self.lock_mut("WorkingDirectoryPool.process_in_working_directory for action")?;
644
645 guard.set_current_working_directory(working_directory_id)?;
646
647 let mut wd = guard
648 .get_working_directory_mut(working_directory_id)
649 .ok_or_else(|| anyhow!("working directory id must still exist"))?;
653
654 if !wd.working_directory_status.status.can_be_used_for_jobs() {
655 bail!(
656 "working directory {working_directory_id} is set aside (in '{}' state)",
657 wd.working_directory_status.status
658 )
659 }
660
661 wd.set_and_save_status(Status::Processing)?;
662
663 info!(
664 "process_working_directory {working_directory_id} \
665 ({:?} for {context} at_{timestamp})...",
666 benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
667 );
668
669 match action(guard.into_get_working_directory_mut(working_directory_id)) {
670 Ok(v) => {
671 self.lock_mut("WorkingDirectoryPool.process_in_working_directory after action Ok")?
672 .get_working_directory_mut(working_directory_id)
673 .expect("we're not removing it in the mean time")
674 .set_and_save_status(Status::Finished)?;
675
676 info!(
677 "process_working_directory {working_directory_id} \
678 ({:?} for {context} at_{timestamp}) succeeded.",
679 benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
680 );
681
682 let wd = self
683 .get_working_directory(working_directory_id)
684 .expect("we're not removing it in the mean time");
685
686 let needs_cleanup = wd.needs_cleanup(
687 self.context.auto_clean.as_ref(),
688 have_other_jobs_for_same_commit,
689 )?;
690 let token = WorkingDirectoryCleanupToken {
691 working_directory_id,
692 needs_cleanup,
693 linear_token: Linear::new(false),
694 };
695 Ok((v, token))
696 }
697 Err(error) => {
698 let mut lock = self.lock_mut(
699 "WorkingDirectoryPool.process_in_working_directory after action Err",
700 )?;
701 lock.get_working_directory_mut(working_directory_id)
702 .expect("we're not removing it in the mean time")
703 .set_and_save_status(Status::Error)?;
704
705 let err = format!("{error:#?}");
706 lock.save_processing_error(
707 working_directory_id,
708 ProcessingError {
709 benchmarking_job_parameters: benchmarking_job_parameters.cloned(),
710 context: context.to_string(),
711 error: err.clone(),
712 },
713 timestamp,
714 )
715 .map_err(ctx!("error storing the error {err:#}"))?;
716
717 info!(
718 "process_working_directory {working_directory_id} \
721 ({:?} for {context} at_{timestamp}) failed.",
722 benchmarking_job_parameters.map(BenchmarkingJobParameters::slow_hash)
723 );
724
725 Err(error)
726 }
727 }
728 }
729
730 pub fn working_directory_cleanup(
734 &mut self,
735 cleanup: WorkingDirectoryCleanupToken,
736 ) -> Result<()> {
737 let WorkingDirectoryCleanupToken {
738 linear_token,
739 working_directory_id,
740 needs_cleanup,
741 } = cleanup;
742 linear_token.bury();
743 if needs_cleanup {
744 let mut lock = self.lock_mut("WorkingDirectoryPool.working_directory_cleanup")?;
745 lock.delete_working_directory(working_directory_id)?;
746 }
747 Ok(())
748 }
749}
750
751impl<'pool> WorkingDirectoryPoolGuard<'pool> {
752 pub fn get_working_directory<'guard: 'pool>(
754 &'guard self,
755 working_directory_id: WorkingDirectoryId,
756 ) -> Option<WorkingDirectoryWithPoolLock<'guard>> {
757 Some(WorkingDirectoryWithPoolLock {
758 wd: self.state.all_entries.get(&working_directory_id)?,
759 })
760 }
761}
762
763impl<'pool> WorkingDirectoryPoolGuardMut<'pool> {
764 pub fn get_working_directory_mut<'guard>(
766 &'guard mut self,
767 working_directory_id: WorkingDirectoryId,
768 ) -> Option<WorkingDirectoryWithPoolLockMut<'guard>> {
769 Some(WorkingDirectoryWithPoolLockMut {
770 wd: self.pool.state.all_entries.get_mut(&working_directory_id)?,
771 })
772 }
773
774 pub fn into_get_working_directory_mut(
777 self,
778 working_directory_id: WorkingDirectoryId,
779 ) -> WorkingDirectoryWithPoolMut<'pool> {
780 WorkingDirectoryWithPoolMut {
781 guard: self,
782 working_directory_id,
783 }
784 }
785
786 pub fn get_first(&mut self) -> Result<WorkingDirectoryId> {
789 if let Some((key, _)) = self.pool.active_entries().next() {
790 return Ok(key);
791 }
792 self.get_new()
793 }
794
795 fn get_new(&mut self) -> Result<WorkingDirectoryId> {
798 let id = self.next_id();
799 debug!("get_new: using {id:?}");
800 let dir = WorkingDirectory::clone_repo(
801 self.pool.base_dir().path(),
802 &id.to_directory_file_name(),
803 self.pool.git_url(),
804 &self.shared(),
805 self.pool.context.signal_change.clone(),
806 )?;
807 self.pool.state.all_entries.insert(id, dir);
808 Ok(id)
809 }
810
811 fn save_processing_error(
816 &mut self,
817 id: WorkingDirectoryId,
818 processing_error: ProcessingError,
819 timestamp: &DateTimeWithOffset,
820 ) -> Result<()> {
821 let error_file_path = self.pool.base_dir().path().append(format!(
822 "{}.error_at_{timestamp}",
823 id.to_directory_file_name()
824 ));
825 let processing_error_string = serde_yml::to_string(&processing_error)?;
826 std::fs::write(&error_file_path, &processing_error_string)
827 .map_err(ctx!("writing to {error_file_path:?}"))?;
828
829 info!("saved processing error to {error_file_path:?}");
830
831 Ok(())
832 }
833
834 pub fn delete_working_directory(
837 &mut self,
838 working_directory_id: WorkingDirectoryId,
839 ) -> Result<()> {
840 let wd = self
841 .pool
842 .state
843 .all_entries
844 .get_mut(&working_directory_id)
845 .ok_or_else(|| anyhow!("working directory id must still exist"))?;
846 let path = wd.git_working_dir.working_dir_path_arc();
847 info!("delete_working_directory: deleting directory {path:?}");
848 self.pool.state.all_entries.remove(&working_directory_id);
849 std::fs::remove_dir_all(&*path).map_err(ctx!("deleting directory {path:?}"))?;
850 Ok(())
851 }
852
853 fn next_id(&mut self) -> WorkingDirectoryId {
854 let id = self.pool.state.next_id;
855 self.pool.state.next_id += 1;
856 WorkingDirectoryId(id)
857 }
858
859 fn init_active_commit_ids(&mut self) -> Result<()> {
862 for (_, wd) in self.pool.active_entries_mut() {
863 let mut wd = WorkingDirectoryWithPoolLockMut { wd };
868 wd.commit()?;
869 }
870 Ok(())
871 }
872
873 fn try_get_fitting_working_directory_for(
878 &mut self,
879 run_parameters: &RunParameters,
880 run_queues_data: &RunQueuesData,
881 ) -> Result<Option<WorkingDirectoryId>> {
882 let commit: &GitHash = &run_parameters.commit_id;
886
887 self.init_active_commit_ids()?;
890 if let Some((id, _dir)) = self
891 .pool
892 .active_entries_mut()
893 .filter(|(_, wd)| wd.commit.as_ref().expect("initialized above") == commit)
894 .max_by_key(|(_, dir)| dir.working_directory_status.status)
897 {
898 info!("try_get_best_working_directory_for: found by commit {commit}");
899 return Ok(Some(id));
900 }
901
902 if let Some((id, _dir)) = self
905 .pool
906 .active_entries()
907 .filter(|(_, dir)| {
908 !run_queues_data
909 .have_job_with_commit_id(dir.commit.as_ref().expect("initialized above"))
910 })
911 .max_by_key(|(_, dir)| dir.working_directory_status.status)
912 {
913 info!("try_get_best_working_directory_for: found as obsolete");
914 return Ok(Some(id));
915 }
916
917 Ok(None)
918 }
919
920 pub fn get_a_working_directory_for<'s>(
928 &'s mut self,
929 run_parameters: &RunParameters,
930 run_queues_data: &RunQueuesData,
931 ) -> Result<WorkingDirectoryId> {
932 if let Some(id) =
933 self.try_get_fitting_working_directory_for(run_parameters, run_queues_data)?
934 {
935 info!("get_a_working_directory_for -> good old {id:?}");
936 Ok(id)
937 } else {
938 if self.pool.active_len() < self.pool.capacity() {
939 let id = self.get_new()?;
941 info!("get_a_working_directory_for -> new {id:?}");
942 Ok(id)
943 } else {
944 let id = self
946 .pool
947 .active_entries()
948 .min_by_key(|(_, entry)| entry.last_use)
949 .expect("capacity is guaranteed >= 1")
950 .0
951 .clone();
952 info!("get_a_working_directory_for -> lru old {id:?}");
953 Ok(id)
954 }
955 }
956 }
957
958 pub fn clear_current_working_directory(&self) -> Result<()> {
967 let path = self
968 .pool
969 .context
970 .base_dir
971 .current_working_directory_symlink_path();
972 if let Err(e) = std::fs::remove_file(&path) {
973 match e.kind() {
974 std::io::ErrorKind::NotFound => (),
975 _ => Err(e).map_err(ctx!("removing symlink {path:?}"))?,
976 }
977 }
978 if let Some(signal_change) = &self.pool.context.signal_change {
979 signal_change.send_signal();
980 }
981 Ok(())
982 }
983
984 fn set_current_working_directory(&self, id: WorkingDirectoryId) -> Result<()> {
988 let source_path = id.to_directory_file_name();
989 let target_path = self
990 .pool
991 .context
992 .base_dir
993 .current_working_directory_symlink_path();
994 std::os::unix::fs::symlink(&source_path, &target_path).map_err(ctx!(
995 "creating symlink at {target_path:?} to {source_path:?}"
996 ))?;
997 if let Some(signal_change) = &self.pool.context.signal_change {
998 signal_change.send_signal();
999 }
1000 Ok(())
1001 }
1002}