rust_htslib/bam/
buffer.rs

1// Copyright 2017 Johannes Köster.
2// Licensed under the MIT license (http://opensource.org/licenses/MIT)
3// This file may not be copied, modified, or distributed
4// except according to those terms.
5
6use std::collections::{vec_deque, VecDeque};
7use std::mem;
8use std::rc::Rc;
9use std::str;
10
11use crate::bam;
12use crate::bam::Read;
13use crate::errors::{Error, Result};
14/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating
15/// over it in a single pass.
16/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
17/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a
18/// region at the very end of the BAM, everything before is omitted without cost.
19#[derive(Debug)]
20pub struct RecordBuffer {
21    reader: bam::IndexedReader,
22    inner: VecDeque<Rc<bam::Record>>,
23    overflow: Option<Rc<bam::Record>>,
24    cache_cigar: bool,
25    min_refetch_distance: u64,
26    buffer_record: Rc<bam::Record>,
27    start_pos: Option<u64>,
28}
29
30unsafe impl Sync for RecordBuffer {}
31unsafe impl Send for RecordBuffer {}
32
33impl RecordBuffer {
34    /// Create a new `RecordBuffer`.
35    ///
36    /// # Arguments
37    ///
38    /// * `bam` - BAM reader
39    /// * `cache_cigar` - whether to call `bam::Record::cache_cigar()` for each record.
40    pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
41        RecordBuffer {
42            reader: bam,
43            inner: VecDeque::new(),
44            overflow: None,
45            cache_cigar,
46            min_refetch_distance: 1,
47            buffer_record: Rc::new(bam::Record::new()),
48            start_pos: Some(0),
49        }
50    }
51
52    /// maximum distance to previous fetch window such that a
53    /// new fetch operation is performed. If the distance is smaller, buffer will simply
54    /// read through until the start of the new fetch window (probably saving some time
55    /// by avoiding the random access).
56    pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
57        self.min_refetch_distance = min_refetch_distance;
58    }
59
60    /// Return start position of buffer
61    pub fn start(&self) -> Option<u64> {
62        self.inner.front().map(|rec| rec.pos() as u64)
63    }
64
65    /// Return end position of buffer.
66    pub fn end(&self) -> Option<u64> {
67        self.inner.back().map(|rec| rec.pos() as u64)
68    }
69
70    pub fn tid(&self) -> Option<i32> {
71        self.inner.back().map(|rec| rec.tid())
72    }
73
74    /// Fill buffer at the given interval. If the start coordinate is left of
75    /// the previous start coordinate, this will use an additional BAM fetch IO operation.
76    /// Coordinates are 0-based, and end is exclusive.
77    /// Returns tuple with numbers of added and deleted records since the previous fetch.
78    #[allow(unused_assignments)] // TODO this is needed because rustc thinks that deleted is unused
79    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
80        let mut added = 0;
81        // move overflow from last fetch into ringbuffer
82        if self.overflow.is_some() {
83            added += 1;
84            self.inner.push_back(self.overflow.take().unwrap());
85        }
86
87        if let Some(tid) = self.reader.header.tid(chrom) {
88            let mut deleted = 0;
89            let window_start = start;
90            if self.inner.is_empty()
91                || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
92                || self.tid().unwrap() != tid as i32
93                || self.start().unwrap() > self.start_pos.unwrap()
94            {
95                let end = self.reader.header.target_len(tid).unwrap();
96                self.reader.fetch((tid, window_start, end))?;
97                deleted = self.inner.len();
98                self.inner.clear();
99            } else {
100                // remove records too far left
101                let to_remove = self
102                    .inner
103                    .iter()
104                    .take_while(|rec| rec.pos() < window_start as i64)
105                    .count();
106                for _ in 0..to_remove {
107                    self.inner.pop_front();
108                }
109                deleted = to_remove;
110            }
111
112            // extend to the right
113            loop {
114                match self
115                    .reader
116                    .read(Rc::get_mut(&mut self.buffer_record).unwrap())
117                {
118                    None => break,
119                    Some(res) => res?,
120                }
121
122                if self.buffer_record.is_unmapped() {
123                    continue;
124                }
125
126                let pos = self.buffer_record.pos();
127
128                // skip records before the start
129                if pos < start as i64 {
130                    continue;
131                }
132
133                // Record is kept, do not reuse it for next iteration
134                // and thus create a new one.
135                let mut record = mem::replace(&mut self.buffer_record, Rc::new(bam::Record::new()));
136
137                if self.cache_cigar {
138                    Rc::get_mut(&mut record).unwrap().cache_cigar();
139                }
140
141                if pos >= end as i64 {
142                    self.overflow = Some(record);
143                    break;
144                } else {
145                    self.inner.push_back(record);
146                    added += 1;
147                }
148            }
149            self.start_pos = Some(self.start().unwrap_or(window_start));
150
151            Ok((added, deleted))
152        } else {
153            Err(Error::UnknownSequence {
154                sequence: str::from_utf8(chrom).unwrap().to_owned(),
155            })
156        }
157    }
158
159    /// Iterate over records that have been fetched with `fetch`.
160    pub fn iter(&self) -> vec_deque::Iter<Rc<bam::Record>> {
161        self.inner.iter()
162    }
163
164    /// Iterate over mutable references to records that have been fetched with `fetch`.
165    pub fn iter_mut(&mut self) -> vec_deque::IterMut<Rc<bam::Record>> {
166        self.inner.iter_mut()
167    }
168
169    pub fn len(&self) -> usize {
170        self.inner.len()
171    }
172
173    pub fn is_empty(&self) -> bool {
174        self.len() == 0
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::bam;
182
183    #[test]
184    fn test_buffer() {
185        let reader = bam::IndexedReader::from_path(&"test/test.bam").unwrap();
186        let mut buffer = RecordBuffer::new(reader, false);
187
188        buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
189        {
190            let records: Vec<_> = buffer.iter().collect();
191            assert_eq!(records.len(), 6);
192            assert_eq!(records[0].pos(), 1);
193            assert_eq!(records[1].pos(), 1);
194            assert_eq!(records[2].pos(), 1);
195            assert_eq!(records[3].pos(), 1);
196            assert_eq!(records[4].pos(), 1);
197            assert_eq!(records[5].pos(), 1);
198        }
199    }
200}