evobench_tools/evaluator/data/
log_data.rs1use std::{iter::FusedIterator, path::Path, sync::Mutex};
2
3use anyhow::{Context, Result, anyhow, bail};
4use kstring::KString;
5
6use crate::{
7 evaluator::data::log_message::{LogMessage, Metadata},
8 io_utils::zstd_file::decompressed_file_mmap,
9 utillib::auto_vivify::AutoVivify,
10};
11
12struct IterWithLineAndByteCount<'t, I: Iterator<Item = &'t [u8]>> {
13 lines_iter: I,
14 linenum: usize,
15 bytepos: usize,
16 path: &'t Path,
17}
18
19impl<'t, I: Iterator<Item = &'t [u8]>> IterWithLineAndByteCount<'t, I> {
20 fn new(lines_iter: I, path: &'t Path) -> Self {
21 Self {
22 lines_iter,
23 linenum: 0,
24 bytepos: 0,
25 path,
26 }
27 }
28}
29
30impl<'t, I: Iterator<Item = &'t [u8]>> Iterator for IterWithLineAndByteCount<'t, I> {
31 type Item = Result<LogMessage>;
32
33 fn next(&mut self) -> Option<Self::Item> {
34 if let Some(line) = self.lines_iter.next() {
35 self.linenum += 1;
36 self.bytepos += line.len() + 1;
37 Some(
38 serde_json::from_slice(line)
39 .with_context(|| anyhow!("parsing file {:?}:{}", self.path, self.linenum)),
40 )
41 } else {
42 None
43 }
44 }
45}
46
47#[derive(Debug)]
48pub struct LogData {
49 pub path: Box<Path>,
50 pub messages: Box<[Box<[LogMessage]>]>,
51 pub evobench_log_version: u32,
52 pub evobench_version: KString,
53 pub metadata: Metadata,
54}
55
56impl LogData {
57 const CHUNK_SIZE_BYTES: usize = 20000000;
60
61 pub fn messages(&self) -> impl DoubleEndedIterator<Item = &LogMessage> + FusedIterator {
62 self.messages.iter().flatten()
63 }
64
65 pub fn read_file(path: &Path, uncompressed_path: Option<&Path>) -> Result<Self> {
73 let input = decompressed_file_mmap(path, uncompressed_path, Some("log"))?;
74
75 let mut items = IterWithLineAndByteCount::new(input.split(|b| *b == b'\n'), path);
76
77 let msg = items
78 .next()
79 .ok_or_else(|| anyhow!("missing the first message in {path:?}"))??;
80 if let LogMessage::Start {
81 evobench_log_version,
82 evobench_version,
83 } = msg
84 {
85 let msg = items
86 .next()
87 .ok_or_else(|| anyhow!("missing the second message in {path:?}"))??;
88 if let LogMessage::Metadata(metadata) = msg {
89 let results: Mutex<Vec<Option<Result<Box<[LogMessage]>>>>> = Default::default();
91 let results_ref = &results;
92
93 rayon::scope(|scope| -> Result<()> {
94 let mut rest = &input[items.bytepos..];
95 let mut current_chunk_index = 0;
96
97 while !rest.is_empty() {
98 let buf = &rest[..Self::CHUNK_SIZE_BYTES.min(rest.len())];
99 let (i, _) = buf
101 .iter()
102 .rev()
103 .enumerate()
104 .find(|(_, b)| **b == b'\n')
105 .ok_or_else(|| {
106 anyhow!(
107 "missing a line break in chunk {current_chunk_index} (size {}) \
108 in file {path:?}",
109 buf.len()
110 )
111 })?;
112 let cutoff = buf.len() - i;
113 let buf = &buf[0..cutoff];
114 rest = &rest[cutoff..];
115
116 scope.spawn({
117 let chunk_index = current_chunk_index;
118 move |_scope| {
119 let r = (|| -> Result<Box<[LogMessage]>> {
120 let mut items = IterWithLineAndByteCount::new(
121 buf.trim_ascii_end().split(|b| *b == b'\n'),
122 path,
123 );
124
125 let mut messages = Vec::new();
126 while let Some(msg) = items.next() {
127 messages.push(msg?);
128 }
129 Ok(messages.into())
130 })();
131 let mut results = results_ref.lock().expect("no panics");
132 _ = results.auto_get_mut(chunk_index, || None).insert(r);
133 }
134 });
135 current_chunk_index += 1;
136 }
137 Ok(())
138 })?;
139
140 let messages: Vec<Box<[LogMessage]>> = results
141 .into_inner()?
142 .into_iter()
143 .enumerate()
144 .map(|(i, o)| {
145 if let Some(r) = o {
146 r.with_context(|| anyhow!("chunk {i}"))
147 } else {
148 bail!("chunk {i} has not reported a result, did it panic?")
149 }
150 })
151 .collect::<Result<_>>()?;
152
153 let last = (&messages)
154 .last()
155 .ok_or_else(|| anyhow!("log file {path:?} contains no data, and misses TEnd"))?
156 .last()
157 .ok_or_else(|| {
158 anyhow!(
159 "processing log file {path:?}: missing a value in last chunk result"
160 )
161 })?;
162
163 if let LogMessage::TEnd(_) = last {
164 } else {
166 bail!("log file {path:?} does not end with TEnd, it was cut off")
167 }
168
169 Ok(LogData {
170 path: path.into(),
171 messages: messages.into(),
172 evobench_log_version,
173 evobench_version,
174 metadata,
175 })
176 } else {
177 bail!("second message is not a `Metadata` message: {msg:?}")
178 }
179 } else {
180 bail!("first message is not a `Start` message: {msg:?}")
181 }
182 }
183}