rust_htslib/bam/
buffer.rs1use std::collections::{vec_deque, VecDeque};
7use std::mem;
8use std::rc::Rc;
9use std::str;
10
11use crate::bam;
12use crate::bam::Read;
13use crate::errors::{Error, Result};
14#[derive(Debug)]
20pub struct RecordBuffer {
21 reader: bam::IndexedReader,
22 inner: VecDeque<Rc<bam::Record>>,
23 overflow: Option<Rc<bam::Record>>,
24 cache_cigar: bool,
25 min_refetch_distance: u64,
26 buffer_record: Rc<bam::Record>,
27 start_pos: Option<u64>,
28}
29
30unsafe impl Sync for RecordBuffer {}
31unsafe impl Send for RecordBuffer {}
32
33impl RecordBuffer {
34 pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
41 RecordBuffer {
42 reader: bam,
43 inner: VecDeque::new(),
44 overflow: None,
45 cache_cigar,
46 min_refetch_distance: 1,
47 buffer_record: Rc::new(bam::Record::new()),
48 start_pos: Some(0),
49 }
50 }
51
52 pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
57 self.min_refetch_distance = min_refetch_distance;
58 }
59
60 pub fn start(&self) -> Option<u64> {
62 self.inner.front().map(|rec| rec.pos() as u64)
63 }
64
65 pub fn end(&self) -> Option<u64> {
67 self.inner.back().map(|rec| rec.pos() as u64)
68 }
69
70 pub fn tid(&self) -> Option<i32> {
71 self.inner.back().map(|rec| rec.tid())
72 }
73
74 #[allow(unused_assignments)] pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
80 let mut added = 0;
81 if self.overflow.is_some() {
83 added += 1;
84 self.inner.push_back(self.overflow.take().unwrap());
85 }
86
87 if let Some(tid) = self.reader.header.tid(chrom) {
88 let mut deleted = 0;
89 let window_start = start;
90 if self.inner.is_empty()
91 || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
92 || self.tid().unwrap() != tid as i32
93 || self.start().unwrap() > self.start_pos.unwrap()
94 {
95 let end = self.reader.header.target_len(tid).unwrap();
96 self.reader.fetch((tid, window_start, end))?;
97 deleted = self.inner.len();
98 self.inner.clear();
99 } else {
100 let to_remove = self
102 .inner
103 .iter()
104 .take_while(|rec| rec.pos() < window_start as i64)
105 .count();
106 for _ in 0..to_remove {
107 self.inner.pop_front();
108 }
109 deleted = to_remove;
110 }
111
112 loop {
114 match self
115 .reader
116 .read(Rc::get_mut(&mut self.buffer_record).unwrap())
117 {
118 None => break,
119 Some(res) => res?,
120 }
121
122 if self.buffer_record.is_unmapped() {
123 continue;
124 }
125
126 let pos = self.buffer_record.pos();
127
128 if pos < start as i64 {
130 continue;
131 }
132
133 let mut record = mem::replace(&mut self.buffer_record, Rc::new(bam::Record::new()));
136
137 if self.cache_cigar {
138 Rc::get_mut(&mut record).unwrap().cache_cigar();
139 }
140
141 if pos >= end as i64 {
142 self.overflow = Some(record);
143 break;
144 } else {
145 self.inner.push_back(record);
146 added += 1;
147 }
148 }
149 self.start_pos = Some(self.start().unwrap_or(window_start));
150
151 Ok((added, deleted))
152 } else {
153 Err(Error::UnknownSequence {
154 sequence: str::from_utf8(chrom).unwrap().to_owned(),
155 })
156 }
157 }
158
159 pub fn iter(&self) -> vec_deque::Iter<Rc<bam::Record>> {
161 self.inner.iter()
162 }
163
164 pub fn iter_mut(&mut self) -> vec_deque::IterMut<Rc<bam::Record>> {
166 self.inner.iter_mut()
167 }
168
169 pub fn len(&self) -> usize {
170 self.inner.len()
171 }
172
173 pub fn is_empty(&self) -> bool {
174 self.len() == 0
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::bam;
182
183 #[test]
184 fn test_buffer() {
185 let reader = bam::IndexedReader::from_path(&"test/test.bam").unwrap();
186 let mut buffer = RecordBuffer::new(reader, false);
187
188 buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
189 {
190 let records: Vec<_> = buffer.iter().collect();
191 assert_eq!(records.len(), 6);
192 assert_eq!(records[0].pos(), 1);
193 assert_eq!(records[1].pos(), 1);
194 assert_eq!(records[2].pos(), 1);
195 assert_eq!(records[3].pos(), 1);
196 assert_eq!(records[4].pos(), 1);
197 assert_eq!(records[5].pos(), 1);
198 }
199 }
200}