use crate::sampling::Sample64;
use crate::streams::{
AddingDecoder, CompressStream, Decoder, DeltaStream, F64ToU64Stream, InflateStream, Stream,
StreamStageStats, Streamable, U64ToF64Decoder, ZigZagDecoder, ZigZagStream,
};
use alloc::sync::Arc;
use core::hash::Hash;
use irox_bits::{
Bits, BitsError, BitsErrorKind, BitsWrapper, Error, MutBits, ReadFromBEBits,
SharedCountingBits, SharedROCounter,
};
use irox_time::Time64;
use irox_tools::buf::{Buffer, RoundBuffer};
use irox_tools::codec::{GroupVarintCodeDecoder, GroupVarintCodeEncoder};
use irox_tools::map::OrderedHashMap;
use irox_tools::read::{MultiStreamReader, MultiStreamWriter, StreamWriter};
use std::path::Path;
macro_rules! new_bdc {
($writer:ident) => {
Box::new(CompressStream::new(BitsWrapper::Owned(
$writer.new_stream(),
)))
};
}
pub struct EightByteStream<'a> {
pub(crate) fb1: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb2: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb3: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb4: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb5: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb6: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb7: Box<CompressStream<'a, StreamWriter>>,
pub(crate) fb8: Box<CompressStream<'a, StreamWriter>>,
}
impl<'a> EightByteStream<'a> {
pub fn new(writer: &Arc<MultiStreamWriter>) -> Self {
Self {
fb1: new_bdc!(writer),
fb2: new_bdc!(writer),
fb3: new_bdc!(writer),
fb4: new_bdc!(writer),
fb5: new_bdc!(writer),
fb6: new_bdc!(writer),
fb7: new_bdc!(writer),
fb8: new_bdc!(writer),
}
}
pub fn written(&self) -> u64 {
let mut out = 0u64;
out = out.wrapping_add(self.fb1.written());
out = out.wrapping_add(self.fb2.written());
out = out.wrapping_add(self.fb3.written());
out = out.wrapping_add(self.fb4.written());
out = out.wrapping_add(self.fb5.written());
out = out.wrapping_add(self.fb6.written());
out = out.wrapping_add(self.fb7.written());
out = out.wrapping_add(self.fb8.written());
out
}
pub fn written_stats(&self) -> [u64; 8] {
[
self.fb1.written(),
self.fb2.written(),
self.fb3.written(),
self.fb4.written(),
self.fb5.written(),
self.fb6.written(),
self.fb7.written(),
self.fb8.written(),
]
}
}
impl<'a> Stream<u64> for EightByteStream<'a> {
fn write_value(&mut self, v: u64) -> Result<usize, Error> {
let [a, b, c, d, e, f, g, h] = v.to_be_bytes();
self.fb1.write_value(a as i8)?;
self.fb2.write_value(b as i8)?;
self.fb3.write_value(c as i8)?;
self.fb4.write_value(d as i8)?;
self.fb5.write_value(e as i8)?;
self.fb6.write_value(f as i8)?;
self.fb7.write_value(g as i8)?;
self.fb8.write_value(h as i8)?;
Ok(8)
}
fn flush(&mut self) -> Result<(), Error> {
self.fb1.flush()?;
self.fb2.flush()?;
self.fb3.flush()?;
self.fb4.flush()?;
self.fb5.flush()?;
self.fb6.flush()?;
self.fb7.flush()?;
self.fb8.flush()?;
Ok(())
}
fn written_stats(&self) -> String {
format!("{:?} = {}", self.written_stats(), self.written())
}
}
pub struct SPDPWriter<'a> {
writer: Arc<MultiStreamWriter>,
time_stream: EightByteStream<'a>,
float_stream: EightByteStream<'a>,
semi_last_value: f64,
last_value: f64,
}
pub fn rot54(value: u64) -> u64 {
let [_, b, c, d, e, f, g, h] = value.to_be_bytes();
irox_bits::FromBEBytes::from_be_bytes([0, h, g, f, e, d, c, b])
}
impl<'a> SPDPWriter<'a> {
pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
let writer = MultiStreamWriter::new(path)?;
let time_stream = EightByteStream::new(&writer);
let float_stream = EightByteStream::new(&writer);
Ok(SPDPWriter {
writer,
time_stream,
float_stream,
last_value: f64::default(),
semi_last_value: f64::default(),
})
}
pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
let Sample64 { time, value } = sample;
self.time_stream.write_value(time.as_u64())?;
let delta = value; self.semi_last_value = self.last_value;
self.last_value = *value;
self.float_stream.write_value(delta.to_bits())?;
Ok(())
}
pub fn flush(&mut self) -> Result<(), Error> {
self.time_stream.flush()?;
self.float_stream.flush()?;
Ok(())
}
pub fn len(&self) -> Result<u64, Error> {
self.writer.len()
}
pub fn is_empty(&self) -> Result<bool, Error> {
Ok(self.len()? == 0)
}
}
pub struct Rot54Stream {
writer: Box<dyn Stream<u64>>,
}
impl Rot54Stream {
pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
Self { writer }
}
}
impl Stream<u64> for Rot54Stream {
fn write_value(&mut self, value: u64) -> Result<usize, Error> {
let v = rot54(value);
self.writer.write_value(v)
}
fn flush(&mut self) -> Result<(), Error> {
self.writer.flush()
}
}
pub struct FloatSplitter {
mantissa_writer: Box<dyn Stream<u64>>,
exponent_writer: Box<dyn Stream<u64>>,
}
impl FloatSplitter {
pub fn new(
mantissa_writer: Box<dyn Stream<u64>>,
exponent_writer: Box<dyn Stream<u64>>,
) -> Self {
Self {
mantissa_writer,
exponent_writer,
}
}
}
impl Stream<f64> for FloatSplitter {
fn write_value(&mut self, value: f64) -> Result<usize, Error> {
let bits = value.to_bits();
let exponent = bits >> 52;
let mantissa = bits & 0xFFFFFFFFFFFFF;
self.mantissa_writer.write_value(mantissa)?;
self.exponent_writer.write_value(exponent)?;
Ok(8)
}
fn flush(&mut self) -> Result<(), Error> {
self.mantissa_writer.flush()?;
self.exponent_writer.flush()?;
Ok(())
}
}
pub struct GroupCodingStream<'a, T: core::hash::Hash + Eq + Streamable, B: MutBits> {
buf: RoundBuffer<4, T>,
inner: GroupVarintCodeEncoder<'a, T, B>,
}
impl<'a, T: core::hash::Hash + Eq + Default + Streamable, B: MutBits> GroupCodingStream<'a, T, B> {
pub fn new(inner: BitsWrapper<'a, B>) -> Self {
Self {
buf: RoundBuffer::new(),
inner: GroupVarintCodeEncoder::new(inner),
}
}
pub fn counter(&self) -> SharedROCounter {
self.inner.counter()
}
}
impl<'a, T: core::hash::Hash + Eq + Streamable, B: MutBits> Stream<T>
for GroupCodingStream<'a, T, B>
{
fn write_value(&mut self, value: T) -> Result<usize, Error> {
let _ = self.buf.push_back(value);
if self.buf.is_full() {
let a = self.buf.pop_front().unwrap_or_default();
let b = self.buf.pop_front().unwrap_or_default();
let c = self.buf.pop_front().unwrap_or_default();
let d = self.buf.pop_front().unwrap_or_default();
self.inner.encode_4(&[a, b, c, d])
} else {
Ok(0)
}
}
fn flush(&mut self) -> Result<(), Error> {
self.inner.flush()
}
}
impl<'a, T: core::hash::Hash + Eq + Streamable, B: MutBits> Drop for GroupCodingStream<'a, T, B> {
fn drop(&mut self) {
let len = self.buf.len();
if len > 0 {
let needed = 4 - len;
for _ in 0..needed {
let _ = self.write_value(T::default());
}
}
let _ = self.inner.flush();
}
}
pub struct GroupDecodingStream<'a, T: Hash + Eq + Default, B: Bits> {
inner: GroupVarintCodeDecoder<'a, T, B>,
buf: RoundBuffer<4, T>,
}
impl<'a, T: Hash + Eq + Default + ReadFromBEBits + Copy, B: Bits> GroupDecodingStream<'a, T, B> {
pub fn new(inner: BitsWrapper<'a, B>) -> Self {
Self {
inner: GroupVarintCodeDecoder::new(inner),
buf: RoundBuffer::new(),
}
}
}
impl<'a, T: Hash + Eq + Default + ReadFromBEBits + Copy, B: Bits> Decoder<T>
for GroupDecodingStream<'a, T, B>
{
fn next(&mut self) -> Result<Option<T>, Error> {
if self.buf.is_empty() {
let Some(val) = self.inner.decode_4()? else {
return Ok(None);
};
for v in val {
let _ = self.buf.push_back(v);
}
}
Ok(self.buf.pop_front())
}
}
pub struct CodedTimeSeriesWriter {
float_stream: Box<dyn Stream<f64>>,
time_stream: Box<dyn Stream<u64>>,
meta_stream: Box<dyn for<'a> Stream<&'a str>>,
stats: StreamStageStats,
}
impl CodedTimeSeriesWriter {
pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
let mut stats = StreamStageStats::default();
let writer = MultiStreamWriter::new(path)?;
let meta_stream = writer.new_stream();
let meta_stream = CompressStream::new(BitsWrapper::Owned(meta_stream));
let float_stream = writer.new_stream();
let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
stats.stage_counting("1.data_gz", float_stream.get_count());
let float_stream = CompressStream::new(BitsWrapper::Owned(float_stream));
let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
stats.stage_counting("2.data_vgb", float_stream.get_count());
let float_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
stats.stage_counting("3.data_codes", float_stream.counter());
let float_stream = F64ToU64Stream::new(Box::new(float_stream));
let time_stream = writer.new_stream();
let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
stats.stage_counting("1.time_gz", time_stream.get_count());
let time_stream = CompressStream::new(BitsWrapper::Owned(time_stream));
let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
stats.stage_counting("2.time_vgb", time_stream.get_count());
let time_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
stats.stage_counting("3.time_codes", time_stream.counter());
let time_stream = ZigZagStream::new(Box::new(time_stream));
let time_stream = DeltaStream::<i64>::new(Box::new(time_stream));
Ok(Self {
float_stream: Box::new(float_stream),
time_stream: Box::new(time_stream),
meta_stream: Box::new(meta_stream),
stats,
})
}
pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
let Sample64 { time, value } = sample;
self.time_stream.write_value(time.as_u64())?;
self.float_stream.write_value(*value)?;
Ok(())
}
pub fn flush(&mut self) -> Result<(), Error> {
self.time_stream.flush()?;
self.float_stream.flush()?;
Ok(())
}
pub fn written_stats(&self) -> Vec<String> {
let stats = self.float_stream.written_stats();
let mut out = self.stats.stats();
out.push(stats);
out
}
pub fn metadata<K: AsRef<str>, V: AsRef<str>>(
&mut self,
key: K,
value: V,
) -> Result<(), Error> {
let key = key.as_ref();
let value = value.as_ref();
self.meta_stream.write_value(key)?;
self.meta_stream.write_value(value)?;
Ok(())
}
}
pub enum TimeSeriesError {
BitsError(BitsError),
MissingMetadataStream,
MissingFloatStream,
MissingTimeStream,
}
impl TimeSeriesError {
pub fn name(&self) -> &'static str {
match self {
TimeSeriesError::BitsError(..) => "BitsError",
TimeSeriesError::MissingMetadataStream => "MissingMetadataStream",
TimeSeriesError::MissingFloatStream => "MissingFloatStream",
TimeSeriesError::MissingTimeStream => "MissingTimeStream",
}
}
}
impl From<BitsError> for TimeSeriesError {
fn from(e: BitsError) -> Self {
TimeSeriesError::BitsError(e)
}
}
impl From<TimeSeriesError> for BitsError {
fn from(e: TimeSeriesError) -> Self {
match e {
TimeSeriesError::BitsError(e) => e,
_ => BitsError::new(BitsErrorKind::InvalidData, e.name()),
}
}
}
pub struct CodedTimeSeriesReader {
metadata: OrderedHashMap<String, String>,
float_decoder: Box<dyn Decoder<f64>>,
time_decoder: Box<dyn Decoder<u64>>,
last_item: Option<Sample64>,
}
impl CodedTimeSeriesReader {
pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, TimeSeriesError> {
let mut reader = MultiStreamReader::open(path)?;
let mut streams = reader.drain(..);
let Some(meta_stream) = streams.next() else {
return Err(TimeSeriesError::MissingMetadataStream);
};
let mut meta_stream = InflateStream::new(BitsWrapper::Owned(meta_stream));
let mut metadata = OrderedHashMap::<String, String>::new();
while meta_stream.has_more()? {
let key = String::read_from_be_bits(&mut meta_stream)?;
let value = String::read_from_be_bits(&mut meta_stream)?;
metadata.insert(key, value);
}
let Some(float_stream) = streams.next() else {
return Err(TimeSeriesError::MissingFloatStream);
};
let float_stream = InflateStream::new(BitsWrapper::Owned(float_stream));
let float_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
let float_stream = U64ToF64Decoder::new(Box::new(float_stream));
let Some(time_stream) = streams.next() else {
return Err(TimeSeriesError::MissingTimeStream);
};
let time_stream = InflateStream::new(BitsWrapper::Owned(time_stream));
let time_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
let time_stream = ZigZagDecoder::new(Box::new(time_stream));
let time_stream = AddingDecoder::new(Box::new(time_stream));
Ok(Self {
metadata,
float_decoder: Box::new(float_stream),
time_decoder: Box::new(time_stream),
last_item: None,
})
}
pub fn metadata(&self) -> impl Iterator<Item = (&String, &String)> {
self.metadata.iter()
}
pub fn peek(&mut self) -> Result<&mut Option<Sample64>, Error> {
if self.last_item.is_some() {
Ok(&mut self.last_item)
} else {
if let Some(v) = self.next() {
let v = v?;
self.last_item = Some(v);
}
Ok(&mut self.last_item)
}
}
}
impl Iterator for CodedTimeSeriesReader {
type Item = Result<Sample64, Error>;
fn next(&mut self) -> Option<Result<Sample64, Error>> {
let r1 = self.float_decoder.next();
let r2 = self.time_decoder.next();
let float = match r1 {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
let time = match r2 {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
let float = float?;
let time = time?;
let samp = Sample64 {
value: float,
time: Time64::from_unix_raw(time),
};
self.last_item = Some(samp);
Some(Ok(samp))
}
}
#[cfg(test)]
mod tests {
use crate::tsdf::Sample64;
use crate::tsdf::{CodedTimeSeriesReader, CodedTimeSeriesWriter};
use irox_bits::Error;
use irox_time::Time64;
use irox_tools::buf::UnlimitedBuffer;
use irox_tools::random::{Random, PRNG};
use irox_units::units::duration::Duration;
use std::time::Instant;
#[test]
pub fn test() -> Result<(), Error> {
let mut data = UnlimitedBuffer::<Sample64>::new();
{
let mut file = CodedTimeSeriesWriter::new("test_file.tsd")?;
let mut input = Time64::now();
let incr = Duration::from_millis(100);
let start = Instant::now();
let count = 20_000_000u64;
let center = 100f64;
let variance = 0.001f64;
let mut rand = Random::default();
{
for _i in 0..count {
let val = rand.next_in_range(0., 4096.); let val = center + val.round() * variance - variance / 2f64;
let samp = Sample64::new(input, val);
data.push_back(samp);
file.write_sample(&samp)?;
input += incr;
}
file.flush()?;
}
let written = std::fs::metadata("test_file.tsd")?.len();
let input_size = count * 16;
let end = start.elapsed();
let ratio = 1. - (written as f64 / input_size as f64);
let ratio = ratio * 100.;
let ubps = input_size as f64 / end.as_secs_f64() / 1e6;
println!(
"Turned {input_size} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s"
);
println!("{:#?}", file.written_stats());
drop(file);
}
let mut file = CodedTimeSeriesReader::new("test_file.tsd")?;
let num_samps = data.len();
assert!(num_samps > 0);
let mut idx = 0;
loop {
let res = file.peek()?;
let Some(val) = res.take() else {
break;
};
let Some(v) = data.pop_front() else {
panic!("should not happen");
};
assert_eq!(val, v, "{idx}");
idx += 1;
}
assert_eq!(num_samps, idx);
Ok(())
}
}