evobench_tools/run/
run_job.rs

1//! Running a benchmarking job
2
3use std::{
4    fs::{create_dir_all, remove_file},
5    io::{Write, stderr},
6    ops::Deref,
7    os::unix::fs::symlink,
8    path::{Path, PathBuf},
9    sync::{Arc, Mutex},
10};
11
12use anyhow::{Result, bail};
13use chrono::{DateTime, Local};
14use cj_path_util::path_util::{AppendToPath, rename_tmp_path};
15use itertools::Itertools;
16use nix::unistd::getpid;
17use regex::Regex;
18
19use crate::{
20    config_file::ron_to_string_pretty,
21    ctx,
22    git::GitHash,
23    git_tags::GitTags,
24    html_files::write_redirect_html_file,
25    info,
26    io_utils::output_capture_log::{CaptureOptions, OutputCaptureLog},
27    run::{
28        bench_tmp_dir::bench_tmp_dir,
29        benchmarking_job::BenchmarkingJob,
30        config::{RunConfig, ShareableConfig},
31        dataset_dir_env_var::dataset_dir_for,
32        env_vars::assert_evobench_env_var,
33        key::{BenchmarkingJobParameters, RunParameters},
34        output_directory::{
35            post_process::compress_file_as,
36            structure::{KeyDir, ReplaceBasePath, RunDir, ToPath},
37        },
38        run_queues::RunQueuesData,
39        versioned_dataset_dir::VersionedDatasetDir,
40        working_directory::{FetchTags, FetchedTags, WorkingDirectory},
41    },
42    serde_types::{date_and_time::DateTimeWithOffset, proper_dirname::ProperDirname},
43    utillib::{
44        arc::CloneArc,
45        cleanup_daemon::{CleanupHandler, Deletion},
46        escaped_display::AsEscapedString,
47        into_arc_path::IntoArcPath,
48        logging::{LogLevel, log_level},
49    },
50};
51
52use super::{
53    config::{BenchmarkingCommand, ScheduleCondition},
54    working_directory_pool::{WorkingDirectoryId, WorkingDirectoryPool},
55};
56
57/// Get the string for the `COMMIT_TAGS` env var, e.g. "" or
58/// "foo,v1.2.3". Wants to be assured that `git fetch --tags` was run
59/// (see methods that return a `FetchedTags`).
60pub fn get_commit_tags(
61    working_dir: &WorkingDirectory,
62    commit_id: &GitHash,
63    re: &Regex,
64    fetched_tags: FetchedTags,
65) -> Result<String> {
66    if fetched_tags != FetchedTags::Yes {
67        bail!("need up to date tags, but got {fetched_tags:?}")
68    }
69
70    let git_working_dir = &working_dir.git_working_dir;
71
72    let git_tags = GitTags::from_dir(git_working_dir)?;
73    // (Huh, `let s = ` being required here makes
74    // no sense to me. rustc 1.90.0)
75    let s = git_tags
76        .get_by_commit(&commit_id)
77        .filter(|s| re.is_match(s))
78        .join(",");
79    Ok(s)
80}
81
82/// The context for running a job (information that should not be part
83/// of `Key`). Only used for one run, as some fields
84/// (e.g. `timestamp`) change. Independent of jobs: this context is
85/// bundled before selecting the job to run.
86pub struct JobRunner<'pool> {
87    pub working_directory_pool: &'pool mut WorkingDirectoryPool,
88    pub output_base_dir: &'pool Arc<Path>,
89    /// The timestamp for this run.
90    pub timestamp: DateTimeWithOffset,
91    // Separate lifetime?
92    pub shareable_config: &'pool ShareableConfig,
93    // ditto?
94    pub versioned_dataset_dir: &'pool VersionedDatasetDir,
95    pub file_cleanup_handler: &'pool CleanupHandler,
96}
97
98impl<'pool> JobRunner<'pool> {
99    pub fn timestamp_local(&self) -> DateTime<Local> {
100        // (A little bit costly.)
101        self.timestamp.to_datetime().into()
102    }
103
104    pub fn run_config(&self) -> &'pool RunConfig {
105        &self.shareable_config.run_config
106    }
107}
108
109pub struct JobRunnerJobData<'run_queues, 'j, 's> {
110    pub job: &'j BenchmarkingJob,
111    pub run_queues_data: &'s RunQueuesData<'run_queues>,
112}
113
114pub struct JobRunnerWithJob<'pool, 'run_queues, 'j, 's> {
115    pub job_runner: JobRunner<'pool>,
116    pub job_data: JobRunnerJobData<'run_queues, 'j, 's>,
117}
118
119impl<'run_queues, 'j, 's> JobRunnerJobData<'run_queues, 'j, 's> {
120    /// Whether more job runs need to be done for the same commit, be
121    /// it for the same job, or others.
122    pub fn have_more_job_runs_for_same_commit(&self) -> bool {
123        // Check if this the last run for the current job. `job` still
124        // contains the count from before running it this time.
125        if self.job.state.remaining_count > 1 {
126            return true;
127        }
128
129        // Otherwise, look for *other* jobs than the current one. Not
130        // so easy since jobs still don't contain an id? Except,
131        // simply check if there is more than 1 entry.
132        self.run_queues_data
133            .jobs_by_commit_id(&self.job.public.run_parameters.commit_id)
134            .len()
135            > 1
136    }
137}
138
139impl<'pool, 'run_queues, 'j, 's> JobRunnerWithJob<'pool, 'run_queues, 'j, 's> {
140    pub fn run_job(
141        &mut self,
142        working_directory_id: WorkingDirectoryId,
143        reason: &Option<String>,
144        schedule_condition: &ScheduleCondition,
145    ) -> Result<()> {
146        // XX put that here, "for backwards compat", but could now use
147        // something else for logging?
148        let benchmarking_job_parameters = self.job_data.job.benchmarking_job_parameters();
149
150        let BenchmarkingJobParameters {
151            run_parameters,
152            command,
153        } = &benchmarking_job_parameters;
154        let RunParameters {
155            commit_id,
156            custom_parameters,
157        } = run_parameters.deref();
158
159        let bench_tmp_dir = &bench_tmp_dir()?;
160        info!(
161            "bench_tmp_dir path, exists?: {:?}",
162            (
163                (&**bench_tmp_dir).as_escaped_string(),
164                bench_tmp_dir.exists()
165            )
166        );
167
168        let file_cleanup_handler = self.job_runner.file_cleanup_handler;
169
170        // File for evobench library output
171        let evobench_log;
172        // File for other output, for optional use by target application
173        let bench_output_log;
174        {
175            let pid = getpid();
176            evobench_log = file_cleanup_handler.register_temporary_file(Deletion::file(
177                bench_tmp_dir.append(format!("evobench-{pid}.log")),
178            )?)?;
179
180            bench_output_log = file_cleanup_handler.register_temporary_file(Deletion::file(
181                bench_tmp_dir.append(format!("bench-output-{pid}.log")),
182            )?)?;
183        }
184
185        // Remove any stale files from previous runs (we're not
186        // removing all possible files (we leave files from other
187        // processes alone (in case running multiple independent
188        // daemons might be useful)), just those that would get in the
189        // way).
190        let _ = remove_file(&evobench_log);
191        let _ = remove_file(&bench_output_log);
192
193        let conf = self.job_runner.run_config();
194
195        let (log_extraction, cleanup) = self
196            .job_runner
197            .working_directory_pool
198            .process_in_working_directory(
199                working_directory_id,
200                &self.job_runner.timestamp,
201                |mut working_directory| -> Result<(&ProperDirname, PathBuf)> {
202                    // Have `checkout` always run git fetch to update
203                    // the remote tags, to get them even if there have
204                    // been past runs where they were not present yet;
205                    // this is so that when the user changes the
206                    // dataset directory and then makes a matching
207                    // tag, we must have that matching tag. Also, for
208                    // the release hack feature, we need to learn when
209                    // the tag was added later. Thus always try to
210                    // update from the git repository (failures
211                    // leading to the working directory ending in
212                    // error state and on repetition the job being
213                    // aborted).
214                    let fetched_tags = working_directory
215                        .get()
216                        .expect("not removed")
217                        .checkout(commit_id.clone(), FetchTags::Always)?;
218
219                    // Drop the lock on the pool
220                    let working_directory = working_directory.into_inner().expect("not removed");
221
222                    let dataset_dir = dataset_dir_for(
223                        conf.versioned_datasets_base_dir.as_deref(),
224                        &custom_parameters,
225                        self.job_runner.versioned_dataset_dir,
226                        &working_directory.git_working_dir,
227                        commit_id,
228                        fetched_tags.clone(),
229                    )?;
230
231                    let commit_tags = get_commit_tags(
232                        &working_directory,
233                        &commit_id,
234                        &conf.commit_tags_regex,
235                        fetched_tags,
236                    )?;
237
238                    let BenchmarkingCommand {
239                        target_name,
240                        subdir,
241                        command,
242                        arguments,
243                        pre_exec_bash_code,
244                    } = command.deref();
245
246                    let mut command = pre_exec_bash_code
247                        .to_run_with_pre_exec(conf)
248                        .command(command, arguments);
249
250                    let dir = working_directory
251                        .git_working_dir
252                        .working_dir_path_ref()
253                        .append(subdir);
254                    let dir_str = dir.as_escaped_string();
255
256                    // for debugging info only:
257                    let cmd_in_dir = format!("command {command:?} in directory {dir_str}");
258
259                    info!(
260                        "running {cmd_in_dir}, EVOBENCH_LOG={}...",
261                        evobench_log.as_escaped_string()
262                    );
263
264                    let check = assert_evobench_env_var;
265
266                    command
267                        .envs(custom_parameters.btree_map())
268                        .env(check("EVOBENCH_LOG"), &**evobench_log)
269                        .env(check("BENCH_OUTPUT_LOG"), &**bench_output_log)
270                        .env(check("COMMIT_ID"), commit_id.to_string())
271                        .env(check("COMMIT_TAGS"), commit_tags)
272                        .current_dir(&dir);
273                    if let Some(dataset_dir) = &dataset_dir {
274                        command.env(check("DATASET_DIR"), dataset_dir);
275                    }
276
277                    let command_output_file = OutputCaptureLog::create(
278                        &working_directory
279                            .working_directory_path()
280                            .standard_log_path(&self.job_runner.timestamp)?,
281                    )?;
282
283                    // Add info header in YAML -- XX abstraction, and
284                    // move to / merge with `command_log_file.rs`?
285                    command_output_file
286                        .write_str(&serde_yml::to_string(&benchmarking_job_parameters)?)?;
287                    command_output_file.write_str("\n")?;
288
289                    info!(
290                        "bench_tmp_dir path, exists?: {:?}",
291                        (
292                            (&**bench_tmp_dir).as_escaped_string(),
293                            bench_tmp_dir.exists()
294                        )
295                    );
296
297                    let status = {
298                        let mut other_files: Vec<Box<dyn Write + Send + 'static>> = vec![];
299                        // Is it evil to use log_level() for this and not a
300                        // function argument?
301                        if log_level() >= LogLevel::Info {
302                            other_files.push(Box::new(stderr()));
303                        }
304                        let other_files = Arc::new(Mutex::new(other_files));
305
306                        command_output_file.run_with_capture(
307                            command,
308                            other_files,
309                            CaptureOptions {
310                                add_source_indicator: true,
311                                add_timestamp: true,
312                            },
313                        )?
314                    };
315
316                    info!(
317                        "bench_tmp_dir path, exists?: {:?}",
318                        (
319                            (&**bench_tmp_dir).as_escaped_string(),
320                            bench_tmp_dir.exists()
321                        )
322                    );
323
324                    if status.success() {
325                        info!("running {cmd_in_dir} succeeded");
326
327                        Ok((target_name, command_output_file.into_path()))
328                    } else {
329                        info!("running {cmd_in_dir} failed.");
330
331                        info!(
332                            "bench_tmp_dir path, exists?: {:?}",
333                            (
334                                (&**bench_tmp_dir).as_escaped_string(),
335                                bench_tmp_dir.exists()
336                            )
337                        );
338
339                        let last_part = command_output_file.last_part(3000)?;
340                        if log_level() < LogLevel::Info {
341                            let mut err = stderr().lock();
342                            writeln!(err, "---- run_job: error in dir {dir_str}: -------")?;
343                            err.write_all(last_part.as_bytes())?;
344                            writeln!(err, "---- /run_job: error in dir {dir_str} -------")?;
345                        }
346
347                        bail!(
348                            "benchmarking command {cmd_in_dir} gave \
349                             error status {status}, last_part {}",
350                            last_part.as_escaped_string()
351                        )
352                    }
353                },
354                Some(&benchmarking_job_parameters),
355                "run_job",
356                Some(&|| self.job_data.have_more_job_runs_for_same_commit()),
357            )?;
358
359        // Can clean up right away, we're not currently accessing the
360        // working directory any longer, OK?
361        self.job_runner
362            .working_directory_pool
363            .working_directory_cleanup(cleanup)?;
364
365        // Move the results to the right location below the
366        // `output_base_dir`, and create/update the extracts
367        {
368            // The directory holding the results of all of the runs for
369            // the same "key" information
370            let key_dir = KeyDir::from_base_target_params(
371                self.job_runner.output_base_dir.clone_arc(),
372                command.target_name.clone(),
373                run_parameters,
374            );
375
376            // Maintain a "latest" subdirectory at the top of the output
377            // dir tree: symlink the new entry into it, with timestamp as
378            // name. Create the symlink first so that it is visible even
379            // in case part of the actions creating the run output
380            // directory fails. -- Also see "latest-redir" below.
381            {
382                let latest_dir = self.job_runner.output_base_dir.join("latest");
383                create_dir_all(&latest_dir).map_err(ctx!("create_dir_all {latest_dir:?}"))?;
384
385                // To get a relative path, simply use ".." as the base
386                // dir.
387                let key_dir_relative_from_latest_dir =
388                    key_dir.replace_base_path("..".into_arc_path());
389
390                let symlink_location = latest_dir.join(self.job_runner.timestamp.as_str());
391                symlink(
392                    key_dir_relative_from_latest_dir.to_path(),
393                    &symlink_location,
394                )
395                .map_err(ctx!("creating symlink at {symlink_location:?}"))?;
396            }
397
398            // Inside the key_dir, make a dir for this particular run
399            let run_dir: RunDir = key_dir
400                .clone_arc()
401                .append_subdir(self.job_runner.timestamp.clone());
402            {
403                let path = run_dir.to_path();
404                create_dir_all(path).map_err(ctx!("create_dir_all {path:?}"))?;
405            }
406
407            // Maintain a "latest-redir" subdirectory at the top of
408            // the output dir tree: make a new subdirectory with an
409            // index.html containing a redirect to the actual
410            // directory. This keeps the URL canonical, letting one
411            // know what the context is, and allows to go to the
412            // parent directory--hence redirect into the run dir, not
413            // key dir.  -- Also see "latest" above.
414            if let Some(output_dir_url) = &self.job_runner.run_config().output_dir.url {
415                let latest_dir = self.job_runner.output_base_dir.join("latest-redir");
416                let subdir: PathBuf = latest_dir.join(self.job_runner.timestamp.as_str());
417                {
418                    let path = &subdir;
419                    create_dir_all(path).map_err(ctx!("create_dir_all {path:?}"))?;
420                }
421
422                // To get a relative path, simply use `output_dir_url` as the base
423                // dir.
424                let redirect_target = run_dir.replace_base_path(output_dir_url.into_arc_path());
425                let url = redirect_target.to_path().to_string_lossy();
426                write_redirect_html_file(&subdir.append("index.html"), &url)?;
427            }
428
429            // Move the result files to the run_dir and generate the
430            // files with the extracts.
431            {
432                info!("moving files to {}", run_dir.to_path().as_escaped_string());
433
434                // First try to compress the log file, here we check whether
435                // it exists; before we expect to compress evobench.log
436                // without checking its existence.
437                if bench_output_log.exists() {
438                    compress_file_as(&bench_output_log, run_dir.bench_output_log_path(), false)?;
439                    drop(bench_output_log);
440                }
441
442                let evobench_log_tmp =
443                    compress_file_as(&evobench_log, run_dir.evobench_log_path(), true)?;
444
445                let (target_name, standard_log_tempfile) = log_extraction;
446                compress_file_as(&standard_log_tempfile, run_dir.standard_log_path(), false)?;
447                // It's OK to delete the original now, but we'll make use of
448                // it for reading back.
449                let standard_log_tempfile = file_cleanup_handler
450                    .register_temporary_file(Deletion::file(standard_log_tempfile)?)?;
451
452                {
453                    let target = run_dir.append_str("schedule_condition.ron")?;
454                    let target_str = target.as_escaped_string();
455                    info!("saving context to {target_str}");
456                    let schedule_condition_str = ron_to_string_pretty(&schedule_condition)?;
457                    std::fs::write(&target, &schedule_condition_str)
458                        .map_err(ctx!("saving to {target_str}"))?
459                }
460
461                {
462                    let target = run_dir.append_str("reason.ron")?;
463                    let target_str = target.as_escaped_string();
464                    info!("saving context to {target_str}");
465                    let s = ron_to_string_pretty(&reason)?;
466                    std::fs::write(&target, &s).map_err(ctx!("saving to {target_str}"))?
467                }
468
469                let evobench_log_path = evobench_log.to_owned();
470                run_dir.post_process_single(
471                    Some(&evobench_log_path),
472                    move || {
473                        info!("evaluating the benchmark file succeeded");
474
475                        drop(evobench_log);
476
477                        rename_tmp_path(evobench_log_tmp)?;
478
479                        info!("compressed benchmark file renamed");
480                        Ok(())
481                    },
482                    // log extraction:
483                    target_name,
484                    &standard_log_tempfile,
485                    &self.job_runner.run_config(),
486                    // Do not omit generation of evobench.log stats
487                    false,
488                )?;
489            }
490
491            // Now that the results for the run are in the right place, we
492            // can update the summaries for the key
493            key_dir.generate_summaries_for_key_dir(
494                // Do not omit generation of evobench.log stats
495                false,
496            )?;
497        }
498
499        Ok(())
500    }
501}