evobench_tools/evaluator/data/
log_data_tree.rs

1//! Recreated call tree representation for scoped probes
2//!
3//! `LogDataTree::from_logdata` both pairs the start and end timings
4//! and builds up a tree of all spans. Also indexes spans by probe name.
5//!
6//! `Timing` and contextual info remains in the parsed log file
7//! (sequence of `LogMessage`, now via `Box<[Box<[LogMessage]>]>`),
8//! the index just references into those.
9
10use std::{
11    collections::{BTreeMap, HashMap, btree_map, hash_map::Entry},
12    fmt::{Display, Write},
13    marker::PhantomData,
14    num::NonZeroU32,
15    ops::Deref,
16};
17
18use anyhow::{Result, anyhow, bail};
19
20use crate::{
21    debug,
22    evaluator::data::{
23        log_data::LogData,
24        log_message::{DataMessage, KeyValue, PointKind, ThreadId, Timing},
25    },
26    utillib::micro_vec::MicroVec,
27};
28
29#[derive(Debug)]
30pub struct LogDataTree<'t> {
31    log_data: &'t LogData,
32    spans: Vec<Span<'t>>,
33    /// For a probe name, all the spans in sequence as occurring in
34    /// the log file (which isn't necessarily by time when multiple
35    /// threads are running), regardless of thread and their `parent`
36    /// inside the thread.
37    spans_by_pn: HashMap<&'t str, Vec<SpanId<'t>>>,
38}
39
40macro_rules! def_log_data_index_id {
41    { {$TId:tt $($TIdLifetime:tt)*} | $T:tt | $db_field:tt | $add_method:tt } => {
42        #[derive(Debug, Clone, Copy)]
43        pub struct $TId $($TIdLifetime)* {
44            t: PhantomData<fn() -> $T $($TIdLifetime)* >,
45            id: NonZeroU32,
46        }
47
48        impl $($TIdLifetime)* $TId $($TIdLifetime)* {
49            fn new(index: usize) -> Self {
50                let id: u32 = index.try_into().expect("index not outside u32 range");
51                let id: NonZeroU32 = id.try_into().expect("index 1-based");
52                Self { id, t: PhantomData::default() }
53            }
54
55            // We use len after insertion as the id, so that id 0 is
56            // never used, so that Option is cheap.
57            fn index(self) -> usize { usize::try_from(u32::from(self.id)).unwrap() - 1 }
58
59            pub fn get_from_db<'d>(self, db: &'d LogDataTree<'t>) -> &'d $T$($TIdLifetime)*
60                // XX are these even required or helpful?:
61                where 't: 'd
62            {
63                &db.$db_field[self.index()]
64            }
65
66            pub fn get_mut_from_db<'d>(self, db: &'d mut LogDataTree<'t>) -> &'d mut $T$($TIdLifetime)*
67                where 't: 'd
68            {
69                &mut db.$db_field[self.index()]
70            }
71        }
72
73        impl<'t> LogDataTree<'t> {
74            pub fn $add_method(&mut self, value: $T $($TIdLifetime)*) -> $TId $($TIdLifetime)* {
75                self.$db_field.push(value);
76                $TId::new(self.$db_field.len())
77            }
78        }
79    }
80}
81
82def_log_data_index_id! {{SpanId<'t>} | Span | spans | add_span }
83
84#[derive(Debug, Clone, Copy, PartialEq)]
85pub enum ScopeKind {
86    Process,
87    Thread,
88    Scope,
89}
90
91/// How a point log message should be handled
92enum PointDispatch {
93    Scope { kind: ScopeKind, is_ending: bool },
94    T,
95    TIO,
96}
97
98impl PointDispatch {
99    pub fn from_point_kind(kind: PointKind) -> Self {
100        use ScopeKind::*;
101        match kind {
102            PointKind::TStart => PointDispatch::Scope {
103                kind: Process,
104                is_ending: false,
105            },
106            PointKind::T => PointDispatch::T,
107            PointKind::TS => PointDispatch::Scope {
108                kind: Scope,
109                is_ending: false,
110            },
111            PointKind::TE => PointDispatch::Scope {
112                kind: Scope,
113                is_ending: true,
114            },
115            PointKind::TThreadStart => PointDispatch::Scope {
116                kind: Thread,
117                is_ending: false,
118            },
119            PointKind::TThreadEnd => PointDispatch::Scope {
120                kind: Thread,
121                is_ending: true,
122            },
123            PointKind::TEnd => PointDispatch::Scope {
124                kind: Process,
125                is_ending: true,
126            },
127            PointKind::TIO => PointDispatch::TIO,
128        }
129    }
130}
131
132#[derive(Debug, Clone, Copy)]
133pub enum SpanData<'t> {
134    /// Process and tread creation and destruction, as well as
135    /// `EVOBENCH_SCOPE`, end by message from destructor
136    Scope {
137        kind: ScopeKind,
138        /// The internally-allocated thread number, 0-based
139        thread_number: ThreadNumber,
140        start: &'t Timing,
141        /// Option just because we allocate the Scope before we get the
142        /// closing Timing, as we need it as parent for inner scopes. All
143        /// `end` fields should be set by the time `from_logdata`
144        /// finishes.
145        end: Option<&'t Timing>,
146    },
147    /// `EVOBENCH_KEY_VALUE`, scoped from issue to the next end of a
148    /// `EVOBENCH_SCOPE`
149    KeyValue(&'t KeyValue),
150}
151
152#[derive(Debug)]
153pub struct Span<'t> {
154    pub parent: Option<SpanId<'t>>,
155    pub children: MicroVec<SpanId<'t>>,
156    pub data: SpanData<'t>,
157}
158
159pub struct PathStringOptions {
160    pub normal_separator: &'static str,
161    pub reverse_separator: &'static str,
162    /// Stop when reaching a `ScopeKind::Process`
163    pub ignore_process: bool,
164    /// Skip showing the process completely (`ignore_process` just
165    /// stops before it and shows a placeholder)
166    pub skip_process: bool,
167    /// Stop when reaching a `ScopeKind::Thread`
168    pub ignore_thread: bool,
169    /// Add thread number (0..) in path strings
170    pub include_thread_number_in_path: bool,
171    /// Whether to show the top of the tree left (default) or the leafs left (reversed)
172    pub reversed: bool,
173    /// A prefix to distinguish this kind of path from others (feel
174    /// free to use ""). Only used with `ignore_process` and
175    /// `ignore_thread`!
176    pub prefix: &'static str,
177}
178
179impl<'t> Span<'t> {
180    /// Also returns the `ScopeKind`, since you want to verify that at
181    /// the same time as mutating the `end` field.
182    pub fn end_mut(&mut self) -> Option<(&mut Option<&'t Timing>, ScopeKind)> {
183        match &mut self.data {
184            SpanData::Scope {
185                kind,
186                start: _,
187                thread_number: _,
188                end,
189            } => Some((end, *kind)),
190            SpanData::KeyValue(_) => None,
191        }
192    }
193
194    /// Checks that the `pn` on the start and end timings
195    /// match. Panics if they don't.
196    pub fn assert_consistency(&self) {
197        match &self.data {
198            SpanData::Scope {
199                kind: _,
200                start,
201                thread_number: _,
202                end,
203            } => {
204                assert_eq!(start.pn, end.unwrap().pn)
205            }
206            SpanData::KeyValue(_) => todo!(),
207        }
208    }
209
210    pub fn pn(&self) -> Option<&'t str> {
211        match &self.data {
212            SpanData::Scope {
213                kind: _,
214                start,
215                thread_number: _,
216                end: _,
217            } => Some(&start.pn),
218            SpanData::KeyValue(_) => None,
219        }
220    }
221
222    /// Show the path to a node in the tree (towards the right, show
223    /// the child node; can also be in reverse (via opts): towards the
224    /// right, show the parents up the tree). `out_prefix` receives
225    /// the prefix (always meant to be shown on the left), `out_main`
226    /// receives the main part of the path (in reversed or normal
227    /// form). The outputs are *not* cleared by this method! The idea
228    /// is to `out_prefix.push_str(&out_main)` after this call, then
229    /// clear both buffers before re-using them.
230    pub fn path_string(
231        &self,
232        opts: &PathStringOptions,
233        db: &LogDataTree<'t>,
234        out_prefix: &mut String,
235        out_main: &mut String,
236    ) {
237        //
238        let PathStringOptions {
239            ignore_process,
240            skip_process,
241            ignore_thread,
242            include_thread_number_in_path,
243            reversed,
244            prefix,
245            normal_separator,
246            reverse_separator,
247        } = opts;
248        // Stop recursion via opts?--XX how useful is this even, have
249        // display below, too ("P:" etc.).
250        match &self.data {
251            SpanData::Scope {
252                kind,
253                thread_number,
254                start: _,
255                end: _,
256            } => match kind {
257                ScopeKind::Process => {
258                    if *skip_process {
259                        return;
260                    }
261                    if *ignore_process {
262                        // Show this as "main thread", not "process",
263                        // because Timing currently still contains
264                        // `RUSAGE_THREAD` data in this context, too!
265                        // And there is no thread start message for
266                        // that thread, too, so data would be missing
267                        // if not using that as main thread data.
268                        out_prefix.push_str(prefix);
269                        out_main.push_str("main thread");
270                        return;
271                    }
272                }
273                ScopeKind::Thread => {
274                    if *ignore_thread {
275                        out_prefix.push_str(prefix);
276                        if *include_thread_number_in_path {
277                            out_main
278                                .write_fmt(format_args!("{thread_number}"))
279                                .expect("string writes don't fail");
280                        } else {
281                            out_main.push_str("thread");
282                        };
283                        return;
284                    }
285                }
286                ScopeKind::Scope => (),
287            },
288            SpanData::KeyValue(_) => (),
289        }
290
291        let push_self = |out_prefix: &mut String, out_main: &mut String| {
292            match &self.data {
293                SpanData::Scope {
294                    kind,
295                    thread_number,
296                    start,
297                    end: _,
298                } => {
299                    match kind {
300                        ScopeKind::Process => {
301                            out_prefix.push_str("P:");
302                        }
303                        ScopeKind::Thread => {
304                            // Push to out_prefix ? But, we're not at the
305                            // end, so no--XX or what options do we have?
306                            out_main.push_str("T:");
307                            if *include_thread_number_in_path {
308                                out_main.push_str(&thread_number.to_string());
309                            }
310                        }
311                        ScopeKind::Scope => (),
312                    }
313                    let pn = &start.pn;
314                    out_main.push_str(pn);
315                }
316                SpanData::KeyValue(KeyValue { tid: _, k, v }) => {
317                    out_main.push_str(k);
318                    out_main.push_str("=");
319                    out_main.push_str(v);
320                }
321            }
322        };
323
324        if let Some(parent_id) = self.parent {
325            let parent = parent_id.get_from_db(db);
326            if *reversed {
327                push_self(out_prefix, out_main);
328                out_main.push_str(reverse_separator);
329                parent.path_string(opts, db, out_prefix, out_main);
330            } else {
331                parent.path_string(opts, db, out_prefix, out_main);
332                out_main.push_str(normal_separator);
333                push_self(out_prefix, out_main);
334            }
335        } else {
336            push_self(out_prefix, out_main);
337        }
338    }
339
340    #[inline]
341    pub fn start_and_end(&self) -> Option<(&'t Timing, &'t Timing)> {
342        match &self.data {
343            SpanData::Scope {
344                kind: _,
345                thread_number: _,
346                start,
347                end,
348            } => Some((*start, end.expect("properly balanced spans"))),
349            SpanData::KeyValue(_) => None,
350        }
351    }
352}
353
354#[derive(Debug, Clone, Copy, PartialEq)]
355pub struct ThreadNumber(u32);
356
357impl Deref for ThreadNumber {
358    type Target = u32;
359
360    fn deref(&self) -> &Self::Target {
361        &self.0
362    }
363}
364
365impl Display for ThreadNumber {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.write_fmt(format_args!("thread{:02}", self.0))
368    }
369}
370
371/// Map from thread id to internally-allocated thread_number, both for
372/// correctness (in case a tid is re-used), as well as for more
373/// consistent output (try to have the same numbers across benchmark
374/// runs, although this depends on the order of thread creation
375/// (including their initialization messages) remaining the
376/// same).
377struct ThreadIdMapper {
378    current_thread_number: u32,
379    // Mappings are removed here when a thread ends! To enforce a
380    // new mapping when the same ThreadId shows up again.
381    thread_number_by_thread_id: HashMap<ThreadId, ThreadNumber>,
382}
383
384impl ThreadIdMapper {
385    fn new() -> Self {
386        Self {
387            current_thread_number: 0,
388            thread_number_by_thread_id: HashMap::new(),
389        }
390    }
391
392    /// Automatically inserts a mapping if there is none yet
393    fn to_thread_number(&mut self, thread_id: ThreadId) -> ThreadNumber {
394        match self.thread_number_by_thread_id.entry(thread_id) {
395            Entry::Occupied(occupied_entry) => *occupied_entry.get(),
396            Entry::Vacant(vacant_entry) => {
397                let thread_number = ThreadNumber(self.current_thread_number);
398                self.current_thread_number += 1;
399                vacant_entry.insert(thread_number);
400                thread_number
401            }
402        }
403    }
404
405    /// NOTE: mappings are to be removed during parsing when a thread
406    /// ends, so that when the same ThreadId is re-used, it gets a new
407    /// mapping
408    fn remove_thread_id(&mut self, thread_id: ThreadId) -> Option<ThreadNumber> {
409        self.thread_number_by_thread_id.remove(&thread_id)
410    }
411}
412
413impl<'t> LogDataTree<'t> {
414    /// For dev use: get info on the size of the leaf `Vec`s of
415    /// `spans_by_pn`
416    pub fn spans_by_pn_stats(&self) -> BTreeMap<usize, usize> {
417        let mut map: BTreeMap<usize, usize> = Default::default();
418        for vec in self.spans_by_pn.values() {
419            let len = vec.len();
420            match map.entry(len) {
421                btree_map::Entry::Vacant(vacant_entry) => {
422                    vacant_entry.insert(1);
423                }
424                btree_map::Entry::Occupied(occupied_entry) => {
425                    *occupied_entry.into_mut() += 1;
426                }
427            }
428        }
429        map
430    }
431
432    pub fn spans_children_stats(&self) -> BTreeMap<usize, usize> {
433        let mut map: BTreeMap<usize, usize> = Default::default();
434        for span in &self.spans {
435            let len = span.children.len();
436            match map.entry(len) {
437                btree_map::Entry::Vacant(vacant_entry) => {
438                    vacant_entry.insert(1);
439                }
440                btree_map::Entry::Occupied(occupied_entry) => {
441                    *occupied_entry.into_mut() += 1;
442                }
443            }
444        }
445        map
446    }
447
448    pub fn from_logdata(log_data: &'t LogData) -> Result<Self> {
449        let mut slf = Self {
450            log_data,
451            spans: Default::default(),
452            spans_by_pn: Default::default(),
453        };
454
455        let mut thread_id_mapper = ThreadIdMapper::new();
456        let mut start_by_thread: HashMap<ThreadId, Vec<SpanId<'t>>> = HashMap::new();
457
458        for message in log_data.messages() {
459            match message.data_message() {
460                DataMessage::KeyValue(kv) => {
461                    // Make it a Span
462                    let mut span_with_parent = |parent| -> SpanId<'t> {
463                        slf.add_span(Span {
464                            data: SpanData::KeyValue(kv),
465                            parent,
466                            children: Default::default(),
467                        })
468                    };
469                    match start_by_thread.entry(kv.tid) {
470                        Entry::Occupied(mut e) => {
471                            let opt_parent_id: Option<SpanId<'t>> = e.get().last().copied();
472                            let span_id = span_with_parent(opt_parent_id);
473                            if let Some(parent_id) = opt_parent_id {
474                                // Add us, span_id, to the parent's child list.
475                                let parent = parent_id.get_mut_from_db(&mut slf);
476                                parent.children.push(span_id);
477                            }
478                            e.get_mut().push(span_id);
479                        }
480                        Entry::Vacant(_e) => {
481                            bail!(
482                                "KeyValue must be below some span (but creating a thread counts, too)"
483                            )
484                        }
485                    }
486                }
487                DataMessage::Timing(kind, timing) => {
488                    match PointDispatch::from_point_kind(kind) {
489                        // Process / thread / scope start
490                        PointDispatch::Scope {
491                            kind,
492                            is_ending: false,
493                        } => {
494                            let mut scope_with_parent = |parent| -> SpanId<'t> {
495                                let thread_number = thread_id_mapper.to_thread_number(timing.tid);
496                                slf.add_span(Span {
497                                    data: SpanData::Scope {
498                                        kind,
499                                        thread_number,
500                                        start: timing,
501                                        end: None,
502                                    },
503                                    parent,
504                                    children: Default::default(),
505                                })
506                            };
507                            match start_by_thread.entry(timing.tid) {
508                                Entry::Occupied(mut e) => {
509                                    let parent: Option<SpanId<'t>> = e.get().last().copied();
510                                    e.get_mut().push(scope_with_parent(parent));
511                                }
512                                Entry::Vacant(e) => {
513                                    e.insert(vec![scope_with_parent(None)]);
514                                }
515                            }
516                        }
517
518                        // Process / thread / scope end
519                        PointDispatch::Scope {
520                            kind,
521                            is_ending: true,
522                        } => match start_by_thread.entry(timing.tid) {
523                            Entry::Occupied(mut e) => loop {
524                                let span_id = e.get_mut().pop().ok_or_else(|| {
525                                    anyhow!("missing messages incl. TS before TE for thread")
526                                })?;
527                                let span = span_id.get_mut_from_db(&mut slf);
528
529                                if let Some((end, opening_scope_kind)) = span.end_mut() {
530                                    if opening_scope_kind != kind {
531                                        // XX line location report
532                                        bail!(
533                                            "expected closing of scope kind \
534                                             {opening_scope_kind:?}, \
535                                             but got {kind:?} ({span:?} vs. message \
536                                             {message:?})"
537                                        )
538                                    }
539
540                                    *end = Some(timing);
541                                    span.assert_consistency();
542
543                                    let pn = span.pn().expect("scopes have a pn");
544                                    match slf.spans_by_pn.entry(pn) {
545                                        Entry::Occupied(mut e) => {
546                                            e.get_mut().push(span_id);
547                                        }
548                                        Entry::Vacant(e) => {
549                                            e.insert(vec![span_id]);
550                                        }
551                                    }
552
553                                    if kind == ScopeKind::Thread {
554                                        thread_id_mapper.remove_thread_id(timing.tid);
555                                    }
556
557                                    break;
558                                }
559                                // else: it was no Scope, go on pop
560                                // the next frame in the next loop
561                                // iteration.
562                            },
563                            Entry::Vacant(_e) => {
564                                // XX line location report
565                                bail!("should never happen as TS comes before TE")
566                            }
567                        },
568
569                        PointDispatch::T => (),   // XX
570                        PointDispatch::TIO => (), // XX
571                    }
572                }
573            }
574        }
575
576        debug!("spans_by_pn_stats={:?}", slf.spans_by_pn_stats());
577        debug!("spans_children_stats={:?}", slf.spans_children_stats());
578
579        Ok(slf)
580    }
581
582    pub fn log_data(&self) -> &'t LogData {
583        self.log_data
584    }
585
586    pub fn probe_names(&self) -> Vec<&'t str> {
587        let mut probe_names: Vec<&'t str> = self.spans_by_pn.keys().copied().collect();
588        probe_names.sort();
589        probe_names
590    }
591
592    pub fn spans(&self) -> &[Span<'t>] {
593        &self.spans
594    }
595
596    pub fn span_ids(&self) -> impl Iterator<Item = SpanId<'t>> {
597        (1..=self.spans.len()).map(SpanId::new)
598    }
599
600    pub fn spans_by_pn<'s>(&'s self, pn: &str) -> Option<&'s [SpanId<'t>]> {
601        self.spans_by_pn.get(pn).map(AsRef::as_ref)
602    }
603}