buf_redux/lib.rs
1// Original implementation Copyright 2013 The Rust Project Developers <https://github.com/rust-lang>
2//
3// Original source file: https://github.com/rust-lang/rust/blob/master/src/libstd/io/buffered.P
4//
5// Additions copyright 2016-2018 Austin Bonander <austin.bonander@gmail.com>
6//
7// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
8// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
9// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
10// option. This file may not be copied, modified, or distributed
11// except according to those terms.
12//! Drop-in replacements for buffered I/O types in `std::io`.
13//!
14//! These replacements retain the method names/signatures and implemented traits of their stdlib
15//! counterparts, making replacement as simple as swapping the import of the type:
16//!
17//! #### `BufReader`:
18//! ```notest
19//! - use std::io::BufReader;
20//! + use buf_redux::BufReader;
21//! ```
22//! #### `BufWriter`:
23//! ```notest
24//! - use std::io::BufWriter;
25//! + use buf_redux::BufWriter;
26//! ```
27//! #### `LineWriter`:
28//! ```notest
29//! - use std::io::LineWriter;
30//! + use buf_redux::LineWriter;
31//! ```
32//!
33//! ### More Direct Control
34//! All replacement types provide methods to:
35//!
36//! * Increase the capacity of the buffer
37//! * Get the number of available bytes as well as the total capacity of the buffer
38//! * Consume the wrapper without losing data
39//!
40//! `BufReader` provides methods to:
41//!
42//! * Access the buffer through an `&`-reference without performing I/O
43//! * Force unconditional reads into the buffer
44//! * Get a `Read` adapter which empties the buffer and then pulls from the inner reader directly
45//! * Shuffle bytes down to the beginning of the buffer to make room for more reading
46//! * Get inner reader and trimmed buffer with the remaining data
47//!
48//! `BufWriter` and `LineWriter` provides methods to:
49//!
50//! * Flush the buffer and unwrap the inner writer unconditionally.
51//! * Get the inner writer and trimmed buffer with the unflushed data.
52//!
53//! ### More Sensible and Customizable Buffering Behavior
54//! Tune the behavior of the buffer to your specific use-case using the types in the
55//! [`policy` module]:
56//!
57//! * Refine `BufReader`'s behavior by implementing the [`ReaderPolicy` trait] or use
58//! an existing implementation like [`MinBuffered`] to ensure the buffer always contains
59//! a minimum number of bytes (until the underlying reader is empty).
60//!
61//! * Refine `BufWriter`'s behavior by implementing the [`WriterPolicy` trait]
62//! or use an existing implementation like [`FlushOn`] to flush when a particular byte
63//! appears in the buffer (used to implement [`LineWriter`]).
64//!
65//! [`policy` module]: policy
66//! [`ReaderPolicy` trait]: policy::ReaderPolicy
67//! [`MinBuffered`]: policy::MinBuffered
68//! [`WriterPolicy`]: policy::WriterPolicy
69//! [`FlushOn`]: policy::FlushOn
70//! [`LineWriter`]: LineWriter
71//!
72//! ### Making Room
73//! The buffered types of this crate and their `std::io` counterparts, by default, use `Box<[u8]>`
74//! as their buffer types ([`Buffer`](Buffer) is included as well since it is used internally
75//! by the other types in this crate).
76//!
77//! When one of these types inserts bytes into its buffer, via `BufRead::fill_buf()` (implicitly
78//! called by `Read::read()`) in `BufReader`'s case or `Write::write()` in `BufWriter`'s case,
79//! the entire buffer is provided to be read/written into and the number of bytes written is saved.
80//! The read/written data then resides in the `[0 .. bytes_inserted]` slice of the buffer.
81//!
82//! When bytes are consumed from the buffer, via `BufRead::consume()` or `Write::flush()`,
83//! the number of bytes consumed is added to the start of the slice such that the remaining
84//! data resides in the `[bytes_consumed .. bytes_inserted]` slice of the buffer.
85//!
86//! The `std::io` buffered types, and their counterparts in this crate with their default policies,
87//! don't have to deal with partially filled buffers as `BufReader` only reads when empty and
88//! `BufWriter` only flushes when full.
89//!
90//! However, because the replacements in this crate are capable of reading on-demand and flushing
91//! less than a full buffer, they can run out of room in their buffers to read/write data into even
92//! though there is technically free space, because this free space is at the head of the buffer
93//! where reading into it would cause the data in the buffer to become non-contiguous.
94//!
95//! This isn't technically a problem as the buffer could operate like `VecDeque` in `std` and return
96//! both slices at once, but this would not fit all use-cases: the `Read::fill_buf()` interface only
97//! allows one slice to be returned at a time so the older data would need to be completely consumed
98//! before the newer data can be returned; `BufWriter` could support it as the `Write` interface
99//! doesn't make an opinion on how the buffer works, but because the data would be non-contiguous
100//! it would require two flushes to get it all, which could degrade performance.
101//!
102//! The obvious solution, then, is to move the existing data down to the beginning of the buffer
103//! when there is no more room at the end so that more reads/writes into the buffer can be issued.
104//! This works, and may suit some use-cases where the amount of data left is small and thus copying
105//! it would be inexpensive, but it is non-optimal. However, this option is provided
106//! as the `.make_room()` methods, and is utilized by [`policy::MinBuffered`](policy::MinBuffered)
107//! and [`policy::FlushExact`](policy::FlushExact).
108//!
109//! ### Ringbuffers / `slice-deque` Feature
110//! Instead of moving data, however, it is also possible to use virtual-memory tricks to
111//! allocate a ringbuffer that loops around on itself in memory and thus is always contiguous,
112//! as described in [the Wikipedia article on Ringbuffers][ringbuf-wikipedia].
113//!
114//! This is the exact trick used by [the `slice-deque` crate](https://crates.io/crates/slice-deque),
115//! which is now provided as an optional feature `slice-deque` exposed via the
116//! `new_ringbuf()` and `with_capacity_ringbuf()` constructors added to the buffered types here.
117//! When a buffered type is constructed using one of these functions, `.make_room()` is turned into
118//! a no-op as consuming bytes from the head of the buffer simultaneously makes room at the tail.
119//! However, this has some caveats:
120//!
121//! * It is only available on target platforms with virtual memory support, namely fully fledged
122//! OSes such as Windows and Unix-derivative platforms like Linux, OS X, BSD variants, etc.
123//!
124//! * The default capacity varies based on platform, and custom capacities are rounded up to a
125//! multiple of their minimum size, typically the page size of the platform.
126//! Windows' minimum size is comparably quite large (**64 KiB**) due to some legacy reasons,
127//! so this may be less optimal than the default capacity for a normal buffer (8 KiB) for some
128//! use-cases.
129//!
130//! * Due to the nature of the virtual-memory trick, the virtual address space the buffer
131//! allocates will be double its capacity. This means that your program will *appear* to use more
132//! memory than it would if it was using a normal buffer of the same capacity. The physical memory
133//! usage will be the same in both cases, but if address space is at a premium in your application
134//! (32-bit targets) then this may be a concern.
135//!
136//! [ringbuf-wikipedia]: https://en.wikipedia.org/wiki/Circular_buffer#Optimization
137#![warn(missing_docs)]
138#![cfg_attr(feature = "nightly", feature(alloc, read_initializer, specialization))]
139#![cfg_attr(all(test, feature = "nightly"), feature(io, test))]
140
141extern crate memchr;
142
143extern crate safemem;
144
145use std::any::Any;
146use std::cell::RefCell;
147use std::io::prelude::*;
148use std::io::SeekFrom;
149use std::mem::ManuallyDrop;
150use std::{cmp, error, fmt, io, ptr};
151
152#[cfg(all(feature = "nightly", test))]
153mod benches;
154
155// std::io's tests require exact allocation which slice_deque cannot provide
156#[cfg(test)]
157mod std_tests;
158
159#[cfg(all(test, feature = "slice-deque"))]
160mod ringbuf_tests;
161
162#[cfg(feature = "nightly")]
163mod nightly;
164
165#[cfg(feature = "nightly")]
166use nightly::init_buffer;
167
168mod buffer;
169
170use buffer::BufImpl;
171
172pub mod policy;
173
174use self::policy::{ReaderPolicy, WriterPolicy, StdPolicy, FlushOnNewline};
175
176const DEFAULT_BUF_SIZE: usize = 8 * 1024;
177
178/// A drop-in replacement for `std::io::BufReader` with more functionality.
179///
180/// Original method names/signatures and implemented traits are left untouched,
181/// making replacement as simple as swapping the import of the type.
182///
183/// By default this type implements the behavior of its `std` counterpart: it only reads into
184/// the buffer when it is empty.
185///
186/// To change this type's behavior, change the policy with [`.set_policy()`] using a type
187/// from the [`policy` module] or your own implementation of [`ReaderPolicy`].
188///
189/// Policies that perform alternating reads and consumes without completely emptying the buffer
190/// may benefit from using a ringbuffer via the [`new_ringbuf()`] and [`with_capacity_ringbuf()`]
191/// constructors. Ringbuffers are only available on supported platforms with the
192/// `slice-deque` feature and have some other caveats; see [the crate root docs][ringbufs-root]
193/// for more details.
194///
195/// [`.set_policy()`]: BufReader::set_policy
196/// [`policy` module]: policy
197/// [`ReaderPolicy`]: policy::ReaderPolicy
198/// [`new_ringbuf()`]: BufReader::new_ringbuf
199/// [`with_capacity_ringbuf()`]: BufReader::with_capacity_ringbuf
200/// [ringbufs-root]: index.html#ringbuffers--slice-deque-feature
201pub struct BufReader<R, P = StdPolicy>{
202 // First field for null pointer optimization.
203 buf: Buffer,
204 inner: R,
205 policy: P,
206}
207
208impl<R> BufReader<R, StdPolicy> {
209 /// Create a new `BufReader` wrapping `inner`, utilizing a buffer of
210 /// default capacity and the default [`ReaderPolicy`](policy::ReaderPolicy).
211 pub fn new(inner: R) -> Self {
212 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
213 }
214
215 /// Create a new `BufReader` wrapping `inner`, utilizing a buffer with a capacity
216 /// of *at least* `cap` bytes and the default [`ReaderPolicy`](policy::ReaderPolicy).
217 ///
218 /// The actual capacity of the buffer may vary based on implementation details of the global
219 /// allocator.
220 pub fn with_capacity(cap: usize, inner: R) -> Self {
221 Self::with_buffer(Buffer::with_capacity(cap), inner)
222 }
223
224 /// Create a new `BufReader` wrapping `inner`, utilizing a ringbuffer with the default capacity
225 /// and `ReaderPolicy`.
226 ///
227 /// A ringbuffer never has to move data to make room; consuming bytes from the head
228 /// simultaneously makes room at the tail. This is useful in conjunction with a policy like
229 /// [`MinBuffered`](policy::MinBuffered) to ensure there is always room to read more data
230 /// if necessary, without expensive copying operations.
231 ///
232 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
233 /// enabled. The default capacity will differ between Windows and Unix-derivative targets.
234 /// See [`Buffer::new_ringbuf()`](struct.Buffer.html#method.new_ringbuf)
235 /// or [the crate root docs](index.html#ringbuffers--slice-deque-feature) for more info.
236 #[cfg(feature = "slice-deque")]
237 pub fn new_ringbuf(inner: R) -> Self {
238 Self::with_capacity_ringbuf(DEFAULT_BUF_SIZE, inner)
239 }
240
241 /// Create a new `BufReader` wrapping `inner`, utilizing a ringbuffer with *at least* the given
242 /// capacity and the default `ReaderPolicy`.
243 ///
244 /// A ringbuffer never has to move data to make room; consuming bytes from the head
245 /// simultaneously makes room at the tail. This is useful in conjunction with a policy like
246 /// [`MinBuffered`](policy::MinBuffered) to ensure there is always room to read more data
247 /// if necessary, without expensive copying operations.
248 ///
249 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
250 /// enabled. The capacity will be rounded up to the minimum size for the target platform.
251 /// See [`Buffer::with_capacity_ringbuf()`](struct.Buffer.html#method.with_capacity_ringbuf)
252 /// or [the crate root docs](index.html#ringbuffers--slice-deque-feature) for more info.
253 #[cfg(feature = "slice-deque")]
254 pub fn with_capacity_ringbuf(cap: usize, inner: R) -> Self {
255 Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
256 }
257
258 /// Wrap `inner` with an existing `Buffer` instance and the default `ReaderPolicy`.
259 ///
260 /// ### Note
261 /// Does **not** clear the buffer first! If there is data already in the buffer
262 /// then it will be returned in `read()` and `fill_buf()` ahead of any data from `inner`.
263 pub fn with_buffer(buf: Buffer, inner: R) -> Self {
264 BufReader {
265 buf, inner, policy: StdPolicy
266 }
267 }
268}
269
270impl<R, P> BufReader<R, P> {
271 /// Apply a new `ReaderPolicy` to this `BufReader`, returning the transformed type.
272 pub fn set_policy<P_: ReaderPolicy>(self, policy: P_) -> BufReader<R, P_> {
273 BufReader {
274 inner: self.inner,
275 buf: self.buf,
276 policy
277 }
278 }
279
280 /// Mutate the current [`ReaderPolicy`](policy::ReaderPolicy) in-place.
281 ///
282 /// If you want to change the type, use `.set_policy()`.
283 pub fn policy_mut(&mut self) -> &mut P { &mut self.policy }
284
285 /// Inspect the current `ReaderPolicy`.
286 pub fn policy(&self) -> &P {
287 &self.policy
288 }
289
290 /// Move data to the start of the buffer, making room at the end for more
291 /// reading.
292 ///
293 /// This is a no-op with the `*_ringbuf()` constructors (requires `slice-deque` feature).
294 pub fn make_room(&mut self) {
295 self.buf.make_room();
296 }
297
298 /// Ensure room in the buffer for *at least* `additional` bytes. May not be
299 /// quite exact due to implementation details of the buffer's allocator.
300 pub fn reserve(&mut self, additional: usize) {
301 self.buf.reserve(additional);
302 }
303
304 // RFC: pub fn shrink(&mut self, new_len: usize) ?
305
306 /// Get the section of the buffer containing valid data; may be empty.
307 ///
308 /// Call `.consume()` to remove bytes from the beginning of this section.
309 pub fn buffer(&self) -> &[u8] {
310 self.buf.buf()
311 }
312
313 /// Get the current number of bytes available in the buffer.
314 pub fn buf_len(&self) -> usize {
315 self.buf.len()
316 }
317
318 /// Get the total buffer capacity.
319 pub fn capacity(&self) -> usize {
320 self.buf.capacity()
321 }
322
323 /// Get an immutable reference to the underlying reader.
324 pub fn get_ref(&self) -> &R { &self.inner }
325
326 /// Get a mutable reference to the underlying reader.
327 ///
328 /// ## Note
329 /// Reading directly from the underlying reader is not recommended, as some
330 /// data has likely already been moved into the buffer.
331 pub fn get_mut(&mut self) -> &mut R { &mut self.inner }
332
333 /// Consume `self` and return the inner reader only.
334 pub fn into_inner(self) -> R {
335 self.inner
336 }
337
338 /// Consume `self` and return both the underlying reader and the buffer.
339 ///
340 /// See also: `BufReader::unbuffer()`
341 pub fn into_inner_with_buffer(self) -> (R, Buffer) {
342 (self.inner, self.buf)
343 }
344
345 /// Consume `self` and return an adapter which implements `Read` and will
346 /// empty the buffer before reading directly from the underlying reader.
347 pub fn unbuffer(self) -> Unbuffer<R> {
348 Unbuffer {
349 inner: self.inner,
350 buf: Some(self.buf),
351 }
352 }
353}
354
355impl<R, P: ReaderPolicy> BufReader<R, P> {
356 #[inline]
357 fn should_read(&mut self) -> bool {
358 self.policy.before_read(&mut self.buf).0
359 }
360}
361
362impl<R: Read, P> BufReader<R, P> {
363 /// Unconditionally perform a read into the buffer.
364 ///
365 /// Does not invoke `ReaderPolicy` methods.
366 ///
367 /// If the read was successful, returns the number of bytes read.
368 pub fn read_into_buf(&mut self) -> io::Result<usize> {
369 self.buf.read_from(&mut self.inner)
370 }
371
372 /// Box the inner reader without losing data.
373 pub fn boxed<'a>(self) -> BufReader<Box<Read + 'a>, P> where R: 'a {
374 let inner: Box<Read + 'a> = Box::new(self.inner);
375
376 BufReader {
377 inner,
378 buf: self.buf,
379 policy: self.policy,
380 }
381 }
382}
383
384impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {
385 fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
386 // If we don't have any buffered data and we're doing a read matching
387 // or exceeding the internal buffer's capacity, bypass the buffer.
388 if self.buf.is_empty() && out.len() >= self.buf.capacity() {
389 return self.inner.read(out);
390 }
391
392 let nread = self.fill_buf()?.read(out)?;
393 self.consume(nread);
394 Ok(nread)
395 }
396}
397
398impl<R: Read, P: ReaderPolicy> BufRead for BufReader<R, P> {
399 fn fill_buf(&mut self) -> io::Result<&[u8]> {
400 // If we've reached the end of our internal buffer then we need to fetch
401 // some more data from the underlying reader.
402 // This execution order is important; the policy may want to resize the buffer or move data
403 // before reading into it.
404 while self.should_read() && self.buf.usable_space() > 0 {
405 if self.read_into_buf()? == 0 { break; };
406 }
407
408 Ok(self.buffer())
409 }
410
411 fn consume(&mut self, mut amt: usize) {
412 amt = cmp::min(amt, self.buf_len());
413 self.buf.consume(amt);
414 self.policy.after_consume(&mut self.buf, amt);
415 }
416}
417
418impl<R: fmt::Debug, P: fmt::Debug> fmt::Debug for BufReader<R, P> {
419 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
420 fmt.debug_struct("buf_redux::BufReader")
421 .field("reader", &self.inner)
422 .field("buf_len", &self.buf_len())
423 .field("capacity", &self.capacity())
424 .field("policy", &self.policy)
425 .finish()
426 }
427}
428
429impl<R: Seek, P: ReaderPolicy> Seek for BufReader<R, P> {
430 /// Seek to an ofPet, in bytes, in the underlying reader.
431 ///
432 /// The position used for seeking with `SeekFrom::Current(_)` is the
433 /// position the underlying reader would be at if the `BufReader` had no
434 /// internal buffer.
435 ///
436 /// Seeking always discards the internal buffer, even if the seek position
437 /// would otherwise fall within it. This guarantees that calling
438 /// `.unwrap()` immediately after a seek yields the underlying reader at
439 /// the same position.
440 ///
441 /// See `std::io::Seek` for more details.
442 ///
443 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
444 /// where `n` minus the internal buffer length underflows an `i64`, two
445 /// seeks will be performed instead of one. If the second seek returns
446 /// `Err`, the underlying reader will be left at the same position it would
447 /// have if you seeked to `SeekFrom::Current(0)`.
448 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
449 let result: u64;
450 if let SeekFrom::Current(n) = pos {
451 let remainder = self.buf_len() as i64;
452 // it should be safe to assume that remainder fits within an i64 as the alternative
453 // means we managed to allocate 8 ebibytes and that's absurd.
454 // But it's not out of the realm of possibility for some weird underlying reader to
455 // support seeking by i64::min_value() so we need to handle underflow when subtracting
456 // remainder.
457 if let Some(offset) = n.checked_sub(remainder) {
458 result = self.inner.seek(SeekFrom::Current(offset))?;
459 } else {
460 // seek backwards by our remainder, and then by the offset
461 self.inner.seek(SeekFrom::Current(-remainder))?;
462 self.buf.clear(); // empty the buffer
463 result = self.inner.seek(SeekFrom::Current(n))?;
464 }
465 } else {
466 // Seeking with Start/End doesn't care about our buffer length.
467 result = self.inner.seek(pos)?;
468 }
469 self.buf.clear();
470 Ok(result)
471 }
472}
473
474/// A drop-in replacement for `std::io::BufWriter` with more functionality.
475///
476/// Original method names/signatures and implemented traits are left untouched,
477/// making replacement as simple as swapping the import of the type.
478///
479/// By default this type implements the behavior of its `std` counterpart: it only flushes
480/// the buffer if an incoming write is larger than the remaining space.
481///
482/// To change this type's behavior, change the policy with [`.set_policy()`] using a type
483/// from the [`policy` module] or your own implentation of [`WriterPolicy`].
484///
485/// Policies that perform alternating writes and flushes without completely emptying the buffer
486/// may benefit from using a ringbuffer via the [`new_ringbuf()`] and [`with_capacity_ringbuf()`]
487/// constructors. Ringbuffers are only available on supported platforms with the
488/// `slice-deque` feature and have some caveats; see [the docs at the crate root][ringbufs-root]
489/// for more details.
490///
491/// [`.set_policy()`]: BufWriter::set_policy
492/// [`policy` module]: policy
493/// [`WriterPolicy`]: policy::WriterPolicy
494/// [`new_ringbuf()`]: BufWriter::new_ringbuf
495/// [`with_capacity_ringbuf()`]: BufWriter::with_capacity_ringbuf
496/// [ringbufs-root]: index.html#ringbuffers--slice-deque-feature
497pub struct BufWriter<W: Write, P = StdPolicy> {
498 buf: Buffer,
499 inner: W,
500 policy: P,
501 panicked: bool,
502}
503
504impl<W: Write> BufWriter<W> {
505 /// Create a new `BufWriter` wrapping `inner` with the default buffer capacity and
506 /// [`WriterPolicy`](policy::WriterPolicy).
507 pub fn new(inner: W) -> Self {
508 Self::with_buffer(Buffer::new(), inner)
509 }
510
511 /// Create a new `BufWriter` wrapping `inner`, utilizing a buffer with a capacity
512 /// of *at least* `cap` bytes and the default [`WriterPolicy`](policy::WriterPolicy).
513 ///
514 /// The actual capacity of the buffer may vary based on implementation details of the global
515 /// allocator.
516 pub fn with_capacity(cap: usize, inner: W) -> Self {
517 Self::with_buffer(Buffer::with_capacity(cap), inner)
518 }
519
520 /// Create a new `BufWriter` wrapping `inner`, utilizing a ringbuffer with the default
521 /// capacity and [`WriterPolicy`](policy::WriterPolicy).
522 ///
523 /// A ringbuffer never has to move data to make room; consuming bytes from the head
524 /// simultaneously makes room at the tail. This is useful in conjunction with a policy like
525 /// [`FlushExact`](policy::FlushExact) to ensure there is always room to write more data if
526 /// necessary, without expensive copying operations.
527 ///
528 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
529 /// enabled. The default capacity will differ between Windows and Unix-derivative targets.
530 /// See [`Buffer::new_ringbuf()`](Buffer::new_ringbuf)
531 /// or [the crate root docs](index.html#ringbuffers--slice-deque-feature) for more info.
532 #[cfg(feature = "slice-deque")]
533 pub fn new_ringbuf(inner: W) -> Self {
534 Self::with_buffer(Buffer::new_ringbuf(), inner)
535 }
536
537 /// Create a new `BufWriter` wrapping `inner`, utilizing a ringbuffer with *at least* `cap`
538 /// capacity and the default [`WriterPolicy`](policy::WriterPolicy).
539 ///
540 /// A ringbuffer never has to move data to make room; consuming bytes from the head
541 /// simultaneously makes room at the tail. This is useful in conjunction with a policy like
542 /// [`FlushExact`](policy::FlushExact) to ensure there is always room to write more data if
543 /// necessary, without expensive copying operations.
544 ///
545 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
546 /// enabled. The capacity will be rounded up to the minimum size for the target platform.
547 /// See [`Buffer::with_capacity_ringbuf()`](Buffer::with_capacity_ringbuf)
548 /// or [the crate root docs](index.html#ringbuffers--slice-deque-feature) for more info.
549 #[cfg(feature = "slice-deque")]
550 pub fn with_capacity_ringbuf(cap: usize, inner: W) -> Self {
551 Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
552 }
553
554 /// Create a new `BufWriter` wrapping `inner`, utilizing the existing [`Buffer`](Buffer)
555 /// instance and the default [`WriterPolicy`](policy::WriterPolicy).
556 ///
557 /// ### Note
558 /// Does **not** clear the buffer first! If there is data already in the buffer
559 /// it will be written out on the next flush!
560 pub fn with_buffer(buf: Buffer, inner: W) -> BufWriter<W> {
561 BufWriter {
562 buf, inner, policy: StdPolicy, panicked: false,
563 }
564 }
565}
566
567impl<W: Write, P> BufWriter<W, P> {
568 /// Set a new [`WriterPolicy`](policy::WriterPolicy), returning the transformed type.
569 pub fn set_policy<P_: WriterPolicy>(self, policy: P_) -> BufWriter<W, P_> {
570 let panicked = self.panicked;
571 let (inner, buf) = self.into_inner_();
572
573 BufWriter {
574 inner, buf, policy, panicked
575 }
576 }
577
578 /// Mutate the current [`WriterPolicy`](policy::WriterPolicy).
579 pub fn policy_mut(&mut self) -> &mut P {
580 &mut self.policy
581 }
582
583 /// Inspect the current `WriterPolicy`.
584 pub fn policy(&self) -> &P {
585 &self.policy
586 }
587
588 /// Get a reference to the inner writer.
589 pub fn get_ref(&self) -> &W {
590 &self.inner
591 }
592
593 /// Get a mutable reference to the inner writer.
594 ///
595 /// ### Note
596 /// If the buffer has not been flushed, writing directly to the inner type will cause
597 /// data inconsistency.
598 pub fn get_mut(&mut self) -> &mut W {
599 &mut self.inner
600 }
601
602 /// Get the capacty of the inner buffer.
603 pub fn capacity(&self) -> usize {
604 self.buf.capacity()
605 }
606
607 /// Get the number of bytes currently in the buffer.
608 pub fn buf_len(&self) -> usize {
609 self.buf.len()
610 }
611
612 /// Reserve space in the buffer for at least `additional` bytes. May not be
613 /// quite exact due to implementation details of the buffer's allocator.
614 pub fn reserve(&mut self, additional: usize) {
615 self.buf.reserve(additional);
616 }
617
618 /// Move data to the start of the buffer, making room at the end for more
619 /// writing.
620 ///
621 /// This is a no-op with the `*_ringbuf()` constructors (requires `slice-deque` feature).
622 pub fn make_room(&mut self) {
623 self.buf.make_room();
624 }
625
626 /// Consume `self` and return both the underlying writer and the buffer
627 pub fn into_inner_with_buffer(self) -> (W, Buffer) {
628 self.into_inner_()
629 }
630
631 // copy the fields out and forget `self` to avoid dropping twice
632 fn into_inner_(self) -> (W, Buffer) {
633 let s = ManuallyDrop::new(self);
634 unsafe {
635 // safe because we immediately forget `self`
636 let inner = ptr::read(&s.inner);
637 let buf = ptr::read(&s.buf);
638 (inner, buf)
639 }
640 }
641
642 fn flush_buf(&mut self, amt: usize) -> io::Result<()> {
643 if amt == 0 || amt > self.buf.len() { return Ok(()) }
644
645 self.panicked = true;
646 let ret = self.buf.write_max(amt, &mut self.inner);
647 self.panicked = false;
648 ret
649 }
650}
651
652impl<W: Write, P: WriterPolicy> BufWriter<W, P> {
653 /// Flush the buffer and unwrap, returning the inner writer on success,
654 /// or a type wrapping `self` plus the error otherwise.
655 pub fn into_inner(mut self) -> Result<W, IntoInnerError<Self>> {
656 match self.flush() {
657 Err(e) => Err(IntoInnerError(self, e)),
658 Ok(()) => Ok(self.into_inner_().0),
659 }
660 }
661
662 /// Flush the buffer and unwrap, returning the inner writer and
663 /// any error encountered during flushing.
664 pub fn into_inner_with_err(mut self) -> (W, Option<io::Error>) {
665 let err = self.flush().err();
666 (self.into_inner_().0, err)
667 }
668}
669
670impl<W: Write, P: WriterPolicy> Write for BufWriter<W, P> {
671 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
672 let flush_amt = self.policy.before_write(&mut self.buf, buf.len()).0;
673 self.flush_buf(flush_amt)?;
674
675 let written = if self.buf.is_empty() && buf.len() >= self.buf.capacity() {
676 self.panicked = true;
677 let result = self.inner.write(buf);
678 self.panicked = false;
679 result?
680 } else {
681 self.buf.copy_from_slice(buf)
682 };
683
684 let flush_amt = self.policy.after_write(&self.buf).0;
685
686 let _ = self.flush_buf(flush_amt);
687
688 Ok(written)
689 }
690
691 fn flush(&mut self) -> io::Result<()> {
692 let flush_amt = self.buf.len();
693 self.flush_buf(flush_amt)?;
694 self.inner.flush()
695 }
696}
697
698impl<W: Write + Seek, P: WriterPolicy> Seek for BufWriter<W, P> {
699 /// Seek to the ofPet, in bytes, in the underlying writer.
700 ///
701 /// Seeking always writes out the internal buffer before seeking.
702 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
703 self.flush().and_then(|_| self.get_mut().seek(pos))
704 }
705}
706
707impl<W: Write + fmt::Debug, P: fmt::Debug> fmt::Debug for BufWriter<W, P> {
708 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
709 f.debug_struct("buf_redux::BufWriter")
710 .field("writer", &self.inner)
711 .field("capacity", &self.capacity())
712 .field("policy", &self.policy)
713 .finish()
714 }
715}
716
717
718/// Attempt to flush the buffer to the underlying writer.
719///
720/// If an error occurs, the thread-local handler is invoked, if one was previously
721/// set by [`set_drop_err_handler`](set_drop_err_handler) for this thread.
722impl<W: Write, P> Drop for BufWriter<W, P> {
723 fn drop(&mut self) {
724 if !self.panicked {
725 // instead of ignoring a failed flush, call the handler
726 let buf_len = self.buf.len();
727 if let Err(err) = self.flush_buf(buf_len) {
728 DROP_ERR_HANDLER.with(|deh| {
729 (*deh.borrow())(&mut self.inner, &mut self.buf, err)
730 });
731 }
732 }
733 }
734}
735
736/// A drop-in replacement for `std::io::LineWriter` with more functionality.
737///
738/// This is, in fact, only a thin wrapper around
739/// [`BufWriter`](BufWriter)`<W, `[`policy::FlushOnNewline`](policy::FlushOnNewline)`>`, which
740/// demonstrates the power of custom [`WriterPolicy`](policy::WriterPolicy) implementations.
741pub struct LineWriter<W: Write>(BufWriter<W, FlushOnNewline>);
742
743impl<W: Write> LineWriter<W> {
744 /// Wrap `inner` with the default buffer capacity.
745 pub fn new(inner: W) -> Self {
746 Self::with_buffer(Buffer::new(), inner)
747 }
748
749 /// Wrap `inner` with the given buffer capacity.
750 pub fn with_capacity(cap: usize, inner: W) -> Self {
751 Self::with_buffer(Buffer::with_capacity(cap), inner)
752 }
753
754 /// Wrap `inner` with the default buffer capacity using a ringbuffer.
755 #[cfg(feature = "slice-deque")]
756 pub fn new_ringbuf(inner: W) -> Self {
757 Self::with_buffer(Buffer::new_ringbuf(), inner)
758 }
759
760 /// Wrap `inner` with the given buffer capacity using a ringbuffer.
761 #[cfg(feature = "slice-deque")]
762 pub fn with_capacity_ringbuf(cap: usize, inner: W) -> Self {
763 Self::with_buffer(Buffer::with_capacity_ringbuf(cap), inner)
764 }
765
766 /// Wrap `inner` with an existing `Buffer` instance.
767 ///
768 /// ### Note
769 /// Does **not** clear the buffer first! If there is data already in the buffer
770 /// it will be written out on the next flush!
771 pub fn with_buffer(buf: Buffer, inner: W) -> LineWriter<W> {
772 LineWriter(BufWriter::with_buffer(buf, inner).set_policy(FlushOnNewline))
773 }
774
775 /// Get a reference to the inner writer.
776 pub fn get_ref(&self) -> &W {
777 self.0.get_ref()
778 }
779
780 /// Get a mutable reference to the inner writer.
781 ///
782 /// ### Note
783 /// If the buffer has not been flushed, writing directly to the inner type will cause
784 /// data inconsistency.
785 pub fn get_mut(&mut self) -> &mut W {
786 self.0.get_mut()
787 }
788
789 /// Get the capacity of the inner buffer.
790 pub fn capacity(&self) -> usize {
791 self.0.capacity()
792 }
793
794 /// Get the number of bytes currently in the buffer.
795 pub fn buf_len(&self) -> usize {
796 self.0.buf_len()
797 }
798
799 /// Ensure enough space in the buffer for *at least* `additional` bytes. May not be
800 /// quite exact due to implementation details of the buffer's allocator.
801 pub fn reserve(&mut self, additional: usize) {
802 self.0.reserve(additional);
803 }
804
805 /// Flush the buffer and unwrap, returning the inner writer on success,
806 /// or a type wrapping `self` plus the error otherwise.
807 pub fn into_inner(self) -> Result<W, IntoInnerError<Self>> {
808 self.0.into_inner()
809 .map_err(|IntoInnerError(inner, e)| IntoInnerError(LineWriter(inner), e))
810 }
811
812 /// Flush the buffer and unwrap, returning the inner writer and
813 /// any error encountered during flushing.
814 pub fn into_inner_with_err(self) -> (W, Option<io::Error>) {
815 self.0.into_inner_with_err()
816 }
817
818 /// Consume `self` and return both the underlying writer and the buffer.
819 pub fn into_inner_with_buf(self) -> (W, Buffer){
820 self.0.into_inner_with_buffer()
821 }
822}
823
824impl<W: Write> Write for LineWriter<W> {
825 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
826 self.0.write(buf)
827 }
828
829 fn flush(&mut self) -> io::Result<()> {
830 self.0.flush()
831 }
832}
833
834impl<W: Write + fmt::Debug> fmt::Debug for LineWriter<W> {
835 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
836 f.debug_struct("buf_redux::LineWriter")
837 .field("writer", self.get_ref())
838 .field("capacity", &self.capacity())
839 .finish()
840 }
841}
842
843/// The error type for `BufWriter::into_inner()`,
844/// contains the `BufWriter` as well as the error that occurred.
845#[derive(Debug)]
846pub struct IntoInnerError<W>(pub W, pub io::Error);
847
848impl<W> IntoInnerError<W> {
849 /// Get the error
850 pub fn error(&self) -> &io::Error {
851 &self.1
852 }
853
854 /// Take the writer.
855 pub fn into_inner(self) -> W {
856 self.0
857 }
858}
859
860impl<W> Into<io::Error> for IntoInnerError<W> {
861 fn into(self) -> io::Error {
862 self.1
863 }
864}
865
866impl<W: Any + Send + fmt::Debug> error::Error for IntoInnerError<W> {
867 fn description(&self) -> &str {
868 error::Error::description(self.error())
869 }
870
871 fn cause(&self) -> Option<&error::Error> {
872 Some(&self.1)
873 }
874}
875
876impl<W> fmt::Display for IntoInnerError<W> {
877 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
878 self.error().fmt(f)
879 }
880}
881
882/// A deque-like datastructure for managing bytes.
883///
884/// Supports interacting via I/O traits like `Read` and `Write`, and direct access.
885pub struct Buffer {
886 buf: BufImpl,
887 zeroed: usize,
888}
889
890impl Buffer {
891 /// Create a new buffer with a default capacity.
892 pub fn new() -> Self {
893 Self::with_capacity(DEFAULT_BUF_SIZE)
894 }
895
896 /// Create a new buffer with *at least* the given capacity.
897 ///
898 /// If the global allocator returns extra capacity, `Buffer` will use all of it.
899 pub fn with_capacity(cap: usize) -> Self {
900 Buffer {
901 buf: BufImpl::with_capacity(cap),
902 zeroed: 0,
903 }
904 }
905
906 /// Allocate a buffer with a default capacity that never needs to move data to make room
907 /// (consuming from the head simultaneously makes more room at the tail).
908 ///
909 /// The default capacity varies based on the target platform:
910 ///
911 /// * Unix-derivative platforms; Linux, OS X, BSDs, etc: **8KiB** (the default buffer size for
912 /// `std::io` buffered types)
913 /// * Windows: **64KiB** because of legacy reasons, of course (see below)
914 ///
915 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
916 /// enabled. The current platforms that are supported/tested are listed
917 /// [in the README for the `slice-deque` crate][slice-deque].
918 ///
919 /// [slice-deque]: https://github.com/gnzlbg/slice_deque#platform-support
920 #[cfg(feature = "slice-deque")]
921 pub fn new_ringbuf() -> Self {
922 Self::with_capacity_ringbuf(DEFAULT_BUF_SIZE)
923 }
924
925 /// Allocate a buffer with *at least* the given capacity that never needs to move data to
926 /// make room (consuming from the head simultaneously makes more room at the tail).
927 ///
928 /// The capacity will be rounded up to the minimum size for the current target:
929 ///
930 /// * Unix-derivative platforms; Linux, OS X, BSDs, etc: the next multiple of the page size
931 /// (typically 4KiB but can vary based on system configuration)
932 /// * Windows: the next muliple of **64KiB**; see [this Microsoft dev blog post][Win-why-64k]
933 /// for why it's 64KiB and not the page size (TL;DR: Alpha AXP needs it and it's applied on
934 /// all targets for consistency/portability)
935 ///
936 /// [Win-why-64k]: https://blogs.msdn.microsoft.com/oldnewthing/20031008-00/?p=42223
937 ///
938 /// Only available on platforms with virtual memory support and with the `slice-deque` feature
939 /// enabled. The current platforms that are supported/tested are listed
940 /// [in the README for the `slice-deque` crate][slice-deque].
941 ///
942 /// [slice-deque]: https://github.com/gnzlbg/slice_deque#platform-support
943 #[cfg(feature = "slice-deque")]
944 pub fn with_capacity_ringbuf(cap: usize) -> Self {
945 Buffer {
946 buf: BufImpl::with_capacity_ringbuf(cap),
947 zeroed: 0,
948 }
949 }
950
951 /// Return `true` if this is a ringbuffer.
952 pub fn is_ringbuf(&self) -> bool {
953 self.buf.is_ringbuf()
954 }
955
956 /// Return the number of bytes currently in this buffer.
957 ///
958 /// Equivalent to `self.buf().len()`.
959 pub fn len(&self) -> usize {
960 self.buf.len()
961 }
962
963 /// Return the number of bytes that can be read into this buffer before it needs
964 /// to grow or the data in the buffer needs to be moved.
965 ///
966 /// This may not constitute all free space in the buffer if bytes have been consumed
967 /// from the head. Use `free_space()` to determine the total free space in the buffer.
968 pub fn usable_space(&self) -> usize {
969 self.buf.usable_space()
970 }
971
972 /// Returns the total amount of free space in the buffer, including bytes
973 /// already consumed from the head.
974 ///
975 /// This will be greater than or equal to `usable_space()`. On supported platforms
976 /// with the `slice-deque` feature enabled, it should be equal.
977 pub fn free_space(&self) -> usize {
978 self.capacity() - self.len()
979 }
980
981 /// Return the total capacity of this buffer.
982 pub fn capacity(&self) -> usize {
983 self.buf.capacity()
984 }
985
986 /// Returns `true` if there are no bytes in the buffer, false otherwise.
987 pub fn is_empty(&self) -> bool {
988 self.len() == 0
989 }
990
991 /// Move bytes down in the buffer to maximize usable space.
992 ///
993 /// This is a no-op on supported platforms with the `slice-deque` feature enabled.
994 pub fn make_room(&mut self) {
995 self.buf.make_room();
996 }
997
998 /// Ensure space for at least `additional` more bytes in the buffer.
999 ///
1000 /// This is a no-op if `usable_space() >= additional`. Note that this will reallocate
1001 /// even if there is enough free space at the head of the buffer for `additional` bytes,
1002 /// because that free space is not at the tail where it can be read into.
1003 /// If you prefer copying data down in the buffer before attempting to reallocate you may wish
1004 /// to call `.make_room()` first.
1005 ///
1006 /// ### Panics
1007 /// If `self.capacity() + additional` overflows.
1008 pub fn reserve(&mut self, additional: usize) {
1009 // Returns `true` if we reallocated out-of-place and thus need to re-zero.
1010 if self.buf.reserve(additional) {
1011 self.zeroed = 0;
1012 }
1013 }
1014
1015 /// Get an immutable slice of the available bytes in this buffer.
1016 ///
1017 /// Call `.consume()` to remove bytes from the beginning of this slice.
1018 pub fn buf(&self) -> &[u8] { self.buf.buf() }
1019
1020 /// Get a mutable slice representing the available bytes in this buffer.
1021 ///
1022 /// Call `.consume()` to remove bytes from the beginning of this slice.
1023 pub fn buf_mut(&mut self) -> &mut [u8] { self.buf.buf_mut() }
1024
1025 /// Read from `rdr`, returning the number of bytes read or any errors.
1026 ///
1027 /// If there is no more room at the head of the buffer, this will return `Ok(0)`.
1028 ///
1029 /// Uses `Read::initializer()` to initialize the buffer if the `nightly`
1030 /// feature is enabled, otherwise the buffer is zeroed if it has never been written.
1031 ///
1032 /// ### Panics
1033 /// If the returned count from `rdr.read()` overflows the tail cursor of this buffer.
1034 pub fn read_from<R: Read + ?Sized>(&mut self, rdr: &mut R) -> io::Result<usize> {
1035 if self.usable_space() == 0 {
1036 return Ok(0);
1037 }
1038
1039 let cap = self.capacity();
1040 if self.zeroed < cap {
1041 unsafe {
1042 let buf = self.buf.write_buf();
1043 init_buffer(&rdr, buf);
1044 }
1045
1046 self.zeroed = cap;
1047 }
1048
1049 let read = {
1050 let mut buf = unsafe { self.buf.write_buf() };
1051 rdr.read(buf)?
1052 };
1053
1054 unsafe {
1055 self.buf.bytes_written(read);
1056 }
1057
1058 Ok(read)
1059 }
1060
1061 /// Copy from `src` to the tail of this buffer. Returns the number of bytes copied.
1062 ///
1063 /// This will **not** grow the buffer if `src` is larger than `self.usable_space()`; instead,
1064 /// it will fill the usable space and return the number of bytes copied. If there is no usable
1065 /// space, this returns 0.
1066 pub fn copy_from_slice(&mut self, src: &[u8]) -> usize {
1067 let len = unsafe {
1068 let mut buf = self.buf.write_buf();
1069 let len = cmp::min(buf.len(), src.len());
1070 buf[..len].copy_from_slice(&src[..len]);
1071 len
1072 };
1073
1074 unsafe {
1075 self.buf.bytes_written(len);
1076 }
1077
1078 len
1079 }
1080
1081 /// Write bytes from this buffer to `wrt`. Returns the number of bytes written or any errors.
1082 ///
1083 /// If the buffer is empty, returns `Ok(0)`.
1084 ///
1085 /// ### Panics
1086 /// If the count returned by `wrt.write()` would cause the head cursor to overflow or pass
1087 /// the tail cursor if added to it.
1088 pub fn write_to<W: Write + ?Sized>(&mut self, wrt: &mut W) -> io::Result<usize> {
1089 if self.len() == 0 {
1090 return Ok(0);
1091 }
1092
1093 let written = wrt.write(self.buf())?;
1094 self.consume(written);
1095 Ok(written)
1096 }
1097
1098 /// Write, at most, the given number of bytes from this buffer to `wrt`, continuing
1099 /// to write and ignoring interrupts until the number is reached or the buffer is empty.
1100 ///
1101 /// ### Panics
1102 /// If the count returned by `wrt.write()` would cause the head cursor to overflow or pass
1103 /// the tail cursor if added to it.
1104 pub fn write_max<W: Write + ?Sized>(&mut self, mut max: usize, wrt: &mut W) -> io::Result<()> {
1105 while self.len() > 0 && max > 0 {
1106 let len = cmp::min(self.len(), max);
1107 let n = match wrt.write(&self.buf()[..len]) {
1108 Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
1109 "Buffer::write_all() got zero-sized write")),
1110 Ok(n) => n,
1111 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
1112 Err(e) => return Err(e),
1113 };
1114
1115 self.consume(n);
1116 max = max.saturating_sub(n);
1117 }
1118
1119 Ok(())
1120 }
1121
1122 /// Write all bytes in this buffer to `wrt`, ignoring interrupts. Continues writing until
1123 /// the buffer is empty or an error is returned.
1124 ///
1125 /// ### Panics
1126 /// If `self.write_to(wrt)` panics.
1127 pub fn write_all<W: Write + ?Sized>(&mut self, wrt: &mut W) -> io::Result<()> {
1128 while self.len() > 0 {
1129 match self.write_to(wrt) {
1130 Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
1131 "Buffer::write_all() got zero-sized write")),
1132 Ok(_) => (),
1133 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
1134 Err(e) => return Err(e),
1135 }
1136 }
1137
1138 Ok(())
1139 }
1140
1141 /// Copy bytes to `out` from this buffer, returning the number of bytes written.
1142 pub fn copy_to_slice(&mut self, out: &mut [u8]) -> usize {
1143 let len = {
1144 let buf = self.buf();
1145
1146 let len = cmp::min(buf.len(), out.len());
1147 out[..len].copy_from_slice(&buf[..len]);
1148 len
1149 };
1150
1151 self.consume(len);
1152
1153 len
1154 }
1155
1156 /// Push `bytes` to the end of the buffer, growing it if necessary.
1157 ///
1158 /// If you prefer moving bytes down in the buffer to reallocating, you may wish to call
1159 /// `.make_room()` first.
1160 pub fn push_bytes(&mut self, bytes: &[u8]) {
1161 let s_len = bytes.len();
1162
1163 if self.usable_space() < s_len {
1164 self.reserve(s_len * 2);
1165 }
1166
1167 unsafe {
1168 self.buf.write_buf()[..s_len].copy_from_slice(bytes);
1169 self.buf.bytes_written(s_len);
1170 }
1171 }
1172
1173 /// Consume `amt` bytes from the head of this buffer.
1174 pub fn consume(&mut self, amt: usize) {
1175 self.buf.consume(amt);
1176 }
1177
1178 /// Empty this buffer by consuming all bytes.
1179 pub fn clear(&mut self) {
1180 let buf_len = self.len();
1181 self.consume(buf_len);
1182 }
1183}
1184
1185impl fmt::Debug for Buffer {
1186 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1187 f.debug_struct("buf_redux::Buffer")
1188 .field("capacity", &self.capacity())
1189 .field("len", &self.len())
1190 .finish()
1191 }
1192}
1193
1194/// A `Read` adapter for a consumed `BufReader` which will empty bytes from the buffer before
1195/// reading from `R` directly. Frees the buffer when it has been emptied.
1196pub struct Unbuffer<R> {
1197 inner: R,
1198 buf: Option<Buffer>,
1199}
1200
1201impl<R> Unbuffer<R> {
1202 /// Returns `true` if the buffer still has some bytes left, `false` otherwise.
1203 pub fn is_buf_empty(&self) -> bool {
1204 !self.buf.is_some()
1205 }
1206
1207 /// Returns the number of bytes remaining in the buffer.
1208 pub fn buf_len(&self) -> usize {
1209 self.buf.as_ref().map(Buffer::len).unwrap_or(0)
1210 }
1211
1212 /// Get a slice over the available bytes in the buffer.
1213 pub fn buf(&self) -> &[u8] {
1214 self.buf.as_ref().map_or(&[], Buffer::buf)
1215 }
1216
1217 /// Return the underlying reader, releasing the buffer.
1218 pub fn into_inner(self) -> R {
1219 self.inner
1220 }
1221}
1222
1223impl<R: Read> Read for Unbuffer<R> {
1224 fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
1225 if let Some(ref mut buf) = self.buf.as_mut() {
1226 let read = buf.copy_to_slice(out);
1227
1228 if out.len() != 0 && read != 0 {
1229 return Ok(read);
1230 }
1231 }
1232
1233 self.buf = None;
1234
1235 self.inner.read(out)
1236 }
1237}
1238
1239impl<R: fmt::Debug> fmt::Debug for Unbuffer<R> {
1240 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1241 fmt.debug_struct("buf_redux::Unbuffer")
1242 .field("reader", &self.inner)
1243 .field("buffer", &self.buf)
1244 .finish()
1245 }
1246}
1247
1248/// Copy data between a `BufRead` and a `Write` without an intermediate buffer.
1249///
1250/// Retries on interrupts. Returns the total bytes copied or the first error;
1251/// even if an error is returned some bytes may still have been copied.
1252pub fn copy_buf<B: BufRead, W: Write>(b: &mut B, w: &mut W) -> io::Result<u64> {
1253 let mut total_copied = 0;
1254
1255 loop {
1256 let copied = match b.fill_buf().and_then(|buf| w.write(buf)) {
1257 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
1258 Err(e) => return Err(e),
1259 Ok(buf) => buf,
1260 };
1261
1262 if copied == 0 { break; }
1263
1264 b.consume(copied);
1265
1266 total_copied += copied as u64;
1267 }
1268
1269 Ok(total_copied)
1270}
1271
1272thread_local!(
1273 static DROP_ERR_HANDLER: RefCell<Box<Fn(&mut Write, &mut Buffer, io::Error)>>
1274 = RefCell::new(Box::new(|_, _, _| ()))
1275);
1276
1277/// Set a thread-local handler for errors thrown in `BufWriter`'s `Drop` impl.
1278///
1279/// The `Write` impl, buffer (at the time of the erroring write) and IO error are provided.
1280///
1281/// Replaces the previous handler. By default this is a no-op.
1282///
1283/// ### Panics
1284/// If called from within a handler previously provided to this function.
1285pub fn set_drop_err_handler<F: 'static>(handler: F)
1286where F: Fn(&mut Write, &mut Buffer, io::Error)
1287{
1288 DROP_ERR_HANDLER.with(|deh| *deh.borrow_mut() = Box::new(handler))
1289}
1290
1291#[cfg(not(feature = "nightly"))]
1292fn init_buffer<R: Read + ?Sized>(_r: &R, buf: &mut [u8]) {
1293 // we can't trust a reader without nightly
1294 safemem::write_bytes(buf, 0);
1295}