linera_rpc/simple/
codec.rs1use std::{io, mem, ops::DerefMut};
5
6use bytes::{Buf, BufMut, BytesMut};
7use linera_core::node::NodeError;
8use thiserror::Error;
9use tokio_util::codec::{Decoder, Encoder};
10
11use crate::RpcMessage;
12
13#[expect(
15 clippy::cast_possible_truncation,
16 reason = "size_of::<u32>() is always 4"
17)]
18const PREFIX_SIZE: u8 = mem::size_of::<u32>() as u8;
19
20#[derive(Clone, Copy, Debug)]
25pub struct Codec;
26
27impl Encoder<RpcMessage> for Codec {
28 type Error = Error;
29
30 fn encode(&mut self, message: RpcMessage, buffer: &mut BytesMut) -> Result<(), Self::Error> {
31 let mut frame_buffer = buffer.split_off(buffer.len());
32
33 frame_buffer.put_u32_le(0);
34
35 let mut frame_writer = frame_buffer.writer();
36
37 bincode::serialize_into(&mut frame_writer, &message)
38 .map_err(|error| Error::Serialization(*error))?;
39
40 let mut frame_buffer = frame_writer.into_inner();
41 let frame_size = frame_buffer.len();
42 let payload_size = frame_size - PREFIX_SIZE as usize;
43
44 let mut start_of_frame = frame_buffer.deref_mut();
45
46 start_of_frame.put_u32_le(
47 payload_size
48 .try_into()
49 .map_err(|_| Error::MessageTooBig { size: payload_size })?,
50 );
51
52 buffer.unsplit(frame_buffer);
53
54 Ok(())
55 }
56}
57
58impl Decoder for Codec {
59 type Item = RpcMessage;
60 type Error = Error;
61
62 fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
63 if buffer.len() < PREFIX_SIZE.into() {
64 return Ok(None);
65 }
66
67 let mut start_of_buffer: &[u8] = &*buffer;
68 let payload_size = start_of_buffer
69 .get_u32_le()
70 .try_into()
71 .expect("u32 should fit in a usize");
72
73 let frame_size = PREFIX_SIZE as usize + payload_size;
74
75 if buffer.len() < frame_size {
76 buffer.reserve(frame_size);
77 return Ok(None);
78 }
79
80 let _prefix = buffer.split_to(PREFIX_SIZE.into());
81 let payload = buffer.split_to(payload_size);
82
83 let message =
84 bincode::deserialize(&payload).map_err(|error| Error::Deserialization(*error))?;
85
86 Ok(Some(message))
87 }
88}
89
90#[derive(Debug, Error)]
92pub enum Error {
93 #[error("I/O error in the underlying transport: {0}")]
94 IoError(#[from] io::Error),
95
96 #[error("Failed to deserialize an incoming message: {0}")]
97 Deserialization(#[source] bincode::ErrorKind),
98
99 #[error("Failed to serialize outgoing message: {0}")]
100 Serialization(#[source] bincode::ErrorKind),
101
102 #[error("RpcMessage is too big to fit in a protocol frame: \
103 message is {size} bytes but can't be larger than {max} bytes.",
104 max = u32::MAX)]
105 MessageTooBig { size: usize },
106}
107
108impl From<Error> for NodeError {
109 fn from(error: Error) -> NodeError {
110 match error {
111 Error::IoError(io_error) => NodeError::ClientIoError {
112 error: format!("{io_error}"),
113 },
114 err => {
115 tracing::error!("Unexpected decoding error: {err}");
116 NodeError::InvalidDecoding
117 }
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 #![allow(clippy::cast_possible_truncation)]
125
126 use bytes::{BufMut, BytesMut};
127 use linera_core::data_types::ChainInfoQuery;
128 use test_strategy::proptest;
129 use tokio_util::codec::{Decoder, Encoder};
130
131 use super::{Codec, RpcMessage, PREFIX_SIZE};
132
133 #[proptest]
141 fn decodes_frame_ignoring_leading_and_trailing_bytes(
142 leading_bytes: Vec<u8>,
143 message_contents: ChainInfoQuery,
144 trailing_bytes: Vec<u8>,
145 ) {
146 let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
147 let payload = bincode::serialize(&message).expect("RpcMessage is serializable");
148
149 let mut buffer = BytesMut::with_capacity(
150 leading_bytes.len() + PREFIX_SIZE as usize + payload.len() + trailing_bytes.len(),
151 );
152
153 buffer.extend_from_slice(&leading_bytes);
154
155 let start_of_buffer = buffer.split();
156
157 buffer.put_u32_le(payload.len() as u32);
158 buffer.extend_from_slice(&payload);
159 buffer.extend_from_slice(&trailing_bytes);
160
161 let result = Codec.decode(&mut buffer);
162
163 assert!(result.is_ok());
164 assert_eq!(result.unwrap(), Some(message));
165
166 assert_eq!(&start_of_buffer, &leading_bytes);
167 assert_eq!(&buffer, &trailing_bytes);
168 }
169
170 #[proptest]
179 fn encodes_at_the_correct_buffer_offset(
180 leading_bytes: Vec<u8>,
181 message_contents: ChainInfoQuery,
182 ) {
183 let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
184 let serialized_message =
185 bincode::serialize(&message).expect("Serialization should succeed");
186
187 let mut buffer = BytesMut::new();
188
189 buffer.extend_from_slice(&leading_bytes);
190
191 let frame_start = buffer.len();
192 let prefix_end = frame_start + PREFIX_SIZE as usize;
193
194 let result = Codec.encode(message, &mut buffer);
195
196 assert!(matches!(result, Ok(())));
197 assert_eq!(&buffer[..frame_start], &leading_bytes);
198
199 let prefix = u32::from_le_bytes(
200 buffer[frame_start..prefix_end]
201 .try_into()
202 .expect("Incorrect prefix slice indices"),
203 );
204
205 assert_eq!(prefix as usize, serialized_message.len());
206 assert_eq!(
207 buffer.len(),
208 leading_bytes.len() + PREFIX_SIZE as usize + prefix as usize
209 );
210
211 assert_eq!(&buffer[prefix_end..], &serialized_message);
212 }
213}