1use std::{borrow::Cow, ffi::OsStr, io::stdout, process::exit, sync::Arc, time::SystemTime};
2
3use anyhow::{Result, anyhow, bail};
4use chj_rustbin::duu::{GetDirDiskUsage, bytes_to_gib_string};
5use chj_unix_util::polling_signals::{PollingSignals, PollingSignalsSender, SharedPollingSignals};
6use cj_path_util::path_util::AppendToPath;
7use itertools::Itertools;
8use rayon::iter::{IntoParallelIterator, ParallelIterator};
9
10use crate::output_table::terminal::{TerminalTable, TerminalTableOpts};
11use crate::output_table::{OutputTable, OutputTableTitle};
12use crate::run::config::ShareableConfig;
13use crate::run::global_app_state_dir::GlobalAppStateDir;
14use crate::utillib::ask::ask_yn;
15use crate::{
16 ctx, info,
17 io_utils::lockable_file::{StandaloneExclusiveFileLock, StandaloneFileLockError},
18 io_utils::{
19 bash::{bash_export_variable_string, bash_string_from_program_path_and_args},
20 shell::preferred_shell,
21 },
22 lazyresult,
23 run::{
24 command_log_file::CommandLogFile,
25 config::{BenchmarkingCommand, RunConfig},
26 dataset_dir_env_var::dataset_dir_for,
27 env_vars::assert_evobench_env_var,
28 key::{BenchmarkingJobParameters, RunParameters},
29 run_job::get_commit_tags,
30 sub_command::{open_working_directory_pool, wd_log::LogOrLogf},
31 versioned_dataset_dir::VersionedDatasetDir,
32 working_directory::{FetchedTags, Status, WorkingDirectory, WorkingDirectoryStatus},
33 working_directory_pool::{
34 WdAllowBareOpt, WorkingDirectoryId, WorkingDirectoryIdOpt, WorkingDirectoryPoolBaseDir,
35 finish_parsing_working_directory_ids,
36 },
37 },
38 serde_types::date_and_time::system_time_to_rfc3339,
39 utillib::unix::ToExitCode,
40 warn,
41};
42
43pub fn open_working_directory_change_signals(conf: &RunConfig) -> Result<PollingSignals> {
46 let signals_path = conf.working_directory_change_signals_path();
47 PollingSignals::open(&signals_path, 0).map_err(ctx!("opening signals path {signals_path:?}"))
48}
49
50pub fn open_queue_change_signals(
56 global_app_state_dir: &GlobalAppStateDir,
57) -> Result<SharedPollingSignals> {
58 let signals_path = global_app_state_dir.run_queue_signal_change_path();
59 let done_path = global_app_state_dir.run_queue_change_done_path();
60 SharedPollingSignals::open(&signals_path, &done_path, 0)
61 .map_err(ctx!("open_queue_change_signals"))
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum GetRunLockError {
66 #[error("{0}")]
67 AlreadyLocked(StandaloneFileLockError),
68 #[error("{0}")]
69 Generic(anyhow::Error),
70}
71
72pub fn get_run_lock(conf: &RunConfig) -> Result<StandaloneExclusiveFileLock, GetRunLockError> {
76 let run_lock_path = &conf.run_jobs_daemon.state_dir;
77
78 match StandaloneExclusiveFileLock::try_lock_path(run_lock_path, || {
79 "getting the global lock for running jobs".into()
80 }) {
81 Ok(run_lock) => Ok(run_lock),
82 Err(e) => match &e {
83 StandaloneFileLockError::IOError { path: _, error: _ } => {
84 Err(GetRunLockError::Generic(e.into()))
85 }
86 StandaloneFileLockError::AlreadyLocked { path: _, msg: _ } => {
87 Err(GetRunLockError::AlreadyLocked(e))
88 }
89 },
90 }
91}
92
93fn daemon_is_running(conf: &RunConfig) -> Result<bool> {
97 match get_run_lock(conf) {
98 Ok(_) => Ok(false),
99 Err(e) => match &e {
100 GetRunLockError::AlreadyLocked(_) => Ok(true),
101 GetRunLockError::Generic(_) => Err(e.into()),
102 },
103 }
104}
105
106#[derive(Debug, clap::Subcommand)]
107pub enum Wd {
108 List {
110 #[clap(flatten)]
111 terminal_table_opts: TerminalTableOpts,
112
113 #[clap(long)]
115 active: bool,
116
117 #[clap(long)]
119 error: bool,
120
121 #[clap(short, long)]
123 numeric_sort: bool,
124
125 #[clap(short, long)]
128 du_sort: bool,
129
130 #[clap(short, long)]
132 id_only: bool,
133
134 #[clap(long)]
137 no_du: bool,
138
139 #[clap(long)]
142 no_commit: bool,
143 },
144 Cleanup {
147 #[clap(long)]
149 dry_run: bool,
150
151 #[clap(short, long)]
154 verbose: bool,
155
156 #[clap(subcommand)]
158 mode: WdCleanupMode,
159 },
160 Delete {
162 #[clap(long)]
164 dry_run: bool,
165
166 #[clap(short, long)]
168 force: bool,
169
170 #[clap(short, long)]
173 verbose: bool,
174
175 #[clap(flatten)]
176 allow_bare: WdAllowBareOpt,
177
178 ids: Vec<WorkingDirectoryIdOpt>,
182 },
183 Log(LogOrLogf),
186 Logf(LogOrLogf),
189 Mark {
192 #[clap(flatten)]
193 allow_bare: WdAllowBareOpt,
194
195 ids: Vec<WorkingDirectoryIdOpt>,
197 },
198 Unmark {
202 #[clap(flatten)]
203 allow_bare: WdAllowBareOpt,
204
205 ids: Vec<WorkingDirectoryIdOpt>,
207 },
208 Recycle {
214 #[clap(flatten)]
215 allow_bare: WdAllowBareOpt,
216
217 ids: Vec<WorkingDirectoryIdOpt>,
219 },
220 Enter {
224 #[clap(long)]
227 mark: bool,
228
229 #[clap(long)]
232 unmark: bool,
233
234 #[clap(long)]
236 force: bool,
237
238 #[clap(long)]
243 no_fetch: bool,
244
245 #[clap(flatten)]
246 allow_bare: WdAllowBareOpt,
247
248 id: WorkingDirectoryIdOpt,
250 },
251}
252
253#[derive(Debug, clap::Subcommand)]
254pub enum WdCleanupMode {
255 All,
257 StaleForDays {
260 x: f32,
262 },
263}
264
265impl Wd {
266 pub fn run(
267 self,
268 shareable_config: &ShareableConfig,
269 working_directory_base_dir: &Arc<WorkingDirectoryPoolBaseDir>,
270 queue_change_signals: PollingSignalsSender,
271 ) -> Result<()> {
272 let omit_check = true;
275
276 let conf = &shareable_config.run_config;
277
278 let mut working_directory_pool = open_working_directory_pool(
279 conf,
280 working_directory_base_dir.clone(),
281 omit_check,
282 Some(queue_change_signals),
283 )?
284 .into_inner();
286
287 let check_original_status =
288 |wd: &WorkingDirectory, allowed_statuses: &str| -> Result<Status> {
289 let status = wd.working_directory_status.status;
290 if status.can_be_used_for_jobs() {
291 bail!(
292 "this action is only for working directories in {allowed_statuses} \
293 status, but directory {} has status '{}' (you can use --force to \
294 bypass this check)",
295 wd.working_directory_path().parent_path_and_id()?.1,
296 status
297 )
298 } else {
303 Ok(status)
304 }
305 };
306
307 #[derive(Debug, thiserror::Error)]
308 enum DoMarkError {
309 #[error("{0}")]
310 Check(anyhow::Error),
311 #[error("{0}")]
312 Generic(anyhow::Error),
313 }
314
315 enum Marked {
316 OldStatus(Status),
317 Unchanged,
318 }
319
320 let mut do_mark = |wanted_status: Status,
325 ignore_if_already_wanted_status: bool,
326 id: WorkingDirectoryId,
327 working_directory_change_signals: Option<&mut PollingSignals>|
328 -> Result<Option<Marked>, DoMarkError> {
329 let mut guard = working_directory_pool
330 .lock_mut("evobench SubCommand::Wd do_mark")
331 .map_err(DoMarkError::Generic)?;
332 if let Some(mut wd) = guard.get_working_directory_mut(id) {
333 if ignore_if_already_wanted_status
334 && wd.working_directory_status.status == wanted_status
335 {
336 return Ok(Some(Marked::Unchanged));
337 }
338 let original_status =
339 check_original_status(&*wd, "error/examination").map_err(DoMarkError::Check)?;
340 wd.set_and_save_status(wanted_status)
341 .map_err(DoMarkError::Generic)?;
342 if let Some(working_directory_change_signals) = working_directory_change_signals {
343 working_directory_change_signals.send_signal();
344 }
345 Ok(Some(Marked::OldStatus(original_status)))
346 } else {
347 Ok(None)
348 }
349 };
350
351 let mut working_directory_change_signals =
352 lazyresult!(open_working_directory_change_signals(conf));
353
354 match self {
355 Wd::List {
356 terminal_table_opts,
357 active,
358 error,
359 id_only,
360 no_commit,
361 numeric_sort,
362 no_du,
363 du_sort,
364 } => {
365 if no_du && du_sort {
366 bail!("both --no-du and --du-sort were given, these options are conflicting");
367 }
368 if id_only && du_sort {
369 bail!("both --id-only and --du-sort were given, these options are conflicting");
370 }
371 let show_commit = !no_commit;
375 let show_du = !no_du;
376
377 let widths = {
378 let mut widths = vec![5 + 2, Status::MAX_STR_LEN + 2, 8 + 2, 35 + 2, 35 + 2];
379 if show_commit {
380 widths.push(40 + 2);
381 }
382 if show_du {
383 widths.push(7 + 2);
384 }
385 widths.pop();
386 widths
387 };
388 let titles: Vec<OutputTableTitle> = {
389 let mut titles =
390 vec!["id", "status", "num_runs", "creation_timestamp", "last_use"];
391 if show_commit {
392 titles.push("commit_id");
393 }
394 if show_du {
395 titles.push("du_GiB");
396 }
397 titles
398 .into_iter()
399 .map(|s| OutputTableTitle {
400 text: Cow::Borrowed(s),
401 span: 1,
402 anchor_name: None,
403 })
404 .collect()
405 };
406
407 let table = if id_only {
408 None
409 } else {
410 let mut table =
411 TerminalTable::new(&widths, terminal_table_opts, stdout().lock());
412 table.write_title_row(&titles, None)?;
413 Some(table)
414 };
415
416 let all_ids: Vec<_> = {
417 let mut all_entries: Vec<_> = working_directory_pool.all_entries().collect();
418 if numeric_sort {
419 } else {
421 all_entries.sort_by(|a, b| a.1.last_use.cmp(&b.1.last_use))
422 }
423 all_entries.into_iter().map(|(id, _)| id).collect()
426 };
427
428 let mut show_as_table = Vec::new();
429
430 for id in all_ids {
431 let wd = working_directory_pool
432 .get_working_directory(id)
433 .expect("got it from all_entries");
434 let WorkingDirectoryStatus {
435 creation_timestamp,
436 num_runs,
437 status,
438 } = &wd.working_directory_status;
439
440 let show = match (active, error) {
441 (true, true) | (false, false) => true,
442 (true, false) => status.can_be_used_for_jobs(),
443 (false, true) => !status.can_be_used_for_jobs(),
444 };
445 if show {
446 if table.is_some() {
447 show_as_table.push((
448 vec![
449 id.to_string(),
450 status.to_string(),
451 num_runs.to_string(),
452 creation_timestamp.to_string(),
453 system_time_to_rfc3339(wd.last_use, None),
454 ],
455 wd.working_directory_path(),
456 wd.commit.clone(),
457 ));
458 } else {
459 println!("{id}");
460 }
461 }
462 }
463
464 if let Some(mut table) = table {
465 let mut rows = show_as_table
466 .into_par_iter()
467 .map(|(mut row, wdp, opt_commit)| -> Result<_> {
468 let get_commit = || -> Result<String> {
469 let commit = if let Some(commit) = opt_commit {
470 info!("already had a commit! how comes?");
471 commit
472 } else {
473 wdp.noncached_commit()?
474 };
475 Ok(commit.to_string())
476 };
477 let get_du = || -> Result<String> {
478 let gdu = GetDirDiskUsage {
479 one_file_system: false,
480 share_globally: false,
481 shared_inodes: Default::default(),
482 };
483 let du = gdu.dir_disk_usage(wdp.clone().into(), 0)?;
484 let shared_inodes = gdu.shared_inodes.lock().expect("no crash");
485 let bytes = du.total(&shared_inodes);
486 Ok(bytes_to_gib_string(bytes))
487 };
488 if show_commit && show_du {
489 let (commit, du) = rayon::join(get_commit, get_du);
490 row.push(commit?);
491 row.push(du?);
492 } else {
493 if show_commit {
494 row.push(get_commit()?);
495 }
496 if show_du {
497 row.push(get_du()?);
498 }
499 }
500 Ok(row)
501 })
502 .collect::<Result<Vec<Vec<String>>>>()?;
503
504 if du_sort {
505 let mut col = 5;
508 if show_commit {
509 col += 1;
510 }
511 rows.sort_by(|a, b| a[col].cmp(&b[col]));
512 }
513
514 for row in rows {
515 table.write_data_row(&row, None)?;
516 }
517 let _ = table.finish()?;
518 }
519 Ok(())
520 }
521 Wd::Cleanup {
522 dry_run,
523 verbose,
524 mode,
525 } => {
526 let stale_days = match mode {
527 WdCleanupMode::All => 0.,
528 WdCleanupMode::StaleForDays { x } => x,
529 };
530 if stale_days < 0. {
531 bail!("number of days must be non-negative");
532 }
533 if stale_days > 1000. || stale_days.is_nan() {
534 bail!("number of days must be reasonable");
535 }
536
537 let stale_seconds = (stale_days * 24. * 3600.) as u64;
538
539 let now = SystemTime::now();
540
541 let mut cleanup_ids = Vec::new();
542 for (id, wd) in working_directory_pool
543 .all_entries()
544 .filter(|(_, wd)| wd.working_directory_status.status == Status::Error)
545 {
546 let d = now.duration_since(wd.last_use).map_err(ctx!(
547 "calculating time since last use of working directory {id}"
548 ))?;
549 if d.as_secs() > stale_seconds {
550 cleanup_ids.push(id);
551 }
552 }
553
554 {
555 let mut lock = working_directory_pool.lock_mut("evobench Wd::Cleanup")?;
556 for id in cleanup_ids {
557 if dry_run {
558 eprintln!("would delete working directory {id}");
559 } else {
560 lock.delete_working_directory(id)?;
563 }
564 if verbose {
565 println!("{id}");
566 }
567 }
568 }
569 Ok(())
570 }
571 Wd::Delete {
572 dry_run,
573 force,
574 verbose,
575 ids,
576 allow_bare,
577 } => {
578 let ids = finish_parsing_working_directory_ids(ids, allow_bare)?;
579
580 let mut lock_mut = working_directory_pool.lock_mut("evobench Wd::Delete")?;
581 let opt_current_wd_id = lock_mut
582 .locked_base_dir()
583 .read_current_working_directory()?;
584 for id in ids {
585 let lock = lock_mut.shared();
586 let wd = lock
587 .get_working_directory(id)
588 .ok_or_else(|| anyhow!("working directory {id} does not exist"))?;
589 let status = wd.working_directory_status.status;
590 if force {
591 if Some(id) == opt_current_wd_id {
592 let status_is_in_use = match status {
594 Status::CheckedOut => true, Status::Processing => true,
596 Status::Error => false,
597 Status::Finished => false,
598 Status::Examination => false,
599 };
600 if status_is_in_use {
601 if daemon_is_running(conf)? {
602 bail!(
603 "working directory {id} is in use and \
604 the daemon is running"
605 );
606 } else {
607 }
609 }
610 }
611 } else {
612 if status != Status::Error {
613 let tip = if status == Status::Examination {
614 "; please first use the `unmark` action to move it \
615 out of examination"
616 } else {
617 "; use the `--force` option if you're sure"
618 };
619 bail!(
620 "working directory {id} is not in `error`, but `{status}` \
621 status{tip}"
622 );
623 }
624 }
625 if dry_run {
626 let path = wd.git_working_dir.working_dir_path_ref();
627 eprintln!("would delete working directory at {path:?}");
628 } else {
629 if status.can_be_used_for_jobs() {
630 working_directory_change_signals.force_mut()?.send_signal();
631 }
635
636 lock_mut.delete_working_directory(id)?;
641 if verbose {
642 println!("{id}");
643 }
644 }
645 }
646 Ok(())
647 }
648 Wd::Log(opts) => {
649 opts.run(false, &working_directory_pool)?;
650 Ok(())
651 }
652 Wd::Logf(opts) => {
653 opts.run(true, &working_directory_pool)?;
654 Ok(())
655 }
656 Wd::Mark { ids, allow_bare } => {
657 let ids = finish_parsing_working_directory_ids(ids, allow_bare)?;
658 for id in ids {
659 if do_mark(Status::Examination, true, id, None)?.is_none() {
660 warn!("there is no working directory for id {id}");
661 }
662 }
663 Ok(())
664 }
665 Wd::Unmark { ids, allow_bare } => {
666 let ids = finish_parsing_working_directory_ids(ids, allow_bare)?;
667 for id in ids {
668 if do_mark(Status::Error, true, id, None)?.is_none() {
669 warn!("there is no working directory for id {id}");
670 }
671 }
672 Ok(())
673 }
674 Wd::Recycle { ids, allow_bare } => {
675 let ids = finish_parsing_working_directory_ids(ids, allow_bare)?;
676 for id in ids {
677 if do_mark(
678 Status::CheckedOut,
679 true,
680 id,
681 Some(working_directory_change_signals.force_mut()?),
682 )?
683 .is_none()
684 {
685 warn!("there is no working directory for id {id}");
686 }
687 }
688 Ok(())
689 }
690 Wd::Enter {
691 mark,
692 unmark,
693 force,
694 no_fetch,
695 id,
696 allow_bare,
697 } => {
698 let id = id.to_working_directory_id(allow_bare)?;
699 if mark && unmark {
700 bail!("please only give one of the --mark or --unmark options")
701 }
702
703 let no_exist = || anyhow!("there is no working directory for id {id}");
704
705 let original_status: Option<Status> =
709 match do_mark(Status::Examination, false, id, None) {
710 Ok(status) => {
711 if let Some(status) = status {
712 match status {
713 Marked::OldStatus(status) => Some(status),
714 Marked::Unchanged => unreachable!("we gave it false"),
715 }
716 } else {
717 Err(no_exist())?
718 }
719 }
720 Err(DoMarkError::Check(e)) => {
721 if force {
722 None
723 } else {
724 Err(e)?
725 }
726 }
727 Err(DoMarkError::Generic(e)) => Err(e)?,
728 };
729
730 let working_directory = working_directory_pool
731 .get_working_directory(id)
732 .ok_or_else(&no_exist)?;
733
734 let (standard_log_path, _id) = working_directory
735 .working_directory_path()
736 .last_standard_log_path()?
737 .ok_or_else(|| {
738 anyhow!("could not find a log file for working directory {id}")
739 })?;
740
741 let command_log_file = CommandLogFile::from(&standard_log_path);
742 let command_log = command_log_file.command_log()?;
743
744 let BenchmarkingJobParameters {
745 run_parameters,
746 command,
747 } = command_log.parse_log_file_params()?;
748
749 let RunParameters {
750 commit_id,
751 custom_parameters,
752 } = &*run_parameters;
753
754 let BenchmarkingCommand {
755 target_name: _,
756 subdir,
757 command,
758 arguments,
759 pre_exec_bash_code,
760 } = &*command;
761
762 let fetched_tags = if no_fetch {
763 FetchedTags::Yes
766 } else {
767 working_directory.fetch(Some(commit_id))?
768 };
769
770 let commit_tags = get_commit_tags(
771 working_directory,
772 commit_id,
773 &conf.commit_tags_regex,
774 fetched_tags.clone(),
775 )?;
776
777 let mut vars: Vec<(&str, &OsStr)> = custom_parameters
778 .btree_map()
779 .iter()
780 .map(|(k, v)| (k.as_str(), v.as_ref()))
781 .collect();
782
783 let check = assert_evobench_env_var;
784
785 let commit_id_str = commit_id.to_string();
786 vars.push((check("COMMIT_ID"), commit_id_str.as_ref()));
787 vars.push((check("COMMIT_TAGS"), commit_tags.as_ref()));
788
789 let versioned_dataset_dir = VersionedDatasetDir::new();
790 let dataset_dir_;
791 if let Some(dataset_dir) = dataset_dir_for(
792 conf.versioned_datasets_base_dir.as_deref(),
793 &custom_parameters,
794 &versioned_dataset_dir,
795 &working_directory.git_working_dir,
796 &commit_id,
797 fetched_tags,
798 )? {
799 dataset_dir_ = dataset_dir;
800 vars.push((check("DATASET_DIR"), dataset_dir_.as_ref()));
801 }
802
803 let exports = vars
804 .iter()
805 .map(|(k, v)| bash_export_variable_string(k, &v.to_string_lossy(), " ", "\n"))
806 .join("");
807
808 let shell = preferred_shell()?;
809
810 println!(
813 "The log file from this job execution is:\n\
814 {standard_log_path:?}\n"
815 );
816
817 if shell != "bash" && shell != "/bin/bash" {
818 println!(
819 "Note: SHELL is set to {shell:?}, but the following syntax \
820 is for bash.\n"
821 );
822 }
823
824 println!("The following environment variables have been set:\n\n{exports}");
825
826 println!(
827 "To rerun the benchmarking, please set `BENCH_OUTPUT_LOG` \
828 and optionally `EVOBENCH_LOG` to some suitable paths, \
829 then run:\n\n {}\n",
830 bash_string_from_program_path_and_args(command, arguments)
831 );
832
833 let actual_commit = working_directory.git_working_dir.get_head_commit_id()?;
834 if commit_id_str != actual_commit {
835 println!(
836 "*** WARNING: the checked-out commit in this directory \
837 does not match the commit id for the job! ***\n"
838 );
839 }
840
841 if original_status.is_none() {
842 println!(
843 "*** WARNING: processing is ongoing, entering this directory \
844 by force! Please do not hinder the benchmarking process! ***\n"
845 );
846 }
847
848 let mut cmd = pre_exec_bash_code
852 .to_run_with_pre_exec(conf)
853 .command::<&str>(&shell, []);
854 cmd.envs(vars);
855 cmd.current_dir(
856 working_directory
857 .git_working_dir
858 .working_dir_path_ref()
859 .append(subdir),
860 );
861 let status = cmd.status()?;
862
863 if unmark || original_status != Some(Status::Examination) {
864 if mark {
865 } else {
867 if let Some(original_status) = original_status {
868 let do_revert = unmark
869 || ask_yn(&format!(
870 "Should the working directory status be reverted to \
871 '{original_status}' (i.e. are you done)?"
872 ))?;
873
874 if do_revert {
875 let mut wd = working_directory_pool
876 .lock_mut("evobench Wd::Enter do_revert")?
877 .into_get_working_directory_mut(id);
878 let mut working_directory = wd.get().ok_or_else(|| {
879 anyhow!("there is no working directory for id {id}")
880 })?;
881 let wanted_status = Status::Error;
882 assert!(
883 original_status == wanted_status
884 || original_status == Status::Examination
885 );
886 working_directory.set_and_save_status(wanted_status)?;
887 println!("Changed status to '{wanted_status}'");
888 } else {
889 println!("Leaving status at 'examination'");
890 }
891 } else {
892 }
896 }
897 } else {
898 if !mark {
899 let status_str = if let Some(original_status) = original_status {
900 &original_status.to_string()
901 } else {
902 "processing(?)"
903 };
904 println!(
905 "Leaving working directory status at the original status, \
906 {status_str}",
907 );
908 }
909 }
910
911 exit(status.to_exit_code());
912 }
913 }
914 }
915}