linera_rpc/simple/
codec.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{io, mem, ops::DerefMut};
5
6use bytes::{Buf, BufMut, BytesMut};
7use linera_core::node::NodeError;
8use thiserror::Error;
9use tokio_util::codec::{Decoder, Encoder};
10
11use crate::RpcMessage;
12
13/// The size of the frame prefix that contains the payload size.
14const PREFIX_SIZE: u8 = mem::size_of::<u32>() as u8;
15
16/// An encoder/decoder of [`RpcMessage`]s for the RPC protocol.
17///
18/// The frames are length-delimited by a [`u32`] prefix, and the payload is deserialized by
19/// [`bincode`].
20#[derive(Clone, Copy, Debug)]
21pub struct Codec;
22
23impl Encoder<RpcMessage> for Codec {
24    type Error = Error;
25
26    fn encode(&mut self, message: RpcMessage, buffer: &mut BytesMut) -> Result<(), Self::Error> {
27        let mut frame_buffer = buffer.split_off(buffer.len());
28
29        frame_buffer.put_u32_le(0);
30
31        let mut frame_writer = frame_buffer.writer();
32
33        bincode::serialize_into(&mut frame_writer, &message)
34            .map_err(|error| Error::Serialization(*error))?;
35
36        let mut frame_buffer = frame_writer.into_inner();
37        let frame_size = frame_buffer.len();
38        let payload_size = frame_size - PREFIX_SIZE as usize;
39
40        let mut start_of_frame = frame_buffer.deref_mut();
41
42        start_of_frame.put_u32_le(
43            payload_size
44                .try_into()
45                .map_err(|_| Error::MessageTooBig { size: payload_size })?,
46        );
47
48        buffer.unsplit(frame_buffer);
49
50        Ok(())
51    }
52}
53
54impl Decoder for Codec {
55    type Item = RpcMessage;
56    type Error = Error;
57
58    fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
59        if buffer.len() < PREFIX_SIZE.into() {
60            return Ok(None);
61        }
62
63        let mut start_of_buffer: &[u8] = &*buffer;
64        let payload_size = start_of_buffer
65            .get_u32_le()
66            .try_into()
67            .expect("u32 should fit in a usize");
68
69        let frame_size = PREFIX_SIZE as usize + payload_size;
70
71        if buffer.len() < frame_size {
72            buffer.reserve(frame_size);
73            return Ok(None);
74        }
75
76        let _prefix = buffer.split_to(PREFIX_SIZE.into());
77        let payload = buffer.split_to(payload_size);
78
79        let message =
80            bincode::deserialize(&payload).map_err(|error| Error::Deserialization(*error))?;
81
82        Ok(Some(message))
83    }
84}
85
86/// Errors that can arise during transmission or reception of [`RpcMessage`]s.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("I/O error in the underlying transport: {0}")]
90    IoError(#[from] io::Error),
91
92    #[error("Failed to deserialize an incoming message: {0}")]
93    Deserialization(#[source] bincode::ErrorKind),
94
95    #[error("Failed to serialize outgoing message: {0}")]
96    Serialization(#[source] bincode::ErrorKind),
97
98    #[error("RpcMessage is too big to fit in a protocol frame: \
99        message is {size} bytes but can't be larger than {max} bytes.",
100        max = u32::MAX)]
101    MessageTooBig { size: usize },
102}
103
104impl From<Error> for NodeError {
105    fn from(error: Error) -> NodeError {
106        match error {
107            Error::IoError(io_error) => NodeError::ClientIoError {
108                error: format!("{}", io_error),
109            },
110            err => {
111                tracing::error!("Unexpected decoding error: {err}");
112                NodeError::InvalidDecoding
113            }
114        }
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use bytes::{BufMut, BytesMut};
121    use linera_core::data_types::ChainInfoQuery;
122    use test_strategy::proptest;
123    use tokio_util::codec::{Decoder, Encoder};
124
125    use super::{Codec, RpcMessage, PREFIX_SIZE};
126
127    /// Test decoding of a frame from a buffer.
128    ///
129    /// The buffer may contain leading or trailing bytes around the frame. The frame contains the
130    /// size of the payload, and the payload is a serialized dummy [`RpcMessage`].
131    ///
132    /// The decoder should produce the exact same message as used as the test input, and it should
133    /// ignore the leading and trailing bytes.
134    #[proptest]
135    fn decodes_frame_ignoring_leading_and_trailing_bytes(
136        leading_bytes: Vec<u8>,
137        message_contents: ChainInfoQuery,
138        trailing_bytes: Vec<u8>,
139    ) {
140        let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
141        let payload = bincode::serialize(&message).expect("RpcMessage is serializable");
142
143        let mut buffer = BytesMut::with_capacity(
144            leading_bytes.len() + PREFIX_SIZE as usize + payload.len() + trailing_bytes.len(),
145        );
146
147        buffer.extend_from_slice(&leading_bytes);
148
149        let start_of_buffer = buffer.split();
150
151        buffer.put_u32_le(payload.len() as u32);
152        buffer.extend_from_slice(&payload);
153        buffer.extend_from_slice(&trailing_bytes);
154
155        let result = Codec.decode(&mut buffer);
156
157        assert!(result.is_ok());
158        assert_eq!(result.unwrap(), Some(message));
159
160        assert_eq!(&start_of_buffer, &leading_bytes);
161        assert_eq!(&buffer, &trailing_bytes);
162    }
163
164    /// Test encoding a message to buffer.
165    ///
166    /// The buffer may already contain some leading bytes, but the cursor is set to where the frame
167    /// should start.
168    ///
169    /// The encoder should write a prefix with the size of the serialized message, followed by the
170    /// serialized message bytes. It should not touch the leading bytes nor append any trailing
171    /// bytes.
172    #[proptest]
173    fn encodes_at_the_correct_buffer_offset(
174        leading_bytes: Vec<u8>,
175        message_contents: ChainInfoQuery,
176    ) {
177        let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
178        let serialized_message =
179            bincode::serialize(&message).expect("Serialization should succeed");
180
181        let mut buffer = BytesMut::new();
182
183        buffer.extend_from_slice(&leading_bytes);
184
185        let frame_start = buffer.len();
186        let prefix_end = frame_start + PREFIX_SIZE as usize;
187
188        let result = Codec.encode(message, &mut buffer);
189
190        assert!(matches!(result, Ok(())));
191        assert_eq!(&buffer[..frame_start], &leading_bytes);
192
193        let prefix = u32::from_le_bytes(
194            buffer[frame_start..prefix_end]
195                .try_into()
196                .expect("Incorrect prefix slice indices"),
197        );
198
199        assert_eq!(prefix as usize, serialized_message.len());
200        assert_eq!(
201            buffer.len(),
202            leading_bytes.len() + PREFIX_SIZE as usize + prefix as usize
203        );
204
205        assert_eq!(&buffer[prefix_end..], &serialized_message);
206    }
207}