noodles_cram/async/io/writer.rs
1//! Async CRAM writer.
2
3mod builder;
4mod container;
5mod header;
6
7use noodles_fasta as fasta;
8use noodles_sam as sam;
9use tokio::io::{self, AsyncWrite};
10
11pub use self::builder::Builder;
12use self::{
13 container::write_container,
14 header::{write_file_definition, write_file_header, write_header},
15};
16use crate::{
17 io::writer::{Options, Record},
18 FileDefinition,
19};
20
21/// An async CRAM writer.
22///
23/// A call to [`Self::shutdown`] must be made before the writer is dropped.
24pub struct Writer<W> {
25 inner: W,
26 reference_sequence_repository: fasta::Repository,
27 options: Options,
28 records: Vec<Record>,
29 record_counter: u64,
30}
31
32impl<W> Writer<W> {
33 /// Returns a reference to the underlying writer.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use noodles_cram as cram;
39 /// use tokio::io;
40 /// let writer = cram::r#async::io::Writer::new(io::sink());
41 /// let _inner = writer.get_ref();
42 /// ```
43 pub fn get_ref(&self) -> &W {
44 &self.inner
45 }
46
47 /// Returns a mutable reference to the underlying writer.
48 ///
49 /// # Examples
50 ///
51 /// ```
52 /// use noodles_cram as cram;
53 /// use tokio::io;
54 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
55 /// let _inner = writer.get_mut();
56 /// ```
57 pub fn get_mut(&mut self) -> &mut W {
58 &mut self.inner
59 }
60
61 /// Returns the underlying writer.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use noodles_cram as cram;
67 /// use tokio::io;
68 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
69 /// let _inner = writer.into_inner();
70 /// ```
71 pub fn into_inner(self) -> W {
72 self.inner
73 }
74}
75
76impl<W> Writer<W>
77where
78 W: AsyncWrite + Unpin,
79{
80 /// Creates an async CRAM writer.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use noodles_cram as cram;
86 /// use tokio::io;
87 /// let writer = cram::r#async::io::Writer::new(io::sink());
88 /// ```
89 pub fn new(inner: W) -> Self {
90 Builder::default().build_from_writer(inner)
91 }
92
93 /// Attempts to shutdown the output stream by writing any pending containers and a final EOF
94 /// container.
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// # #[tokio::main]
100 /// # async fn main() -> tokio::io::Result<()> {
101 /// use noodles_cram as cram;
102 /// use noodles_sam as sam;
103 /// use tokio::io;
104 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
105 /// let header = sam::Header::default();
106 /// writer.shutdown(&header).await?;
107 /// # Ok(())
108 /// # }
109 /// ```
110 pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> {
111 use self::container::write_eof_container;
112 self.flush(header).await?;
113 write_eof_container(&mut self.inner).await
114 }
115
116 /// Writes a CRAM file definition.
117 ///
118 /// The file ID is set as a blank value (`[0x00; 20]`).
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// # #[tokio::main]
124 /// # async fn main() -> tokio::io::Result<()> {
125 /// use noodles_cram as cram;
126 /// use tokio::io;
127 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
128 /// writer.write_file_definition().await?;
129 /// # Ok(())
130 /// # }
131 /// ```
132 pub async fn write_file_definition(&mut self) -> io::Result<()> {
133 let file_definition = FileDefinition::new(self.options.version, Default::default());
134 write_file_definition(&mut self.inner, &file_definition).await
135 }
136
137 /// Writes a CRAM file header container.
138 ///
139 /// The position of the stream is expected to be directly after the file definition.
140 ///
141 /// Entries in the reference sequence dictionary that are missing MD5 checksums (`M5`) will
142 /// automatically be calculated and added to the written record.
143 ///
144 /// # Examples
145 ///
146 /// ```
147 /// # #[tokio::main]
148 /// # async fn main() -> tokio::io::Result<()> {
149 /// use noodles_cram as cram;
150 /// use noodles_sam as sam;
151 /// use tokio::io;
152 ///
153 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
154 /// writer.write_file_definition().await?;
155 ///
156 /// let header = sam::Header::default();
157 /// writer.write_file_header(&header).await?;
158 ///
159 /// writer.shutdown(&header).await?;
160 /// # Ok(())
161 /// # }
162 /// ```
163 pub async fn write_file_header(&mut self, header: &sam::Header) -> io::Result<()> {
164 write_file_header(&mut self.inner, &self.reference_sequence_repository, header).await
165 }
166
167 /// Writes a SAM header.
168 ///
169 /// This writes the CRAM magic number, the file definition, and file header using the given SAM
170 /// header.
171 ///
172 /// ```
173 /// # #[tokio::main]
174 /// # async fn main() -> tokio::io::Result<()> {
175 /// use noodles_cram as cram;
176 /// use noodles_sam as sam;
177 /// use tokio::io;
178 ///
179 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
180 ///
181 /// let header = sam::Header::default();
182 /// writer.write_header(&header).await?;
183 ///
184 /// writer.shutdown(&header).await?;
185 /// # Ok(())
186 /// # }
187 /// ```
188 pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> {
189 let file_definition = FileDefinition::new(self.options.version, Default::default());
190
191 write_header(
192 &mut self.inner,
193 &self.reference_sequence_repository,
194 &file_definition,
195 header,
196 )
197 .await
198 }
199
200 /// Writes a CRAM record.
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// # #[tokio::main]
206 /// # async fn main() -> tokio::io::Result<()> {
207 /// use noodles_cram as cram;
208 /// use noodles_sam as sam;
209 /// use tokio::io;
210 ///
211 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
212 /// writer.write_file_definition().await?;
213 ///
214 /// let header = sam::Header::default();
215 /// writer.write_file_header(&header).await?;
216 ///
217 /// let record = sam::Record::default();
218 /// writer.write_alignment_record(&header, &record).await?;
219 ///
220 /// writer.shutdown(&header).await?;
221 /// # Ok(())
222 /// # }
223 /// ```
224 pub async fn write_record(&mut self, header: &sam::Header, record: Record) -> io::Result<()> {
225 self.records.push(record);
226
227 if self.records.len() >= self.records.capacity() {
228 self.flush(header).await?;
229 }
230
231 Ok(())
232 }
233
234 /// Writes an alignment record.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// # #[tokio::main]
240 /// # async fn main() -> tokio::io::Result<()> {
241 /// use noodles_cram as cram;
242 /// use noodles_sam as sam;
243 /// use tokio::io;
244 ///
245 /// let mut writer = cram::r#async::io::Writer::new(io::sink());
246 /// writer.write_file_definition().await?;
247 ///
248 /// let header = sam::Header::default();
249 /// writer.write_file_header(&header).await?;
250 ///
251 /// let record = sam::Record::default();
252 /// writer.write_alignment_record(&header, &record).await?;
253 ///
254 /// writer.shutdown(&header).await?;
255 /// # Ok(())
256 /// # }
257 /// ```
258 pub async fn write_alignment_record(
259 &mut self,
260 header: &sam::Header,
261 record: &dyn sam::alignment::Record,
262 ) -> io::Result<()> {
263 let record = Record::try_from_alignment_record(header, record)?;
264 self.write_record(header, record).await
265 }
266
267 async fn flush(&mut self, header: &sam::Header) -> io::Result<()> {
268 write_container(
269 &mut self.inner,
270 &self.reference_sequence_repository,
271 &self.options,
272 header,
273 self.record_counter,
274 &mut self.records,
275 )
276 .await?;
277
278 let record_count = u64::try_from(self.records.len())
279 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
280 self.record_counter += record_count;
281
282 self.records.clear();
283
284 Ok(())
285 }
286}