noodles_bgzf/async/
writer.rs

1//! Async BGZF writer.
2
3mod builder;
4pub(crate) mod deflate;
5mod deflater;
6
7use std::{
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11
12use bytes::{Buf, Bytes, BytesMut};
13use futures::{sink::Buffer, Sink};
14use pin_project_lite::pin_project;
15use tokio::io::{self, AsyncWrite};
16
17pub use self::builder::Builder;
18use self::{deflate::Deflate, deflater::Deflater};
19use crate::writer::MAX_BUF_SIZE;
20
21#[cfg(feature = "libdeflate")]
22type CompressionLevel = libdeflater::CompressionLvl;
23#[cfg(not(feature = "libdeflate"))]
24type CompressionLevel = flate2::Compression;
25
26pin_project! {
27    /// An async BGZF writer.
28    pub struct Writer<W> {
29        #[pin]
30        sink: Buffer<Deflater<W>, Deflate>,
31        buf: BytesMut,
32        #[pin]
33        eof_buf: Bytes,
34        compression_level: CompressionLevel,
35    }
36}
37
38impl<W> Writer<W>
39where
40    W: AsyncWrite + Unpin,
41{
42    /// Creates an async BGZF writer with a default compression level.
43    ///
44    /// # Examples
45    ///
46    /// ```
47    /// use noodles_bgzf as bgzf;
48    /// let writer = bgzf::r#async::Writer::new(Vec::new());
49    /// ```
50    pub fn new(inner: W) -> Self {
51        Builder::default().build_from_writer(inner)
52    }
53
54    /// Returns the underlying writer.
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use noodles_bgzf as bgzf;
60    /// let writer = bgzf::r#async::Writer::new(Vec::new());
61    /// assert!(writer.into_inner().is_empty());
62    /// ```
63    pub fn into_inner(self) -> W {
64        self.sink.into_inner().into_inner()
65    }
66
67    fn remaining(&self) -> usize {
68        MAX_BUF_SIZE - self.buf.len()
69    }
70
71    fn has_remaining(&self) -> bool {
72        self.buf.len() < MAX_BUF_SIZE
73    }
74}
75
76impl<W> AsyncWrite for Writer<W>
77where
78    W: AsyncWrite + Unpin,
79{
80    fn poll_write(
81        mut self: Pin<&mut Self>,
82        cx: &mut Context<'_>,
83        buf: &[u8],
84    ) -> Poll<io::Result<usize>> {
85        if !self.has_remaining() {
86            if let Err(e) = ready!(self.as_mut().poll_flush(cx)) {
87                return Poll::Ready(Err(e));
88            }
89        }
90
91        let amt = self.remaining().min(buf.len());
92        self.as_mut().buf.extend_from_slice(&buf[..amt]);
93
94        Poll::Ready(Ok(amt))
95    }
96
97    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
98        let mut this = self.project();
99
100        if this.buf.is_empty() {
101            return Poll::Ready(Ok(()));
102        }
103
104        ready!(this.sink.as_mut().poll_ready(cx))?;
105
106        let buf = this.buf.split();
107        this.sink
108            .as_mut()
109            .start_send(Deflate::new(buf, *this.compression_level))?;
110
111        Poll::Ready(Ok(()))
112    }
113
114    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
115        ready!(self.as_mut().poll_flush(cx))?;
116
117        let mut this = self.project();
118        let mut sink = this.sink.as_mut();
119
120        ready!(sink.as_mut().poll_close(cx))?;
121
122        let mut inner = sink.get_mut().get_mut().get_mut();
123
124        while this.eof_buf.has_remaining() {
125            let bytes_written = ready!(Pin::new(&mut inner).poll_write(cx, this.eof_buf.chunk()))?;
126
127            this.eof_buf.advance(bytes_written);
128
129            if bytes_written == 0 {
130                return Poll::Ready(Err(io::Error::from(io::ErrorKind::WriteZero)));
131            }
132        }
133
134        Poll::Ready(Ok(()))
135    }
136}