evobench_tools/run/
run_queue.rs

1use anyhow::Result;
2
3use crate::{
4    config_file::ron_to_string_pretty,
5    info,
6    key_val_fs::{
7        key_val::{KeyVal, KeyValError},
8        queue::{Queue, QueueGetItemOptions, QueueItem, QueueIterationOptions, TimeKey},
9    },
10    run::{benchmarking_job::BenchmarkingJobState, run_job::JobRunnerWithJob},
11    serde_types::{priority::Priority, proper_filename::ProperFilename},
12    utillib::logging::{LogLevel, log_level},
13};
14
15use super::{
16    benchmarking_job::{BenchmarkingJob, BenchmarkingJobPublic},
17    config::ScheduleCondition,
18    working_directory_pool::WorkingDirectoryId,
19};
20
21#[derive(Debug, PartialEq)]
22pub struct RunQueue<'conf> {
23    pub file_name: ProperFilename,
24    pub schedule_condition: &'conf ScheduleCondition,
25    pub queue: Queue<BenchmarkingJob>,
26}
27
28/// A loaded copy of the on-disk data, for on-the-fly
29/// indexing/multiple traversal
30#[derive(Debug, PartialEq)]
31pub struct RunQueueData<'conf, 'run_queue> {
32    run_queue: &'run_queue RunQueue<'conf>,
33    /// The queue items, with total priority from job and queue
34    queue_data: Vec<(TimeKey, BenchmarkingJob, Priority)>,
35}
36
37impl<'conf, 'run_queue> RunQueueData<'conf, 'run_queue> {
38    pub fn run_queue(&self) -> &'run_queue RunQueue<'conf> {
39        self.run_queue
40    }
41    pub fn jobs(&self) -> impl Iterator<Item = &BenchmarkingJob> {
42        self.queue_data.iter().map(|(_, job, _)| job)
43    }
44    /// Priority already includes the queue priority here.
45    pub fn entries(&self) -> impl Iterator<Item = &(TimeKey, BenchmarkingJob, Priority)> {
46        self.queue_data.iter()
47    }
48    /// Panics for invalid i
49    pub fn entry(&self, i: usize) -> &(TimeKey, BenchmarkingJob, Priority) {
50        &self.queue_data[i]
51    }
52}
53
54impl<'conf> RunQueue<'conf> {
55    pub fn push_front(&self, job: &BenchmarkingJob) -> Result<(), KeyValError> {
56        self.queue.push_front(job)
57    }
58
59    // XX OK? see warning on queue.key_val() method! -- and does that
60    // allow to mutate the queue, bypassing the use of
61    // signal_queues_change?
62    pub fn key_val(&self) -> &KeyVal<TimeKey, BenchmarkingJob> {
63        self.queue.key_val()
64    }
65
66    pub fn data<'run_queue>(&'run_queue self) -> Result<RunQueueData<'conf, 'run_queue>> {
67        let queue_data = self
68            .jobs()
69            .map(|r| -> Result<_> {
70                let (queue_item, job) = r?;
71                let queue_priority = self
72                    .schedule_condition
73                    .priority()
74                    .expect("no inactive queue in pipeline");
75                let priority = (job.priority()? + queue_priority)?;
76                Ok((queue_item.key()?, job, priority))
77            })
78            .collect::<Result<_>>()?;
79        Ok(RunQueueData {
80            run_queue: self,
81            queue_data,
82        })
83    }
84
85    /// NOTE: this returns unlocked `QueueItem`s! Call
86    /// `lock_exclusive()` on them to lock them afterwards.
87    // This is obsolete as public method and only used via the `data` method.
88    fn jobs<'s>(
89        &'s self,
90    ) -> impl Iterator<Item = Result<(QueueItem<'s, BenchmarkingJob>, BenchmarkingJob), KeyValError>>
91    + use<'s> {
92        let opts = QueueIterationOptions {
93            get_item_opts: QueueGetItemOptions {
94                no_lock: true,
95                error_when_locked: false,
96                verbose: log_level() >= LogLevel::Info,
97                delete_first: false,
98            },
99            wait: false,
100            stop_at: None,
101            reverse: false,
102        };
103        self.queue.items(opts)
104    }
105}
106
107/// A `RunQueue` paired with its optional successor `RunQueue` (the
108/// queue where jobs go next)
109#[derive(Debug, PartialEq, Clone, Copy)]
110pub struct RunQueueWithNext<'conf, 'r> {
111    pub current: &'r RunQueue<'conf>,
112    pub next: Option<&'r RunQueue<'conf>>,
113}
114
115/// A `RunQueueData` paired with its optional successor `RunQueueData` (the
116/// queue where jobs go next)
117#[derive(Debug, PartialEq, Clone, Copy)]
118pub struct RunQueueDataWithNext<'conf, 'run_queue, 'r> {
119    pub current: &'r RunQueueData<'conf, 'run_queue>,
120    pub next: Option<&'r RunQueueData<'conf, 'run_queue>>,
121}
122
123/// The status that a job is in (after running
124/// `RunQueueWithNext::run_job`, but perhaps also useful more
125/// generally?) (This is related to, but currently independent of, the
126/// `run::working_directory::Status` type which is currently used to
127/// determine "R" status column in `evobench list`.)
128#[derive(Clone, Copy)]
129#[must_use]
130pub enum JobStatus {
131    /// It will still be run
132    Active,
133    /// Job is finished; the argument says if retained in a
134    /// `done_jobs_queue`. Note that this can happen even if
135    /// `remaining_count` is non-zero; relevant is having reached the
136    /// end of the pipeline.
137    Done(bool),
138    /// Positioned in a queue with ScheduleCondition::Inactive (other
139    /// than the done queue)
140    Inactive,
141    /// Job ran out of `remaining_count` *before* reaching the end of
142    /// the pipeline
143    Dropped,
144    /// Ran out of error budget; the argument says if retained in an
145    /// `erroneous_jobs_queue`
146    Error(bool),
147}
148
149impl JobStatus {
150    /// If this returns false, then e.g. cache files retained for the
151    /// whole job execution can be deleted.
152    pub fn can_run_again(self) -> bool {
153        match self {
154            JobStatus::Active => true,
155            JobStatus::Done(_) | JobStatus::Inactive | JobStatus::Dropped | JobStatus::Error(_) => {
156                false
157            }
158        }
159    }
160}
161
162impl<'conf, 'r> RunQueueWithNext<'conf, 'r> {
163    /// Run the given job, which must be from this queue. `item`
164    /// represents the queue entry of this job, and is used for
165    /// locking and deletion--it must not already be locked!
166    ///
167    /// Returns the status of the job after running it.
168    pub fn run_job(
169        &self,
170        item: &QueueItem<BenchmarkingJob>,
171        job_runner_with_job: &mut JobRunnerWithJob,
172        erroneous_jobs_queue: Option<&RunQueue>,
173        done_jobs_queue: Option<&RunQueue>,
174        working_directory_id: WorkingDirectoryId,
175    ) -> Result<JobStatus> {
176        let _lock = item.lock_exclusive()?;
177
178        let BenchmarkingJobState {
179            remaining_count,
180            mut remaining_error_budget,
181            last_working_directory: _,
182        } = job_runner_with_job.job_data.job.state.clone();
183
184        let finish_completed_job = |remaining_count| -> Result<JobStatus> {
185            let job = job_runner_with_job
186                .job_data
187                .job
188                .clone_for_queue_reinsertion(BenchmarkingJobState {
189                    remaining_count,
190                    remaining_error_budget,
191                    last_working_directory: Some(working_directory_id),
192                });
193            info!(
194                "job completed: {}",
195                ron_to_string_pretty(&job).expect("no err")
196            );
197            let retained = if let Some(done_jobs_queue) = done_jobs_queue {
198                done_jobs_queue.push_front(&job)?;
199                true
200            } else {
201                false
202            };
203            Ok(JobStatus::Done(retained))
204        };
205
206        let handle_out_of_error_budget = || -> Result<JobStatus> {
207            let job = job_runner_with_job
208                .job_data
209                .job
210                .clone_for_queue_reinsertion(BenchmarkingJobState {
211                    remaining_count,
212                    remaining_error_budget: 0,
213                    last_working_directory: Some(working_directory_id),
214                });
215
216            let retained = if let Some(queue) = &erroneous_jobs_queue {
217                queue.push_front(&job)?;
218                true
219            } else {
220                info!(
221                    "job dropped due to running out of error budget \
222                     and no configured erroneous_jobs_queue: {}",
223                    ron_to_string_pretty(&job).expect("no err")
224                );
225                false
226            };
227            Ok(JobStatus::Error(retained))
228        };
229
230        let BenchmarkingJobPublic {
231            reason,
232            // Getting these via job.benchmarking_job_parameters() instead
233            run_parameters: _,
234            command: _,
235        } = job_runner_with_job.job_data.job.public.clone();
236
237        let job_status;
238
239        if remaining_error_budget > 0 {
240            if remaining_count > 0 {
241                if let Err(error) = job_runner_with_job.run_job(
242                    working_directory_id,
243                    &reason,
244                    &self.current.schedule_condition,
245                ) {
246                    remaining_error_budget = remaining_error_budget - 1;
247
248                    // XX this should use more important error
249                    // logging than info!; (XX also, repetitive
250                    // BenchmarkingJob recreation and cloning.)
251                    info!(
252                        "job gave error: {}: {error:#?}",
253                        // XX: give job_runner_ext as the context? And
254                        // anyway, todo layered error zones.
255                        ron_to_string_pretty(&job_runner_with_job.job_data.job).expect("no err")
256                    );
257                    if remaining_error_budget > 0 {
258                        // Re-schedule
259                        let job = job_runner_with_job
260                            .job_data
261                            .job
262                            .clone_for_queue_reinsertion(BenchmarkingJobState {
263                                remaining_count,
264                                remaining_error_budget,
265                                last_working_directory: Some(working_directory_id),
266                            });
267                        self.current.push_front(&job)?;
268                        job_status = JobStatus::Active;
269                    } else {
270                        job_status = handle_out_of_error_budget()?;
271                    }
272                } else {
273                    let remaining_count = remaining_count - 1;
274                    if remaining_count > 0 {
275                        let maybe_queue;
276                        match self.current.schedule_condition {
277                            ScheduleCondition::Immediately { situation: _ } => {
278                                // Job is always going to the next queue
279                                maybe_queue = self.next;
280                            }
281                            ScheduleCondition::LocalNaiveTimeWindow {
282                                priority: _,
283                                situation: _,
284                                stop_start: _,
285                                repeatedly,
286                                move_when_time_window_ends: _,
287                                from: _,
288                                to: _,
289                            } => {
290                                if *repeatedly {
291                                    // Job is going to the current queue (as
292                                    // long as `to` has not been reached,
293                                    // otherwise the next queue, but then will
294                                    // move them all anyway once running out,
295                                    // so doesn't matter, and won't parse `to`
296                                    // time here, because need to do that
297                                    // before we start, hence using `stop_at`
298                                    // for that. Thus, simply:)
299                                    maybe_queue = Some(self.current);
300                                } else {
301                                    maybe_queue = self.next;
302                                }
303                            }
304                            ScheduleCondition::Inactive => {
305                                unreachable!("already returned at beginning of function")
306                            }
307                        }
308
309                        let job = job_runner_with_job
310                            .job_data
311                            .job
312                            .clone_for_queue_reinsertion(BenchmarkingJobState {
313                                remaining_count,
314                                remaining_error_budget,
315                                last_working_directory: Some(working_directory_id),
316                            });
317                        if let Some(queue) = maybe_queue {
318                            queue.push_front(&job)?;
319                            job_status = JobStatus::Active;
320                        } else {
321                            info!(
322                                "job dropping off the pipeline: {}",
323                                ron_to_string_pretty(&job).expect("no err")
324                            );
325                            job_status = JobStatus::Dropped;
326                        }
327                    } else {
328                        job_status = finish_completed_job(remaining_count)?;
329                    }
330                }
331            } else {
332                info!(
333                    "should never get here normally: job stored in normal queue \
334                     with remaining_count 0"
335                );
336                job_status = finish_completed_job(remaining_count)?;
337            }
338        } else {
339            info!("Job already had no error budget; should not be possible?");
340            job_status = handle_out_of_error_budget()?;
341        }
342        item.delete()?;
343        Ok(job_status)
344    }
345
346    pub fn handle_timeout(&self) -> Result<()> {
347        info!(
348            "ran out of time in queue {:?}",
349            self.current.file_name.as_str()
350        );
351        if self.current.schedule_condition.move_when_time_window_ends() {
352            let mut count = 0;
353            for entry in self.current.queue.sorted_entries(false, None, false)? {
354                // XX continue in the face of
355                // errors? Just globally in
356                // the queue?
357                let mut entry = entry?;
358                let val = entry.get()?;
359                if let Some(next) = self.next {
360                    next.push_front(&val)?;
361                }
362                entry.delete()?;
363                count += 1;
364            }
365            info!(
366                "moved {count} entries to queue {:?}",
367                self.next.map(|q| &q.file_name)
368            );
369        }
370        Ok(())
371    }
372}
373
374impl<'conf, 'run_queue, 'r> RunQueueDataWithNext<'conf, 'run_queue, 'r> {
375    pub fn run_queue_with_next(&self) -> RunQueueWithNext<'conf, 'run_queue> {
376        let RunQueueDataWithNext { current, next } = self;
377        let current = current.run_queue();
378        let next = next.map(|rq| rq.run_queue());
379        RunQueueWithNext { current, next }
380    }
381}