evobench_tools/utillib/
ndjson_pipe.rs1use std::{
2 io::{BufRead, BufReader, PipeReader, PipeWriter, Write, pipe},
3 marker::PhantomData,
4};
5
6use anyhow::Result;
7use serde::{Serialize, de::DeserializeOwned};
8
9pub struct NdJsonPipe<T: Serialize + DeserializeOwned> {
10 _phantom: PhantomData<fn() -> T>,
11 p: (PipeReader, PipeWriter),
12}
13
14#[derive(Debug)]
15pub struct NdJsonPipeWriter<T: Serialize + DeserializeOwned> {
16 _phantom: PhantomData<fn() -> T>,
17 w: PipeWriter,
18}
19
20impl<T: Serialize + DeserializeOwned> NdJsonPipeWriter<T> {
21 pub fn send(&mut self, msg: T) -> Result<()> {
24 let mut s = serde_json::to_string(&msg)?;
25 s.push('\n');
26 self.w.write_all(s.as_bytes())?;
27 Ok(())
28 }
29}
30
31#[derive(Debug)]
32pub struct NdJsonPipeReader<T: Serialize + DeserializeOwned> {
33 _phantom: PhantomData<fn() -> T>,
34 line: String,
35 reader: BufReader<PipeReader>,
36}
37
38impl<T: Serialize + DeserializeOwned> Iterator for NdJsonPipeReader<T> {
39 type Item = Result<T>;
40
41 fn next(&mut self) -> Option<Self::Item> {
42 (|| -> Result<Option<T>> {
43 let nread = self.reader.read_line(&mut self.line)?;
44 if nread == 0 {
45 Ok(None)
46 } else {
47 let val: T = serde_json::from_str(&self.line)?;
48 self.line.clear();
49 Ok(Some(val))
50 }
51 })()
52 .transpose()
53 }
54}
55
56impl<T: Serialize + DeserializeOwned> NdJsonPipe<T> {
57 pub fn new() -> Result<Self> {
58 let p = pipe()?;
59 Ok(Self {
60 _phantom: PhantomData,
61 p,
62 })
63 }
64
65 pub fn into_reader(self) -> NdJsonPipeReader<T> {
69 let (r, _w) = self.p;
70 NdJsonPipeReader {
71 _phantom: PhantomData,
72 line: String::new(),
73 reader: BufReader::new(r),
74 }
75 }
76
77 pub fn into_writer(self) -> NdJsonPipeWriter<T> {
81 let (_r, w) = self.p;
82 NdJsonPipeWriter {
83 _phantom: PhantomData,
84 w,
85 }
86 }
87}