Struct SyncIoBridge

Source
pub struct SyncIoBridge<T> { /* private fields */ }
Available on crate features io-util and io only.
Expand description

Use a tokio::io::AsyncRead synchronously as a std::io::Read or a tokio::io::AsyncWrite synchronously as a std::io::Write.

§Alternatives

In many cases, there are better alternatives to using SyncIoBridge, especially if you want to avoid blocking the async runtime. Consider the following scenarios:

When hashing data, using SyncIoBridge can lead to suboptimal performance and might not fully leverage the async capabilities of the system.

§Why It Matters:

SyncIoBridge allows you to use asynchronous I/O operations in an synchronous context by blocking the current thread. However, this can be inefficient because:

  • Inefficient Resource Usage: SyncIoBridge takes up an entire OS thread, which is inefficient compared to asynchronous code that can multiplex many tasks on a single thread.
  • Thread Pool Saturation: Excessive use of SyncIoBridge can exhaust the async runtime’s thread pool, reducing the number of threads available for other tasks and impacting overall performance.
  • Missed Concurrency Benefits: By using synchronous operations with SyncIoBridge, you lose the ability to interleave tasks efficiently, which is a key advantage of asynchronous programming.

§Example 1: Hashing Data

The use of SyncIoBridge is unnecessary when hashing data. Instead, you can process the data asynchronously by reading it into memory, which avoids blocking the async runtime.

There are two strategies for avoiding SyncIoBridge when hashing data. When the data fits into memory, the easiest is to read the data into a Vec<u8> and hash it:

Explanation: This example demonstrates how to asynchronously read data from a reader into memory and hash it using a synchronous hashing function. The SyncIoBridge is avoided, ensuring that the async runtime is not blocked.

use tokio::io::AsyncReadExt;
use tokio::io::AsyncRead;
use std::io::Cursor;

async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
   // Read all data from the reader into a Vec<u8>.
   let mut data = Vec::new();
   reader.read_to_end(&mut data).await?;

   // Hash the data using the blake3 hashing function.
   let hash = blake3::hash(&data);

   Ok(hash)
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // Example: In-memory data.
    let data = b"Hello, world!"; // A byte slice.
    let reader = Cursor::new(data); // Create an in-memory AsyncRead.
    hash_contents(reader).await
}

When the data doesn’t fit into memory, the hashing library will usually provide a hasher that you can repeatedly call update on to hash the data one chunk at the time.

Explanation: This example demonstrates how to asynchronously stream data in chunks for hashing. Each chunk is read asynchronously, and the hash is updated incrementally. This avoids blocking and improves performance over using SyncIoBridge.

use tokio::io::AsyncReadExt;
use tokio::io::AsyncRead;
use std::io::Cursor;

/// Asynchronously streams data from an async reader, processes it in chunks,
/// and hashes the data incrementally.
async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
   // Create a buffer to read data into, sized for performance.
   let mut data = vec![0; 64 * 1024];
   loop {
       // Read data from the reader into the buffer.
       let len = reader.read(&mut data).await?;
       if len == 0 { break; } // Exit loop if no more data.

       // Update the hash with the data read.
       hasher.update(&data[..len]);
   }

   // Finalize the hash after all data has been processed.
   let hash = hasher.finalize();

   Ok(hash)
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // Example: In-memory data.
    let data = b"Hello, world!"; // A byte slice.
    let reader = Cursor::new(data); // Create an in-memory AsyncRead.
    let hasher = Hasher;
    hash_stream(reader, hasher).await
}

§Example 2: Compressing Data

When compressing data, the use of SyncIoBridge is unnecessary as it introduces blocking and inefficient code. Instead, you can utilize an async compression library such as the async-compression crate, which is built to handle asynchronous data streams efficiently.

Explanation: This example shows how to asynchronously compress data using an async compression library. By reading and writing asynchronously, it avoids blocking and is more efficient than using SyncIoBridge with a non-async compression library.

use async_compression::tokio::write::GzipEncoder;
use std::io::Cursor;
use tokio::io::AsyncRead;

/// Asynchronously compresses data from an async reader using Gzip and an async encoder.
async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
   let writer = tokio::io::sink();

   // Create a Gzip encoder that wraps the writer.
   let mut encoder = GzipEncoder::new(writer);

   // Copy data from the reader to the encoder, compressing it.
   tokio::io::copy(&mut reader, &mut encoder).await?;

   Ok(())
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // Example: In-memory data.
    let data = b"Hello, world!"; // A byte slice.
    let reader = Cursor::new(data); // Create an in-memory AsyncRead.
    compress_data(reader).await?;

  Ok(())
}

§Example 3: Parsing Data Formats

SyncIoBridge is not ideal when parsing data formats such as JSON, as it blocks async operations. A more efficient approach is to read data asynchronously into memory and then deserialize it, avoiding unnecessary synchronization overhead.

