rust_htslib/bcf/buffer.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
// Copyright 2017 Johannes Köster.
// Licensed under the MIT license (http://opensource.org/licenses/MIT)
// This file may not be copied, modified, or distributed
// except according to those terms.
use std::cmp::Ordering;
use std::collections::{vec_deque, VecDeque};
use std::mem;
use crate::bcf::{self, Read};
use crate::errors::Result;
/// A buffer for BCF records. This allows access regions in a sorted BCF file while iterating
/// over it in a single pass.
/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
/// linear complexity. The buffer does not use any indexed random access. Hence, for getting a
/// region at the very end of the BCF, you will have to wait until all records before have
/// been read.
#[derive(Debug)]
pub struct RecordBuffer {
reader: bcf::Reader,
ringbuffer: VecDeque<bcf::Record>,
ringbuffer2: VecDeque<bcf::Record>,
overflow: Option<bcf::Record>,
}
unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}
impl RecordBuffer {
/// Create new buffer.
pub fn new(reader: bcf::Reader) -> Self {
RecordBuffer {
reader,
ringbuffer: VecDeque::new(),
ringbuffer2: VecDeque::new(),
overflow: None,
}
}
fn last_rid(&self) -> Option<u32> {
self.ringbuffer.back().map(|rec| rec.rid().unwrap())
}
fn next_rid(&self) -> Option<u32> {
self.ringbuffer2.back().map(|rec| rec.rid().unwrap())
}
fn swap_buffers(&mut self) {
// swap with buffer for next rid
mem::swap(&mut self.ringbuffer2, &mut self.ringbuffer);
// clear second buffer
self.ringbuffer2.clear();
}
fn drain_left(&mut self, rid: u32, window_start: u64) -> usize {
// remove records too far left or from wrong rid
// rec.rid() will always yield Some(), because otherwise we won't put the rec into the
// buffer.
let to_remove = self
.ringbuffer
.iter()
.take_while(|rec| (rec.pos() as u64) < window_start || rec.rid().unwrap() != rid)
.count();
self.ringbuffer.drain(..to_remove);
to_remove
}
/// Fill the buffer with variants in the given window. The start coordinate has to be right of
/// the start coordinate of any previous `fill` operation.
/// Coordinates are 0-based, and end is exclusive.
/// Returns tuple with numbers of added and deleted records compared to previous fetch.
pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
// TODO panic if start is left of previous start or we have moved past the given chrom
// before.
let rid = self.reader.header.name2rid(chrom)?;
let mut added = 0;
let mut deleted = 0;
// shrink and swap
match (self.last_rid(), self.next_rid()) {
(Some(last_rid), _) => {
if last_rid != rid {
deleted = self.ringbuffer.len();
self.swap_buffers();
added = self.ringbuffer.len();
// TODO drain left?
} else {
deleted = self.drain_left(rid, start);
}
}
(_, Some(_)) => {
// TODO is this really necessary? If there was no fetch before, there is nothing
// to delete.
deleted = self.ringbuffer.len();
self.swap_buffers();
deleted += self.drain_left(rid, start);
added = self.ringbuffer.len();
}
_ => (),
}
if !self.ringbuffer2.is_empty() {
// We have already read beyond the current rid. Hence we can't extend to the right for
// this rid.
return Ok((added, deleted));
}
// move overflow from last fill into ringbuffer
if self.overflow.is_some() {
let pos = self.overflow.as_ref().unwrap().pos() as u64;
if pos >= start {
if pos <= end {
self.ringbuffer.push_back(self.overflow.take().unwrap());
added += 1;
} else {
return Ok((added, deleted));
}
} else {
// discard overflow
self.overflow.take();
}
}
// extend to the right
loop {
let mut rec = self.reader.empty_record();
if self.reader.read(&mut rec).is_none() {
// EOF
break;
}
let pos = rec.pos() as u64;
if let Some(rec_rid) = rec.rid() {
match rec_rid.cmp(&rid) {
Ordering::Equal => {
if pos >= end {
// Record is beyond our window. Store it anyways but stop.
self.overflow = Some(rec);
break;
} else if pos >= start {
// Record is within our window.
self.ringbuffer.push_back(rec);
added += 1;
} else {
// Record is upstream of our window, ignore it
continue;
}
}
Ordering::Greater => {
// record comes from next rid. Store it in second buffer but stop filling.
self.ringbuffer2.push_back(rec);
break;
}
_ => {
// Record comes from previous rid. Ignore it.
continue;
}
}
} else {
// skip records without proper rid
continue;
}
}
Ok((added, deleted))
}
/// Iterate over records that have been fetched with `fetch`.
pub fn iter(&self) -> vec_deque::Iter<'_, bcf::Record> {
self.ringbuffer.iter()
}
/// Iterate over mutable references to records that have been fetched with `fetch`.
pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, bcf::Record> {
self.ringbuffer.iter_mut()
}
pub fn len(&self) -> usize {
self.ringbuffer.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bcf;
#[test]
fn test_buffer() {
let reader = bcf::Reader::from_path(&"test/test.bcf").unwrap();
let mut buffer = RecordBuffer::new(reader);
buffer.fetch(b"1", 100, 10023).unwrap();
{
let records: Vec<_> = buffer.iter().collect();
assert_eq!(records.len(), 2);
assert_eq!(records[0].pos(), 10021);
assert_eq!(records[1].pos(), 10022);
}
buffer.fetch(b"1", 10023, 10024).unwrap();
{
let records: Vec<_> = buffer.iter().collect();
assert_eq!(records.len(), 1);
assert_eq!(records[0].pos(), 10023);
}
}
}