1use async_graphql::SimpleObject;
5use linera_base::{
6 data_types::{ArithmeticError, BlockHeight},
7 ensure,
8 identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::MemoryContext;
12use linera_views::{
13 context::Context,
14 queue_view::QueueView,
15 register_view::RegisterView,
16 views::{ClonableView, View},
17 ViewError,
18};
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21
22use crate::{data_types::MessageBundle, ChainError};
23
24#[cfg(test)]
25#[path = "unit_tests/inbox_tests.rs"]
26mod inbox_tests;
27
28#[cfg(with_metrics)]
29mod metrics {
30 use std::sync::LazyLock;
31
32 use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
33 use prometheus::HistogramVec;
34
35 pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
36 register_histogram_vec(
37 "inbox_size",
38 "Inbox size",
39 &[],
40 exponential_bucket_interval(1.0, 2_000_000.0),
41 )
42 });
43
44 pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
45 register_histogram_vec(
46 "removed_bundles",
47 "Number of bundles removed by anticipation",
48 &[],
49 exponential_bucket_interval(1.0, 10_000.0),
50 )
51 });
52}
53
54#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
67#[derive(Debug, ClonableView, View)]
68pub struct InboxStateView<C>
69where
70 C: Clone + Context + Send + Sync,
71{
72 pub next_cursor_to_add: RegisterView<C, Cursor>,
74 pub next_cursor_to_remove: RegisterView<C, Cursor>,
76 pub added_bundles: QueueView<C, MessageBundle>,
78 pub removed_bundles: QueueView<C, MessageBundle>,
81}
82
83#[derive(
84 Debug,
85 Default,
86 Clone,
87 Copy,
88 Hash,
89 Eq,
90 PartialEq,
91 Ord,
92 PartialOrd,
93 Serialize,
94 Deserialize,
95 SimpleObject,
96)]
97pub struct Cursor {
98 height: BlockHeight,
99 index: u32,
100}
101
102#[derive(Error, Debug)]
103pub(crate) enum InboxError {
104 #[error(transparent)]
105 ViewError(#[from] ViewError),
106 #[error(transparent)]
107 ArithmeticError(#[from] ArithmeticError),
108 #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
109 UnexpectedBundle {
110 bundle: MessageBundle,
111 previous_bundle: MessageBundle,
112 },
113 #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
114 IncorrectOrder {
115 bundle: MessageBundle,
116 next_cursor: Cursor,
117 },
118 #[error(
119 "{bundle:?} cannot be skipped: it must be received before the next \
120 messages from the same origin"
121 )]
122 UnskippableBundle { bundle: MessageBundle },
123}
124
125impl From<&MessageBundle> for Cursor {
126 #[inline]
127 fn from(bundle: &MessageBundle) -> Self {
128 Self {
129 height: bundle.height,
130 index: bundle.transaction_index,
131 }
132 }
133}
134
135impl Cursor {
136 fn try_add_one(self) -> Result<Self, ArithmeticError> {
137 let value = Self {
138 height: self.height,
139 index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
140 };
141 Ok(value)
142 }
143}
144
145impl From<(ChainId, ChainId, InboxError)> for ChainError {
146 fn from(value: (ChainId, ChainId, InboxError)) -> Self {
147 let (chain_id, origin, error) = value;
148 match error {
149 InboxError::ViewError(e) => ChainError::ViewError(e),
150 InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
151 InboxError::UnexpectedBundle {
152 bundle,
153 previous_bundle,
154 } => ChainError::UnexpectedMessage {
155 chain_id,
156 origin,
157 bundle: Box::new(bundle),
158 previous_bundle: Box::new(previous_bundle),
159 },
160 InboxError::IncorrectOrder {
161 bundle,
162 next_cursor,
163 } => ChainError::IncorrectMessageOrder {
164 chain_id,
165 origin,
166 bundle: Box::new(bundle),
167 next_height: next_cursor.height,
168 next_index: next_cursor.index,
169 },
170 InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
171 chain_id,
172 origin,
173 bundle: Box::new(bundle),
174 },
175 }
176 }
177}
178
179impl<C> InboxStateView<C>
180where
181 C: Context + Clone + Send + Sync + 'static,
182{
183 pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
186 let cursor = self.next_cursor_to_add.get();
187 if cursor.index == 0 {
188 Ok(cursor.height)
189 } else {
190 Ok(cursor.height.try_add_one()?)
191 }
192 }
193
194 pub(crate) async fn remove_bundle(
198 &mut self,
199 bundle: &MessageBundle,
200 ) -> Result<bool, InboxError> {
201 let cursor = Cursor::from(bundle);
203 ensure!(
204 cursor >= *self.next_cursor_to_remove.get(),
205 InboxError::IncorrectOrder {
206 bundle: bundle.clone(),
207 next_cursor: *self.next_cursor_to_remove.get(),
208 }
209 );
210 while let Some(previous_bundle) = self.added_bundles.front().await? {
212 if Cursor::from(&previous_bundle) >= cursor {
213 break;
214 }
215 ensure!(
216 previous_bundle.is_skippable(),
217 InboxError::UnskippableBundle {
218 bundle: previous_bundle
219 }
220 );
221 self.added_bundles.delete_front();
222 #[cfg(with_metrics)]
223 metrics::INBOX_SIZE
224 .with_label_values(&[])
225 .observe(self.added_bundles.count() as f64);
226 tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
227 }
228 let already_known = match self.added_bundles.front().await? {
230 Some(previous_bundle) => {
231 ensure!(
237 bundle == &previous_bundle,
238 InboxError::UnexpectedBundle {
239 previous_bundle,
240 bundle: bundle.clone(),
241 }
242 );
243 self.added_bundles.delete_front();
244 #[cfg(with_metrics)]
245 metrics::INBOX_SIZE
246 .with_label_values(&[])
247 .observe(self.added_bundles.count() as f64);
248 tracing::trace!("Consuming bundle {:?}", bundle);
249 true
250 }
251 None => {
252 tracing::trace!("Marking bundle as expected: {:?}", bundle);
253 self.removed_bundles.push_back(bundle.clone());
254 #[cfg(with_metrics)]
255 metrics::REMOVED_BUNDLES
256 .with_label_values(&[])
257 .observe(self.removed_bundles.count() as f64);
258 false
259 }
260 };
261 self.next_cursor_to_remove.set(cursor.try_add_one()?);
262 Ok(already_known)
263 }
264
265 pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
270 let cursor = Cursor::from(&bundle);
272 ensure!(
273 cursor >= *self.next_cursor_to_add.get(),
274 InboxError::IncorrectOrder {
275 bundle: bundle.clone(),
276 next_cursor: *self.next_cursor_to_add.get(),
277 }
278 );
279 let newly_added = match self.removed_bundles.front().await? {
281 Some(previous_bundle) => {
282 if Cursor::from(&previous_bundle) == cursor {
283 ensure!(
286 bundle == previous_bundle,
287 InboxError::UnexpectedBundle {
288 previous_bundle,
289 bundle,
290 }
291 );
292 self.removed_bundles.delete_front();
293 #[cfg(with_metrics)]
294 metrics::REMOVED_BUNDLES
295 .with_label_values(&[])
296 .observe(self.removed_bundles.count() as f64);
297 } else {
298 ensure!(
301 cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
302 InboxError::UnexpectedBundle {
303 previous_bundle,
304 bundle,
305 }
306 );
307 }
308 false
309 }
310 None => {
311 self.added_bundles.push_back(bundle);
313 #[cfg(with_metrics)]
314 metrics::INBOX_SIZE
315 .with_label_values(&[])
316 .observe(self.added_bundles.count() as f64);
317 true
318 }
319 };
320 self.next_cursor_to_add.set(cursor.try_add_one()?);
321 Ok(newly_added)
322 }
323}
324
325#[cfg(with_testing)]
326impl InboxStateView<MemoryContext<()>>
327where
328 MemoryContext<()>: Context + Clone + Send + Sync + 'static,
329{
330 pub async fn new() -> Self {
331 let context = MemoryContext::new_for_testing(());
332 Self::load(context)
333 .await
334 .expect("Loading from memory should work")
335 }
336}