1use anyhow::Result;
2
3use crate::{
4 config_file::ron_to_string_pretty,
5 info,
6 key_val_fs::{
7 key_val::{KeyVal, KeyValError},
8 queue::{Queue, QueueGetItemOptions, QueueItem, QueueIterationOptions, TimeKey},
9 },
10 run::{benchmarking_job::BenchmarkingJobState, run_job::JobRunnerWithJob},
11 serde_types::{priority::Priority, proper_filename::ProperFilename},
12 utillib::logging::{LogLevel, log_level},
13};
14
15use super::{
16 benchmarking_job::{BenchmarkingJob, BenchmarkingJobPublic},
17 config::ScheduleCondition,
18 working_directory_pool::WorkingDirectoryId,
19};
20
21#[derive(Debug, PartialEq)]
22pub struct RunQueue<'conf> {
23 pub file_name: ProperFilename,
24 pub schedule_condition: &'conf ScheduleCondition,
25 pub queue: Queue<BenchmarkingJob>,
26}
27
28#[derive(Debug, PartialEq)]
31pub struct RunQueueData<'conf, 'run_queue> {
32 run_queue: &'run_queue RunQueue<'conf>,
33 queue_data: Vec<(TimeKey, BenchmarkingJob, Priority)>,
35}
36
37impl<'conf, 'run_queue> RunQueueData<'conf, 'run_queue> {
38 pub fn run_queue(&self) -> &'run_queue RunQueue<'conf> {
39 self.run_queue
40 }
41 pub fn jobs(&self) -> impl Iterator<Item = &BenchmarkingJob> {
42 self.queue_data.iter().map(|(_, job, _)| job)
43 }
44 pub fn entries(&self) -> impl Iterator<Item = &(TimeKey, BenchmarkingJob, Priority)> {
46 self.queue_data.iter()
47 }
48 pub fn entry(&self, i: usize) -> &(TimeKey, BenchmarkingJob, Priority) {
50 &self.queue_data[i]
51 }
52}
53
54impl<'conf> RunQueue<'conf> {
55 pub fn push_front(&self, job: &BenchmarkingJob) -> Result<(), KeyValError> {
56 self.queue.push_front(job)
57 }
58
59 pub fn key_val(&self) -> &KeyVal<TimeKey, BenchmarkingJob> {
63 self.queue.key_val()
64 }
65
66 pub fn data<'run_queue>(&'run_queue self) -> Result<RunQueueData<'conf, 'run_queue>> {
67 let queue_data = self
68 .jobs()
69 .map(|r| -> Result<_> {
70 let (queue_item, job) = r?;
71 let queue_priority = self
72 .schedule_condition
73 .priority()
74 .expect("no inactive queue in pipeline");
75 let priority = (job.priority()? + queue_priority)?;
76 Ok((queue_item.key()?, job, priority))
77 })
78 .collect::<Result<_>>()?;
79 Ok(RunQueueData {
80 run_queue: self,
81 queue_data,
82 })
83 }
84
85 fn jobs<'s>(
89 &'s self,
90 ) -> impl Iterator<Item = Result<(QueueItem<'s, BenchmarkingJob>, BenchmarkingJob), KeyValError>>
91 + use<'s> {
92 let opts = QueueIterationOptions {
93 get_item_opts: QueueGetItemOptions {
94 no_lock: true,
95 error_when_locked: false,
96 verbose: log_level() >= LogLevel::Info,
97 delete_first: false,
98 },
99 wait: false,
100 stop_at: None,
101 reverse: false,
102 };
103 self.queue.items(opts)
104 }
105}
106
107#[derive(Debug, PartialEq, Clone, Copy)]
110pub struct RunQueueWithNext<'conf, 'r> {
111 pub current: &'r RunQueue<'conf>,
112 pub next: Option<&'r RunQueue<'conf>>,
113}
114
115#[derive(Debug, PartialEq, Clone, Copy)]
118pub struct RunQueueDataWithNext<'conf, 'run_queue, 'r> {
119 pub current: &'r RunQueueData<'conf, 'run_queue>,
120 pub next: Option<&'r RunQueueData<'conf, 'run_queue>>,
121}
122
123#[derive(Clone, Copy)]
129#[must_use]
130pub enum JobStatus {
131 Active,
133 Done(bool),
138 Inactive,
141 Dropped,
144 Error(bool),
147}
148
149impl JobStatus {
150 pub fn can_run_again(self) -> bool {
153 match self {
154 JobStatus::Active => true,
155 JobStatus::Done(_) | JobStatus::Inactive | JobStatus::Dropped | JobStatus::Error(_) => {
156 false
157 }
158 }
159 }
160}
161
162impl<'conf, 'r> RunQueueWithNext<'conf, 'r> {
163 pub fn run_job(
169 &self,
170 item: &QueueItem<BenchmarkingJob>,
171 job_runner_with_job: &mut JobRunnerWithJob,
172 erroneous_jobs_queue: Option<&RunQueue>,
173 done_jobs_queue: Option<&RunQueue>,
174 working_directory_id: WorkingDirectoryId,
175 ) -> Result<JobStatus> {
176 let _lock = item.lock_exclusive()?;
177
178 let BenchmarkingJobState {
179 remaining_count,
180 mut remaining_error_budget,
181 last_working_directory: _,
182 } = job_runner_with_job.job_data.job.state.clone();
183
184 let finish_completed_job = |remaining_count| -> Result<JobStatus> {
185 let job = job_runner_with_job
186 .job_data
187 .job
188 .clone_for_queue_reinsertion(BenchmarkingJobState {
189 remaining_count,
190 remaining_error_budget,
191 last_working_directory: Some(working_directory_id),
192 });
193 info!(
194 "job completed: {}",
195 ron_to_string_pretty(&job).expect("no err")
196 );
197 let retained = if let Some(done_jobs_queue) = done_jobs_queue {
198 done_jobs_queue.push_front(&job)?;
199 true
200 } else {
201 false
202 };
203 Ok(JobStatus::Done(retained))
204 };
205
206 let handle_out_of_error_budget = || -> Result<JobStatus> {
207 let job = job_runner_with_job
208 .job_data
209 .job
210 .clone_for_queue_reinsertion(BenchmarkingJobState {
211 remaining_count,
212 remaining_error_budget: 0,
213 last_working_directory: Some(working_directory_id),
214 });
215
216 let retained = if let Some(queue) = &erroneous_jobs_queue {
217 queue.push_front(&job)?;
218 true
219 } else {
220 info!(
221 "job dropped due to running out of error budget \
222 and no configured erroneous_jobs_queue: {}",
223 ron_to_string_pretty(&job).expect("no err")
224 );
225 false
226 };
227 Ok(JobStatus::Error(retained))
228 };
229
230 let BenchmarkingJobPublic {
231 reason,
232 run_parameters: _,
234 command: _,
235 } = job_runner_with_job.job_data.job.public.clone();
236
237 let job_status;
238
239 if remaining_error_budget > 0 {
240 if remaining_count > 0 {
241 if let Err(error) = job_runner_with_job.run_job(
242 working_directory_id,
243 &reason,
244 &self.current.schedule_condition,
245 ) {
246 remaining_error_budget = remaining_error_budget - 1;
247
248 info!(
252 "job gave error: {}: {error:#?}",
253 ron_to_string_pretty(&job_runner_with_job.job_data.job).expect("no err")
256 );
257 if remaining_error_budget > 0 {
258 let job = job_runner_with_job
260 .job_data
261 .job
262 .clone_for_queue_reinsertion(BenchmarkingJobState {
263 remaining_count,
264 remaining_error_budget,
265 last_working_directory: Some(working_directory_id),
266 });
267 self.current.push_front(&job)?;
268 job_status = JobStatus::Active;
269 } else {
270 job_status = handle_out_of_error_budget()?;
271 }
272 } else {
273 let remaining_count = remaining_count - 1;
274 if remaining_count > 0 {
275 let maybe_queue;
276 match self.current.schedule_condition {
277 ScheduleCondition::Immediately { situation: _ } => {
278 maybe_queue = self.next;
280 }
281 ScheduleCondition::LocalNaiveTimeWindow {
282 priority: _,
283 situation: _,
284 stop_start: _,
285 repeatedly,
286 move_when_time_window_ends: _,
287 from: _,
288 to: _,
289 } => {
290 if *repeatedly {
291 maybe_queue = Some(self.current);
300 } else {
301 maybe_queue = self.next;
302 }
303 }
304 ScheduleCondition::Inactive => {
305 unreachable!("already returned at beginning of function")
306 }
307 }
308
309 let job = job_runner_with_job
310 .job_data
311 .job
312 .clone_for_queue_reinsertion(BenchmarkingJobState {
313 remaining_count,
314 remaining_error_budget,
315 last_working_directory: Some(working_directory_id),
316 });
317 if let Some(queue) = maybe_queue {
318 queue.push_front(&job)?;
319 job_status = JobStatus::Active;
320 } else {
321 info!(
322 "job dropping off the pipeline: {}",
323 ron_to_string_pretty(&job).expect("no err")
324 );
325 job_status = JobStatus::Dropped;
326 }
327 } else {
328 job_status = finish_completed_job(remaining_count)?;
329 }
330 }
331 } else {
332 info!(
333 "should never get here normally: job stored in normal queue \
334 with remaining_count 0"
335 );
336 job_status = finish_completed_job(remaining_count)?;
337 }
338 } else {
339 info!("Job already had no error budget; should not be possible?");
340 job_status = handle_out_of_error_budget()?;
341 }
342 item.delete()?;
343 Ok(job_status)
344 }
345
346 pub fn handle_timeout(&self) -> Result<()> {
347 info!(
348 "ran out of time in queue {:?}",
349 self.current.file_name.as_str()
350 );
351 if self.current.schedule_condition.move_when_time_window_ends() {
352 let mut count = 0;
353 for entry in self.current.queue.sorted_entries(false, None, false)? {
354 let mut entry = entry?;
358 let val = entry.get()?;
359 if let Some(next) = self.next {
360 next.push_front(&val)?;
361 }
362 entry.delete()?;
363 count += 1;
364 }
365 info!(
366 "moved {count} entries to queue {:?}",
367 self.next.map(|q| &q.file_name)
368 );
369 }
370 Ok(())
371 }
372}
373
374impl<'conf, 'run_queue, 'r> RunQueueDataWithNext<'conf, 'run_queue, 'r> {
375 pub fn run_queue_with_next(&self) -> RunQueueWithNext<'conf, 'run_queue> {
376 let RunQueueDataWithNext { current, next } = self;
377 let current = current.run_queue();
378 let next = next.map(|rq| rq.run_queue());
379 RunQueueWithNext { current, next }
380 }
381}