Skip to main content

linera_rpc/simple/
codec.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{io, mem, ops::DerefMut};
5
6use bytes::{Buf, BufMut, BytesMut};
7use linera_core::node::NodeError;
8use thiserror::Error;
9use tokio_util::codec::{Decoder, Encoder};
10
11use crate::RpcMessage;
12
13/// The size of the frame prefix that contains the payload size.
14#[expect(
15    clippy::cast_possible_truncation,
16    reason = "size_of::<u32>() is always 4"
17)]
18const PREFIX_SIZE: u8 = mem::size_of::<u32>() as u8;
19
20/// An encoder/decoder of [`RpcMessage`]s for the RPC protocol.
21///
22/// The frames are length-delimited by a [`u32`] prefix, and the payload is deserialized by
23/// [`bincode`].
24#[derive(Clone, Copy, Debug)]
25pub struct Codec;
26
27impl Encoder<RpcMessage> for Codec {
28    type Error = Error;
29
30    fn encode(&mut self, message: RpcMessage, buffer: &mut BytesMut) -> Result<(), Self::Error> {
31        let mut frame_buffer = buffer.split_off(buffer.len());
32
33        frame_buffer.put_u32_le(0);
34
35        let mut frame_writer = frame_buffer.writer();
36
37        bincode::serialize_into(&mut frame_writer, &message)
38            .map_err(|error| Error::Serialization(*error))?;
39
40        let mut frame_buffer = frame_writer.into_inner();
41        let frame_size = frame_buffer.len();
42        let payload_size = frame_size - PREFIX_SIZE as usize;
43
44        let mut start_of_frame = frame_buffer.deref_mut();
45
46        start_of_frame.put_u32_le(
47            payload_size
48                .try_into()
49                .map_err(|_| Error::MessageTooBig { size: payload_size })?,
50        );
51
52        buffer.unsplit(frame_buffer);
53
54        Ok(())
55    }
56}
57
58impl Decoder for Codec {
59    type Item = RpcMessage;
60    type Error = Error;
61
62    fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
63        if buffer.len() < PREFIX_SIZE.into() {
64            return Ok(None);
65        }
66
67        let mut start_of_buffer: &[u8] = &*buffer;
68        let payload_size = start_of_buffer
69            .get_u32_le()
70            .try_into()
71            .expect("u32 should fit in a usize");
72
73        let frame_size = PREFIX_SIZE as usize + payload_size;
74
75        if buffer.len() < frame_size {
76            buffer.reserve(frame_size);
77            return Ok(None);
78        }
79
80        let _prefix = buffer.split_to(PREFIX_SIZE.into());
81        let payload = buffer.split_to(payload_size);
82
83        let message =
84            bincode::deserialize(&payload).map_err(|error| Error::Deserialization(*error))?;
85
86        Ok(Some(message))
87    }
88}
89
90/// Errors that can arise during transmission or reception of [`RpcMessage`]s.
91#[derive(Debug, Error)]
92pub enum Error {
93    #[error("I/O error in the underlying transport: {0}")]
94    IoError(#[from] io::Error),
95
96    #[error("Failed to deserialize an incoming message: {0}")]
97    Deserialization(#[source] bincode::ErrorKind),
98
99    #[error("Failed to serialize outgoing message: {0}")]
100    Serialization(#[source] bincode::ErrorKind),
101
102    #[error("RpcMessage is too big to fit in a protocol frame: \
103        message is {size} bytes but can't be larger than {max} bytes.",
104        max = u32::MAX)]
105    MessageTooBig { size: usize },
106}
107
108impl From<Error> for NodeError {
109    fn from(error: Error) -> NodeError {
110        match error {
111            Error::IoError(io_error) => NodeError::ClientIoError {
112                error: format!("{io_error}"),
113            },
114            err => {
115                tracing::error!("Unexpected decoding error: {err}");
116                NodeError::InvalidDecoding
117            }
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    #![allow(clippy::cast_possible_truncation)]
125
126    use bytes::{BufMut, BytesMut};
127    use linera_core::data_types::ChainInfoQuery;
128    use test_strategy::proptest;
129    use tokio_util::codec::{Decoder, Encoder};
130
131    use super::{Codec, RpcMessage, PREFIX_SIZE};
132
133    /// Test decoding of a frame from a buffer.
134    ///
135    /// The buffer may contain leading or trailing bytes around the frame. The frame contains the
136    /// size of the payload, and the payload is a serialized dummy [`RpcMessage`].
137    ///
138    /// The decoder should produce the exact same message as used as the test input, and it should
139    /// ignore the leading and trailing bytes.
140    #[proptest]
141    fn decodes_frame_ignoring_leading_and_trailing_bytes(
142        leading_bytes: Vec<u8>,
143        message_contents: ChainInfoQuery,
144        trailing_bytes: Vec<u8>,
145    ) {
146        let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
147        let payload = bincode::serialize(&message).expect("RpcMessage is serializable");
148
149        let mut buffer = BytesMut::with_capacity(
150            leading_bytes.len() + PREFIX_SIZE as usize + payload.len() + trailing_bytes.len(),
151        );
152
153        buffer.extend_from_slice(&leading_bytes);
154
155        let start_of_buffer = buffer.split();
156
157        buffer.put_u32_le(payload.len() as u32);
158        buffer.extend_from_slice(&payload);
159        buffer.extend_from_slice(&trailing_bytes);
160
161        let result = Codec.decode(&mut buffer);
162
163        assert!(result.is_ok());
164        assert_eq!(result.unwrap(), Some(message));
165
166        assert_eq!(&start_of_buffer, &leading_bytes);
167        assert_eq!(&buffer, &trailing_bytes);
168    }
169
170    /// Test encoding a message to buffer.
171    ///
172    /// The buffer may already contain some leading bytes, but the cursor is set to where the frame
173    /// should start.
174    ///
175    /// The encoder should write a prefix with the size of the serialized message, followed by the
176    /// serialized message bytes. It should not touch the leading bytes nor append any trailing
177    /// bytes.
178    #[proptest]
179    fn encodes_at_the_correct_buffer_offset(
180        leading_bytes: Vec<u8>,
181        message_contents: ChainInfoQuery,
182    ) {
183        let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
184        let serialized_message =
185            bincode::serialize(&message).expect("Serialization should succeed");
186
187        let mut buffer = BytesMut::new();
188
189        buffer.extend_from_slice(&leading_bytes);
190
191        let frame_start = buffer.len();
192        let prefix_end = frame_start + PREFIX_SIZE as usize;
193
194        let result = Codec.encode(message, &mut buffer);
195
196        assert!(matches!(result, Ok(())));
197        assert_eq!(&buffer[..frame_start], &leading_bytes);
198
199        let prefix = u32::from_le_bytes(
200            buffer[frame_start..prefix_end]
201                .try_into()
202                .expect("Incorrect prefix slice indices"),
203        );
204
205        assert_eq!(prefix as usize, serialized_message.len());
206        assert_eq!(
207            buffer.len(),
208            leading_bytes.len() + PREFIX_SIZE as usize + prefix as usize
209        );
210
211        assert_eq!(&buffer[prefix_end..], &serialized_message);
212    }
213}