rust_htslib/bcf/buffer.rs
1// Copyright 2017 Johannes Köster.
2// Licensed under the MIT license (http://opensource.org/licenses/MIT)
3// This file may not be copied, modified, or distributed
4// except according to those terms.
5
6use std::cmp::Ordering;
7use std::collections::{vec_deque, VecDeque};
8use std::mem;
9
10use crate::bcf::{self, Read};
11use crate::errors::Result;
12
13/// A buffer for BCF records. This allows access regions in a sorted BCF file while iterating
14/// over it in a single pass.
15/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
16/// linear complexity. The buffer does not use any indexed random access. Hence, for getting a
17/// region at the very end of the BCF, you will have to wait until all records before have
18/// been read.
19#[derive(Debug)]
20pub struct RecordBuffer {
21 reader: bcf::Reader,
22 ringbuffer: VecDeque<bcf::Record>,
23 ringbuffer2: VecDeque<bcf::Record>,
24 overflow: Option<bcf::Record>,
25}
26
27unsafe impl Sync for RecordBuffer {}
28unsafe impl Send for RecordBuffer {}
29
30impl RecordBuffer {
31 /// Create new buffer.
32 pub fn new(reader: bcf::Reader) -> Self {
33 RecordBuffer {
34 reader,
35 ringbuffer: VecDeque::new(),
36 ringbuffer2: VecDeque::new(),
37 overflow: None,
38 }
39 }
40
41 fn last_rid(&self) -> Option<u32> {
42 self.ringbuffer.back().map(|rec| rec.rid().unwrap())
43 }
44
45 fn next_rid(&self) -> Option<u32> {
46 self.ringbuffer2.back().map(|rec| rec.rid().unwrap())
47 }
48
49 fn swap_buffers(&mut self) {
50 // swap with buffer for next rid
51 mem::swap(&mut self.ringbuffer2, &mut self.ringbuffer);
52 // clear second buffer
53 self.ringbuffer2.clear();
54 }
55
56 fn drain_left(&mut self, rid: u32, window_start: u64) -> usize {
57 // remove records too far left or from wrong rid
58 // rec.rid() will always yield Some(), because otherwise we won't put the rec into the
59 // buffer.
60 let to_remove = self
61 .ringbuffer
62 .iter()
63 .take_while(|rec| (rec.pos() as u64) < window_start || rec.rid().unwrap() != rid)
64 .count();
65 self.ringbuffer.drain(..to_remove);
66 to_remove
67 }
68
69 /// Fill the buffer with variants in the given window. The start coordinate has to be right of
70 /// the start coordinate of any previous `fill` operation.
71 /// Coordinates are 0-based, and end is exclusive.
72 /// Returns tuple with numbers of added and deleted records compared to previous fetch.
73 pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
74 // TODO panic if start is left of previous start or we have moved past the given chrom
75 // before.
76 let rid = self.reader.header.name2rid(chrom)?;
77 let mut added = 0;
78 let mut deleted = 0;
79
80 // shrink and swap
81 match (self.last_rid(), self.next_rid()) {
82 (Some(last_rid), _) => {
83 if last_rid != rid {
84 deleted = self.ringbuffer.len();
85 self.swap_buffers();
86 added = self.ringbuffer.len();
87 // TODO drain left?
88 } else {
89 deleted = self.drain_left(rid, start);
90 }
91 }
92 (_, Some(_)) => {
93 // TODO is this really necessary? If there was no fetch before, there is nothing
94 // to delete.
95 deleted = self.ringbuffer.len();
96 self.swap_buffers();
97 deleted += self.drain_left(rid, start);
98 added = self.ringbuffer.len();
99 }
100 _ => (),
101 }
102
103 if !self.ringbuffer2.is_empty() {
104 // We have already read beyond the current rid. Hence we can't extend to the right for
105 // this rid.
106 return Ok((added, deleted));
107 }
108
109 // move overflow from last fill into ringbuffer
110 if self.overflow.is_some() {
111 let pos = self.overflow.as_ref().unwrap().pos() as u64;
112 if pos >= start {
113 if pos <= end {
114 self.ringbuffer.push_back(self.overflow.take().unwrap());
115 added += 1;
116 } else {
117 return Ok((added, deleted));
118 }
119 } else {
120 // discard overflow
121 self.overflow.take();
122 }
123 }
124
125 // extend to the right
126 loop {
127 let mut rec = self.reader.empty_record();
128
129 if self.reader.read(&mut rec).is_none() {
130 // EOF
131 break;
132 }
133 let pos = rec.pos() as u64;
134 if let Some(rec_rid) = rec.rid() {
135 match rec_rid.cmp(&rid) {
136 Ordering::Equal => {
137 if pos >= end {
138 // Record is beyond our window. Store it anyways but stop.
139 self.overflow = Some(rec);
140 break;
141 } else if pos >= start {
142 // Record is within our window.
143 self.ringbuffer.push_back(rec);
144 added += 1;
145 } else {
146 // Record is upstream of our window, ignore it
147 continue;
148 }
149 }
150 Ordering::Greater => {
151 // record comes from next rid. Store it in second buffer but stop filling.
152 self.ringbuffer2.push_back(rec);
153 break;
154 }
155 _ => {
156 // Record comes from previous rid. Ignore it.
157 continue;
158 }
159 }
160 } else {
161 // skip records without proper rid
162 continue;
163 }
164 }
165
166 Ok((added, deleted))
167 }
168
169 /// Iterate over records that have been fetched with `fetch`.
170 pub fn iter(&self) -> vec_deque::Iter<'_, bcf::Record> {
171 self.ringbuffer.iter()
172 }
173
174 /// Iterate over mutable references to records that have been fetched with `fetch`.
175 pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, bcf::Record> {
176 self.ringbuffer.iter_mut()
177 }
178
179 pub fn len(&self) -> usize {
180 self.ringbuffer.len()
181 }
182
183 pub fn is_empty(&self) -> bool {
184 self.len() == 0
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::bcf;
192
193 #[test]
194 fn test_buffer() {
195 let reader = bcf::Reader::from_path(&"test/test.bcf").unwrap();
196 let mut buffer = RecordBuffer::new(reader);
197
198 buffer.fetch(b"1", 100, 10023).unwrap();
199 {
200 let records: Vec<_> = buffer.iter().collect();
201 assert_eq!(records.len(), 2);
202 assert_eq!(records[0].pos(), 10021);
203 assert_eq!(records[1].pos(), 10022);
204 }
205
206 buffer.fetch(b"1", 10023, 10024).unwrap();
207 {
208 let records: Vec<_> = buffer.iter().collect();
209 assert_eq!(records.len(), 1);
210 assert_eq!(records[0].pos(), 10023);
211 }
212 }
213}