linera_rpc/simple/
codec.rs1use std::{io, mem, ops::DerefMut};
5
6use bytes::{Buf, BufMut, BytesMut};
7use linera_core::node::NodeError;
8use thiserror::Error;
9use tokio_util::codec::{Decoder, Encoder};
10
11use crate::RpcMessage;
12
13const PREFIX_SIZE: u8 = mem::size_of::<u32>() as u8;
15
16#[derive(Clone, Copy, Debug)]
21pub struct Codec;
22
23impl Encoder<RpcMessage> for Codec {
24 type Error = Error;
25
26 fn encode(&mut self, message: RpcMessage, buffer: &mut BytesMut) -> Result<(), Self::Error> {
27 let mut frame_buffer = buffer.split_off(buffer.len());
28
29 frame_buffer.put_u32_le(0);
30
31 let mut frame_writer = frame_buffer.writer();
32
33 bincode::serialize_into(&mut frame_writer, &message)
34 .map_err(|error| Error::Serialization(*error))?;
35
36 let mut frame_buffer = frame_writer.into_inner();
37 let frame_size = frame_buffer.len();
38 let payload_size = frame_size - PREFIX_SIZE as usize;
39
40 let mut start_of_frame = frame_buffer.deref_mut();
41
42 start_of_frame.put_u32_le(
43 payload_size
44 .try_into()
45 .map_err(|_| Error::MessageTooBig { size: payload_size })?,
46 );
47
48 buffer.unsplit(frame_buffer);
49
50 Ok(())
51 }
52}
53
54impl Decoder for Codec {
55 type Item = RpcMessage;
56 type Error = Error;
57
58 fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
59 if buffer.len() < PREFIX_SIZE.into() {
60 return Ok(None);
61 }
62
63 let mut start_of_buffer: &[u8] = &*buffer;
64 let payload_size = start_of_buffer
65 .get_u32_le()
66 .try_into()
67 .expect("u32 should fit in a usize");
68
69 let frame_size = PREFIX_SIZE as usize + payload_size;
70
71 if buffer.len() < frame_size {
72 buffer.reserve(frame_size);
73 return Ok(None);
74 }
75
76 let _prefix = buffer.split_to(PREFIX_SIZE.into());
77 let payload = buffer.split_to(payload_size);
78
79 let message =
80 bincode::deserialize(&payload).map_err(|error| Error::Deserialization(*error))?;
81
82 Ok(Some(message))
83 }
84}
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("I/O error in the underlying transport: {0}")]
90 IoError(#[from] io::Error),
91
92 #[error("Failed to deserialize an incoming message: {0}")]
93 Deserialization(#[source] bincode::ErrorKind),
94
95 #[error("Failed to serialize outgoing message: {0}")]
96 Serialization(#[source] bincode::ErrorKind),
97
98 #[error("RpcMessage is too big to fit in a protocol frame: \
99 message is {size} bytes but can't be larger than {max} bytes.",
100 max = u32::MAX)]
101 MessageTooBig { size: usize },
102}
103
104impl From<Error> for NodeError {
105 fn from(error: Error) -> NodeError {
106 match error {
107 Error::IoError(io_error) => NodeError::ClientIoError {
108 error: format!("{}", io_error),
109 },
110 err => {
111 tracing::error!("Unexpected decoding error: {err}");
112 NodeError::InvalidDecoding
113 }
114 }
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use bytes::{BufMut, BytesMut};
121 use linera_core::data_types::ChainInfoQuery;
122 use test_strategy::proptest;
123 use tokio_util::codec::{Decoder, Encoder};
124
125 use super::{Codec, RpcMessage, PREFIX_SIZE};
126
127 #[proptest]
135 fn decodes_frame_ignoring_leading_and_trailing_bytes(
136 leading_bytes: Vec<u8>,
137 message_contents: ChainInfoQuery,
138 trailing_bytes: Vec<u8>,
139 ) {
140 let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
141 let payload = bincode::serialize(&message).expect("RpcMessage is serializable");
142
143 let mut buffer = BytesMut::with_capacity(
144 leading_bytes.len() + PREFIX_SIZE as usize + payload.len() + trailing_bytes.len(),
145 );
146
147 buffer.extend_from_slice(&leading_bytes);
148
149 let start_of_buffer = buffer.split();
150
151 buffer.put_u32_le(payload.len() as u32);
152 buffer.extend_from_slice(&payload);
153 buffer.extend_from_slice(&trailing_bytes);
154
155 let result = Codec.decode(&mut buffer);
156
157 assert!(result.is_ok());
158 assert_eq!(result.unwrap(), Some(message));
159
160 assert_eq!(&start_of_buffer, &leading_bytes);
161 assert_eq!(&buffer, &trailing_bytes);
162 }
163
164 #[proptest]
173 fn encodes_at_the_correct_buffer_offset(
174 leading_bytes: Vec<u8>,
175 message_contents: ChainInfoQuery,
176 ) {
177 let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
178 let serialized_message =
179 bincode::serialize(&message).expect("Serialization should succeed");
180
181 let mut buffer = BytesMut::new();
182
183 buffer.extend_from_slice(&leading_bytes);
184
185 let frame_start = buffer.len();
186 let prefix_end = frame_start + PREFIX_SIZE as usize;
187
188 let result = Codec.encode(message, &mut buffer);
189
190 assert!(matches!(result, Ok(())));
191 assert_eq!(&buffer[..frame_start], &leading_bytes);
192
193 let prefix = u32::from_le_bytes(
194 buffer[frame_start..prefix_end]
195 .try_into()
196 .expect("Incorrect prefix slice indices"),
197 );
198
199 assert_eq!(prefix as usize, serialized_message.len());
200 assert_eq!(
201 buffer.len(),
202 leading_bytes.len() + PREFIX_SIZE as usize + prefix as usize
203 );
204
205 assert_eq!(&buffer[prefix_end..], &serialized_message);
206 }
207}