evobench_tools/run/
insert_jobs.rs

1use std::{io::stdout, time::SystemTime};
2
3use anyhow::{Result, bail};
4use itertools::Itertools;
5
6use crate::{
7    config_file::ron_to_string_pretty,
8    debug,
9    key_val_fs::key_val::{KeyVal, KeyValSync},
10    run::{
11        config::ShareableConfig,
12        key::{BenchmarkingJobParameters, BenchmarkingJobParametersHash},
13        sub_command::open_polling_pool,
14    },
15    serde_types::date_and_time::system_time_to_rfc3339,
16};
17
18use super::{
19    benchmarking_job::BenchmarkingJob, global_app_state_dir::GlobalAppStateDir,
20    run_queues::RunQueues,
21};
22
23/// Open the table of the already-inserted jobs
24pub fn open_already_inserted(
25    global_app_state_dir: &GlobalAppStateDir,
26) -> Result<KeyVal<BenchmarkingJobParametersHash, (BenchmarkingJobParameters, Vec<SystemTime>)>> {
27    Ok(KeyVal::open(
28        global_app_state_dir.already_inserted_base()?,
29        crate::key_val_fs::key_val::KeyValConfig {
30            sync: KeyValSync::All,
31            // already created anyway
32            create_dir_if_not_exists: false,
33        },
34        // Do not currently send signals for that, nothing is
35        // currently listening.
36        None,
37    )?)
38}
39
40#[derive(Debug, Clone, clap::Args)]
41pub struct ForceOpt {
42    /// Normally, the same job parameters can only be inserted
43    /// once, subsequent attempts yield an error. This overrides
44    /// the check and allows insertion anyway.
45    #[clap(long)]
46    pub force: bool,
47}
48
49#[derive(Debug, Clone, clap::Args)]
50pub struct QuietOpt {
51    /// Skip attempts at insertion quietly if the given job parameters
52    /// were already inserted before (by default, give an error)
53    #[clap(long)]
54    pub quiet: bool,
55}
56
57#[derive(Debug, Clone, clap::Args)]
58pub struct DryRunOpt {
59    /// Show the details of the jobs that would be inserted
60    /// instead of inserting them.
61    #[clap(long)]
62    dry_run: bool,
63}
64
65/// Unless `dry_run` is true (in which a report is printed to stdout),
66/// inserts the jobs into the first queue and the `already_inserted`
67/// table. Returns the number of jobs actually inserted. If a job has
68/// already been inserted in the past and `force_opt` and `quiet_opt`
69/// are both false, no job is inserted and an error listing all the
70/// already-inserted jobs is returned instead. Also, if the commit id
71/// of any job is not present in upstream, returns an error without
72/// inserting any jobs.
73pub fn insert_jobs(
74    benchmarking_jobs: Vec<BenchmarkingJob>,
75    config: &ShareableConfig,
76    dry_run_opt: DryRunOpt,
77    force_opt: ForceOpt,
78    quiet_opt: QuietOpt,
79    queues: &RunQueues,
80) -> Result<usize> {
81    let DryRunOpt { dry_run } = dry_run_opt;
82    let ForceOpt { force } = force_opt;
83    let QuietOpt { quiet } = quiet_opt;
84
85    let already_inserted = open_already_inserted(&config.global_app_state_dir)?;
86    let _lock = already_inserted.lock_exclusive()?;
87
88    let mut polling_pool = open_polling_pool(config)?;
89
90    let mut jobs_to_insert: Vec<(
91        BenchmarkingJob,
92        BenchmarkingJobParametersHash,
93        Vec<SystemTime>,
94    )> = Vec::new();
95    // Only if !quiet
96    let mut jobs_already_inserted: Vec<String> = Vec::new();
97
98    for benchmarking_job in benchmarking_jobs {
99        let run_parameters_hash =
100            BenchmarkingJobParametersHash::from(&benchmarking_job.benchmarking_job_parameters());
101
102        // All insertion times, for adding the new ones below
103        let insertion_times;
104
105        // Check if already inserted
106        if let Some(mut entry) = already_inserted.entry_opt(&run_parameters_hash)? {
107            let params;
108            (params, insertion_times) = entry.get()?;
109            if force {
110                debug!("already inserted, but --force was given");
111                // fall through and try to do insertion anyway, below
112            } else {
113                if !quiet {
114                    let insertion_times = insertion_times
115                        .iter()
116                        .cloned()
117                        .map(|t| system_time_to_rfc3339(t, None))
118                        .join(", ");
119                    jobs_already_inserted.push(format!(
120                        "These parameters have already been inserted at {insertion_times}:\n{}",
121                        ron_to_string_pretty(&params).expect("no err")
122                    ));
123                }
124                debug!("already inserted, skip insertion");
125                continue;
126            }
127        } else {
128            insertion_times = Vec::new()
129        }
130
131        {
132            let commit = &benchmarking_job.public.run_parameters.commit_id;
133            if !polling_pool.commit_is_valid(commit)? {
134                bail!(
135                    "commit {commit} does not exist in the repository at {:?}",
136                    config.run_config.remote_repository.url.as_str()
137                )
138            }
139        }
140        // ^ XX so, always checks upstream repo for the commit! hm OK sure?
141
142        jobs_to_insert.push((benchmarking_job, run_parameters_hash, insertion_times));
143    }
144
145    if !jobs_already_inserted.is_empty() {
146        bail!(
147            "there are jobs that were already inserted:\n\n{}",
148            jobs_already_inserted.join("\n\n")
149        )
150    }
151
152    if dry_run {
153        use std::io::Write;
154        let mut out = stdout().lock();
155        for (i, (benchmarking_job, _run_parameters_hash, _insertion_times)) in
156            jobs_to_insert.into_iter().enumerate()
157        {
158            writeln!(
159                &mut out,
160                "would insert job {}:\n{}",
161                i + 1,
162                ron_to_string_pretty(&benchmarking_job).expect("no err")
163            )?;
164        }
165        out.flush()?;
166        Ok(0)
167    } else {
168        // Insert the jobs
169        let mut num_inserted = 0;
170        for (benchmarking_job, run_parameters_hash, mut insertion_times) in jobs_to_insert {
171            queues.first().push_front(&benchmarking_job)?;
172            num_inserted += 1;
173
174            // Update the `already_inserted` table (have to re-request
175            // entry as it contains a file handle (and lock?, but even
176            // without a lock it's bad)) -- XXX do we get a lock on the whole table, though?
177            if let Some(entry) = already_inserted.entry_opt(&run_parameters_hash)? {
178                entry.delete()?;
179            }
180            insertion_times.push(SystemTime::now());
181            already_inserted.insert(
182                &run_parameters_hash,
183                &(
184                    benchmarking_job.benchmarking_job_parameters(),
185                    insertion_times,
186                ),
187                true,
188            )?;
189        }
190
191        Ok(num_inserted)
192    }
193}