evobench_tools/run/
run_context.rs

1use std::collections::BTreeMap;
2
3use anyhow::Result;
4use chrono::{DateTime, Local};
5
6use crate::{
7    date_and_time::time_ranges::DateTimeRange, info, serde_types::proper_filename::ProperFilename,
8};
9
10use super::{run_queue::RunQueue, run_queues::RunQueues, stop_start_status::StopStartStatus};
11
12#[derive(Default)]
13pub struct RunContext {
14    stop_start_status: StopStartStatus,
15
16    /// Queues with time ranges and flag `move_when_time_window_ends`,
17    /// that were started processing entries from, with the end of the
18    /// time range. To be closed off after the current time is past
19    /// the end time.
20    open_queues: BTreeMap<ProperFilename, DateTime<Local>>,
21}
22
23impl RunContext {
24    pub fn stop_start_be(&mut self, stop_start: Option<&[String]>) -> Result<()> {
25        self.stop_start_status.be(stop_start)
26    }
27
28    /// Notify the RunContext that this queue is being used (i.e. add
29    /// it for later closing)
30    pub fn running_job_in_windowed_queue(&mut self, queue: &RunQueue, dtr: DateTimeRange<Local>) {
31        if queue.schedule_condition.move_when_time_window_ends() {
32            self.open_queues.insert(queue.file_name.clone(), dtr.to);
33        }
34    }
35
36    /// Check all open queues and run closing actions for those for
37    /// which the time window has closed.
38    // passing in RunQueues feels ugly; but want to have the queue
39    // closing logic (handle_timeout) there.
40    pub fn close_open_queues(
41        &mut self,
42        now: DateTime<Local>,
43        run_queues: &RunQueues,
44    ) -> Result<()> {
45        let close: Vec<_> = self
46            .open_queues
47            .iter()
48            .filter_map(|(file_name, dtr)| {
49                if now >= *dtr {
50                    Some(file_name.clone())
51                } else {
52                    None
53                }
54            })
55            .collect();
56        for file_name in close {
57            if let Some(rqwn) = run_queues.get_run_queue_with_next_by_name(&file_name) {
58                rqwn.handle_timeout()?;
59                self.open_queues.remove(&file_name);
60            } else {
61                info!(
62                    "couldn't find RunQueue with name {:?}, \
63                     might have been config change",
64                    file_name.as_str()
65                )
66            }
67        }
68        Ok(())
69    }
70}