evobench_tools/io_utils/
output_capture_log.rs1use std::{
10 borrow::Cow,
11 ffi::OsStr,
12 fs::File,
13 io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
14 path::{Path, PathBuf},
15 process::{Command, ExitStatus, Stdio},
16 sync::{Arc, Mutex, atomic::Ordering},
17 thread::{Scope, ScopedJoinHandle},
18};
19
20use anyhow::{Result, anyhow};
21
22use crate::{
23 ctx,
24 serde_types::date_and_time::{DateTimeWithOffset, LOCAL_TIME},
25};
26
27use super::bash::bash_string_from_cmd;
28
29pub fn get_cmd_and_args(cmd: &Command) -> Vec<Cow<'_, str>> {
31 let prog_name = cmd.get_program().to_string_lossy();
32 let mut args: Vec<_> = cmd
33 .get_args()
34 .map(|s: &OsStr| s.to_string_lossy())
35 .collect();
36 let mut cmd_and_args = vec![prog_name];
37 cmd_and_args.append(&mut args);
38 cmd_and_args
39}
40
41pub fn get_cmd_and_args_as_bash_string(cmd: &Command) -> String {
42 bash_string_from_cmd(get_cmd_and_args(cmd))
43}
44
45pub fn new_proxy_thread<'scope, 'file, 'm, F: Read + Send + 'static>(
46 scope: &'scope Scope<'scope, 'file>,
47 child_output: F,
48 main_file: Arc<Mutex<File>>,
49 other_files: Arc<Mutex<Vec<Box<dyn Write + 'static + Send>>>>,
50 source_indicator: Option<&'file str>,
51 add_timestamp: bool,
52) -> Result<ScopedJoinHandle<'scope, Result<()>>>
53where
54 'file: 'scope,
55 'file: 'm,
56 'm: 'scope,
57{
58 let mut child_output = BufReader::new(child_output);
59 std::thread::Builder::new()
60 .name("output proxy".into())
61 .spawn_scoped(scope, move || -> Result<()> {
62 let local_time = Some(LOCAL_TIME.load(Ordering::Relaxed));
63 let mut input_line = String::new();
67 let mut line = String::new();
68 while child_output.read_line(&mut input_line)? > 0 {
69 {
70 line.clear();
71 if add_timestamp {
72 line.push_str(&DateTimeWithOffset::now(local_time).into_string());
73 line.push_str("\t");
74 }
75 if let Some(source_indicator) = source_indicator.as_ref() {
76 line.push_str(source_indicator);
77 line.push_str("\t");
78 }
79 line.push_str(&input_line);
80 input_line.clear();
81 }
82 if !line.ends_with("\n") {
83 line.push_str("\n");
84 }
85 let mut output = main_file.lock().expect("no panics in proxy threads");
86 output.write_all(line.as_bytes())?;
87 {
88 let mut outputs = other_files.lock().expect("no panics in proxy threads");
89 for output in outputs.iter_mut() {
90 output.write_all(line.as_bytes())?;
91 }
92 }
93 }
94 Ok(())
95 })
96 .map_err(move |e| anyhow!("{e:#}"))
97}
98
99#[derive(Clone, Debug)]
100pub struct CaptureOptions {
101 pub add_source_indicator: bool,
102 pub add_timestamp: bool,
103}
104
105#[derive(Debug)]
106pub struct OutputCaptureLog {
107 path: PathBuf,
108 file: Arc<Mutex<File>>,
109}
110
111impl OutputCaptureLog {
112 pub fn create(path: &Path) -> Result<Self> {
113 let file =
114 File::create(path).map_err(ctx!("opening OutputCaptureLog {path:?} for writing"))?;
115 let path = path.to_owned();
116
117 Ok(Self {
118 path,
119 file: Arc::new(Mutex::new(file)),
120 })
121 }
122
123 pub fn last_part(&self, len: u16) -> Result<String> {
126 let mut v = Vec::new();
127 let have_all;
128 {
129 let mut file =
135 File::open(&self.path).map_err(ctx!("re-opening {:?} for reading", self.path))?;
136 let meta = file.metadata().map_err(ctx!("metadata"))?;
139 let existing_len = meta.len();
140 let offset = if let Some(offset) = existing_len.checked_sub(u64::from(len)) {
141 have_all = false;
142 offset
143 } else {
144 have_all = true;
145 0
146 };
147 file.seek(SeekFrom::Start(offset)).map_err(ctx!("seek"))?;
148 file.read_to_end(&mut v)
149 .map_err(ctx!("reading {:?}", self.path))?;
150 }
151 let s = String::from_utf8_lossy(&v);
152 if have_all {
153 Ok(s.into())
154 } else {
155 Ok(format!("...\n{s}"))
156 }
157 }
158
159 pub fn write_str(&self, s: &str) -> Result<()> {
163 self.file
164 .lock()
165 .expect("no panics")
166 .write_all(s.as_bytes())
167 .map_err(ctx!("writing to {:?}", self.path))
168 }
169
170 pub fn run_with_capture<'a: 'file, 'file>(
173 &self,
174 mut cmd: Command,
175 other_files: Arc<Mutex<Vec<Box<dyn Write + Send + 'static>>>>,
176 opts: CaptureOptions,
177 ) -> Result<ExitStatus> {
178 let CaptureOptions {
179 add_source_indicator,
180 add_timestamp,
181 } = opts;
182
183 let mut child = cmd
184 .stdout(Stdio::piped())
185 .stderr(Stdio::piped())
186 .spawn()
187 .map_err(ctx!("running {}", get_cmd_and_args_as_bash_string(&cmd)))?;
188
189 std::thread::scope(move |scope| -> Result<ExitStatus> {
190 let stdout_thread = new_proxy_thread(
191 scope,
192 child.stdout.take().expect("configured above"),
193 self.file.clone(),
194 other_files.clone(),
195 if add_source_indicator {
196 Some("O")
197 } else {
198 None
199 },
200 add_timestamp,
201 )?;
202 let stderr_thread = new_proxy_thread(
203 scope,
204 child.stderr.take().expect("configured above"),
205 self.file.clone(),
206 other_files.clone(),
207 if add_source_indicator {
208 Some("E")
209 } else {
210 None
211 },
212 add_timestamp,
213 )?;
214
215 let status = child.wait()?;
216
217 stdout_thread
218 .join()
219 .map_err(|e| anyhow!("stdout proxy thread panicked: {e:?}"))?
220 .map_err(ctx!("stdout proxy thread"))?;
221 stderr_thread
222 .join()
223 .map_err(|e| anyhow!("stderr proxy thread panicked: {e:?}"))?
224 .map_err(ctx!("stderr proxy thread"))?;
225
226 Ok(status)
227 })
228 }
229
230 pub fn into_path(self) -> PathBuf {
231 self.path
232 }
233}