evobench_tools/run/
migrate.rs

1//! Database migration. Structs with incompatible changes should be
2//! copied here and an upgrade function provided.
3
4use std::{
5    collections::btree_map,
6    collections::{BTreeMap, BTreeSet},
7    fmt::Debug,
8    path::PathBuf,
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::{Result, anyhow, bail};
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15
16use crate::{
17    ctx,
18    key_val_fs::{
19        as_key::AsKey,
20        key_val::{KeyVal, KeyValError},
21    },
22    run::{
23        benchmarking_job::{BenchmarkingJob, BenchmarkingJobPublic, BenchmarkingJobState},
24        config::{BenchmarkingCommand, PreExecLevel2},
25        key::{BenchmarkingJobParameters, BenchmarkingJobParametersHash, RunParameters},
26        run_queue::RunQueue,
27    },
28    serde_types::{priority::Priority, proper_dirname::ProperDirname},
29    util::grep_diff::LogExtract,
30    utillib::type_name_short::type_name_short,
31    warn,
32};
33
34trait FromStrMigrating: Sized + DeserializeOwned + Serialize {
35    /// Parse from whatever serialisation format that's appropriate
36    /// for your type. Returns whether migration was needed as the
37    /// second result.
38    fn from_str_migrating(s: &str) -> Result<(Self, bool)>;
39}
40
41/// Returns how many items were migrated. `handle_conflict` receives
42/// two values resulting from migration or pre-existing entry in the
43/// table, that both yield the same key; its return value is stored
44/// for the key. It can return an error to stop migration.
45fn migrate_key_val<K: AsKey + Debug + Clone + PartialEq + Ord, T: FromStrMigrating>(
46    table: &KeyVal<K, T>,
47    gen_key: impl Fn(&K, &T) -> K,
48    handle_conflict: impl Fn(&K, T, T) -> Result<T>,
49) -> Result<usize> {
50    let mut num_migrated = 0;
51    // Take a lock on the whole table since we need to ensure files
52    // still exist when we are ready to overwrite them (the
53    // `_entry_lock` is not enough for this). Still lock the items,
54    // too, OK? XX What about deadlocks? (Table users are not required
55    // to take the dir lock first! Todo: at least document that
56    // ordering is required!)
57    let _table_lock = table.lock_exclusive()?;
58
59    // Collect changes until after iteration has finished, to avoid
60    // the "iterator invalidation" problem.
61    let mut saves: BTreeMap<K, Vec<(bool, T)>> = BTreeMap::new();
62    let mut deletions: BTreeSet<K> = BTreeSet::new();
63
64    for old_key in table.keys(false, None)? {
65        let old_key = old_key?;
66        // `Entry` works for us as it does not transparently decode.
67        if let Some(mut entry) = table.entry_opt(&old_key)? {
68            let _entry_lock = entry
69                .take_lockable_file()
70                .expect("succeeds since calling it the first time");
71            let path = entry.target_path();
72            let s = std::fs::read_to_string(path).map_err(ctx!("reading file {path:?}"))?;
73            let (value, needs_saving) = T::from_str_migrating(&s)?;
74            let new_key = gen_key(&old_key, &value);
75            let key_changed = new_key != old_key;
76            if needs_saving || key_changed {
77                if key_changed {
78                    deletions.insert(old_key);
79                }
80                match saves.entry(new_key) {
81                    btree_map::Entry::Vacant(vacant_entry) => {
82                        vacant_entry.insert(vec![(key_changed, value)]);
83                    }
84                    btree_map::Entry::Occupied(mut occupied_entry) => {
85                        occupied_entry.get_mut().push((key_changed, value));
86                    }
87                }
88                num_migrated += 1;
89            }
90        }
91    }
92
93    for old_key in &deletions {
94        table.delete(old_key)?;
95    }
96    for (new_key, mut values) in saves {
97        let (key_changed, value) = {
98            // I don't really know what I'm doing here: if we have
99            // multiple values that were migrated *and* are hashing to
100            // the same key, still try to apply `handle_conflict`,
101            // does that make sense? In what order, how do the changed
102            // flags matter?
103            let (mut key_changed, mut value) =
104                values.pop().expect("at least one must have been inserted");
105            for (_other_key_changed, other_value) in values {
106                value = handle_conflict(&new_key, value, other_value)?;
107                key_changed = false; // at that point, overwrite will have to happen.
108            }
109            (key_changed, value)
110        };
111        // If key_changed is false, then it's OK to
112        // overwrite. If the key changed, and there is a
113        // conflict, then that probably means that migrated
114        // data clashes with pre-existing data that wasn't
115        // modified, "or something like that".
116        match table.insert(&new_key, &value, key_changed) {
117            Ok(()) => (),
118            Err(e) => match e {
119                KeyValError::KeyExists {
120                    base_dir: _,
121                    key_debug_string: _,
122                } => {
123                    let old_value = table.get(&new_key)?.ok_or_else(|| {
124                        anyhow!("entry {new_key:?} has vanished while we held the lock")
125                    })?;
126                    let value = handle_conflict(&new_key, old_value, value)?;
127                    // Now overwrite it.
128                    table.insert(&new_key, &value, false)?;
129                }
130                _ => Err(e)?,
131            },
132        }
133    }
134
135    Ok(num_migrated)
136}
137
138/// Migrate a queue. Returns how many items were migrated.
139pub fn migrate_queue(run_queue: &RunQueue) -> Result<usize> {
140    migrate_key_val(
141        run_queue.key_val(),
142        // The key (a `TimeKey`) remains the same
143        |k, _v| k.clone(),
144        // Conflicts can't happen since we never change the key
145        |_, _, _| bail!("can't happen"),
146    )
147}
148
149/// Migrate the already_inserted table. Returns how many items
150/// (buckets of time stamps) were migrated.
151pub fn migrate_already_inserted(
152    table: &KeyVal<BenchmarkingJobParametersHash, (BenchmarkingJobParameters, Vec<SystemTime>)>,
153) -> Result<usize> {
154    migrate_key_val(
155        table,
156        // Recalculate the key from the `BenchmarkingJobParameters`
157        |_k, v| v.0.slow_hash(),
158        |key, (params1, times1), (_params2, times2)| {
159            warn!(
160                "note: after migration, two buckets for key {key:?} exist; \
161                 taking the older one, assuming that the newer entry was erroneous"
162            );
163            Ok((params1, times1.min(times2)))
164        },
165    )
166}
167
168#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
169#[serde(deny_unknown_fields)]
170#[serde(rename = "BenchmarkingCommand")]
171pub struct BenchmarkingCommand1 {
172    pub target_name: ProperDirname,
173    pub subdir: PathBuf,
174    pub command: PathBuf,
175    pub arguments: Vec<String>,
176    pub log_extracts: Option<Vec<LogExtract>>,
177}
178
179impl From<Arc<BenchmarkingCommand1>> for BenchmarkingCommand {
180    fn from(value: Arc<BenchmarkingCommand1>) -> Self {
181        let BenchmarkingCommand1 {
182            target_name,
183            subdir,
184            command,
185            arguments,
186            log_extracts: _ignore,
187        } = Arc::into_inner(value).expect("guaranteed 1 reference");
188        let command = command.to_string_lossy().to_string();
189        // ^ XX lossy. But have not been using any such paths.
190        BenchmarkingCommand {
191            target_name,
192            subdir,
193            command,
194            arguments,
195            pre_exec_bash_code: PreExecLevel2::new(None),
196        }
197    }
198}
199
200#[derive(Debug, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
201#[serde(deny_unknown_fields)]
202// #[serde(rename = "BenchmarkingJobPublic")]
203pub struct BenchmarkingJobPublic1 {
204    pub reason: Option<String>,
205    pub run_parameters: Arc<RunParameters>,
206    pub command: Arc<BenchmarkingCommand1>,
207}
208
209#[derive(Debug, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
210#[serde(deny_unknown_fields)]
211#[serde(rename = "BenchmarkingJob")]
212pub struct BenchmarkingJob1 {
213    #[serde(flatten)]
214    pub benchmarking_job_public: BenchmarkingJobPublic1,
215    #[serde(flatten)]
216    pub benchmarking_job_state: BenchmarkingJobState,
217    priority: Priority,
218    current_boost: Priority,
219}
220
221#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
222#[serde(rename = "BenchmarkingJobParameters")]
223pub struct BenchmarkingJobParameters1 {
224    pub run_parameters: Arc<RunParameters>,
225    pub command: Arc<BenchmarkingCommand1>,
226}
227
228impl From<BenchmarkingJobParameters1> for BenchmarkingJobParameters {
229    fn from(value: BenchmarkingJobParameters1) -> Self {
230        let BenchmarkingJobParameters1 {
231            run_parameters,
232            command,
233        } = value;
234        BenchmarkingJobParameters {
235            run_parameters,
236            command: Arc::new(command.into()),
237        }
238    }
239}
240
241impl FromStrMigrating for (BenchmarkingJobParameters, Vec<SystemTime>) {
242    fn from_str_migrating(s: &str) -> Result<(Self, bool)> {
243        if let Ok(v) = serde_json::from_str::<(BenchmarkingJobParameters, Vec<SystemTime>)>(s) {
244            return Ok((v, false));
245        }
246        if let Ok((params, times)) =
247            serde_json::from_str::<(BenchmarkingJobParameters1, Vec<SystemTime>)>(s)
248        {
249            return Ok(((params.into(), times), true));
250        }
251        bail!(
252            "can't parse/migrate as {}: {s:?}",
253            type_name_short::<Self>()
254        )
255    }
256}
257
258impl FromStrMigrating for BenchmarkingJob {
259    fn from_str_migrating(s: &str) -> Result<(Self, bool)> {
260        if let Ok(v) = serde_json::from_str::<BenchmarkingJob>(s) {
261            return Ok((v, false));
262        }
263        if let Ok(v) = serde_json::from_str::<BenchmarkingJob1>(s) {
264            let BenchmarkingJob1 {
265                benchmarking_job_public,
266                benchmarking_job_state,
267                priority,
268                current_boost,
269            } = v;
270            let BenchmarkingJobPublic1 {
271                reason,
272                run_parameters,
273                command,
274            } = benchmarking_job_public;
275
276            let v = BenchmarkingJob::new(
277                BenchmarkingJobPublic {
278                    reason,
279                    run_parameters,
280                    command: Arc::new(command.into()),
281                },
282                benchmarking_job_state,
283                priority,
284                current_boost,
285            );
286            return Ok((v, true));
287        }
288        bail!(
289            "can't parse/migrate as {}: {s:?}",
290            type_name_short::<Self>()
291        )
292    }
293}