tokio/io/util/
buf_stream.rs1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
3
4use pin_project_lite::pin_project;
5use std::io::{self, IoSlice, SeekFrom};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project! {
10    #[derive(Debug)]
18    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19    pub struct BufStream<RW> {
20        #[pin]
21        inner: BufReader<BufWriter<RW>>,
22    }
23}
24
25impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
26    pub fn new(stream: RW) -> BufStream<RW> {
30        BufStream {
31            inner: BufReader::new(BufWriter::new(stream)),
32        }
33    }
34
35    pub fn with_capacity(
40        reader_capacity: usize,
41        writer_capacity: usize,
42        stream: RW,
43    ) -> BufStream<RW> {
44        BufStream {
45            inner: BufReader::with_capacity(
46                reader_capacity,
47                BufWriter::with_capacity(writer_capacity, stream),
48            ),
49        }
50    }
51
52    pub fn get_ref(&self) -> &RW {
56        self.inner.get_ref().get_ref()
57    }
58
59    pub fn get_mut(&mut self) -> &mut RW {
63        self.inner.get_mut().get_mut()
64    }
65
66    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
70        self.project().inner.get_pin_mut().get_pin_mut()
71    }
72
73    pub fn into_inner(self) -> RW {
77        self.inner.into_inner().into_inner()
78    }
79}
80
81impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
82    fn from(b: BufReader<BufWriter<RW>>) -> Self {
83        BufStream { inner: b }
84    }
85}
86
87impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
88    fn from(b: BufWriter<BufReader<RW>>) -> Self {
89        let BufWriter {
91            inner:
92                BufReader {
93                    inner,
94                    buf: rbuf,
95                    pos,
96                    cap,
97                    seek_state: rseek_state,
98                },
99            buf: wbuf,
100            written,
101            seek_state: wseek_state,
102        } = b;
103
104        BufStream {
105            inner: BufReader {
106                inner: BufWriter {
107                    inner,
108                    buf: wbuf,
109                    written,
110                    seek_state: wseek_state,
111                },
112                buf: rbuf,
113                pos,
114                cap,
115                seek_state: rseek_state,
116            },
117        }
118    }
119}
120
121impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
122    fn poll_write(
123        self: Pin<&mut Self>,
124        cx: &mut Context<'_>,
125        buf: &[u8],
126    ) -> Poll<io::Result<usize>> {
127        self.project().inner.poll_write(cx, buf)
128    }
129
130    fn poll_write_vectored(
131        self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133        bufs: &[IoSlice<'_>],
134    ) -> Poll<io::Result<usize>> {
135        self.project().inner.poll_write_vectored(cx, bufs)
136    }
137
138    fn is_write_vectored(&self) -> bool {
139        self.inner.is_write_vectored()
140    }
141
142    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143        self.project().inner.poll_flush(cx)
144    }
145
146    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
147        self.project().inner.poll_shutdown(cx)
148    }
149}
150
151impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
152    fn poll_read(
153        self: Pin<&mut Self>,
154        cx: &mut Context<'_>,
155        buf: &mut ReadBuf<'_>,
156    ) -> Poll<io::Result<()>> {
157        self.project().inner.poll_read(cx, buf)
158    }
159}
160
161impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
180    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
181        self.project().inner.start_seek(position)
182    }
183
184    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185        self.project().inner.poll_complete(cx)
186    }
187}
188
189impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
190    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191        self.project().inner.poll_fill_buf(cx)
192    }
193
194    fn consume(self: Pin<&mut Self>, amt: usize) {
195        self.project().inner.consume(amt);
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn assert_unpin() {
205        crate::is_unpin::<BufStream<()>>();
206    }
207}