1use std::{
2 collections::{BTreeMap, BTreeSet, btree_map::Entry},
3 ops::Neg,
4 path::PathBuf,
5 sync::Arc,
6 time::SystemTime,
7};
8
9use anyhow::{Result, bail};
10use chj_unix_util::polling_signals::PollingSignalsSender;
11use chrono::{DateTime, Local};
12use cj_path_util::path_util::AppendToPath;
13use genawaiter::rc::Gen;
14use itertools::{EitherOrBoth, Itertools};
15
16use crate::{
17 date_and_time::time_ranges::DateTimeRange,
18 git::GitHash,
19 info,
20 key_val_fs::{
21 key_val::{KeyValConfig, KeyValSync},
22 queue::{Queue, QueueGetItemOptions, QueueItem, TimeKey},
23 },
24 run::{
25 run_job::{JobRunnerJobData, JobRunnerWithJob},
26 run_queue::JobStatus,
27 },
28 serde_types::{priority::Priority, proper_filename::ProperFilename},
29 utillib::logging::{LogLevel, log_level},
30};
31
32use super::{
33 benchmarking_job::BenchmarkingJob,
34 config::{QueuesConfig, ScheduleCondition},
35 global_app_state_dir::GlobalAppStateDir,
36 run_context::RunContext,
37 run_job::JobRunner,
38 run_queue::{RunQueue, RunQueueData, RunQueueDataWithNext, RunQueueWithNext},
39};
40
41pub fn get_now_chrono() -> DateTime<Local> {
43 SystemTime::now().into()
44}
45
46#[ouroboros::self_referencing]
47pub struct RunQueues {
48 pub config: Arc<QueuesConfig>,
49 #[borrows(config)]
52 #[covariant]
53 pipeline: Vec<RunQueue<'this>>,
54
55 #[borrows(config)]
56 #[covariant]
57 erroneous_jobs_queue: Option<RunQueue<'this>>,
58
59 #[borrows(config)]
60 #[covariant]
61 done_jobs_queue: Option<RunQueue<'this>>,
62}
63
64pub struct RunQueuesData<'run_queues> {
67 run_queues: &'run_queues RunQueues,
68 pipeline_data: Vec<RunQueueData<'run_queues, 'run_queues>>,
69 jobs_by_commit_id: BTreeMap<GitHash, Vec<(usize, usize)>>,
71}
72
73impl RunQueues {
74 pub fn all_queues<'s>(&'s self) -> impl Iterator<Item = &'s RunQueue<'s>> {
76 Gen::new(|co| async move {
77 for queue in self.pipeline() {
78 co.yield_(queue).await;
79 }
80 if let Some(queue) = self.borrow_erroneous_jobs_queue().as_ref() {
81 co.yield_(queue).await;
82 }
83 if let Some(queue) = self.borrow_done_jobs_queue().as_ref() {
84 co.yield_(queue).await;
85 }
86 })
87 .into_iter()
88 }
89
90 pub fn pipeline(&self) -> &[RunQueue<'_>] {
91 self.borrow_pipeline()
92 }
93
94 pub fn erroneous_jobs_queue(&self) -> Option<&RunQueue<'_>> {
95 self.borrow_erroneous_jobs_queue().as_ref()
96 }
97
98 pub fn done_jobs_queue(&self) -> Option<&RunQueue<'_>> {
99 self.borrow_done_jobs_queue().as_ref()
100 }
101
102 pub fn first(&self) -> &RunQueue<'_> {
103 &self.pipeline()[0]
104 }
105
106 pub fn data<'run_queues>(&'run_queues self) -> Result<RunQueuesData<'run_queues>> {
107 let pipeline_data: Vec<RunQueueData> = self
108 .pipeline()
109 .iter()
110 .map(|rq| rq.data())
111 .collect::<Result<_>>()?;
112 let mut jobs_by_commit_id = BTreeMap::default();
113 for (i, rqd) in pipeline_data.iter().enumerate() {
114 for (j, job) in rqd.jobs().enumerate() {
115 let commit_id = &job.public.run_parameters.commit_id;
116 match jobs_by_commit_id.entry(commit_id.clone()) {
117 Entry::Vacant(vacant_entry) => {
118 vacant_entry.insert(vec![(i, j)]);
119 }
120 Entry::Occupied(mut occupied_entry) => {
121 occupied_entry.get_mut().push((i, j));
122 }
123 }
124 }
125 }
126 Ok(RunQueuesData {
127 run_queues: self,
128 pipeline_data,
129 jobs_by_commit_id,
130 })
131 }
132
133 pub fn get_run_queue_with_next_by_name(
135 &self,
136 file_name: &ProperFilename,
137 ) -> Option<RunQueueWithNext<'_, '_>> {
138 let mut queues = self.pipeline().iter();
139 while let Some(current) = queues.next() {
140 if current.file_name == *file_name {
141 let next = queues.next();
142 return Some(RunQueueWithNext { current, next });
143 }
144 }
145 None
146 }
147
148 fn check_run_queues(&self) -> Result<()> {
150 let pipeline = self.pipeline();
151 let erroneous_jobs_queue = self.erroneous_jobs_queue();
152 let done_jobs_queue = self.done_jobs_queue();
153 if pipeline.is_empty() {
154 bail!(
155 "no queues defined -- need at least one, also \
156 suggested is to add a `Inactive` as the last"
157 )
158 }
159
160 let mut check_seen = {
161 let mut seen = BTreeSet::new();
162 move |file_name: &ProperFilename| -> Result<()> {
163 if seen.contains(file_name) {
164 bail!("duplicate queue name {file_name:?}")
165 }
166 seen.insert(file_name.clone());
167 Ok(())
168 }
169 };
170
171 let mut inactive_count = 0;
172 for run_queue in pipeline {
173 check_seen(&run_queue.file_name)?;
174 match run_queue.schedule_condition {
175 ScheduleCondition::Immediately { situation: _ } => (),
176 ScheduleCondition::LocalNaiveTimeWindow {
177 priority: _,
178 situation: _,
179 stop_start,
180 repeatedly: _,
181 move_when_time_window_ends: _,
182 from: _,
183 to: _,
184 } => {
185 if let Some(stop_start) = &stop_start {
186 if stop_start.is_empty() {
187 bail!(
188 "`LocalNaiveTimeWindow.stop_start` was given \
189 but is the empty list, require at least a program name/path"
190 )
191 }
192 }
193 }
194 ScheduleCondition::Inactive => inactive_count += 1,
195 }
196 }
197 if inactive_count > 1 {
198 bail!("can have at most one `Inactive` queue");
199 }
200 if inactive_count > 0 {
201 if *pipeline
202 .last()
203 .expect("checked in the if condition")
204 .schedule_condition
205 != ScheduleCondition::Inactive
206 {
207 bail!("`Inactive` queue must be the last in the pipeline")
208 }
209 }
210
211 let mut check_extra_queue = |name: &str, run_queue: Option<&RunQueue>| -> Result<()> {
212 if let Some(run_queue) = run_queue {
213 check_seen(&run_queue.file_name)?;
214 if !run_queue.schedule_condition.is_inactive() {
215 bail!("the `{name}` must be of kind `Inactive`")
216 }
217 }
218 Ok(())
219 };
220 check_extra_queue("erroneous_jobs_queue", erroneous_jobs_queue)?;
221 check_extra_queue("done_jobs_queue", done_jobs_queue)?;
222
223 Ok(())
224 }
225
226 pub fn open(
227 config: Arc<QueuesConfig>,
228 create_dirs_if_not_exist: bool,
229 global_app_state_dir: &GlobalAppStateDir,
230 signal_change: Option<PollingSignalsSender>,
231 ) -> Result<Self> {
232 let run_queues_basedir =
233 config.run_queues_basedir(create_dirs_if_not_exist, global_app_state_dir)?;
234
235 fn make_run_queue<'this>(
236 (filename, schedule_condition): &'this (ProperFilename, ScheduleCondition),
237 run_queues_basedir: &PathBuf,
238 create_dirs_if_not_exist: bool,
239 signal_change: Option<PollingSignalsSender>,
240 ) -> Result<RunQueue<'this>> {
241 let run_queue_path = (&run_queues_basedir).append(filename.as_str());
242 Ok(RunQueue {
243 file_name: filename.clone(),
244 schedule_condition,
245 queue: Queue::<BenchmarkingJob>::open(
246 &run_queue_path,
247 KeyValConfig {
248 sync: KeyValSync::All,
249 create_dir_if_not_exists: create_dirs_if_not_exist,
250 },
251 signal_change,
252 )?,
253 })
254 }
255
256 let slf = Self::try_new(
257 config,
258 |config| -> Result<_> {
260 let queues = config
261 .pipeline
262 .iter()
263 .map(|cfg| {
264 make_run_queue(
265 cfg,
266 &run_queues_basedir,
267 create_dirs_if_not_exist,
268 signal_change.clone(),
269 )
270 })
271 .collect::<Result<_>>()?;
272 Ok(queues)
273 },
274 |config| {
276 if let Some(cfg) = config.erroneous_jobs_queue.as_ref() {
277 Ok(Some(make_run_queue(
278 cfg,
279 &run_queues_basedir,
280 create_dirs_if_not_exist,
281 signal_change.clone(),
282 )?))
283 } else {
284 Ok(None)
285 }
286 },
287 |config| {
289 if let Some(cfg) = config.done_jobs_queue.as_ref() {
290 Ok(Some(make_run_queue(
291 cfg,
292 &run_queues_basedir,
293 create_dirs_if_not_exist,
294 signal_change.clone(),
295 )?))
296 } else {
297 Ok(None)
298 }
299 },
300 )?;
301
302 slf.check_run_queues()?;
303
304 Ok(slf)
305 }
306}
307
308impl<'run_queues> RunQueuesData<'run_queues> {
309 pub fn run_queues(&self) -> &'run_queues RunQueues {
312 self.run_queues
313 }
314
315 pub fn jobs_by_commit_id(&self, commit_id: &GitHash) -> &[(usize, usize)] {
317 self.jobs_by_commit_id
318 .get(commit_id)
319 .map(|v| v.as_slice())
320 .unwrap_or([].as_slice())
321 }
322
323 pub fn have_job_with_commit_id(&self, commit_id: &GitHash) -> bool {
324 !self.jobs_by_commit_id(commit_id).is_empty()
325 }
326
327 #[allow(unused)]
331 fn entries_by_commit_id(
332 &self,
333 commit_id: &GitHash,
334 ) -> impl Iterator<Item = &(TimeKey, BenchmarkingJob, Priority)> {
335 self.jobs_by_commit_id(commit_id)
336 .iter()
337 .map(|(i, j)| self.pipeline_data[*i].entry(*j))
338 }
339
340 pub fn run_queue_with_nexts<'s>(
343 &'s self,
344 ) -> impl Iterator<Item = RunQueueDataWithNext<'run_queues, 'run_queues, 's>> {
345 self.pipeline_data
346 .iter()
347 .zip_longest(self.pipeline_data.iter().skip(1))
348 .map(|either_or_both| match either_or_both {
349 EitherOrBoth::Both(current, next) => RunQueueDataWithNext {
350 current,
351 next: Some(next),
352 },
353 EitherOrBoth::Left(current) => RunQueueDataWithNext {
354 current,
355 next: None,
356 },
357 EitherOrBoth::Right(_) => unreachable!("because the left sequence is longer"),
358 })
359 }
360
361 fn active_queues<'s>(
364 &'s self,
365 reference_time: DateTime<Local>,
366 ) -> impl Iterator<
367 Item = (
368 RunQueueDataWithNext<'run_queues, 'run_queues, 's>,
369 Option<DateTimeRange<Local>>,
370 ),
371 > {
372 self.run_queue_with_nexts().filter_map(move |rq| {
373 if let Some(range) = rq
374 .current
375 .run_queue()
376 .schedule_condition
377 .is_runnable_at(reference_time)
378 {
379 Some((rq, range))
380 } else {
381 None
382 }
383 })
384 }
385
386 fn most_prioritized_job<'s, 'conf, 'r, 'rc>(
388 &'s self,
389 now: DateTime<Local>,
390 ) -> Result<
391 Option<(
392 RunQueueDataWithNext<'run_queues, 'run_queues, 's>,
393 Option<DateTimeRange<Local>>,
394 QueueItem<'run_queues, BenchmarkingJob>,
395 &'s BenchmarkingJob,
396 Priority,
397 )>,
398 > {
399 let verbose = log_level() >= LogLevel::Info;
400
401 if let Some(((key, job, prio), rq, dtr)) = self
406 .active_queues(now)
407 .filter_map(|(rq, dtr)| -> Option<_> {
408 let entry = rq
409 .current
410 .entries()
411 .min_by_key(|(_, _, job_priority)| job_priority.neg())?;
412
413 Some((entry, rq, dtr))
414 })
415 .min_by_key(|((_, _, job_priority), _, _)| job_priority.neg())
416 {
417 if let Some(item) = rq.current.run_queue().queue.get_item(
418 key,
419 QueueGetItemOptions {
420 verbose,
421 no_lock: true,
422 error_when_locked: false,
423 delete_first: false,
424 },
425 )? {
426 Ok(Some((rq, dtr, item, job, *prio)))
427 } else {
428 info!("entry {key} has disappeared in the mean time, skipping it");
429 Ok(None)
430 }
431 } else {
432 Ok(None)
433 }
434 }
435
436 pub fn run_next_job<'s, 'conf, 'r, 'rc>(
454 &'s self,
455 job_runner: JobRunner,
456 run_context: &mut RunContext,
457 ) -> Result<Option<(&'s BenchmarkingJob, JobStatus)>> {
458 let timestamp_local = job_runner.timestamp_local();
459
460 let job = self.most_prioritized_job(timestamp_local)?;
461
462 if let Some((rqdwn, dtr, item, job, _)) = job {
463 let rq = rqdwn.current.run_queue();
464 run_context.stop_start_be(rq.schedule_condition.stop_start())?;
465 if let Some(dtr) = dtr {
466 run_context.running_job_in_windowed_queue(rq, dtr);
467 }
468
469 let working_directory_id;
470 {
471 let mut lock = job_runner
472 .working_directory_pool
473 .lock_mut("RunQueuesData.run_next_job")?;
474 working_directory_id =
475 lock.get_a_working_directory_for(&job.public.run_parameters, self)?;
476
477 lock.clear_current_working_directory()?;
478 }
479
480 let job_status = rqdwn.run_queue_with_next().run_job(
481 &item,
482 &mut JobRunnerWithJob {
483 job_runner,
484 job_data: JobRunnerJobData {
485 job,
486 run_queues_data: self,
487 },
488 },
489 self.run_queues.erroneous_jobs_queue(),
490 self.run_queues.done_jobs_queue(),
491 working_directory_id,
492 )?;
493
494 Ok(Some((job, job_status)))
495 } else {
496 run_context.stop_start_be(None)?;
497
498 Ok(None)
499 }
500 }
501}