rust_htslib/bam/buffer.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
// Copyright 2017 Johannes Köster.
// Licensed under the MIT license (http://opensource.org/licenses/MIT)
// This file may not be copied, modified, or distributed
// except according to those terms.
use std::collections::{vec_deque, VecDeque};
use std::mem;
use std::rc::Rc;
use std::str;
use crate::bam;
use crate::bam::Read;
use crate::errors::{Error, Result};
/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating
/// over it in a single pass.
/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a
/// region at the very end of the BAM, everything before is omitted without cost.
#[derive(Debug)]
pub struct RecordBuffer {
reader: bam::IndexedReader,
inner: VecDeque<Rc<bam::Record>>,
overflow: Option<Rc<bam::Record>>,
cache_cigar: bool,
min_refetch_distance: u64,
buffer_record: Rc<bam::Record>,
start_pos: Option<u64>,
}
unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}
impl RecordBuffer {
/// Create a new `RecordBuffer`.
///
/// # Arguments
///
/// * `bam` - BAM reader
/// * `cache_cigar` - whether to call `bam::Record::cache_cigar()` for each record.
pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
RecordBuffer {
reader: bam,
inner: VecDeque::new(),
overflow: None,
cache_cigar,
min_refetch_distance: 1,
buffer_record: Rc::new(bam::Record::new()),
start_pos: Some(0),
}
}
/// maximum distance to previous fetch window such that a
/// new fetch operation is performed. If the distance is smaller, buffer will simply
/// read through until the start of the new fetch window (probably saving some time
/// by avoiding the random access).
pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
self.min_refetch_distance = min_refetch_distance;
}
/// Return start position of buffer
pub fn start(&self) -> Option<u64> {
self.inner.front().map(|rec| rec.pos() as u64)
}
/// Return end position of buffer.
pub fn end(&self) -> Option<u64> {
self.inner.back().map(|rec| rec.pos() as u64)
}
pub fn tid(&self) -> Option<i32> {
self.inner.back().map(|rec| rec.tid())
}
/// Fill buffer at the given interval. If the start coordinate is left of
/// the previous start coordinate, this will use an additional BAM fetch IO operation.
/// Coordinates are 0-based, and end is exclusive.
/// Returns tuple with numbers of added and deleted records since the previous fetch.
#[allow(unused_assignments)] // TODO this is needed because rustc thinks that deleted is unused
pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
let mut added = 0;
// move overflow from last fetch into ringbuffer
if self.overflow.is_some() {
added += 1;
self.inner.push_back(self.overflow.take().unwrap());
}
if let Some(tid) = self.reader.header.tid(chrom) {
let mut deleted = 0;
let window_start = start;
if self.inner.is_empty()
|| window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
|| self.tid().unwrap() != tid as i32
|| self.start().unwrap() > self.start_pos.unwrap()
{
let end = self.reader.header.target_len(tid).unwrap();
self.reader.fetch((tid, window_start, end))?;
deleted = self.inner.len();
self.inner.clear();
} else {
// remove records too far left
let to_remove = self
.inner
.iter()
.take_while(|rec| rec.pos() < window_start as i64)
.count();
for _ in 0..to_remove {
self.inner.pop_front();
}
deleted = to_remove;
}
// extend to the right
loop {
match self
.reader
.read(Rc::get_mut(&mut self.buffer_record).unwrap())
{
None => break,
Some(res) => res?,
}
if self.buffer_record.is_unmapped() {
continue;
}
let pos = self.buffer_record.pos();
// skip records before the start
if pos < start as i64 {
continue;
}
// Record is kept, do not reuse it for next iteration
// and thus create a new one.
let mut record = mem::replace(&mut self.buffer_record, Rc::new(bam::Record::new()));
if self.cache_cigar {
Rc::get_mut(&mut record).unwrap().cache_cigar();
}
if pos >= end as i64 {
self.overflow = Some(record);
break;
} else {
self.inner.push_back(record);
added += 1;
}
}
self.start_pos = Some(self.start().unwrap_or(window_start));
Ok((added, deleted))
} else {
Err(Error::UnknownSequence {
sequence: str::from_utf8(chrom).unwrap().to_owned(),
})
}
}
/// Iterate over records that have been fetched with `fetch`.
pub fn iter(&self) -> vec_deque::Iter<Rc<bam::Record>> {
self.inner.iter()
}
/// Iterate over mutable references to records that have been fetched with `fetch`.
pub fn iter_mut(&mut self) -> vec_deque::IterMut<Rc<bam::Record>> {
self.inner.iter_mut()
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bam;
#[test]
fn test_buffer() {
let reader = bam::IndexedReader::from_path(&"test/test.bam").unwrap();
let mut buffer = RecordBuffer::new(reader, false);
buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
{
let records: Vec<_> = buffer.iter().collect();
assert_eq!(records.len(), 6);
assert_eq!(records[0].pos(), 1);
assert_eq!(records[1].pos(), 1);
assert_eq!(records[2].pos(), 1);
assert_eq!(records[3].pos(), 1);
assert_eq!(records[4].pos(), 1);
assert_eq!(records[5].pos(), 1);
}
}
}