rust_htslib/bcf/
buffer.rs

1// Copyright 2017 Johannes Köster.
2// Licensed under the MIT license (http://opensource.org/licenses/MIT)
3// This file may not be copied, modified, or distributed
4// except according to those terms.
5
6use std::cmp::Ordering;
7use std::collections::{vec_deque, VecDeque};
8use std::mem;
9
10use crate::bcf::{self, Read};
11use crate::errors::Result;
12
13/// A buffer for BCF records. This allows access regions in a sorted BCF file while iterating
14/// over it in a single pass.
15/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
16/// linear complexity. The buffer does not use any indexed random access. Hence, for getting a
17/// region at the very end of the BCF, you will have to wait until all records before have
18/// been read.
19#[derive(Debug)]
20pub struct RecordBuffer {
21    reader: bcf::Reader,
22    ringbuffer: VecDeque<bcf::Record>,
23    ringbuffer2: VecDeque<bcf::Record>,
24    overflow: Option<bcf::Record>,
25}
26
27unsafe impl Sync for RecordBuffer {}
28unsafe impl Send for RecordBuffer {}
29
30impl RecordBuffer {
31    /// Create new buffer.
32    pub fn new(reader: bcf::Reader) -> Self {
33        RecordBuffer {
34            reader,
35            ringbuffer: VecDeque::new(),
36            ringbuffer2: VecDeque::new(),
37            overflow: None,
38        }
39    }
40
41    fn last_rid(&self) -> Option<u32> {
42        self.ringbuffer.back().map(|rec| rec.rid().unwrap())
43    }
44
45    fn next_rid(&self) -> Option<u32> {
46        self.ringbuffer2.back().map(|rec| rec.rid().unwrap())
47    }
48
49    fn swap_buffers(&mut self) {
50        // swap with buffer for next rid
51        mem::swap(&mut self.ringbuffer2, &mut self.ringbuffer);
52        // clear second buffer
53        self.ringbuffer2.clear();
54    }
55
56    fn drain_left(&mut self, rid: u32, window_start: u64) -> usize {
57        // remove records too far left or from wrong rid
58        // rec.rid() will always yield Some(), because otherwise we won't put the rec into the
59        // buffer.
60        let to_remove = self
61            .ringbuffer
62            .iter()
63            .take_while(|rec| (rec.pos() as u64) < window_start || rec.rid().unwrap() != rid)
64            .count();
65        self.ringbuffer.drain(..to_remove);
66        to_remove
67    }
68
69    /// Fill the buffer with variants in the given window. The start coordinate has to be right of
70    /// the start coordinate of any previous `fill` operation.
71    /// Coordinates are 0-based, and end is exclusive.
72    /// Returns tuple with numbers of added and deleted records compared to previous fetch.
73    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
74        // TODO panic if start is left of previous start or we have moved past the given chrom
75        // before.
76        let rid = self.reader.header.name2rid(chrom)?;
77        let mut added = 0;
78        let mut deleted = 0;
79
80        // shrink and swap
81        match (self.last_rid(), self.next_rid()) {
82            (Some(last_rid), _) => {
83                if last_rid != rid {
84                    deleted = self.ringbuffer.len();
85                    self.swap_buffers();
86                    added = self.ringbuffer.len();
87                // TODO drain left?
88                } else {
89                    deleted = self.drain_left(rid, start);
90                }
91            }
92            (_, Some(_)) => {
93                // TODO is this really necessary? If there was no fetch before, there is nothing
94                // to delete.
95                deleted = self.ringbuffer.len();
96                self.swap_buffers();
97                deleted += self.drain_left(rid, start);
98                added = self.ringbuffer.len();
99            }
100            _ => (),
101        }
102
103        if !self.ringbuffer2.is_empty() {
104            // We have already read beyond the current rid. Hence we can't extend to the right for
105            // this rid.
106            return Ok((added, deleted));
107        }
108
109        // move overflow from last fill into ringbuffer
110        if self.overflow.is_some() {
111            let pos = self.overflow.as_ref().unwrap().pos() as u64;
112            if pos >= start {
113                if pos <= end {
114                    self.ringbuffer.push_back(self.overflow.take().unwrap());
115                    added += 1;
116                } else {
117                    return Ok((added, deleted));
118                }
119            } else {
120                // discard overflow
121                self.overflow.take();
122            }
123        }
124
125        // extend to the right
126        loop {
127            let mut rec = self.reader.empty_record();
128
129            if self.reader.read(&mut rec).is_none() {
130                // EOF
131                break;
132            }
133            let pos = rec.pos() as u64;
134            if let Some(rec_rid) = rec.rid() {
135                match rec_rid.cmp(&rid) {
136                    Ordering::Equal => {
137                        if pos >= end {
138                            // Record is beyond our window. Store it anyways but stop.
139                            self.overflow = Some(rec);
140                            break;
141                        } else if pos >= start {
142                            // Record is within our window.
143                            self.ringbuffer.push_back(rec);
144                            added += 1;
145                        } else {
146                            // Record is upstream of our window, ignore it
147                            continue;
148                        }
149                    }
150                    Ordering::Greater => {
151                        // record comes from next rid. Store it in second buffer but stop filling.
152                        self.ringbuffer2.push_back(rec);
153                        break;
154                    }
155                    _ => {
156                        // Record comes from previous rid. Ignore it.
157                        continue;
158                    }
159                }
160            } else {
161                // skip records without proper rid
162                continue;
163            }
164        }
165
166        Ok((added, deleted))
167    }
168
169    /// Iterate over records that have been fetched with `fetch`.
170    pub fn iter(&self) -> vec_deque::Iter<'_, bcf::Record> {
171        self.ringbuffer.iter()
172    }
173
174    /// Iterate over mutable references to records that have been fetched with `fetch`.
175    pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, bcf::Record> {
176        self.ringbuffer.iter_mut()
177    }
178
179    pub fn len(&self) -> usize {
180        self.ringbuffer.len()
181    }
182
183    pub fn is_empty(&self) -> bool {
184        self.len() == 0
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::bcf;
192
193    #[test]
194    fn test_buffer() {
195        let reader = bcf::Reader::from_path(&"test/test.bcf").unwrap();
196        let mut buffer = RecordBuffer::new(reader);
197
198        buffer.fetch(b"1", 100, 10023).unwrap();
199        {
200            let records: Vec<_> = buffer.iter().collect();
201            assert_eq!(records.len(), 2);
202            assert_eq!(records[0].pos(), 10021);
203            assert_eq!(records[1].pos(), 10022);
204        }
205
206        buffer.fetch(b"1", 10023, 10024).unwrap();
207        {
208            let records: Vec<_> = buffer.iter().collect();
209            assert_eq!(records.len(), 1);
210            assert_eq!(records[0].pos(), 10023);
211        }
212    }
213}