noodles_bgzf/async/
writer.rs1mod builder;
4pub(crate) mod deflate;
5mod deflater;
6
7use std::{
8 pin::Pin,
9 task::{ready, Context, Poll},
10};
11
12use bytes::{Buf, Bytes, BytesMut};
13use futures::{sink::Buffer, Sink};
14use pin_project_lite::pin_project;
15use tokio::io::{self, AsyncWrite};
16
17pub use self::builder::Builder;
18use self::{deflate::Deflate, deflater::Deflater};
19use crate::writer::MAX_BUF_SIZE;
20
21#[cfg(feature = "libdeflate")]
22type CompressionLevel = libdeflater::CompressionLvl;
23#[cfg(not(feature = "libdeflate"))]
24type CompressionLevel = flate2::Compression;
25
26pin_project! {
27 pub struct Writer<W> {
29 #[pin]
30 sink: Buffer<Deflater<W>, Deflate>,
31 buf: BytesMut,
32 #[pin]
33 eof_buf: Bytes,
34 compression_level: CompressionLevel,
35 }
36}
37
38impl<W> Writer<W>
39where
40 W: AsyncWrite + Unpin,
41{
42 pub fn new(inner: W) -> Self {
51 Builder::default().build_from_writer(inner)
52 }
53
54 pub fn into_inner(self) -> W {
64 self.sink.into_inner().into_inner()
65 }
66
67 fn remaining(&self) -> usize {
68 MAX_BUF_SIZE - self.buf.len()
69 }
70
71 fn has_remaining(&self) -> bool {
72 self.buf.len() < MAX_BUF_SIZE
73 }
74}
75
76impl<W> AsyncWrite for Writer<W>
77where
78 W: AsyncWrite + Unpin,
79{
80 fn poll_write(
81 mut self: Pin<&mut Self>,
82 cx: &mut Context<'_>,
83 buf: &[u8],
84 ) -> Poll<io::Result<usize>> {
85 if !self.has_remaining() {
86 if let Err(e) = ready!(self.as_mut().poll_flush(cx)) {
87 return Poll::Ready(Err(e));
88 }
89 }
90
91 let amt = self.remaining().min(buf.len());
92 self.as_mut().buf.extend_from_slice(&buf[..amt]);
93
94 Poll::Ready(Ok(amt))
95 }
96
97 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
98 let mut this = self.project();
99
100 if this.buf.is_empty() {
101 return Poll::Ready(Ok(()));
102 }
103
104 ready!(this.sink.as_mut().poll_ready(cx))?;
105
106 let buf = this.buf.split();
107 this.sink
108 .as_mut()
109 .start_send(Deflate::new(buf, *this.compression_level))?;
110
111 Poll::Ready(Ok(()))
112 }
113
114 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
115 ready!(self.as_mut().poll_flush(cx))?;
116
117 let mut this = self.project();
118 let mut sink = this.sink.as_mut();
119
120 ready!(sink.as_mut().poll_close(cx))?;
121
122 let mut inner = sink.get_mut().get_mut().get_mut();
123
124 while this.eof_buf.has_remaining() {
125 let bytes_written = ready!(Pin::new(&mut inner).poll_write(cx, this.eof_buf.chunk()))?;
126
127 this.eof_buf.advance(bytes_written);
128
129 if bytes_written == 0 {
130 return Poll::Ready(Err(io::Error::from(io::ErrorKind::WriteZero)));
131 }
132 }
133
134 Poll::Ready(Ok(()))
135 }
136}