evobench_tools/evaluator/data/
log_data.rs

1use std::{iter::FusedIterator, path::Path, sync::Mutex};
2
3use anyhow::{Context, Result, anyhow, bail};
4use kstring::KString;
5
6use crate::{
7    evaluator::data::log_message::{LogMessage, Metadata},
8    io_utils::zstd_file::decompressed_file_mmap,
9    utillib::auto_vivify::AutoVivify,
10};
11
12struct IterWithLineAndByteCount<'t, I: Iterator<Item = &'t [u8]>> {
13    lines_iter: I,
14    linenum: usize,
15    bytepos: usize,
16    path: &'t Path,
17}
18
19impl<'t, I: Iterator<Item = &'t [u8]>> IterWithLineAndByteCount<'t, I> {
20    fn new(lines_iter: I, path: &'t Path) -> Self {
21        Self {
22            lines_iter,
23            linenum: 0,
24            bytepos: 0,
25            path,
26        }
27    }
28}
29
30impl<'t, I: Iterator<Item = &'t [u8]>> Iterator for IterWithLineAndByteCount<'t, I> {
31    type Item = Result<LogMessage>;
32
33    fn next(&mut self) -> Option<Self::Item> {
34        if let Some(line) = self.lines_iter.next() {
35            self.linenum += 1;
36            self.bytepos += line.len() + 1;
37            Some(
38                serde_json::from_slice(line)
39                    .with_context(|| anyhow!("parsing file {:?}:{}", self.path, self.linenum)),
40            )
41        } else {
42            None
43        }
44    }
45}
46
47#[derive(Debug)]
48pub struct LogData {
49    pub path: Box<Path>,
50    pub messages: Box<[Box<[LogMessage]>]>,
51    pub evobench_log_version: u32,
52    pub evobench_version: KString,
53    pub metadata: Metadata,
54}
55
56impl LogData {
57    // Size of buffer for decompressed log data (for one chunk
58    // processed in parallel)
59    const CHUNK_SIZE_BYTES: usize = 20000000;
60
61    pub fn messages(&self) -> impl DoubleEndedIterator<Item = &LogMessage> + FusedIterator {
62        self.messages.iter().flatten()
63    }
64
65    /// `path` must end in `.log` or `.zstd`. Decompresses the latter
66    /// transparently. Currently not doing streaming with the parsed
67    /// results, the in-memory representation is larger than the
68    /// file.
69    // Note: you can find versions of this function reading from
70    // `decompressed_file` instead of using mmap in the Git history,
71    // in parallel and before-parallel versions.
72    pub fn read_file(path: &Path, uncompressed_path: Option<&Path>) -> Result<Self> {
73        let input = decompressed_file_mmap(path, uncompressed_path, Some("log"))?;
74
75        let mut items = IterWithLineAndByteCount::new(input.split(|b| *b == b'\n'), path);
76
77        let msg = items
78            .next()
79            .ok_or_else(|| anyhow!("missing the first message in {path:?}"))??;
80        if let LogMessage::Start {
81            evobench_log_version,
82            evobench_version,
83        } = msg
84        {
85            let msg = items
86                .next()
87                .ok_or_else(|| anyhow!("missing the second message in {path:?}"))??;
88            if let LogMessage::Metadata(metadata) = msg {
89                // Results from chunks processing as they come in
90                let results: Mutex<Vec<Option<Result<Box<[LogMessage]>>>>> = Default::default();
91                let results_ref = &results;
92
93                rayon::scope(|scope| -> Result<()> {
94                    let mut rest = &input[items.bytepos..];
95                    let mut current_chunk_index = 0;
96
97                    while !rest.is_empty() {
98                        let buf = &rest[..Self::CHUNK_SIZE_BYTES.min(rest.len())];
99                        // Find the last line break
100                        let (i, _) = buf
101                            .iter()
102                            .rev()
103                            .enumerate()
104                            .find(|(_, b)| **b == b'\n')
105                            .ok_or_else(|| {
106                                anyhow!(
107                                    "missing a line break in chunk {current_chunk_index} (size {}) \
108                                     in file {path:?}",
109                                    buf.len()
110                                )
111                            })?;
112                        let cutoff = buf.len() - i;
113                        let buf = &buf[0..cutoff];
114                        rest = &rest[cutoff..];
115
116                        scope.spawn({
117                            let chunk_index = current_chunk_index;
118                            move |_scope| {
119                                let r = (|| -> Result<Box<[LogMessage]>> {
120                                    let mut items = IterWithLineAndByteCount::new(
121                                        buf.trim_ascii_end().split(|b| *b == b'\n'),
122                                        path,
123                                    );
124
125                                    let mut messages = Vec::new();
126                                    while let Some(msg) = items.next() {
127                                        messages.push(msg?);
128                                    }
129                                    Ok(messages.into())
130                                })();
131                                let mut results = results_ref.lock().expect("no panics");
132                                _ = results.auto_get_mut(chunk_index, || None).insert(r);
133                            }
134                        });
135                        current_chunk_index += 1;
136                    }
137                    Ok(())
138                })?;
139
140                let messages: Vec<Box<[LogMessage]>> = results
141                    .into_inner()?
142                    .into_iter()
143                    .enumerate()
144                    .map(|(i, o)| {
145                        if let Some(r) = o {
146                            r.with_context(|| anyhow!("chunk {i}"))
147                        } else {
148                            bail!("chunk {i} has not reported a result, did it panic?")
149                        }
150                    })
151                    .collect::<Result<_>>()?;
152
153                let last = (&messages)
154                    .last()
155                    .ok_or_else(|| anyhow!("log file {path:?} contains no data, and misses TEnd"))?
156                    .last()
157                    .ok_or_else(|| {
158                        anyhow!(
159                            "processing log file {path:?}: missing a value in last chunk result"
160                        )
161                    })?;
162
163                if let LogMessage::TEnd(_) = last {
164                    // OK
165                } else {
166                    bail!("log file {path:?} does not end with TEnd, it was cut off")
167                }
168
169                Ok(LogData {
170                    path: path.into(),
171                    messages: messages.into(),
172                    evobench_log_version,
173                    evobench_version,
174                    metadata,
175                })
176            } else {
177                bail!("second message is not a `Metadata` message: {msg:?}")
178            }
179        } else {
180            bail!("first message is not a `Start` message: {msg:?}")
181        }
182    }
183}