tokio_util/io/
sync_bridge.rs

1use std::io::{BufRead, Read, Seek, Write};
2use tokio::io::{
3    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
4    AsyncWriteExt,
5};
6
7/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
8/// a [`tokio::io::AsyncWrite`] synchronously as a [`std::io::Write`].
9///
10/// # Alternatives
11///
12/// In many cases, there are better alternatives to using `SyncIoBridge`, especially
13/// if you want to avoid blocking the async runtime. Consider the following scenarios:
14///
15/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and
16/// might not fully leverage the async capabilities of the system.
17///
18/// ### Why It Matters:
19///
20/// `SyncIoBridge` allows you to use asynchronous I/O operations in an synchronous
21/// context by blocking the current thread. However, this can be inefficient because:
22/// - **Inefficient Resource Usage**: `SyncIoBridge` takes up an entire OS thread,
23///   which is inefficient compared to asynchronous code that can multiplex many
24///   tasks on a single thread.
25/// - **Thread Pool Saturation**: Excessive use of `SyncIoBridge` can exhaust the
26///   async runtime's thread pool, reducing the number of threads available for
27///   other tasks and impacting overall performance.
28/// - **Missed Concurrency Benefits**: By using synchronous operations with
29///   `SyncIoBridge`, you lose the ability to interleave tasks efficiently,
30///   which is a key advantage of asynchronous programming.
31///
32/// ## Example 1: Hashing Data
33///
34/// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can
35/// process the data asynchronously by reading it into memory, which avoids blocking
36/// the async runtime.
37///
38/// There are two strategies for avoiding `SyncIoBridge` when hashing data. When
39/// the data fits into memory, the easiest is to read the data into a `Vec<u8>`
40/// and hash it:
41///
42/// Explanation: This example demonstrates how to asynchronously read data from a
43/// reader into memory and hash it using a synchronous hashing function. The
44/// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked.
45/// ```rust
46/// use tokio::io::AsyncReadExt;
47/// use tokio::io::AsyncRead;
48/// use std::io::Cursor;
49/// # mod blake3 { pub fn hash(_: &[u8]) {} }
50///
51/// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
52///    // Read all data from the reader into a Vec<u8>.
53///    let mut data = Vec::new();
54///    reader.read_to_end(&mut data).await?;
55///
56///    // Hash the data using the blake3 hashing function.
57///    let hash = blake3::hash(&data);
58///
59///    Ok(hash)
60///}
61///
62/// #[tokio::main]
63/// async fn main() -> Result<(), std::io::Error> {
64///     // Example: In-memory data.
65///     let data = b"Hello, world!"; // A byte slice.
66///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
67///     hash_contents(reader).await
68/// }
69/// ```
70///
71/// When the data doesn't fit into memory, the hashing library will usually
72/// provide a `hasher` that you can repeatedly call `update` on to hash the data
73/// one chunk at the time.
74///
75/// Explanation: This example demonstrates how to asynchronously stream data in
76/// chunks for hashing. Each chunk is read asynchronously, and the hash is updated
77/// incrementally. This avoids blocking and improves performance over using
78/// `SyncIoBridge`.
79///
80/// ```rust
81/// use tokio::io::AsyncReadExt;
82/// use tokio::io::AsyncRead;
83/// use std::io::Cursor;
84/// # struct Hasher;
85/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
86///
87/// /// Asynchronously streams data from an async reader, processes it in chunks,
88/// /// and hashes the data incrementally.
89/// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
90///    // Create a buffer to read data into, sized for performance.
91///    let mut data = vec![0; 64 * 1024];
92///    loop {
93///        // Read data from the reader into the buffer.
94///        let len = reader.read(&mut data).await?;
95///        if len == 0 { break; } // Exit loop if no more data.
96///
97///        // Update the hash with the data read.
98///        hasher.update(&data[..len]);
99///    }
100///
101///    // Finalize the hash after all data has been processed.
102///    let hash = hasher.finalize();
103///
104///    Ok(hash)
105///}
106///
107/// #[tokio::main]
108/// async fn main() -> Result<(), std::io::Error> {
109///     // Example: In-memory data.
110///     let data = b"Hello, world!"; // A byte slice.
111///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
112///     let hasher = Hasher;
113///     hash_stream(reader, hasher).await
114/// }
115/// ```
116///
117///
118/// ## Example 2: Compressing Data
119///
120/// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces
121/// blocking and inefficient code. Instead, you can utilize an async compression library
122/// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/)
123/// crate, which is built to handle asynchronous data streams efficiently.
124///
125/// Explanation: This example shows how to asynchronously compress data using an
126/// async compression library. By reading and writing asynchronously, it avoids
127/// blocking and is more efficient than using `SyncIoBridge` with a non-async
128/// compression library.
129///
130/// ```ignore
131/// use async_compression::tokio::write::GzipEncoder;
132/// use std::io::Cursor;
133/// use tokio::io::AsyncRead;
134///
135/// /// Asynchronously compresses data from an async reader using Gzip and an async encoder.
136/// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
137///    let writer = tokio::io::sink();
138///
139///    // Create a Gzip encoder that wraps the writer.
140///    let mut encoder = GzipEncoder::new(writer);
141///
142///    // Copy data from the reader to the encoder, compressing it.
143///    tokio::io::copy(&mut reader, &mut encoder).await?;
144///
145///    Ok(())
146///}
147///
148/// #[tokio::main]
149/// async fn main() -> Result<(), std::io::Error> {
150///     // Example: In-memory data.
151///     let data = b"Hello, world!"; // A byte slice.
152///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
153///     compress_data(reader).await?;
154///
155///   Ok(())
156/// }
157/// ```
158///
159///
160/// ## Example 3: Parsing Data Formats
161///
162///
163/// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it
164/// blocks async operations. A more efficient approach is to read data asynchronously
165/// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead.
166///
167/// Explanation: This example shows how to asynchronously read data into memory
168/// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime
169/// remains unblocked, leading to better performance when working with asynchronous
170/// I/O streams.
171///
172/// ```rust,no_run
173/// use tokio::io::AsyncRead;
174/// use tokio::io::AsyncReadExt;
175/// use std::io::Cursor;
176/// # mod serde {
177/// #     pub trait DeserializeOwned: 'static {}
178/// #     impl<T: 'static> DeserializeOwned for T {}
179/// # }
180/// # mod serde_json {
181/// #     use super::serde::DeserializeOwned;
182/// #     pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
183/// #         unimplemented!()
184/// #     }
185/// # }
186/// # #[derive(Debug)] struct MyStruct;
187///
188///
189/// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
190///    // Read all data from the reader into a Vec<u8>.
191///    let mut data = Vec::new();
192///    reader.read_to_end(&mut data).await?;
193///
194///    // Deserialize the data from the Vec<u8> into a MyStruct instance.
195///    let value: MyStruct = serde_json::from_slice(&data)?;
196///
197///    Ok(value)
198///}
199///
200/// #[tokio::main]
201/// async fn main() -> Result<(), std::io::Error> {
202///     // Example: In-memory data.
203///     let data = b"Hello, world!"; // A byte slice.
204///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
205///     parse_json(reader).await?;
206///     Ok(())
207/// }
208/// ```
209///
210/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
211///
212/// `SyncIoBridge` is mainly useful when you need to interface with synchronous
213/// libraries from an asynchronous context.
214///
215/// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking`
216/// task to safely perform synchronous I/O without blocking the async runtime. The
217/// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated
218/// thread pool, preventing it from interfering with the async tasks.
219///
220/// ```rust
221/// use tokio::task::spawn_blocking;
222/// use tokio_util::io::SyncIoBridge;
223/// use tokio::io::AsyncRead;
224/// use std::marker::Unpin;
225/// use std::io::Cursor;
226///
227/// /// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
228/// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
229///    // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
230///    let mut sync_reader = SyncIoBridge::new(reader);
231///
232///    // Spawn a blocking task to perform synchronous I/O operations.
233///    let result = spawn_blocking(move || {
234///        // Create an in-memory buffer to hold the copied data.
235///        let mut buffer = Vec::new();
236///        // Copy data from the sync_reader to the buffer.
237///        std::io::copy(&mut sync_reader, &mut buffer)?;
238///        // Return the buffer containing the copied data.
239///        Ok::<_, std::io::Error>(buffer)
240///    })
241///    .await??;
242///
243///    // Return the result from the blocking task.
244///    Ok(result)
245///}
246///
247/// #[tokio::main]
248/// async fn main() -> Result<(), std::io::Error> {
249///     // Example: In-memory data.
250///     let data = b"Hello, world!"; // A byte slice.
251///     let reader = Cursor::new(data); // Create an in-memory AsyncRead.
252///     let result = process_sync_io(reader).await?;
253///
254///     // You can use `result` here as needed.
255///
256///     Ok(())
257/// }
258/// ```
259///
260#[derive(Debug)]
261pub struct SyncIoBridge<T> {
262    src: T,
263    rt: tokio::runtime::Handle,
264}
265
266impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T> {
267    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
268        let src = &mut self.src;
269        self.rt.block_on(AsyncBufReadExt::fill_buf(src))
270    }
271
272    fn consume(&mut self, amt: usize) {
273        let src = &mut self.src;
274        AsyncBufReadExt::consume(src, amt)
275    }
276
277    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
278        let src = &mut self.src;
279        self.rt
280            .block_on(AsyncBufReadExt::read_until(src, byte, buf))
281    }
282    fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
283        let src = &mut self.src;
284        self.rt.block_on(AsyncBufReadExt::read_line(src, buf))
285    }
286}
287
288impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> {
289    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
290        let src = &mut self.src;
291        self.rt.block_on(AsyncReadExt::read(src, buf))
292    }
293
294    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
295        let src = &mut self.src;
296        self.rt.block_on(src.read_to_end(buf))
297    }
298
299    fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
300        let src = &mut self.src;
301        self.rt.block_on(src.read_to_string(buf))
302    }
303
304    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
305        let src = &mut self.src;
306        // The AsyncRead trait returns the count, synchronous doesn't.
307        let _n = self.rt.block_on(src.read_exact(buf))?;
308        Ok(())
309    }
310}
311
312impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> {
313    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
314        let src = &mut self.src;
315        self.rt.block_on(src.write(buf))
316    }
317
318    fn flush(&mut self) -> std::io::Result<()> {
319        let src = &mut self.src;
320        self.rt.block_on(src.flush())
321    }
322
323    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
324        let src = &mut self.src;
325        self.rt.block_on(src.write_all(buf))
326    }
327
328    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
329        let src = &mut self.src;
330        self.rt.block_on(src.write_vectored(bufs))
331    }
332}
333
334impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T> {
335    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
336        let src = &mut self.src;
337        self.rt.block_on(AsyncSeekExt::seek(src, pos))
338    }
339}
340
341// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
342// of this writing still unstable, we expose this as part of a standalone method.
343impl<T: AsyncWrite> SyncIoBridge<T> {
344    /// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
345    ///
346    /// See [`tokio::io::AsyncWrite::is_write_vectored`].
347    pub fn is_write_vectored(&self) -> bool {
348        self.src.is_write_vectored()
349    }
350}
351
352impl<T: AsyncWrite + Unpin> SyncIoBridge<T> {
353    /// Shutdown this writer. This method provides a way to call the [`AsyncWriteExt::shutdown`]
354    /// function of the inner [`tokio::io::AsyncWrite`] instance.
355    ///
356    /// # Errors
357    ///
358    /// This method returns the same errors as [`AsyncWriteExt::shutdown`].
359    ///
360    /// [`AsyncWriteExt::shutdown`]: tokio::io::AsyncWriteExt::shutdown
361    pub fn shutdown(&mut self) -> std::io::Result<()> {
362        let src = &mut self.src;
363        self.rt.block_on(src.shutdown())
364    }
365}
366
367impl<T: Unpin> SyncIoBridge<T> {
368    /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
369    /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
370    ///
371    /// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
372    /// It is hence OK to move this struct into a separate thread outside the runtime, as created
373    /// by e.g. [`tokio::task::spawn_blocking`].
374    ///
375    /// Stated even more strongly: to make use of this bridge, you *must* move
376    /// it into a separate thread outside the runtime.  The synchronous I/O will use the
377    /// underlying handle to block on the backing asynchronous source, via
378    /// [`tokio::runtime::Handle::block_on`].  As noted in the documentation for that
379    /// function, an attempt to `block_on` from an asynchronous execution context
380    /// will panic.
381    ///
382    /// # Wrapping `!Unpin` types
383    ///
384    /// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
385    ///
386    /// # Panics
387    ///
388    /// This will panic if called outside the context of a Tokio runtime.
389    #[track_caller]
390    pub fn new(src: T) -> Self {
391        Self::new_with_handle(src, tokio::runtime::Handle::current())
392    }
393
394    /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
395    /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
396    ///
397    /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
398    /// be initially invoked outside of an asynchronous context.
399    pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
400        Self { src, rt }
401    }
402
403    /// Consume this bridge, returning the underlying stream.
404    pub fn into_inner(self) -> T {
405        self.src
406    }
407}
408
409impl<T> AsMut<T> for SyncIoBridge<T> {
410    fn as_mut(&mut self) -> &mut T {
411        &mut self.src
412    }
413}
414
415impl<T> AsRef<T> for SyncIoBridge<T> {
416    fn as_ref(&self) -> &T {
417        &self.src
418    }
419}