Explanation: This example shows how to asynchronously read data into memory and then parse it as JSON. By avoiding SyncIoBridge, the asynchronous runtime remains unblocked, leading to better performance when working with asynchronous I/O streams.

use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use std::io::Cursor;


async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
   // Read all data from the reader into a Vec<u8>.
   let mut data = Vec::new();
   reader.read_to_end(&mut data).await?;

   // Deserialize the data from the Vec<u8> into a MyStruct instance.
   let value: MyStruct = serde_json::from_slice(&data)?;

   Ok(value)
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // Example: In-memory data.
    let data = b"Hello, world!"; // A byte slice.
    let reader = Cursor::new(data); // Create an in-memory AsyncRead.
    parse_json(reader).await?;
    Ok(())
}

§Correct Usage of SyncIoBridge inside spawn_blocking

SyncIoBridge is mainly useful when you need to interface with synchronous libraries from an asynchronous context.

Explanation: This example shows how to use SyncIoBridge inside a spawn_blocking task to safely perform synchronous I/O without blocking the async runtime. The spawn_blocking ensures that the synchronous code is offloaded to a dedicated thread pool, preventing it from interfering with the async tasks.

use tokio::task::spawn_blocking;
use tokio_util::io::SyncIoBridge;
use tokio::io::AsyncRead;
use std::marker::Unpin;
use std::io::Cursor;

/// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
   // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
   let mut sync_reader = SyncIoBridge::new(reader);

   // Spawn a blocking task to perform synchronous I/O operations.
   let result = spawn_blocking(move || {
       // Create an in-memory buffer to hold the copied data.
       let mut buffer = Vec::new();
       // Copy data from the sync_reader to the buffer.
       std::io::copy(&mut sync_reader, &mut buffer)?;
       // Return the buffer containing the copied data.
       Ok::<_, std::io::Error>(buffer)
   })
   .await??;

   // Return the result from the blocking task.
   Ok(result)
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // Example: In-memory data.
    let data = b"Hello, world!"; // A byte slice.
    let reader = Cursor::new(data); // Create an in-memory AsyncRead.
    let result = process_sync_io(reader).await?;

    // You can use `result` here as needed.

    Ok(())
}

Implementations§

Source§

impl<T: AsyncWrite> SyncIoBridge<T>

Source

pub fn is_write_vectored(&self) -> bool

Determines if the underlying tokio::io::AsyncWrite target supports efficient vectored writes.

See tokio::io::AsyncWrite::is_write_vectored.

Source§

impl<T: AsyncWrite + Unpin> SyncIoBridge<T>

Source

pub fn shutdown(&mut self) -> Result<()>

Shutdown this writer. This method provides a way to call the AsyncWriteExt::shutdown function of the inner tokio::io::AsyncWrite instance.

§Errors

This method returns the same errors as AsyncWriteExt::shutdown.

Source§

impl<T: Unpin> SyncIoBridge<T>

Source

pub fn new(src: T) -> Self

Use a tokio::io::AsyncRead synchronously as a std::io::Read or a tokio::io::AsyncWrite as a std::io::Write.

When this struct is created, it captures a handle to the current thread’s runtime with tokio::runtime::Handle::current. It is hence OK to move this struct into a separate thread outside the runtime, as created by e.g. tokio::task::spawn_blocking.

Stated even more strongly: to make use of this bridge, you must move it into a separate thread outside the runtime. The synchronous I/O will use the underlying handle to block on the backing asynchronous source, via tokio::runtime::Handle::block_on. As noted in the documentation for that function, an attempt to block_on from an asynchronous execution context will panic.

§Wrapping !Unpin types

Use e.g. SyncIoBridge::new(Box::pin(src)).

§Panics

This will panic if called outside the context of a Tokio runtime.

Source

pub fn new_with_handle(src: T, rt: Handle) -> Self

Use a tokio::io::AsyncRead synchronously as a std::io::Read or a tokio::io::AsyncWrite as a std::io::Write.

This is the same as SyncIoBridge::new, but allows passing an arbitrary handle and hence may be initially invoked outside of an asynchronous context.

Source

pub fn into_inner(self) -> T

Consume this bridge, returning the underlying stream.

Trait Implementations§

Source§

impl<T> AsMut<T> for SyncIoBridge<T>

Source§

fn as_mut(&mut self) -> &mut T

Converts this type into a mutable reference of the (usually inferred) input type.
Source§

impl<T> AsRef<T> for SyncIoBridge<T>

Source§

fn as_ref(&self) -> &T

Converts this type into a shared reference of the (usually inferred) input type.
Source§

impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T>

Source§

fn fill_buf(&mut self) -> Result<&[u8]>

Returns the contents of the internal buffer, filling it with more data from the inner reader if it is empty. Read more
Source§

fn consume(&mut self, amt: usize)

Tells this buffer that amt bytes have been consumed from the buffer, so they should no longer be returned in calls to read. Read more
Source§

fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> Result<usize>

Reads all bytes into buf until the delimiter byte or EOF is reached. Read more
Source§

fn read_line(&mut self, buf: &mut String) -> Result<usize>

Reads all bytes until a newline (the 0xA byte) is reached, and append them to the provided String buffer. Read more
Source§

fn has_data_left(&mut self) -> Result<bool, Error>

🔬This is a nightly-only experimental API. (buf_read_has_data_left)
Checks if the underlying Read has any data left to be read. Read more
1.83.0 · Source§

fn skip_until(&mut self, byte: u8) -> Result<usize, Error>

Skips all bytes until the delimiter byte or EOF is reached. Read more
1.0.0 · Source§

fn split(self, byte: u8) -> Split<Self>
where Self: Sized,

Returns an iterator over the contents of this reader split on the byte byte. Read more
1.0.0 · Source§

fn lines(self) -> Lines<Self>
where Self: Sized,

Returns an iterator over the lines of this reader. Read more
Source§

impl<T: Debug> Debug for SyncIoBridge<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T>

Source§

fn read(&mut self, buf: &mut [u8]) -> Result<usize>

Pull some bytes from this source into the specified buffer, returning how many bytes were read. Read more
Source§

fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize>

Reads all bytes until EOF in this source, placing them into buf. Read more
Source§

fn read_to_string(&mut self, buf: &mut String) -> Result<usize>

Reads all bytes until EOF in this source, appending them to buf. Read more
Source§

fn read_exact(&mut self, buf: &mut [u8]) -> Result<()>

Reads the exact number of bytes required to fill buf. Read more
1.36.0 · Source§

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>

Like read, except that it reads into a slice of buffers. Read more
Source§

fn is_read_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Reader has an efficient read_vectored implementation. Read more
Source§

fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Pull some bytes from this source into the specified buffer. Read more
Source§

fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Reads the exact number of bytes required to fill cursor. Read more
1.0.0 · Source§

fn by_ref(&mut self) -> &mut Self
where Self: Sized,

Creates a “by reference” adaptor for this instance of Read. Read more
1.0.0 · Source§

fn bytes(self) -> Bytes<Self>
where Self: Sized,

Transforms this Read instance to an Iterator over its bytes. Read more
1.0.0 · Source§

fn chain<R>(self, next: R) -> Chain<Self, R>
where R: Read, Self: Sized,

Creates an adapter which will chain this stream with another. Read more
1.0.0 · Source§

fn take(self, limit: u64) -> Take<Self>
where Self: Sized,

Creates an adapter which will read at most limit bytes from it. Read more
Source§

impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T>

Source§

fn seek(&mut self, pos: SeekFrom) -> Result<u64>

Seek to an offset, in bytes, in a stream. Read more
1.55.0 · Source§

fn rewind(&mut self) -> Result<(), Error>

Rewind to the beginning of a stream. Read more
Source§

fn stream_len(&mut self) -> Result<u64, Error>

🔬This is a nightly-only experimental API. (seek_stream_len)
Returns the length of this stream (in bytes). Read more
1.51.0 · Source§

fn stream_position(&mut self) -> Result<u64, Error>

Returns the current seek position from the start of the stream. Read more
1.80.0 · Source§

fn seek_relative(&mut self, offset: i64) -> Result<(), Error>

Seeks relative to the current position. Read more
Source§

impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T>

Source§

fn write(&mut self, buf: &[u8]) -> Result<usize>

Writes a buffer into this writer, returning how many bytes were written. Read more
Source§

fn flush(&mut self) -> Result<()>

Flushes this output stream, ensuring that all intermediately buffered contents reach their destination. Read more
Source§

fn write_all(&mut self, buf: &[u8]) -> Result<()>

Attempts to write an entire buffer into this writer. Read more
Source§

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize>

Like write, except that it writes from a slice of buffers. Read more
Source§

fn is_write_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Writer has an efficient write_vectored implementation. Read more
Source§

fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>

🔬This is a nightly-only experimental API. (write_all_vectored)
Attempts to write multiple buffers into this writer. Read more
1.0.0 · Source§

fn write_fmt(&mut self, fmt: Arguments<'_>) -> Result<(), Error>

Writes a formatted string into this writer, returning any error encountered. Read more
1.0.0 · Source§

fn by_ref(&mut self) -> &mut Self
where Self: Sized,

Creates a “by reference” adapter for this instance of Write. Read more

Auto Trait Implementations§

§

impl<T> Freeze for SyncIoBridge<T>
where T: Freeze,

§

impl<T> !RefUnwindSafe for SyncIoBridge<T>

§

impl<T> Send for SyncIoBridge<T>
where T: Send,

§

impl<T> Sync for SyncIoBridge<T>
where T: Sync,

§

impl<T> Unpin for SyncIoBridge<T>
where T: Unpin,

§

impl<T> !UnwindSafe for SyncIoBridge<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more