noodles_cram/async/io/
writer.rs

1//! Async CRAM writer.
2
3mod builder;
4mod container;
5mod header;
6
7use noodles_fasta as fasta;
8use noodles_sam as sam;
9use tokio::io::{self, AsyncWrite};
10
11pub use self::builder::Builder;
12use self::{
13    container::write_container,
14    header::{write_file_definition, write_file_header, write_header},
15};
16use crate::{
17    io::writer::{Options, Record},
18    FileDefinition,
19};
20
21/// An async CRAM writer.
22///
23/// A call to [`Self::shutdown`] must be made before the writer is dropped.
24pub struct Writer<W> {
25    inner: W,
26    reference_sequence_repository: fasta::Repository,
27    options: Options,
28    records: Vec<Record>,
29    record_counter: u64,
30}
31
32impl<W> Writer<W> {
33    /// Returns a reference to the underlying writer.
34    ///
35    /// # Examples
36    ///
37    /// ```
38    /// use noodles_cram as cram;
39    /// use tokio::io;
40    /// let writer = cram::r#async::io::Writer::new(io::sink());
41    /// let _inner = writer.get_ref();
42    /// ```
43    pub fn get_ref(&self) -> &W {
44        &self.inner
45    }
46
47    /// Returns a mutable reference to the underlying writer.
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// use noodles_cram as cram;
53    /// use tokio::io;
54    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
55    /// let _inner = writer.get_mut();
56    /// ```
57    pub fn get_mut(&mut self) -> &mut W {
58        &mut self.inner
59    }
60
61    /// Returns the underlying writer.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use noodles_cram as cram;
67    /// use tokio::io;
68    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
69    /// let _inner = writer.into_inner();
70    /// ```
71    pub fn into_inner(self) -> W {
72        self.inner
73    }
74}
75
76impl<W> Writer<W>
77where
78    W: AsyncWrite + Unpin,
79{
80    /// Creates an async CRAM writer.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// use noodles_cram as cram;
86    /// use tokio::io;
87    /// let writer = cram::r#async::io::Writer::new(io::sink());
88    /// ```
89    pub fn new(inner: W) -> Self {
90        Builder::default().build_from_writer(inner)
91    }
92
93    /// Attempts to shutdown the output stream by writing any pending containers and a final EOF
94    /// container.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// # #[tokio::main]
100    /// # async fn main() -> tokio::io::Result<()> {
101    /// use noodles_cram as cram;
102    /// use noodles_sam as sam;
103    /// use tokio::io;
104    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
105    /// let header = sam::Header::default();
106    /// writer.shutdown(&header).await?;
107    /// # Ok(())
108    /// # }
109    /// ```
110    pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> {
111        use self::container::write_eof_container;
112        self.flush(header).await?;
113        write_eof_container(&mut self.inner).await
114    }
115
116    /// Writes a CRAM file definition.
117    ///
118    /// The file ID is set as a blank value (`[0x00; 20]`).
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// # #[tokio::main]
124    /// # async fn main() -> tokio::io::Result<()> {
125    /// use noodles_cram as cram;
126    /// use tokio::io;
127    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
128    /// writer.write_file_definition().await?;
129    /// # Ok(())
130    /// # }
131    /// ```
132    pub async fn write_file_definition(&mut self) -> io::Result<()> {
133        let file_definition = FileDefinition::new(self.options.version, Default::default());
134        write_file_definition(&mut self.inner, &file_definition).await
135    }
136
137    /// Writes a CRAM file header container.
138    ///
139    /// The position of the stream is expected to be directly after the file definition.
140    ///
141    /// Entries in the reference sequence dictionary that are missing MD5 checksums (`M5`) will
142    /// automatically be calculated and added to the written record.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// # #[tokio::main]
148    /// # async fn main() -> tokio::io::Result<()> {
149    /// use noodles_cram as cram;
150    /// use noodles_sam as sam;
151    /// use tokio::io;
152    ///
153    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
154    /// writer.write_file_definition().await?;
155    ///
156    /// let header = sam::Header::default();
157    /// writer.write_file_header(&header).await?;
158    ///
159    /// writer.shutdown(&header).await?;
160    /// # Ok(())
161    /// # }
162    /// ```
163    pub async fn write_file_header(&mut self, header: &sam::Header) -> io::Result<()> {
164        write_file_header(&mut self.inner, &self.reference_sequence_repository, header).await
165    }
166
167    /// Writes a SAM header.
168    ///
169    /// This writes the CRAM magic number, the file definition, and file header using the given SAM
170    /// header.
171    ///
172    /// ```
173    /// # #[tokio::main]
174    /// # async fn main() -> tokio::io::Result<()> {
175    /// use noodles_cram as cram;
176    /// use noodles_sam as sam;
177    /// use tokio::io;
178    ///
179    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
180    ///
181    /// let header = sam::Header::default();
182    /// writer.write_header(&header).await?;
183    ///
184    /// writer.shutdown(&header).await?;
185    /// # Ok(())
186    /// # }
187    /// ```
188    pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> {
189        let file_definition = FileDefinition::new(self.options.version, Default::default());
190
191        write_header(
192            &mut self.inner,
193            &self.reference_sequence_repository,
194            &file_definition,
195            header,
196        )
197        .await
198    }
199
200    /// Writes a CRAM record.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// # #[tokio::main]
206    /// # async fn main() -> tokio::io::Result<()> {
207    /// use noodles_cram as cram;
208    /// use noodles_sam as sam;
209    /// use tokio::io;
210    ///
211    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
212    /// writer.write_file_definition().await?;
213    ///
214    /// let header = sam::Header::default();
215    /// writer.write_file_header(&header).await?;
216    ///
217    /// let record = sam::Record::default();
218    /// writer.write_alignment_record(&header, &record).await?;
219    ///
220    /// writer.shutdown(&header).await?;
221    /// # Ok(())
222    /// # }
223    /// ```
224    pub async fn write_record(&mut self, header: &sam::Header, record: Record) -> io::Result<()> {
225        self.records.push(record);
226
227        if self.records.len() >= self.records.capacity() {
228            self.flush(header).await?;
229        }
230
231        Ok(())
232    }
233
234    /// Writes an alignment record.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// # #[tokio::main]
240    /// # async fn main() -> tokio::io::Result<()> {
241    /// use noodles_cram as cram;
242    /// use noodles_sam as sam;
243    /// use tokio::io;
244    ///
245    /// let mut writer = cram::r#async::io::Writer::new(io::sink());
246    /// writer.write_file_definition().await?;
247    ///
248    /// let header = sam::Header::default();
249    /// writer.write_file_header(&header).await?;
250    ///
251    /// let record = sam::Record::default();
252    /// writer.write_alignment_record(&header, &record).await?;
253    ///
254    /// writer.shutdown(&header).await?;
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn write_alignment_record(
259        &mut self,
260        header: &sam::Header,
261        record: &dyn sam::alignment::Record,
262    ) -> io::Result<()> {
263        let record = Record::try_from_alignment_record(header, record)?;
264        self.write_record(header, record).await
265    }
266
267    async fn flush(&mut self, header: &sam::Header) -> io::Result<()> {
268        write_container(
269            &mut self.inner,
270            &self.reference_sequence_repository,
271            &self.options,
272            header,
273            self.record_counter,
274            &mut self.records,
275        )
276        .await?;
277
278        let record_count = u64::try_from(self.records.len())
279            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
280        self.record_counter += record_count;
281
282        self.records.clear();
283
284        Ok(())
285    }
286}