pub struct SyncIoBridge<T> { /* private fields */ }
io-util
and io
only.Expand description
Use a tokio::io::AsyncRead
synchronously as a std::io::Read
or
a tokio::io::AsyncWrite
synchronously as a std::io::Write
.
§Alternatives
In many cases, there are better alternatives to using SyncIoBridge
, especially
if you want to avoid blocking the async runtime. Consider the following scenarios:
When hashing data, using SyncIoBridge
can lead to suboptimal performance and
might not fully leverage the async capabilities of the system.
§Why It Matters:
SyncIoBridge
allows you to use asynchronous I/O operations in an synchronous
context by blocking the current thread. However, this can be inefficient because:
- Inefficient Resource Usage:
SyncIoBridge
takes up an entire OS thread, which is inefficient compared to asynchronous code that can multiplex many tasks on a single thread. - Thread Pool Saturation: Excessive use of
SyncIoBridge
can exhaust the async runtime’s thread pool, reducing the number of threads available for other tasks and impacting overall performance. - Missed Concurrency Benefits: By using synchronous operations with
SyncIoBridge
, you lose the ability to interleave tasks efficiently, which is a key advantage of asynchronous programming.
§Example 1: Hashing Data
The use of SyncIoBridge
is unnecessary when hashing data. Instead, you can
process the data asynchronously by reading it into memory, which avoids blocking
the async runtime.
There are two strategies for avoiding SyncIoBridge
when hashing data. When
the data fits into memory, the easiest is to read the data into a Vec<u8>
and hash it:
Explanation: This example demonstrates how to asynchronously read data from a
reader into memory and hash it using a synchronous hashing function. The
SyncIoBridge
is avoided, ensuring that the async runtime is not blocked.
use tokio::io::AsyncReadExt;
use tokio::io::AsyncRead;
use std::io::Cursor;
async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
// Read all data from the reader into a Vec<u8>.
let mut data = Vec::new();
reader.read_to_end(&mut data).await?;
// Hash the data using the blake3 hashing function.
let hash = blake3::hash(&data);
Ok(hash)
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Example: In-memory data.
let data = b"Hello, world!"; // A byte slice.
let reader = Cursor::new(data); // Create an in-memory AsyncRead.
hash_contents(reader).await
}
When the data doesn’t fit into memory, the hashing library will usually
provide a hasher
that you can repeatedly call update
on to hash the data
one chunk at the time.
Explanation: This example demonstrates how to asynchronously stream data in
chunks for hashing. Each chunk is read asynchronously, and the hash is updated
incrementally. This avoids blocking and improves performance over using
SyncIoBridge
.
use tokio::io::AsyncReadExt;
use tokio::io::AsyncRead;
use std::io::Cursor;
/// Asynchronously streams data from an async reader, processes it in chunks,
/// and hashes the data incrementally.
async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
// Create a buffer to read data into, sized for performance.
let mut data = vec![0; 64 * 1024];
loop {
// Read data from the reader into the buffer.
let len = reader.read(&mut data).await?;
if len == 0 { break; } // Exit loop if no more data.
// Update the hash with the data read.
hasher.update(&data[..len]);
}
// Finalize the hash after all data has been processed.
let hash = hasher.finalize();
Ok(hash)
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Example: In-memory data.
let data = b"Hello, world!"; // A byte slice.
let reader = Cursor::new(data); // Create an in-memory AsyncRead.
let hasher = Hasher;
hash_stream(reader, hasher).await
}
§Example 2: Compressing Data
When compressing data, the use of SyncIoBridge
is unnecessary as it introduces
blocking and inefficient code. Instead, you can utilize an async compression library
such as the async-compression
crate, which is built to handle asynchronous data streams efficiently.
Explanation: This example shows how to asynchronously compress data using an
async compression library. By reading and writing asynchronously, it avoids
blocking and is more efficient than using SyncIoBridge
with a non-async
compression library.
use async_compression::tokio::write::GzipEncoder;
use std::io::Cursor;
use tokio::io::AsyncRead;
/// Asynchronously compresses data from an async reader using Gzip and an async encoder.
async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
let writer = tokio::io::sink();
// Create a Gzip encoder that wraps the writer.
let mut encoder = GzipEncoder::new(writer);
// Copy data from the reader to the encoder, compressing it.
tokio::io::copy(&mut reader, &mut encoder).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Example: In-memory data.
let data = b"Hello, world!"; // A byte slice.
let reader = Cursor::new(data); // Create an in-memory AsyncRead.
compress_data(reader).await?;
Ok(())
}
§Example 3: Parsing Data Formats
SyncIoBridge
is not ideal when parsing data formats such as JSON
, as it
blocks async operations. A more efficient approach is to read data asynchronously
into memory and then deserialize
it, avoiding unnecessary synchronization overhead.
Explanation: This example shows how to asynchronously read data into memory
and then parse it as JSON
. By avoiding SyncIoBridge
, the asynchronous runtime
remains unblocked, leading to better performance when working with asynchronous
I/O streams.
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use std::io::Cursor;
async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
// Read all data from the reader into a Vec<u8>.
let mut data = Vec::new();
reader.read_to_end(&mut data).await?;
// Deserialize the data from the Vec<u8> into a MyStruct instance.
let value: MyStruct = serde_json::from_slice(&data)?;
Ok(value)
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Example: In-memory data.
let data = b"Hello, world!"; // A byte slice.
let reader = Cursor::new(data); // Create an in-memory AsyncRead.
parse_json(reader).await?;
Ok(())
}
§Correct Usage of SyncIoBridge
inside spawn_blocking
SyncIoBridge
is mainly useful when you need to interface with synchronous
libraries from an asynchronous context.
Explanation: This example shows how to use SyncIoBridge
inside a spawn_blocking
task to safely perform synchronous I/O without blocking the async runtime. The
spawn_blocking
ensures that the synchronous code is offloaded to a dedicated
thread pool, preventing it from interfering with the async tasks.
use tokio::task::spawn_blocking;
use tokio_util::io::SyncIoBridge;
use tokio::io::AsyncRead;
use std::marker::Unpin;
use std::io::Cursor;
/// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
// Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
let mut sync_reader = SyncIoBridge::new(reader);
// Spawn a blocking task to perform synchronous I/O operations.
let result = spawn_blocking(move || {
// Create an in-memory buffer to hold the copied data.
let mut buffer = Vec::new();
// Copy data from the sync_reader to the buffer.
std::io::copy(&mut sync_reader, &mut buffer)?;
// Return the buffer containing the copied data.
Ok::<_, std::io::Error>(buffer)
})
.await??;
// Return the result from the blocking task.
Ok(result)
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Example: In-memory data.
let data = b"Hello, world!"; // A byte slice.
let reader = Cursor::new(data); // Create an in-memory AsyncRead.
let result = process_sync_io(reader).await?;
// You can use `result` here as needed.
Ok(())
}
Implementations§
Source§impl<T: AsyncWrite> SyncIoBridge<T>
impl<T: AsyncWrite> SyncIoBridge<T>
Sourcepub fn is_write_vectored(&self) -> bool
pub fn is_write_vectored(&self) -> bool
Determines if the underlying tokio::io::AsyncWrite
target supports efficient vectored writes.
Source§impl<T: AsyncWrite + Unpin> SyncIoBridge<T>
impl<T: AsyncWrite + Unpin> SyncIoBridge<T>
Sourcepub fn shutdown(&mut self) -> Result<()>
pub fn shutdown(&mut self) -> Result<()>
Shutdown this writer. This method provides a way to call the AsyncWriteExt::shutdown
function of the inner tokio::io::AsyncWrite
instance.
§Errors
This method returns the same errors as AsyncWriteExt::shutdown
.
Source§impl<T: Unpin> SyncIoBridge<T>
impl<T: Unpin> SyncIoBridge<T>
Sourcepub fn new(src: T) -> Self
pub fn new(src: T) -> Self
Use a tokio::io::AsyncRead
synchronously as a std::io::Read
or
a tokio::io::AsyncWrite
as a std::io::Write
.
When this struct is created, it captures a handle to the current thread’s runtime with tokio::runtime::Handle::current
.
It is hence OK to move this struct into a separate thread outside the runtime, as created
by e.g. tokio::task::spawn_blocking
.
Stated even more strongly: to make use of this bridge, you must move
it into a separate thread outside the runtime. The synchronous I/O will use the
underlying handle to block on the backing asynchronous source, via
tokio::runtime::Handle::block_on
. As noted in the documentation for that
function, an attempt to block_on
from an asynchronous execution context
will panic.
§Wrapping !Unpin
types
Use e.g. SyncIoBridge::new(Box::pin(src))
.
§Panics
This will panic if called outside the context of a Tokio runtime.
Sourcepub fn new_with_handle(src: T, rt: Handle) -> Self
pub fn new_with_handle(src: T, rt: Handle) -> Self
Use a tokio::io::AsyncRead
synchronously as a std::io::Read
or
a tokio::io::AsyncWrite
as a std::io::Write
.
This is the same as SyncIoBridge::new
, but allows passing an arbitrary handle and hence may
be initially invoked outside of an asynchronous context.
Sourcepub fn into_inner(self) -> T
pub fn into_inner(self) -> T
Consume this bridge, returning the underlying stream.
Trait Implementations§
Source§impl<T> AsMut<T> for SyncIoBridge<T>
impl<T> AsMut<T> for SyncIoBridge<T>
Source§impl<T> AsRef<T> for SyncIoBridge<T>
impl<T> AsRef<T> for SyncIoBridge<T>
Source§impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T>
impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T>
Source§fn fill_buf(&mut self) -> Result<&[u8]>
fn fill_buf(&mut self) -> Result<&[u8]>
Source§fn consume(&mut self, amt: usize)
fn consume(&mut self, amt: usize)
amt
bytes have been consumed from the buffer,
so they should no longer be returned in calls to read
. Read moreSource§fn read_line(&mut self, buf: &mut String) -> Result<usize>
fn read_line(&mut self, buf: &mut String) -> Result<usize>
0xA
byte) is reached, and append
them to the provided String
buffer. Read moreSource§fn has_data_left(&mut self) -> Result<bool, Error>
fn has_data_left(&mut self) -> Result<bool, Error>
buf_read_has_data_left
)Read
has any data left to be read. Read more1.83.0 · Source§fn skip_until(&mut self, byte: u8) -> Result<usize, Error>
fn skip_until(&mut self, byte: u8) -> Result<usize, Error>
byte
or EOF is reached. Read moreSource§impl<T: Debug> Debug for SyncIoBridge<T>
impl<T: Debug> Debug for SyncIoBridge<T>
Source§impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T>
impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T>
Source§fn read(&mut self, buf: &mut [u8]) -> Result<usize>
fn read(&mut self, buf: &mut [u8]) -> Result<usize>
Source§fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize>
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize>
buf
. Read moreSource§fn read_to_string(&mut self, buf: &mut String) -> Result<usize>
fn read_to_string(&mut self, buf: &mut String) -> Result<usize>
buf
. Read moreSource§fn read_exact(&mut self, buf: &mut [u8]) -> Result<()>
fn read_exact(&mut self, buf: &mut [u8]) -> Result<()>
buf
. Read more1.36.0 · Source§fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
read
, except that it reads into a slice of buffers. Read moreSource§fn is_read_vectored(&self) -> bool
fn is_read_vectored(&self) -> bool
can_vector
)Source§fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)Source§fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)cursor
. Read more1.0.0 · Source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
Read
. Read moreSource§impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T>
impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T>
Source§fn seek(&mut self, pos: SeekFrom) -> Result<u64>
fn seek(&mut self, pos: SeekFrom) -> Result<u64>
1.55.0 · Source§fn rewind(&mut self) -> Result<(), Error>
fn rewind(&mut self) -> Result<(), Error>
Source§fn stream_len(&mut self) -> Result<u64, Error>
fn stream_len(&mut self) -> Result<u64, Error>
seek_stream_len
)Source§impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T>
impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T>
Source§fn write(&mut self, buf: &[u8]) -> Result<usize>
fn write(&mut self, buf: &[u8]) -> Result<usize>
Source§fn flush(&mut self) -> Result<()>
fn flush(&mut self) -> Result<()>
Source§fn write_all(&mut self, buf: &[u8]) -> Result<()>
fn write_all(&mut self, buf: &[u8]) -> Result<()>
Source§fn is_write_vectored(&self) -> bool
fn is_write_vectored(&self) -> bool
can_vector
)Source§fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
write_all_vectored
)