1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet},
4 fmt::{Debug, Display},
5 path::{Path, PathBuf},
6 str::FromStr,
7 sync::Arc,
8};
9
10use anyhow::{Context, Result, anyhow, bail};
11use chj_unix_util::daemon::DaemonPaths;
12use chrono::{DateTime, Local};
13use cj_path_util::path_util::AppendToPath;
14use kstring::KString;
15
16use crate::{
17 config_file::{ConfigFile, DefaultConfigPath, ron_to_string_pretty},
18 date_and_time::time_ranges::{DateTimeRange, LocalNaiveTimeRange},
19 info,
20 io_utils::{bash::bash_string_from_cmd, div::create_dir_if_not_exists},
21 run::{env_vars::AllowableCustomEnvVar, key::CustomParameters},
22 run_with_pre_exec::{BashSettings, BashSettingsLevel, RunWithPreExec, join_pre_exec_bash_code},
23 serde_types::{
24 allowed_env_var::AllowedEnvVar,
25 date_and_time::LocalNaiveTime,
26 git_branch_name::GitBranchName,
27 git_url::GitUrl,
28 priority::Priority,
29 proper_dirname::ProperDirname,
30 proper_filename::ProperFilename,
31 regex::SerializableRegex,
32 tilde_path::TildePath,
33 val_or_ref::{ValOrRef, ValOrRefTarget},
34 },
35 util::grep_diff::LogExtract,
36 utillib::arc::CloneArc,
37};
38
39use super::{
40 benchmarking_job::BenchmarkingJobSettingsOpts, custom_parameter::AllowedCustomParameter,
41 global_app_state_dir::GlobalAppStateDir, working_directory_pool::WorkingDirectoryPoolOpts,
42};
43
44#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
45#[serde(deny_unknown_fields)]
46pub enum ScheduleCondition {
47 Immediately {
49 situation: ProperFilename,
56 },
57
58 LocalNaiveTimeWindow {
63 priority: Option<Priority>,
66 situation: ProperFilename,
73 stop_start: Option<Vec<String>>,
76 repeatedly: bool,
82 move_when_time_window_ends: bool,
86 from: LocalNaiveTime,
90 to: LocalNaiveTime,
91 },
92
93 Inactive,
97}
98
99impl Display for ScheduleCondition {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 ScheduleCondition::Immediately { situation } => {
103 write!(f, "Immediately {:?}", situation.as_str())
104 }
105 ScheduleCondition::LocalNaiveTimeWindow {
106 priority: _,
107 situation,
108 stop_start,
109 repeatedly,
110 move_when_time_window_ends,
111 from,
112 to,
113 } => {
114 let rep = if *repeatedly { "repeatedly" } else { "once" };
115 let mov = if *move_when_time_window_ends {
116 "move"
117 } else {
118 "stay"
119 };
120 let cmd = if let Some(st) = stop_start {
121 bash_string_from_cmd(st)
122 } else {
123 "-".into()
124 };
125 let pri: f64 = self
126 .priority()
127 .expect("LocalNaiveTimeWindow *does* have priority field")
128 .into();
129 write!(
130 f,
131 "LocalNaiveTimeWindow {:?} {from} - {to} pri={pri}: {rep}, {mov}, \"{cmd}\"",
132 situation.as_str()
133 )
134 }
135 ScheduleCondition::Inactive => f.write_str("Inactive"),
136 }
137 }
138}
139
140impl ScheduleCondition {
141 pub const TIMED_QUEUE_DEFAULT_PRIORITY: Priority = Priority::new_unchecked(1.5);
142
143 pub fn is_inactive(&self) -> bool {
145 match self {
146 ScheduleCondition::Inactive => true,
147 _ => false,
148 }
149 }
150
151 pub fn time_range(&self) -> Option<(LocalNaiveTime, LocalNaiveTime)> {
152 match self {
153 ScheduleCondition::Immediately { situation: _ } => None,
154 ScheduleCondition::LocalNaiveTimeWindow {
155 priority: _,
156 situation: _,
157 stop_start: _,
158 repeatedly: _,
159 move_when_time_window_ends: _,
160 from,
161 to,
162 } => Some((from.clone(), to.clone())),
163 ScheduleCondition::Inactive => None,
164 }
165 }
166
167 pub fn stop_start(&self) -> Option<&[String]> {
168 match self {
169 ScheduleCondition::Immediately { situation: _ } => None,
170 ScheduleCondition::LocalNaiveTimeWindow {
171 priority: _,
172 situation: _,
173 stop_start,
174 repeatedly: _,
175 move_when_time_window_ends: _,
176 from: _,
177 to: _,
178 } => stop_start.as_deref(),
179 ScheduleCondition::Inactive => None,
180 }
181 }
182
183 pub fn move_when_time_window_ends(&self) -> bool {
185 match self {
186 ScheduleCondition::Immediately { situation: _ } => false,
187 ScheduleCondition::LocalNaiveTimeWindow {
188 priority: _,
189 situation: _,
190 stop_start: _,
191 repeatedly: _,
192 move_when_time_window_ends,
193 from: _,
194 to: _,
195 } => *move_when_time_window_ends,
196 ScheduleCondition::Inactive => false,
197 }
198 }
199
200 pub fn situation(&self) -> Option<&ProperFilename> {
201 match self {
202 ScheduleCondition::Immediately { situation } => Some(situation),
203 ScheduleCondition::LocalNaiveTimeWindow {
204 priority: _,
205 situation,
206 stop_start: _,
207 repeatedly: _,
208 move_when_time_window_ends: _,
209 from: _,
210 to: _,
211 } => Some(situation),
212 ScheduleCondition::Inactive => None,
213 }
214 }
215
216 pub fn priority(&self) -> Option<Priority> {
217 match self {
218 ScheduleCondition::Immediately { situation: _ } => Some(Priority::default()),
219 ScheduleCondition::LocalNaiveTimeWindow {
220 priority,
221 situation: _,
222 stop_start: _,
223 repeatedly: _,
224 move_when_time_window_ends: _,
225 from: _,
226 to: _,
227 } => Some(priority.unwrap_or(Self::TIMED_QUEUE_DEFAULT_PRIORITY)),
228 ScheduleCondition::Inactive => None,
229 }
230 }
231
232 pub fn is_runnable_at(
235 &self,
236 reference_time: DateTime<Local>,
237 ) -> Option<Option<DateTimeRange<Local>>> {
238 match self {
239 ScheduleCondition::Immediately { situation: _ } => Some(None),
240 ScheduleCondition::LocalNaiveTimeWindow {
241 priority: _,
242 situation: _,
243 stop_start: _,
244 repeatedly: _,
245 move_when_time_window_ends: _,
246 from,
247 to,
248 } => {
249 let ltr = LocalNaiveTimeRange {
250 from: *from,
251 to: *to,
252 };
253 let dtr: Option<DateTimeRange<Local>> = ltr.after_datetime(&reference_time, true);
254 if let Some(dtr) = dtr {
255 if dtr.contains(&reference_time) {
256 Some(Some(dtr))
257 } else {
258 None
259 }
260 } else {
261 info!("times in {ltr} do not resolve for {reference_time}");
262 None
263 }
264 }
265 ScheduleCondition::Inactive => None,
266 }
267 }
268}
269
270#[derive(Debug, serde::Serialize, serde::Deserialize)]
271#[serde(deny_unknown_fields)]
272pub struct QueuesConfig {
273 pub run_queues_basedir: Option<TildePath<PathBuf>>,
280
281 pub pipeline: Vec<(ProperFilename, ScheduleCondition)>,
284
285 pub erroneous_jobs_queue: Option<(ProperFilename, ScheduleCondition)>,
291
292 pub done_jobs_queue: Option<(ProperFilename, ScheduleCondition)>,
296
297 pub view_jobs_max_len: usize,
301}
302
303impl QueuesConfig {
304 pub fn run_queues_basedir(
305 &self,
306 create_if_not_exists: bool,
307 global_app_state_dir: &GlobalAppStateDir,
308 ) -> Result<PathBuf> {
309 if let Some(base_dir) = &self.run_queues_basedir {
310 let base_dir = base_dir.resolve()?;
311 if create_if_not_exists {
312 create_dir_if_not_exists(&base_dir, "queues base directory")?;
313 }
314 Ok(base_dir)
315 } else {
316 global_app_state_dir.run_queues_basedir()
317 }
318 }
319}
320
321#[derive(serde::Serialize, serde::Deserialize, Debug)]
322#[serde(deny_unknown_fields)]
323#[serde(rename = "RemoteRepository")]
324pub struct RemoteRepositoryOpts {
325 pub url: GitUrl,
327
328 pub remote_branch_names_for_poll:
330 BTreeMap<GitBranchName, ValOrRef<JobTemplateListsField, Vec<JobTemplateOpts>>>,
331}
332
333pub struct RemoteRepository {
334 pub url: GitUrl,
335 pub remote_branch_names_for_poll: BTreeMap<GitBranchName, Arc<[JobTemplate]>>,
336}
337
338impl RemoteRepositoryOpts {
339 fn check(
340 &self,
341 job_template_lists: &BTreeMap<KString, Arc<[JobTemplate]>>,
342 targets: &BTreeMap<ProperDirname, Arc<BenchmarkingTarget>>,
343 ) -> Result<RemoteRepository> {
344 let Self {
345 url,
346 remote_branch_names_for_poll,
347 } = self;
348
349 let remote_branch_names_for_poll = remote_branch_names_for_poll
350 .iter()
351 .map(|(branch_name, job_template_optss)| -> Result<_> {
352 let job_templates: ValOrRef<JobTemplateListsField, Arc<[JobTemplate]>> =
353 job_template_optss.try_map(
354 |job_template_optss: &Vec<JobTemplateOpts>| -> Result<Arc<[JobTemplate]>> {
355 job_template_optss
356 .iter()
357 .map(|job_template_opts| job_template_opts.check(targets))
358 .collect()
359 },
360 )?;
361 let job_templates = job_templates.value_with_backing(job_template_lists)?;
362 Ok((branch_name.clone(), job_templates.clone_arc()))
363 })
364 .collect::<Result<_>>()?;
365
366 Ok(RemoteRepository {
367 url: url.clone(),
368 remote_branch_names_for_poll,
369 })
370 }
371}
372
373#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
374#[serde(from = "Option<Arc<str>>", into = "Option<Arc<str>>")]
375pub struct PreExecLevel2(Option<Arc<str>>);
376
377impl From<Option<Arc<str>>> for PreExecLevel2 {
378 fn from(value: Option<Arc<str>>) -> Self {
379 Self(value)
380 }
381}
382
383impl From<PreExecLevel2> for Option<Arc<str>> {
384 fn from(value: PreExecLevel2) -> Self {
385 value.0
386 }
387}
388
389impl PreExecLevel2 {
390 pub fn new(arg: Option<Arc<str>>) -> Self {
391 Self(arg)
392 }
393
394 pub fn to_run_with_pre_exec(&self, conf: &RunConfig) -> RunWithPreExec<'static> {
395 let code = join_pre_exec_bash_code(
396 conf.target_pre_exec_bash_code.as_deref().unwrap_or(""),
397 self.0.as_deref().unwrap_or(""),
398 );
399 RunWithPreExec {
400 pre_exec_bash_code: code.into(),
401 bash_settings: BashSettings {
402 level: BashSettingsLevel::SetMEUPipefail,
403 set_ifs: true,
404 },
405 bash_path: None,
406 }
407 }
408}
409
410#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
411#[serde(deny_unknown_fields)]
412pub struct BenchmarkingCommand {
416 pub target_name: ProperDirname,
421
422 pub subdir: PathBuf,
425
426 pub command: String,
428
429 pub arguments: Vec<String>,
431
432 pub pre_exec_bash_code: PreExecLevel2,
442}
443
444#[derive(Debug, serde::Serialize, serde::Deserialize)]
445#[serde(deny_unknown_fields)]
446pub struct BenchmarkingTarget {
447 pub benchmarking_command: Arc<BenchmarkingCommand>,
448
449 pub allowed_custom_parameters:
452 BTreeMap<AllowedEnvVar<AllowableCustomEnvVar>, AllowedCustomParameter>,
453
454 pub log_extracts: Option<Vec<LogExtract>>,
460}
461
462#[derive(Debug, serde::Serialize, serde::Deserialize)]
463#[serde(deny_unknown_fields)]
464#[serde(rename = "JobTemplate")]
465pub struct JobTemplateOpts {
466 priority: Priority,
467 initial_boost: Priority,
468 target_name: ProperDirname,
469 custom_parameters: BTreeMap<AllowedEnvVar<AllowableCustomEnvVar>, KString>,
474}
475
476pub struct JobTemplate {
477 pub priority: Priority,
478 pub initial_boost: Priority,
479 pub command: Arc<BenchmarkingCommand>,
480 pub custom_parameters: Arc<CustomParameters>,
481}
482
483impl JobTemplateOpts {
484 pub fn check(
485 &self,
486 targets: &BTreeMap<ProperDirname, Arc<BenchmarkingTarget>>,
487 ) -> Result<JobTemplate> {
488 let Self {
489 priority,
490 initial_boost,
491 target_name,
492 custom_parameters,
493 } = self;
494
495 let target = targets
496 .get(target_name)
497 .ok_or_else(|| anyhow!("unknown target name {:?}", target_name.as_str()))?;
498
499 let custom_parameters =
500 CustomParameters::checked_from(custom_parameters, &target.allowed_custom_parameters)
501 .with_context(|| {
502 let context = ron_to_string_pretty(self).expect("no serialisation errors");
503 anyhow!("processing {context}")
504 })?;
505
506 Ok(JobTemplate {
507 priority: *priority,
508 initial_boost: *initial_boost,
509 command: target.benchmarking_command.clone_arc(),
510 custom_parameters: custom_parameters.into(),
511 })
512 }
513}
514
515#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
517#[serde(deny_unknown_fields)]
518pub struct EvalSettings {
519 pub show_thread_number: bool,
526}
527
528#[derive(Debug, serde::Serialize, serde::Deserialize)]
529#[serde(deny_unknown_fields)]
530#[serde(rename = "DaemonPaths")]
531pub struct DaemonPathsOpts {
532 pub state_dir: Option<TildePath<PathBuf>>,
534 pub log_dir: Option<TildePath<PathBuf>>,
536}
537
538impl DaemonPathsOpts {
539 fn check(
540 &self,
541 global_app_state_dir: &GlobalAppStateDir,
542 default_state_subdir: &str,
543 ) -> Result<DaemonPaths> {
544 let DaemonPathsOpts { state_dir, log_dir } = self;
545
546 let state_dir: Arc<Path> = if let Some(path) = state_dir {
547 path.resolve()?.into()
548 } else {
549 global_app_state_dir.subdir(default_state_subdir)?.into()
550 };
551 let log_dir = if let Some(path) = log_dir {
552 path.resolve()?.into()
553 } else {
554 (&state_dir).append("logs").into()
555 };
556 Ok(DaemonPaths { state_dir, log_dir })
557 }
558}
559
560#[derive(Debug, serde::Serialize, serde::Deserialize)]
561#[serde(deny_unknown_fields)]
562#[serde(rename = "OutputDir")]
563pub struct OutputDirOpts {
564 pub path: Arc<TildePath<PathBuf>>,
568
569 pub url: Option<Arc<str>>,
572}
573
574impl OutputDirOpts {
575 fn resolve(&self) -> Result<OutputDir> {
576 let Self { path, url } = self;
577 let path = path.resolve()?.into();
578 let url = url.clone();
579 Ok(OutputDir { path, url })
580 }
581}
582
583pub struct OutputDir {
584 pub path: Arc<Path>,
585 pub url: Option<Arc<str>>,
586}
587
588#[derive(Debug, serde::Serialize, serde::Deserialize)]
591#[serde(deny_unknown_fields)]
592#[serde(rename = "RunConfig")]
593pub struct RunConfigOpts {
594 pub queues: Arc<QueuesConfig>,
595
596 pub working_directory_pool: Arc<WorkingDirectoryPoolOpts>,
597
598 pub target_pre_exec_bash_code: Option<Arc<str>>,
603
604 pub targets: Vec<Arc<BenchmarkingTarget>>,
608
609 pub job_template_lists: BTreeMap<KString, Vec<JobTemplateOpts>>,
620
621 pub benchmarking_job_settings: Arc<BenchmarkingJobSettingsOpts>,
623
624 pub eval_settings: Arc<EvalSettings>,
626
627 pub remote_repository: RemoteRepositoryOpts,
629
630 pub output_dir: OutputDirOpts,
632
633 run_jobs_daemon: DaemonPathsOpts,
638
639 polling_daemon: DaemonPathsOpts,
642
643 pub versioned_datasets_base_dir: Option<Arc<TildePath<PathBuf>>>,
655
656 pub commit_tags_regex: Option<SerializableRegex>,
660}
661
662#[derive(Debug)]
663pub struct JobTemplateListsField;
664impl ValOrRefTarget for JobTemplateListsField {
665 fn target_desc() -> Cow<'static, str> {
666 "`RunConfig.job_template_lists` field".into()
667 }
668}
669
670impl DefaultConfigPath for RunConfigOpts {
671 fn default_config_file_name_without_suffix() -> Result<Option<ProperFilename>> {
672 Ok(Some("evobench".parse().map_err(|e| anyhow!("{e:#}"))?))
673 }
674}
675
676pub struct RunConfig {
678 pub queues: Arc<QueuesConfig>,
679 pub run_jobs_daemon: DaemonPaths,
680 pub polling_daemon: DaemonPaths,
681 pub working_directory_pool: Arc<WorkingDirectoryPoolOpts>,
682 pub target_pre_exec_bash_code: Option<Arc<str>>,
683 pub job_template_lists: BTreeMap<KString, Arc<[JobTemplate]>>,
685 pub benchmarking_job_settings: Arc<BenchmarkingJobSettingsOpts>,
686 pub eval_settings: Arc<EvalSettings>,
687 pub remote_repository: RemoteRepository,
688 pub output_dir: OutputDir,
689 pub versioned_datasets_base_dir: Option<Arc<Path>>,
690 pub targets: BTreeMap<ProperDirname, Arc<BenchmarkingTarget>>,
691 pub commit_tags_regex: SerializableRegex,
692}
693
694impl RunConfig {
695 pub fn working_directory_change_signals_path(&self) -> PathBuf {
696 (&self.run_jobs_daemon.state_dir).append("working_directory_change.signals")
697 }
698}
699
700impl RunConfigOpts {
701 pub fn check(&self, global_app_state_dir: &GlobalAppStateDir) -> Result<RunConfig> {
703 let RunConfigOpts {
704 queues,
705 working_directory_pool,
706 target_pre_exec_bash_code,
707 targets,
708 job_template_lists,
709 benchmarking_job_settings,
710 eval_settings,
711 remote_repository,
712 output_dir,
713 run_jobs_daemon,
714 polling_daemon,
715 versioned_datasets_base_dir,
716 commit_tags_regex,
717 } = self;
718
719 let targets: BTreeMap<ProperDirname, Arc<BenchmarkingTarget>> = {
720 let mut seen = BTreeSet::new();
721 targets
722 .iter()
723 .map(|benchmarking_target| {
724 let name = &benchmarking_target.benchmarking_command.target_name;
725 if seen.contains(&name) {
726 bail!("duplicate `target_name` value {:?}", name.as_str())
727 }
728 seen.insert(name);
729 Ok((name.clone(), benchmarking_target.clone_arc()))
730 })
731 .collect::<Result<_>>()?
732 };
733
734 let job_template_lists: BTreeMap<KString, Arc<[JobTemplate]>> = job_template_lists
735 .iter()
736 .map(
737 |(template_list_name, template_list)| -> Result<(KString, Arc<[JobTemplate]>)> {
738 Ok((
739 template_list_name.clone(),
740 template_list
741 .iter()
742 .map(|job_template_opts| job_template_opts.check(&targets))
743 .collect::<Result<_>>()?,
744 ))
745 },
746 )
747 .collect::<Result<_>>()?;
748
749 let remote_repository = remote_repository.check(&job_template_lists, &targets)?;
750
751 let commit_tags_regex: SerializableRegex =
752 if let Some(commit_tags_regex) = commit_tags_regex {
753 (*commit_tags_regex).clone()
754 } else {
755 SerializableRegex::from_str(".*")?
756 };
757
758 Ok(RunConfig {
759 queues: queues.clone_arc(),
760 working_directory_pool: working_directory_pool.clone_arc(),
761 target_pre_exec_bash_code: target_pre_exec_bash_code.clone(),
762 job_template_lists,
763 benchmarking_job_settings: benchmarking_job_settings.clone_arc(),
764 eval_settings: eval_settings.clone_arc(),
765 remote_repository,
766 output_dir: output_dir.resolve()?,
767 targets,
768 versioned_datasets_base_dir: versioned_datasets_base_dir
769 .as_ref()
770 .map(|d| d.resolve())
771 .transpose()?
772 .map(Arc::<Path>::from),
773 commit_tags_regex,
774 run_jobs_daemon: run_jobs_daemon.check(global_app_state_dir, "run_jobs_daemon")?,
775 polling_daemon: polling_daemon.check(global_app_state_dir, "polling_daemon")?,
776 })
777 }
778}
779
780#[derive(Clone)]
782pub struct ShareableConfig {
783 pub run_config: Arc<RunConfig>,
784 pub global_app_state_dir: Arc<GlobalAppStateDir>,
785}
786
787pub struct RunConfigBundle {
792 pub config_file: Arc<ConfigFile<RunConfigOpts>>,
793 pub shareable: ShareableConfig,
794}
795
796impl RunConfigBundle {
797 pub fn load(
798 provided_path: Option<Arc<Path>>,
799 or_else: impl FnOnce(&str) -> Result<RunConfigOpts>,
800 global_app_state_dir: GlobalAppStateDir,
801 ) -> Result<Self> {
802 let config_file = Arc::new(ConfigFile::<RunConfigOpts>::load_config(
803 provided_path,
804 or_else,
805 )?);
806 let run_config = config_file.check(&global_app_state_dir)?.into();
807 Ok(Self {
808 config_file,
809 shareable: ShareableConfig {
810 run_config,
811 global_app_state_dir: global_app_state_dir.into(),
812 },
813 })
814 }
815}