1use std::{
5 collections::btree_map,
6 collections::{BTreeMap, BTreeSet},
7 fmt::Debug,
8 path::PathBuf,
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::{Result, anyhow, bail};
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15
16use crate::{
17 ctx,
18 key_val_fs::{
19 as_key::AsKey,
20 key_val::{KeyVal, KeyValError},
21 },
22 run::{
23 benchmarking_job::{BenchmarkingJob, BenchmarkingJobPublic, BenchmarkingJobState},
24 config::{BenchmarkingCommand, PreExecLevel2},
25 key::{BenchmarkingJobParameters, BenchmarkingJobParametersHash, RunParameters},
26 run_queue::RunQueue,
27 },
28 serde_types::{priority::Priority, proper_dirname::ProperDirname},
29 util::grep_diff::LogExtract,
30 utillib::type_name_short::type_name_short,
31 warn,
32};
33
34trait FromStrMigrating: Sized + DeserializeOwned + Serialize {
35 fn from_str_migrating(s: &str) -> Result<(Self, bool)>;
39}
40
41fn migrate_key_val<K: AsKey + Debug + Clone + PartialEq + Ord, T: FromStrMigrating>(
46 table: &KeyVal<K, T>,
47 gen_key: impl Fn(&K, &T) -> K,
48 handle_conflict: impl Fn(&K, T, T) -> Result<T>,
49) -> Result<usize> {
50 let mut num_migrated = 0;
51 let _table_lock = table.lock_exclusive()?;
58
59 let mut saves: BTreeMap<K, Vec<(bool, T)>> = BTreeMap::new();
62 let mut deletions: BTreeSet<K> = BTreeSet::new();
63
64 for old_key in table.keys(false, None)? {
65 let old_key = old_key?;
66 if let Some(mut entry) = table.entry_opt(&old_key)? {
68 let _entry_lock = entry
69 .take_lockable_file()
70 .expect("succeeds since calling it the first time");
71 let path = entry.target_path();
72 let s = std::fs::read_to_string(path).map_err(ctx!("reading file {path:?}"))?;
73 let (value, needs_saving) = T::from_str_migrating(&s)?;
74 let new_key = gen_key(&old_key, &value);
75 let key_changed = new_key != old_key;
76 if needs_saving || key_changed {
77 if key_changed {
78 deletions.insert(old_key);
79 }
80 match saves.entry(new_key) {
81 btree_map::Entry::Vacant(vacant_entry) => {
82 vacant_entry.insert(vec![(key_changed, value)]);
83 }
84 btree_map::Entry::Occupied(mut occupied_entry) => {
85 occupied_entry.get_mut().push((key_changed, value));
86 }
87 }
88 num_migrated += 1;
89 }
90 }
91 }
92
93 for old_key in &deletions {
94 table.delete(old_key)?;
95 }
96 for (new_key, mut values) in saves {
97 let (key_changed, value) = {
98 let (mut key_changed, mut value) =
104 values.pop().expect("at least one must have been inserted");
105 for (_other_key_changed, other_value) in values {
106 value = handle_conflict(&new_key, value, other_value)?;
107 key_changed = false; }
109 (key_changed, value)
110 };
111 match table.insert(&new_key, &value, key_changed) {
117 Ok(()) => (),
118 Err(e) => match e {
119 KeyValError::KeyExists {
120 base_dir: _,
121 key_debug_string: _,
122 } => {
123 let old_value = table.get(&new_key)?.ok_or_else(|| {
124 anyhow!("entry {new_key:?} has vanished while we held the lock")
125 })?;
126 let value = handle_conflict(&new_key, old_value, value)?;
127 table.insert(&new_key, &value, false)?;
129 }
130 _ => Err(e)?,
131 },
132 }
133 }
134
135 Ok(num_migrated)
136}
137
138pub fn migrate_queue(run_queue: &RunQueue) -> Result<usize> {
140 migrate_key_val(
141 run_queue.key_val(),
142 |k, _v| k.clone(),
144 |_, _, _| bail!("can't happen"),
146 )
147}
148
149pub fn migrate_already_inserted(
152 table: &KeyVal<BenchmarkingJobParametersHash, (BenchmarkingJobParameters, Vec<SystemTime>)>,
153) -> Result<usize> {
154 migrate_key_val(
155 table,
156 |_k, v| v.0.slow_hash(),
158 |key, (params1, times1), (_params2, times2)| {
159 warn!(
160 "note: after migration, two buckets for key {key:?} exist; \
161 taking the older one, assuming that the newer entry was erroneous"
162 );
163 Ok((params1, times1.min(times2)))
164 },
165 )
166}
167
168#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
169#[serde(deny_unknown_fields)]
170#[serde(rename = "BenchmarkingCommand")]
171pub struct BenchmarkingCommand1 {
172 pub target_name: ProperDirname,
173 pub subdir: PathBuf,
174 pub command: PathBuf,
175 pub arguments: Vec<String>,
176 pub log_extracts: Option<Vec<LogExtract>>,
177}
178
179impl From<Arc<BenchmarkingCommand1>> for BenchmarkingCommand {
180 fn from(value: Arc<BenchmarkingCommand1>) -> Self {
181 let BenchmarkingCommand1 {
182 target_name,
183 subdir,
184 command,
185 arguments,
186 log_extracts: _ignore,
187 } = Arc::into_inner(value).expect("guaranteed 1 reference");
188 let command = command.to_string_lossy().to_string();
189 BenchmarkingCommand {
191 target_name,
192 subdir,
193 command,
194 arguments,
195 pre_exec_bash_code: PreExecLevel2::new(None),
196 }
197 }
198}
199
200#[derive(Debug, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
201#[serde(deny_unknown_fields)]
202pub struct BenchmarkingJobPublic1 {
204 pub reason: Option<String>,
205 pub run_parameters: Arc<RunParameters>,
206 pub command: Arc<BenchmarkingCommand1>,
207}
208
209#[derive(Debug, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
210#[serde(deny_unknown_fields)]
211#[serde(rename = "BenchmarkingJob")]
212pub struct BenchmarkingJob1 {
213 #[serde(flatten)]
214 pub benchmarking_job_public: BenchmarkingJobPublic1,
215 #[serde(flatten)]
216 pub benchmarking_job_state: BenchmarkingJobState,
217 priority: Priority,
218 current_boost: Priority,
219}
220
221#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
222#[serde(rename = "BenchmarkingJobParameters")]
223pub struct BenchmarkingJobParameters1 {
224 pub run_parameters: Arc<RunParameters>,
225 pub command: Arc<BenchmarkingCommand1>,
226}
227
228impl From<BenchmarkingJobParameters1> for BenchmarkingJobParameters {
229 fn from(value: BenchmarkingJobParameters1) -> Self {
230 let BenchmarkingJobParameters1 {
231 run_parameters,
232 command,
233 } = value;
234 BenchmarkingJobParameters {
235 run_parameters,
236 command: Arc::new(command.into()),
237 }
238 }
239}
240
241impl FromStrMigrating for (BenchmarkingJobParameters, Vec<SystemTime>) {
242 fn from_str_migrating(s: &str) -> Result<(Self, bool)> {
243 if let Ok(v) = serde_json::from_str::<(BenchmarkingJobParameters, Vec<SystemTime>)>(s) {
244 return Ok((v, false));
245 }
246 if let Ok((params, times)) =
247 serde_json::from_str::<(BenchmarkingJobParameters1, Vec<SystemTime>)>(s)
248 {
249 return Ok(((params.into(), times), true));
250 }
251 bail!(
252 "can't parse/migrate as {}: {s:?}",
253 type_name_short::<Self>()
254 )
255 }
256}
257
258impl FromStrMigrating for BenchmarkingJob {
259 fn from_str_migrating(s: &str) -> Result<(Self, bool)> {
260 if let Ok(v) = serde_json::from_str::<BenchmarkingJob>(s) {
261 return Ok((v, false));
262 }
263 if let Ok(v) = serde_json::from_str::<BenchmarkingJob1>(s) {
264 let BenchmarkingJob1 {
265 benchmarking_job_public,
266 benchmarking_job_state,
267 priority,
268 current_boost,
269 } = v;
270 let BenchmarkingJobPublic1 {
271 reason,
272 run_parameters,
273 command,
274 } = benchmarking_job_public;
275
276 let v = BenchmarkingJob::new(
277 BenchmarkingJobPublic {
278 reason,
279 run_parameters,
280 command: Arc::new(command.into()),
281 },
282 benchmarking_job_state,
283 priority,
284 current_boost,
285 );
286 return Ok((v, true));
287 }
288 bail!(
289 "can't parse/migrate as {}: {s:?}",
290 type_name_short::<Self>()
291 )
292 }
293}