evobench_tools/run/
run_queues.rs

1use std::{
2    collections::{BTreeMap, BTreeSet, btree_map::Entry},
3    ops::Neg,
4    path::PathBuf,
5    sync::Arc,
6    time::SystemTime,
7};
8
9use anyhow::{Result, bail};
10use chj_unix_util::polling_signals::PollingSignalsSender;
11use chrono::{DateTime, Local};
12use cj_path_util::path_util::AppendToPath;
13use genawaiter::rc::Gen;
14use itertools::{EitherOrBoth, Itertools};
15
16use crate::{
17    date_and_time::time_ranges::DateTimeRange,
18    git::GitHash,
19    info,
20    key_val_fs::{
21        key_val::{KeyValConfig, KeyValSync},
22        queue::{Queue, QueueGetItemOptions, QueueItem, TimeKey},
23    },
24    run::{
25        run_job::{JobRunnerJobData, JobRunnerWithJob},
26        run_queue::JobStatus,
27    },
28    serde_types::{priority::Priority, proper_filename::ProperFilename},
29    utillib::logging::{LogLevel, log_level},
30};
31
32use super::{
33    benchmarking_job::BenchmarkingJob,
34    config::{QueuesConfig, ScheduleCondition},
35    global_app_state_dir::GlobalAppStateDir,
36    run_context::RunContext,
37    run_job::JobRunner,
38    run_queue::{RunQueue, RunQueueData, RunQueueDataWithNext, RunQueueWithNext},
39};
40
41// Move, where?
42pub fn get_now_chrono() -> DateTime<Local> {
43    SystemTime::now().into()
44}
45
46#[ouroboros::self_referencing]
47pub struct RunQueues {
48    pub config: Arc<QueuesConfig>,
49    // Checked to be at least 1, at most one is `Immediately`,
50    // etc. (private field to prevent by-passing the constructor)
51    #[borrows(config)]
52    #[covariant]
53    pipeline: Vec<RunQueue<'this>>,
54
55    #[borrows(config)]
56    #[covariant]
57    erroneous_jobs_queue: Option<RunQueue<'this>>,
58
59    #[borrows(config)]
60    #[covariant]
61    done_jobs_queue: Option<RunQueue<'this>>,
62}
63
64/// A loaded copy of the on-disk data, for on-the-fly
65/// indexing/multiple traversal
66pub struct RunQueuesData<'run_queues> {
67    run_queues: &'run_queues RunQueues,
68    pipeline_data: Vec<RunQueueData<'run_queues, 'run_queues>>,
69    /// Value is (index in pipeline_data, index within its queue_data)
70    jobs_by_commit_id: BTreeMap<GitHash, Vec<(usize, usize)>>,
71}
72
73impl RunQueues {
74    /// Retuns all queues of all kinds; e.g. for migrating items.
75    pub fn all_queues<'s>(&'s self) -> impl Iterator<Item = &'s RunQueue<'s>> {
76        Gen::new(|co| async move {
77            for queue in self.pipeline() {
78                co.yield_(queue).await;
79            }
80            if let Some(queue) = self.borrow_erroneous_jobs_queue().as_ref() {
81                co.yield_(queue).await;
82            }
83            if let Some(queue) = self.borrow_done_jobs_queue().as_ref() {
84                co.yield_(queue).await;
85            }
86        })
87        .into_iter()
88    }
89
90    pub fn pipeline(&self) -> &[RunQueue<'_>] {
91        self.borrow_pipeline()
92    }
93
94    pub fn erroneous_jobs_queue(&self) -> Option<&RunQueue<'_>> {
95        self.borrow_erroneous_jobs_queue().as_ref()
96    }
97
98    pub fn done_jobs_queue(&self) -> Option<&RunQueue<'_>> {
99        self.borrow_done_jobs_queue().as_ref()
100    }
101
102    pub fn first(&self) -> &RunQueue<'_> {
103        &self.pipeline()[0]
104    }
105
106    pub fn data<'run_queues>(&'run_queues self) -> Result<RunQueuesData<'run_queues>> {
107        let pipeline_data: Vec<RunQueueData> = self
108            .pipeline()
109            .iter()
110            .map(|rq| rq.data())
111            .collect::<Result<_>>()?;
112        let mut jobs_by_commit_id = BTreeMap::default();
113        for (i, rqd) in pipeline_data.iter().enumerate() {
114            for (j, job) in rqd.jobs().enumerate() {
115                let commit_id = &job.public.run_parameters.commit_id;
116                match jobs_by_commit_id.entry(commit_id.clone()) {
117                    Entry::Vacant(vacant_entry) => {
118                        vacant_entry.insert(vec![(i, j)]);
119                    }
120                    Entry::Occupied(mut occupied_entry) => {
121                        occupied_entry.get_mut().push((i, j));
122                    }
123                }
124            }
125        }
126        Ok(RunQueuesData {
127            run_queues: self,
128            pipeline_data,
129            jobs_by_commit_id,
130        })
131    }
132
133    /// Also returns the queue following the requested one, if any
134    pub fn get_run_queue_with_next_by_name(
135        &self,
136        file_name: &ProperFilename,
137    ) -> Option<RunQueueWithNext<'_, '_>> {
138        let mut queues = self.pipeline().iter();
139        while let Some(current) = queues.next() {
140            if current.file_name == *file_name {
141                let next = queues.next();
142                return Some(RunQueueWithNext { current, next });
143            }
144        }
145        None
146    }
147
148    /// Verify that the queue configuration is valid
149    fn check_run_queues(&self) -> Result<()> {
150        let pipeline = self.pipeline();
151        let erroneous_jobs_queue = self.erroneous_jobs_queue();
152        let done_jobs_queue = self.done_jobs_queue();
153        if pipeline.is_empty() {
154            bail!(
155                "no queues defined -- need at least one, also \
156                 suggested is to add a `Inactive` as the last"
157            )
158        }
159
160        let mut check_seen = {
161            let mut seen = BTreeSet::new();
162            move |file_name: &ProperFilename| -> Result<()> {
163                if seen.contains(file_name) {
164                    bail!("duplicate queue name {file_name:?}")
165                }
166                seen.insert(file_name.clone());
167                Ok(())
168            }
169        };
170
171        let mut inactive_count = 0;
172        for run_queue in pipeline {
173            check_seen(&run_queue.file_name)?;
174            match run_queue.schedule_condition {
175                ScheduleCondition::Immediately { situation: _ } => (),
176                ScheduleCondition::LocalNaiveTimeWindow {
177                    priority: _,
178                    situation: _,
179                    stop_start,
180                    repeatedly: _,
181                    move_when_time_window_ends: _,
182                    from: _,
183                    to: _,
184                } => {
185                    if let Some(stop_start) = &stop_start {
186                        if stop_start.is_empty() {
187                            bail!(
188                                "`LocalNaiveTimeWindow.stop_start` was given \
189                                 but is the empty list, require at least a program name/path"
190                            )
191                        }
192                    }
193                }
194                ScheduleCondition::Inactive => inactive_count += 1,
195            }
196        }
197        if inactive_count > 1 {
198            bail!("can have at most one `Inactive` queue");
199        }
200        if inactive_count > 0 {
201            if *pipeline
202                .last()
203                .expect("checked in the if condition")
204                .schedule_condition
205                != ScheduleCondition::Inactive
206            {
207                bail!("`Inactive` queue must be the last in the pipeline")
208            }
209        }
210
211        let mut check_extra_queue = |name: &str, run_queue: Option<&RunQueue>| -> Result<()> {
212            if let Some(run_queue) = run_queue {
213                check_seen(&run_queue.file_name)?;
214                if !run_queue.schedule_condition.is_inactive() {
215                    bail!("the `{name}` must be of kind `Inactive`")
216                }
217            }
218            Ok(())
219        };
220        check_extra_queue("erroneous_jobs_queue", erroneous_jobs_queue)?;
221        check_extra_queue("done_jobs_queue", done_jobs_queue)?;
222
223        Ok(())
224    }
225
226    pub fn open(
227        config: Arc<QueuesConfig>,
228        create_dirs_if_not_exist: bool,
229        global_app_state_dir: &GlobalAppStateDir,
230        signal_change: Option<PollingSignalsSender>,
231    ) -> Result<Self> {
232        let run_queues_basedir =
233            config.run_queues_basedir(create_dirs_if_not_exist, global_app_state_dir)?;
234
235        fn make_run_queue<'this>(
236            (filename, schedule_condition): &'this (ProperFilename, ScheduleCondition),
237            run_queues_basedir: &PathBuf,
238            create_dirs_if_not_exist: bool,
239            signal_change: Option<PollingSignalsSender>,
240        ) -> Result<RunQueue<'this>> {
241            let run_queue_path = (&run_queues_basedir).append(filename.as_str());
242            Ok(RunQueue {
243                file_name: filename.clone(),
244                schedule_condition,
245                queue: Queue::<BenchmarkingJob>::open(
246                    &run_queue_path,
247                    KeyValConfig {
248                        sync: KeyValSync::All,
249                        create_dir_if_not_exists: create_dirs_if_not_exist,
250                    },
251                    signal_change,
252                )?,
253            })
254        }
255
256        let slf = Self::try_new(
257            config,
258            // pipeline:
259            |config| -> Result<_> {
260                let queues = config
261                    .pipeline
262                    .iter()
263                    .map(|cfg| {
264                        make_run_queue(
265                            cfg,
266                            &run_queues_basedir,
267                            create_dirs_if_not_exist,
268                            signal_change.clone(),
269                        )
270                    })
271                    .collect::<Result<_>>()?;
272                Ok(queues)
273            },
274            // erroneous_jobs_queue:
275            |config| {
276                if let Some(cfg) = config.erroneous_jobs_queue.as_ref() {
277                    Ok(Some(make_run_queue(
278                        cfg,
279                        &run_queues_basedir,
280                        create_dirs_if_not_exist,
281                        signal_change.clone(),
282                    )?))
283                } else {
284                    Ok(None)
285                }
286            },
287            // done_jobs_queue:
288            |config| {
289                if let Some(cfg) = config.done_jobs_queue.as_ref() {
290                    Ok(Some(make_run_queue(
291                        cfg,
292                        &run_queues_basedir,
293                        create_dirs_if_not_exist,
294                        signal_change.clone(),
295                    )?))
296                } else {
297                    Ok(None)
298                }
299            },
300        )?;
301
302        slf.check_run_queues()?;
303
304        Ok(slf)
305    }
306}
307
308impl<'run_queues> RunQueuesData<'run_queues> {
309    /// For things to be run from e.g. run_job
310    /// (e.g. `regenerate_index_files` function)
311    pub fn run_queues(&self) -> &'run_queues RunQueues {
312        self.run_queues
313    }
314
315    /// Values are (index in pipeline_data, index within its queue_data)
316    pub fn jobs_by_commit_id(&self, commit_id: &GitHash) -> &[(usize, usize)] {
317        self.jobs_by_commit_id
318            .get(commit_id)
319            .map(|v| v.as_slice())
320            .unwrap_or([].as_slice())
321    }
322
323    pub fn have_job_with_commit_id(&self, commit_id: &GitHash) -> bool {
324        !self.jobs_by_commit_id(commit_id).is_empty()
325    }
326
327    /// Iterator over all entries for that commit id. Still efficient,
328    /// since it just returns references to existing tuples.--Not
329    /// actually used, might it be useful in the future?
330    #[allow(unused)]
331    fn entries_by_commit_id(
332        &self,
333        commit_id: &GitHash,
334    ) -> impl Iterator<Item = &(TimeKey, BenchmarkingJob, Priority)> {
335        self.jobs_by_commit_id(commit_id)
336            .iter()
337            .map(|(i, j)| self.pipeline_data[*i].entry(*j))
338    }
339
340    /// The `RunQueueData`s paired with their successor (still in the
341    /// original, configured, order)
342    pub fn run_queue_with_nexts<'s>(
343        &'s self,
344    ) -> impl Iterator<Item = RunQueueDataWithNext<'run_queues, 'run_queues, 's>> {
345        self.pipeline_data
346            .iter()
347            .zip_longest(self.pipeline_data.iter().skip(1))
348            .map(|either_or_both| match either_or_both {
349                EitherOrBoth::Both(current, next) => RunQueueDataWithNext {
350                    current,
351                    next: Some(next),
352                },
353                EitherOrBoth::Left(current) => RunQueueDataWithNext {
354                    current,
355                    next: None,
356                },
357                EitherOrBoth::Right(_) => unreachable!("because the left sequence is longer"),
358            })
359    }
360
361    /// All queues which are runnable at the given time, with their
362    /// successor queue, and calculated time window if any
363    fn active_queues<'s>(
364        &'s self,
365        reference_time: DateTime<Local>,
366    ) -> impl Iterator<
367        Item = (
368            RunQueueDataWithNext<'run_queues, 'run_queues, 's>,
369            Option<DateTimeRange<Local>>,
370        ),
371    > {
372        self.run_queue_with_nexts().filter_map(move |rq| {
373            if let Some(range) = rq
374                .current
375                .run_queue()
376                .schedule_condition
377                .is_runnable_at(reference_time)
378            {
379                Some((rq, range))
380            } else {
381                None
382            }
383        })
384    }
385
386    /// The most prioritized job across all runnable queues
387    fn most_prioritized_job<'s, 'conf, 'r, 'rc>(
388        &'s self,
389        now: DateTime<Local>,
390    ) -> Result<
391        Option<(
392            RunQueueDataWithNext<'run_queues, 'run_queues, 's>,
393            Option<DateTimeRange<Local>>,
394            QueueItem<'run_queues, BenchmarkingJob>,
395            &'s BenchmarkingJob,
396            Priority,
397        )>,
398    > {
399        let verbose = log_level() >= LogLevel::Info;
400
401        // Get the single most prioritized job from each queue (if
402        // any), then of those the most prioritized one. Using
403        // min_by_key since this takes the first of the equal jobs,
404        // unlike max_by_key.
405        if let Some(((key, job, prio), rq, dtr)) = self
406            .active_queues(now)
407            .filter_map(|(rq, dtr)| -> Option<_> {
408                let entry = rq
409                    .current
410                    .entries()
411                    .min_by_key(|(_, _, job_priority)| job_priority.neg())?;
412
413                Some((entry, rq, dtr))
414            })
415            .min_by_key(|((_, _, job_priority), _, _)| job_priority.neg())
416        {
417            if let Some(item) = rq.current.run_queue().queue.get_item(
418                key,
419                QueueGetItemOptions {
420                    verbose,
421                    no_lock: true,
422                    error_when_locked: false,
423                    delete_first: false,
424                },
425            )? {
426                Ok(Some((rq, dtr, item, job, *prio)))
427            } else {
428                info!("entry {key} has disappeared in the mean time, skipping it");
429                Ok(None)
430            }
431        } else {
432            Ok(None)
433        }
434    }
435
436    /// Run the first or most prioritized job in the queues. Returns
437    /// the job and its status after running it if one was found,
438    /// None if all runnable queues are empty.
439    ///
440    /// This method needs to be run in a loop forever for daemon style
441    /// processing. The reason this doesn't do the looping inside is
442    /// to allow for a reload of the queue config and then
443    /// queues. `current_stop_start`, if given, represents an active
444    /// `stop_start` command that was run with `stop` and now needs a
445    /// `start` when the next running action does not require the same
446    /// command to be `stop`ed. Likewise, this method returns the
447    /// active `stop_start` command, if any, by the time it
448    /// returns. The caller should pass that back into this method on
449    /// the next iteration. Using SliceOrBox to allow carrying it over
450    /// a config reload. `now` should be the current time (at least is
451    /// understood as such), get it via `get_now_chrono()` right
452    /// before calling this method.
453    pub fn run_next_job<'s, 'conf, 'r, 'rc>(
454        &'s self,
455        job_runner: JobRunner,
456        run_context: &mut RunContext,
457    ) -> Result<Option<(&'s BenchmarkingJob, JobStatus)>> {
458        let timestamp_local = job_runner.timestamp_local();
459
460        let job = self.most_prioritized_job(timestamp_local)?;
461
462        if let Some((rqdwn, dtr, item, job, _)) = job {
463            let rq = rqdwn.current.run_queue();
464            run_context.stop_start_be(rq.schedule_condition.stop_start())?;
465            if let Some(dtr) = dtr {
466                run_context.running_job_in_windowed_queue(rq, dtr);
467            }
468
469            let working_directory_id;
470            {
471                let mut lock = job_runner
472                    .working_directory_pool
473                    .lock_mut("RunQueuesData.run_next_job")?;
474                working_directory_id =
475                    lock.get_a_working_directory_for(&job.public.run_parameters, self)?;
476
477                lock.clear_current_working_directory()?;
478            }
479
480            let job_status = rqdwn.run_queue_with_next().run_job(
481                &item,
482                &mut JobRunnerWithJob {
483                    job_runner,
484                    job_data: JobRunnerJobData {
485                        job,
486                        run_queues_data: self,
487                    },
488                },
489                self.run_queues.erroneous_jobs_queue(),
490                self.run_queues.done_jobs_queue(),
491                working_directory_id,
492            )?;
493
494            Ok(Some((job, job_status)))
495        } else {
496            run_context.stop_start_be(None)?;
497
498            Ok(None)
499        }
500    }
501}