tokio_util/
compat.rs

1//! Compatibility between the `tokio::io` and `futures-io` versions of the
2//! `AsyncRead` and `AsyncWrite` traits.
3//!
4//! ## Bridging Tokio and Futures I/O with `compat()`
5//!
6//! The [`compat()`] function provides a compatibility layer that allows types implementing
7//! [`tokio::io::AsyncRead`] or [`tokio::io::AsyncWrite`] to be used as their
8//! [`futures::io::AsyncRead`] or [`futures::io::AsyncWrite`] counterparts — and vice versa.
9//!
10//! This is especially useful when working with libraries that expect I/O types from one ecosystem
11//! (usually `futures`) but you are using types from the other (usually `tokio`).
12//!
13//! ## Compatibility Overview
14//!
15//! | Inner Type Implements...    | `Compat<T>` Implements...   |
16//! |-----------------------------|-----------------------------|
17//! | [`tokio::io::AsyncRead`]    | [`futures::io::AsyncRead`]  |
18//! | [`futures::io::AsyncRead`]  | [`tokio::io::AsyncRead`]    |
19//! | [`tokio::io::AsyncWrite`]   | [`futures::io::AsyncWrite`] |
20//! | [`futures::io::AsyncWrite`] | [`tokio::io::AsyncWrite`]   |
21//!
22//! ## Feature Flag
23//!
24//! This functionality is available through the `compat` feature flag:
25//!
26//! ```toml
27//! tokio-util = { version = "...", features = ["compat"] }
28//! ```
29//!
30//! ## Example 1: Tokio -> Futures (`AsyncRead`)
31//!
32//! This example demonstrates sending data over a [`tokio::net::TcpStream`] and using
33//! [`futures::io::AsyncReadExt::read`] from the `futures` crate to read it after adapting the
34//! stream via [`compat()`].
35//!
36//! ```no_run
37//! use tokio::net::{TcpListener, TcpStream};
38//! use tokio::io::AsyncWriteExt;
39//! use tokio_util::compat::TokioAsyncReadCompatExt;
40//! use futures::io::AsyncReadExt;
41//!
42//! #[tokio::main]
43//! async fn main() -> std::io::Result<()> {
44//!     let listener = TcpListener::bind("127.0.0.1:8081").await?;
45//!
46//!     tokio::spawn(async {
47//!         let mut client = TcpStream::connect("127.0.0.1:8081").await.unwrap();
48//!         client.write_all(b"Hello World").await.unwrap();
49//!     });
50//!
51//!     let (stream, _) = listener.accept().await?;
52//!
53//!     // Adapt `tokio::TcpStream` to be used with `futures::io::AsyncReadExt`
54//!     let mut compat_stream = stream.compat();
55//!     let mut buffer = [0; 20];
56//!     let n = compat_stream.read(&mut buffer).await?;
57//!     println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
58//!
59//!     Ok(())
60//! }
61//! ```
62//!
63//! ## Example 2: Futures -> Tokio (`AsyncRead`)
64//!
65//! The reverse is also possible: you can take a [`futures::io::AsyncRead`] (e.g. a cursor) and
66//! adapt it to be used with [`tokio::io::AsyncReadExt::read_to_end`]
67//!
68//! ```
69//! use futures::io::Cursor;
70//! use tokio_util::compat::FuturesAsyncReadCompatExt;
71//! use tokio::io::AsyncReadExt;
72//!
73//! fn main() {
74//!     let future = async {
75//!         let reader = Cursor::new(b"Hello from futures");
76//!         let mut compat_reader = reader.compat();
77//!         let mut buf = Vec::new();
78//!         compat_reader.read_to_end(&mut buf).await.unwrap();
79//!         assert_eq!(&buf, b"Hello from futures");
80//!     };
81//!
82//!     // Run the future inside a Tokio runtime
83//!     tokio::runtime::Runtime::new().unwrap().block_on(future);
84//! }
85//! ```
86//!
87//! ## Common Use Cases
88//!
89//! - Using `tokio` sockets with `async-tungstenite`, `async-compression`, or `futures-rs`-based
90//!   libraries.
91//! - Bridging I/O interfaces between mixed-ecosystem libraries.
92//! - Avoiding rewrites or duplication of I/O code in async environments.
93//!
94//! ## See Also
95//!
96//! - [`Compat`] type
97//! - [`TokioAsyncReadCompatExt`]
98//! - [`FuturesAsyncReadCompatExt`]
99//! - [`tokio::io`]
100//! - [`futures::io`]
101//!
102//! [`futures::io`]: https://docs.rs/futures/latest/futures/io/
103//! [`futures::io::AsyncRead`]: https://docs.rs/futures/latest/futures/io/trait.AsyncRead.html
104//! [`futures::io::AsyncWrite`]: https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html
105//! [`futures::io::AsyncReadExt::read`]: https://docs.rs/futures/latest/futures/io/trait.AsyncReadExt.html#method.read
106//! [`compat()`]: TokioAsyncReadCompatExt::compat
107
108use pin_project_lite::pin_project;
109use std::io;
110use std::pin::Pin;
111use std::task::{ready, Context, Poll};
112
113pin_project! {
114    /// A compatibility layer that allows conversion between the
115    /// `tokio::io` and `futures-io` `AsyncRead` and `AsyncWrite` traits.
116    #[derive(Copy, Clone, Debug)]
117    pub struct Compat<T> {
118        #[pin]
119        inner: T,
120        seek_pos: Option<io::SeekFrom>,
121    }
122}
123
124/// Extension trait that allows converting a type implementing
125/// `futures_io::AsyncRead` to implement `tokio::io::AsyncRead`.
126pub trait FuturesAsyncReadCompatExt: futures_io::AsyncRead {
127    /// Wraps `self` with a compatibility layer that implements
128    /// `tokio_io::AsyncRead`.
129    fn compat(self) -> Compat<Self>
130    where
131        Self: Sized,
132    {
133        Compat::new(self)
134    }
135}
136
137impl<T: futures_io::AsyncRead> FuturesAsyncReadCompatExt for T {}
138
139/// Extension trait that allows converting a type implementing
140/// `futures_io::AsyncWrite` to implement `tokio::io::AsyncWrite`.
141pub trait FuturesAsyncWriteCompatExt: futures_io::AsyncWrite {
142    /// Wraps `self` with a compatibility layer that implements
143    /// `tokio::io::AsyncWrite`.
144    fn compat_write(self) -> Compat<Self>
145    where
146        Self: Sized,
147    {
148        Compat::new(self)
149    }
150}
151
152impl<T: futures_io::AsyncWrite> FuturesAsyncWriteCompatExt for T {}
153
154/// Extension trait that allows converting a type implementing
155/// `tokio::io::AsyncRead` to implement `futures_io::AsyncRead`.
156pub trait TokioAsyncReadCompatExt: tokio::io::AsyncRead {
157    /// Wraps `self` with a compatibility layer that implements
158    /// `futures_io::AsyncRead`.
159    fn compat(self) -> Compat<Self>
160    where
161        Self: Sized,
162    {
163        Compat::new(self)
164    }
165}
166
167impl<T: tokio::io::AsyncRead> TokioAsyncReadCompatExt for T {}
168
169/// Extension trait that allows converting a type implementing
170/// `tokio::io::AsyncWrite` to implement `futures_io::AsyncWrite`.
171pub trait TokioAsyncWriteCompatExt: tokio::io::AsyncWrite {
172    /// Wraps `self` with a compatibility layer that implements
173    /// `futures_io::AsyncWrite`.
174    fn compat_write(self) -> Compat<Self>
175    where
176        Self: Sized,
177    {
178        Compat::new(self)
179    }
180}
181
182impl<T: tokio::io::AsyncWrite> TokioAsyncWriteCompatExt for T {}
183
184// === impl Compat ===
185
186impl<T> Compat<T> {
187    fn new(inner: T) -> Self {
188        Self {
189            inner,
190            seek_pos: None,
191        }
192    }
193
194    /// Get a reference to the `Future`, `Stream`, `AsyncRead`, or `AsyncWrite` object
195    /// contained within.
196    pub fn get_ref(&self) -> &T {
197        &self.inner
198    }
199
200    /// Get a mutable reference to the `Future`, `Stream`, `AsyncRead`, or `AsyncWrite` object
201    /// contained within.
202    pub fn get_mut(&mut self) -> &mut T {
203        &mut self.inner
204    }
205
206    /// Returns the wrapped item.
207    pub fn into_inner(self) -> T {
208        self.inner
209    }
210}
211
212impl<T> tokio::io::AsyncRead for Compat<T>
213where
214    T: futures_io::AsyncRead,
215{
216    fn poll_read(
217        self: Pin<&mut Self>,
218        cx: &mut Context<'_>,
219        buf: &mut tokio::io::ReadBuf<'_>,
220    ) -> Poll<io::Result<()>> {
221        // We can't trust the inner type to not peak at the bytes,
222        // so we must defensively initialize the buffer.
223        let slice = buf.initialize_unfilled();
224        let n = ready!(futures_io::AsyncRead::poll_read(
225            self.project().inner,
226            cx,
227            slice
228        ))?;
229        buf.advance(n);
230        Poll::Ready(Ok(()))
231    }
232}
233
234impl<T> futures_io::AsyncRead for Compat<T>
235where
236    T: tokio::io::AsyncRead,
237{
238    fn poll_read(
239        self: Pin<&mut Self>,
240        cx: &mut Context<'_>,
241        slice: &mut [u8],
242    ) -> Poll<io::Result<usize>> {
243        let mut buf = tokio::io::ReadBuf::new(slice);
244        ready!(tokio::io::AsyncRead::poll_read(
245            self.project().inner,
246            cx,
247            &mut buf
248        ))?;
249        Poll::Ready(Ok(buf.filled().len()))
250    }
251}
252
253impl<T> tokio::io::AsyncBufRead for Compat<T>
254where
255    T: futures_io::AsyncBufRead,
256{
257    fn poll_fill_buf<'a>(
258        self: Pin<&'a mut Self>,
259        cx: &mut Context<'_>,
260    ) -> Poll<io::Result<&'a [u8]>> {
261        futures_io::AsyncBufRead::poll_fill_buf(self.project().inner, cx)
262    }
263
264    fn consume(self: Pin<&mut Self>, amt: usize) {
265        futures_io::AsyncBufRead::consume(self.project().inner, amt)
266    }
267}
268
269impl<T> futures_io::AsyncBufRead for Compat<T>
270where
271    T: tokio::io::AsyncBufRead,
272{
273    fn poll_fill_buf<'a>(
274        self: Pin<&'a mut Self>,
275        cx: &mut Context<'_>,
276    ) -> Poll<io::Result<&'a [u8]>> {
277        tokio::io::AsyncBufRead::poll_fill_buf(self.project().inner, cx)
278    }
279
280    fn consume(self: Pin<&mut Self>, amt: usize) {
281        tokio::io::AsyncBufRead::consume(self.project().inner, amt)
282    }
283}
284
285impl<T> tokio::io::AsyncWrite for Compat<T>
286where
287    T: futures_io::AsyncWrite,
288{
289    fn poll_write(
290        self: Pin<&mut Self>,
291        cx: &mut Context<'_>,
292        buf: &[u8],
293    ) -> Poll<io::Result<usize>> {
294        futures_io::AsyncWrite::poll_write(self.project().inner, cx, buf)
295    }
296
297    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
298        futures_io::AsyncWrite::poll_flush(self.project().inner, cx)
299    }
300
301    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
302        futures_io::AsyncWrite::poll_close(self.project().inner, cx)
303    }
304}
305
306impl<T> futures_io::AsyncWrite for Compat<T>
307where
308    T: tokio::io::AsyncWrite,
309{
310    fn poll_write(
311        self: Pin<&mut Self>,
312        cx: &mut Context<'_>,
313        buf: &[u8],
314    ) -> Poll<io::Result<usize>> {
315        tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
316    }
317
318    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
319        tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
320    }
321
322    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
323        tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
324    }
325}
326
327impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
328    fn poll_seek(
329        mut self: Pin<&mut Self>,
330        cx: &mut Context<'_>,
331        pos: io::SeekFrom,
332    ) -> Poll<io::Result<u64>> {
333        if self.seek_pos != Some(pos) {
334            // Ensure previous seeks have finished before starting a new one
335            ready!(self.as_mut().project().inner.poll_complete(cx))?;
336            self.as_mut().project().inner.start_seek(pos)?;
337            *self.as_mut().project().seek_pos = Some(pos);
338        }
339        let res = ready!(self.as_mut().project().inner.poll_complete(cx));
340        *self.as_mut().project().seek_pos = None;
341        Poll::Ready(res)
342    }
343}
344
345impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
346    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
347        *self.as_mut().project().seek_pos = Some(pos);
348        Ok(())
349    }
350
351    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
352        let pos = match self.seek_pos {
353            None => {
354                // tokio 1.x AsyncSeek recommends calling poll_complete before start_seek.
355                // We don't have to guarantee that the value returned by
356                // poll_complete called without start_seek is correct,
357                // so we'll return 0.
358                return Poll::Ready(Ok(0));
359            }
360            Some(pos) => pos,
361        };
362        let res = ready!(self.as_mut().project().inner.poll_seek(cx, pos));
363        *self.as_mut().project().seek_pos = None;
364        Poll::Ready(res)
365    }
366}
367
368#[cfg(unix)]
369impl<T: std::os::unix::io::AsRawFd> std::os::unix::io::AsRawFd for Compat<T> {
370    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
371        self.inner.as_raw_fd()
372    }
373}
374
375#[cfg(windows)]
376impl<T: std::os::windows::io::AsRawHandle> std::os::windows::io::AsRawHandle for Compat<T> {
377    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
378        self.inner.as_raw_handle()
379    }
380}