evobench_tools/run/
insert_jobs.rs1use std::{io::stdout, time::SystemTime};
2
3use anyhow::{Result, bail};
4use itertools::Itertools;
5
6use crate::{
7 config_file::ron_to_string_pretty,
8 debug,
9 key_val_fs::key_val::{KeyVal, KeyValSync},
10 run::{
11 config::ShareableConfig,
12 key::{BenchmarkingJobParameters, BenchmarkingJobParametersHash},
13 sub_command::open_polling_pool,
14 },
15 serde_types::date_and_time::system_time_to_rfc3339,
16};
17
18use super::{
19 benchmarking_job::BenchmarkingJob, global_app_state_dir::GlobalAppStateDir,
20 run_queues::RunQueues,
21};
22
23pub fn open_already_inserted(
25 global_app_state_dir: &GlobalAppStateDir,
26) -> Result<KeyVal<BenchmarkingJobParametersHash, (BenchmarkingJobParameters, Vec<SystemTime>)>> {
27 Ok(KeyVal::open(
28 global_app_state_dir.already_inserted_base()?,
29 crate::key_val_fs::key_val::KeyValConfig {
30 sync: KeyValSync::All,
31 create_dir_if_not_exists: false,
33 },
34 None,
37 )?)
38}
39
40#[derive(Debug, Clone, clap::Args)]
41pub struct ForceOpt {
42 #[clap(long)]
46 pub force: bool,
47}
48
49#[derive(Debug, Clone, clap::Args)]
50pub struct QuietOpt {
51 #[clap(long)]
54 pub quiet: bool,
55}
56
57#[derive(Debug, Clone, clap::Args)]
58pub struct DryRunOpt {
59 #[clap(long)]
62 dry_run: bool,
63}
64
65pub fn insert_jobs(
74 benchmarking_jobs: Vec<BenchmarkingJob>,
75 config: &ShareableConfig,
76 dry_run_opt: DryRunOpt,
77 force_opt: ForceOpt,
78 quiet_opt: QuietOpt,
79 queues: &RunQueues,
80) -> Result<usize> {
81 let DryRunOpt { dry_run } = dry_run_opt;
82 let ForceOpt { force } = force_opt;
83 let QuietOpt { quiet } = quiet_opt;
84
85 let already_inserted = open_already_inserted(&config.global_app_state_dir)?;
86 let _lock = already_inserted.lock_exclusive()?;
87
88 let mut polling_pool = open_polling_pool(config)?;
89
90 let mut jobs_to_insert: Vec<(
91 BenchmarkingJob,
92 BenchmarkingJobParametersHash,
93 Vec<SystemTime>,
94 )> = Vec::new();
95 let mut jobs_already_inserted: Vec<String> = Vec::new();
97
98 for benchmarking_job in benchmarking_jobs {
99 let run_parameters_hash =
100 BenchmarkingJobParametersHash::from(&benchmarking_job.benchmarking_job_parameters());
101
102 let insertion_times;
104
105 if let Some(mut entry) = already_inserted.entry_opt(&run_parameters_hash)? {
107 let params;
108 (params, insertion_times) = entry.get()?;
109 if force {
110 debug!("already inserted, but --force was given");
111 } else {
113 if !quiet {
114 let insertion_times = insertion_times
115 .iter()
116 .cloned()
117 .map(|t| system_time_to_rfc3339(t, None))
118 .join(", ");
119 jobs_already_inserted.push(format!(
120 "These parameters have already been inserted at {insertion_times}:\n{}",
121 ron_to_string_pretty(¶ms).expect("no err")
122 ));
123 }
124 debug!("already inserted, skip insertion");
125 continue;
126 }
127 } else {
128 insertion_times = Vec::new()
129 }
130
131 {
132 let commit = &benchmarking_job.public.run_parameters.commit_id;
133 if !polling_pool.commit_is_valid(commit)? {
134 bail!(
135 "commit {commit} does not exist in the repository at {:?}",
136 config.run_config.remote_repository.url.as_str()
137 )
138 }
139 }
140 jobs_to_insert.push((benchmarking_job, run_parameters_hash, insertion_times));
143 }
144
145 if !jobs_already_inserted.is_empty() {
146 bail!(
147 "there are jobs that were already inserted:\n\n{}",
148 jobs_already_inserted.join("\n\n")
149 )
150 }
151
152 if dry_run {
153 use std::io::Write;
154 let mut out = stdout().lock();
155 for (i, (benchmarking_job, _run_parameters_hash, _insertion_times)) in
156 jobs_to_insert.into_iter().enumerate()
157 {
158 writeln!(
159 &mut out,
160 "would insert job {}:\n{}",
161 i + 1,
162 ron_to_string_pretty(&benchmarking_job).expect("no err")
163 )?;
164 }
165 out.flush()?;
166 Ok(0)
167 } else {
168 let mut num_inserted = 0;
170 for (benchmarking_job, run_parameters_hash, mut insertion_times) in jobs_to_insert {
171 queues.first().push_front(&benchmarking_job)?;
172 num_inserted += 1;
173
174 if let Some(entry) = already_inserted.entry_opt(&run_parameters_hash)? {
178 entry.delete()?;
179 }
180 insertion_times.push(SystemTime::now());
181 already_inserted.insert(
182 &run_parameters_hash,
183 &(
184 benchmarking_job.benchmarking_job_parameters(),
185 insertion_times,
186 ),
187 true,
188 )?;
189 }
190
191 Ok(num_inserted)
192 }
193}