1use std::{
4 fs::{create_dir_all, remove_file},
5 io::{Write, stderr},
6 ops::Deref,
7 os::unix::fs::symlink,
8 path::{Path, PathBuf},
9 sync::{Arc, Mutex},
10};
11
12use anyhow::{Result, bail};
13use chrono::{DateTime, Local};
14use cj_path_util::path_util::{AppendToPath, rename_tmp_path};
15use itertools::Itertools;
16use nix::unistd::getpid;
17use regex::Regex;
18
19use crate::{
20 config_file::ron_to_string_pretty,
21 ctx,
22 git::GitHash,
23 git_tags::GitTags,
24 html_files::write_redirect_html_file,
25 info,
26 io_utils::output_capture_log::{CaptureOptions, OutputCaptureLog},
27 run::{
28 bench_tmp_dir::bench_tmp_dir,
29 benchmarking_job::BenchmarkingJob,
30 config::{RunConfig, ShareableConfig},
31 dataset_dir_env_var::dataset_dir_for,
32 env_vars::assert_evobench_env_var,
33 key::{BenchmarkingJobParameters, RunParameters},
34 output_directory::{
35 post_process::compress_file_as,
36 structure::{KeyDir, ReplaceBasePath, RunDir, ToPath},
37 },
38 run_queues::RunQueuesData,
39 versioned_dataset_dir::VersionedDatasetDir,
40 working_directory::{FetchTags, FetchedTags, WorkingDirectory},
41 },
42 serde_types::{date_and_time::DateTimeWithOffset, proper_dirname::ProperDirname},
43 utillib::{
44 arc::CloneArc,
45 cleanup_daemon::{CleanupHandler, Deletion},
46 escaped_display::AsEscapedString,
47 into_arc_path::IntoArcPath,
48 logging::{LogLevel, log_level},
49 },
50};
51
52use super::{
53 config::{BenchmarkingCommand, ScheduleCondition},
54 working_directory_pool::{WorkingDirectoryId, WorkingDirectoryPool},
55};
56
57pub fn get_commit_tags(
61 working_dir: &WorkingDirectory,
62 commit_id: &GitHash,
63 re: &Regex,
64 fetched_tags: FetchedTags,
65) -> Result<String> {
66 if fetched_tags != FetchedTags::Yes {
67 bail!("need up to date tags, but got {fetched_tags:?}")
68 }
69
70 let git_working_dir = &working_dir.git_working_dir;
71
72 let git_tags = GitTags::from_dir(git_working_dir)?;
73 let s = git_tags
76 .get_by_commit(&commit_id)
77 .filter(|s| re.is_match(s))
78 .join(",");
79 Ok(s)
80}
81
82pub struct JobRunner<'pool> {
87 pub working_directory_pool: &'pool mut WorkingDirectoryPool,
88 pub output_base_dir: &'pool Arc<Path>,
89 pub timestamp: DateTimeWithOffset,
91 pub shareable_config: &'pool ShareableConfig,
93 pub versioned_dataset_dir: &'pool VersionedDatasetDir,
95 pub file_cleanup_handler: &'pool CleanupHandler,
96}
97
98impl<'pool> JobRunner<'pool> {
99 pub fn timestamp_local(&self) -> DateTime<Local> {
100 self.timestamp.to_datetime().into()
102 }
103
104 pub fn run_config(&self) -> &'pool RunConfig {
105 &self.shareable_config.run_config
106 }
107}
108
109pub struct JobRunnerJobData<'run_queues, 'j, 's> {
110 pub job: &'j BenchmarkingJob,
111 pub run_queues_data: &'s RunQueuesData<'run_queues>,
112}
113
114pub struct JobRunnerWithJob<'pool, 'run_queues, 'j, 's> {
115 pub job_runner: JobRunner<'pool>,
116 pub job_data: JobRunnerJobData<'run_queues, 'j, 's>,
117}
118
119impl<'run_queues, 'j, 's> JobRunnerJobData<'run_queues, 'j, 's> {
120 pub fn have_more_job_runs_for_same_commit(&self) -> bool {
123 if self.job.state.remaining_count > 1 {
126 return true;
127 }
128
129 self.run_queues_data
133 .jobs_by_commit_id(&self.job.public.run_parameters.commit_id)
134 .len()
135 > 1
136 }
137}
138
139impl<'pool, 'run_queues, 'j, 's> JobRunnerWithJob<'pool, 'run_queues, 'j, 's> {
140 pub fn run_job(
141 &mut self,
142 working_directory_id: WorkingDirectoryId,
143 reason: &Option<String>,
144 schedule_condition: &ScheduleCondition,
145 ) -> Result<()> {
146 let benchmarking_job_parameters = self.job_data.job.benchmarking_job_parameters();
149
150 let BenchmarkingJobParameters {
151 run_parameters,
152 command,
153 } = &benchmarking_job_parameters;
154 let RunParameters {
155 commit_id,
156 custom_parameters,
157 } = run_parameters.deref();
158
159 let bench_tmp_dir = &bench_tmp_dir()?;
160 info!(
161 "bench_tmp_dir path, exists?: {:?}",
162 (
163 (&**bench_tmp_dir).as_escaped_string(),
164 bench_tmp_dir.exists()
165 )
166 );
167
168 let file_cleanup_handler = self.job_runner.file_cleanup_handler;
169
170 let evobench_log;
172 let bench_output_log;
174 {
175 let pid = getpid();
176 evobench_log = file_cleanup_handler.register_temporary_file(Deletion::file(
177 bench_tmp_dir.append(format!("evobench-{pid}.log")),
178 )?)?;
179
180 bench_output_log = file_cleanup_handler.register_temporary_file(Deletion::file(
181 bench_tmp_dir.append(format!("bench-output-{pid}.log")),
182 )?)?;
183 }
184
185 let _ = remove_file(&evobench_log);
191 let _ = remove_file(&bench_output_log);
192
193 let conf = self.job_runner.run_config();
194
195 let (log_extraction, cleanup) = self
196 .job_runner
197 .working_directory_pool
198 .process_in_working_directory(
199 working_directory_id,
200 &self.job_runner.timestamp,
201 |mut working_directory| -> Result<(&ProperDirname, PathBuf)> {
202 let fetched_tags = working_directory
215 .get()
216 .expect("not removed")
217 .checkout(commit_id.clone(), FetchTags::Always)?;
218
219 let working_directory = working_directory.into_inner().expect("not removed");
221
222 let dataset_dir = dataset_dir_for(
223 conf.versioned_datasets_base_dir.as_deref(),
224 &custom_parameters,
225 self.job_runner.versioned_dataset_dir,
226 &working_directory.git_working_dir,
227 commit_id,
228 fetched_tags.clone(),
229 )?;
230
231 let commit_tags = get_commit_tags(
232 &working_directory,
233 &commit_id,
234 &conf.commit_tags_regex,
235 fetched_tags,
236 )?;
237
238 let BenchmarkingCommand {
239 target_name,
240 subdir,
241 command,
242 arguments,
243 pre_exec_bash_code,
244 } = command.deref();
245
246 let mut command = pre_exec_bash_code
247 .to_run_with_pre_exec(conf)
248 .command(command, arguments);
249
250 let dir = working_directory
251 .git_working_dir
252 .working_dir_path_ref()
253 .append(subdir);
254 let dir_str = dir.as_escaped_string();
255
256 let cmd_in_dir = format!("command {command:?} in directory {dir_str}");
258
259 info!(
260 "running {cmd_in_dir}, EVOBENCH_LOG={}...",
261 evobench_log.as_escaped_string()
262 );
263
264 let check = assert_evobench_env_var;
265
266 command
267 .envs(custom_parameters.btree_map())
268 .env(check("EVOBENCH_LOG"), &**evobench_log)
269 .env(check("BENCH_OUTPUT_LOG"), &**bench_output_log)
270 .env(check("COMMIT_ID"), commit_id.to_string())
271 .env(check("COMMIT_TAGS"), commit_tags)
272 .current_dir(&dir);
273 if let Some(dataset_dir) = &dataset_dir {
274 command.env(check("DATASET_DIR"), dataset_dir);
275 }
276
277 let command_output_file = OutputCaptureLog::create(
278 &working_directory
279 .working_directory_path()
280 .standard_log_path(&self.job_runner.timestamp)?,
281 )?;
282
283 command_output_file
286 .write_str(&serde_yml::to_string(&benchmarking_job_parameters)?)?;
287 command_output_file.write_str("\n")?;
288
289 info!(
290 "bench_tmp_dir path, exists?: {:?}",
291 (
292 (&**bench_tmp_dir).as_escaped_string(),
293 bench_tmp_dir.exists()
294 )
295 );
296
297 let status = {
298 let mut other_files: Vec<Box<dyn Write + Send + 'static>> = vec![];
299 if log_level() >= LogLevel::Info {
302 other_files.push(Box::new(stderr()));
303 }
304 let other_files = Arc::new(Mutex::new(other_files));
305
306 command_output_file.run_with_capture(
307 command,
308 other_files,
309 CaptureOptions {
310 add_source_indicator: true,
311 add_timestamp: true,
312 },
313 )?
314 };
315
316 info!(
317 "bench_tmp_dir path, exists?: {:?}",
318 (
319 (&**bench_tmp_dir).as_escaped_string(),
320 bench_tmp_dir.exists()
321 )
322 );
323
324 if status.success() {
325 info!("running {cmd_in_dir} succeeded");
326
327 Ok((target_name, command_output_file.into_path()))
328 } else {
329 info!("running {cmd_in_dir} failed.");
330
331 info!(
332 "bench_tmp_dir path, exists?: {:?}",
333 (
334 (&**bench_tmp_dir).as_escaped_string(),
335 bench_tmp_dir.exists()
336 )
337 );
338
339 let last_part = command_output_file.last_part(3000)?;
340 if log_level() < LogLevel::Info {
341 let mut err = stderr().lock();
342 writeln!(err, "---- run_job: error in dir {dir_str}: -------")?;
343 err.write_all(last_part.as_bytes())?;
344 writeln!(err, "---- /run_job: error in dir {dir_str} -------")?;
345 }
346
347 bail!(
348 "benchmarking command {cmd_in_dir} gave \
349 error status {status}, last_part {}",
350 last_part.as_escaped_string()
351 )
352 }
353 },
354 Some(&benchmarking_job_parameters),
355 "run_job",
356 Some(&|| self.job_data.have_more_job_runs_for_same_commit()),
357 )?;
358
359 self.job_runner
362 .working_directory_pool
363 .working_directory_cleanup(cleanup)?;
364
365 {
368 let key_dir = KeyDir::from_base_target_params(
371 self.job_runner.output_base_dir.clone_arc(),
372 command.target_name.clone(),
373 run_parameters,
374 );
375
376 {
382 let latest_dir = self.job_runner.output_base_dir.join("latest");
383 create_dir_all(&latest_dir).map_err(ctx!("create_dir_all {latest_dir:?}"))?;
384
385 let key_dir_relative_from_latest_dir =
388 key_dir.replace_base_path("..".into_arc_path());
389
390 let symlink_location = latest_dir.join(self.job_runner.timestamp.as_str());
391 symlink(
392 key_dir_relative_from_latest_dir.to_path(),
393 &symlink_location,
394 )
395 .map_err(ctx!("creating symlink at {symlink_location:?}"))?;
396 }
397
398 let run_dir: RunDir = key_dir
400 .clone_arc()
401 .append_subdir(self.job_runner.timestamp.clone());
402 {
403 let path = run_dir.to_path();
404 create_dir_all(path).map_err(ctx!("create_dir_all {path:?}"))?;
405 }
406
407 if let Some(output_dir_url) = &self.job_runner.run_config().output_dir.url {
415 let latest_dir = self.job_runner.output_base_dir.join("latest-redir");
416 let subdir: PathBuf = latest_dir.join(self.job_runner.timestamp.as_str());
417 {
418 let path = &subdir;
419 create_dir_all(path).map_err(ctx!("create_dir_all {path:?}"))?;
420 }
421
422 let redirect_target = run_dir.replace_base_path(output_dir_url.into_arc_path());
425 let url = redirect_target.to_path().to_string_lossy();
426 write_redirect_html_file(&subdir.append("index.html"), &url)?;
427 }
428
429 {
432 info!("moving files to {}", run_dir.to_path().as_escaped_string());
433
434 if bench_output_log.exists() {
438 compress_file_as(&bench_output_log, run_dir.bench_output_log_path(), false)?;
439 drop(bench_output_log);
440 }
441
442 let evobench_log_tmp =
443 compress_file_as(&evobench_log, run_dir.evobench_log_path(), true)?;
444
445 let (target_name, standard_log_tempfile) = log_extraction;
446 compress_file_as(&standard_log_tempfile, run_dir.standard_log_path(), false)?;
447 let standard_log_tempfile = file_cleanup_handler
450 .register_temporary_file(Deletion::file(standard_log_tempfile)?)?;
451
452 {
453 let target = run_dir.append_str("schedule_condition.ron")?;
454 let target_str = target.as_escaped_string();
455 info!("saving context to {target_str}");
456 let schedule_condition_str = ron_to_string_pretty(&schedule_condition)?;
457 std::fs::write(&target, &schedule_condition_str)
458 .map_err(ctx!("saving to {target_str}"))?
459 }
460
461 {
462 let target = run_dir.append_str("reason.ron")?;
463 let target_str = target.as_escaped_string();
464 info!("saving context to {target_str}");
465 let s = ron_to_string_pretty(&reason)?;
466 std::fs::write(&target, &s).map_err(ctx!("saving to {target_str}"))?
467 }
468
469 let evobench_log_path = evobench_log.to_owned();
470 run_dir.post_process_single(
471 Some(&evobench_log_path),
472 move || {
473 info!("evaluating the benchmark file succeeded");
474
475 drop(evobench_log);
476
477 rename_tmp_path(evobench_log_tmp)?;
478
479 info!("compressed benchmark file renamed");
480 Ok(())
481 },
482 target_name,
484 &standard_log_tempfile,
485 &self.job_runner.run_config(),
486 false,
488 )?;
489 }
490
491 key_dir.generate_summaries_for_key_dir(
494 false,
496 )?;
497 }
498
499 Ok(())
500 }
501}