1use allocative::Allocative;
5use async_graphql::SimpleObject;
6use linera_base::{
7 data_types::{ArithmeticError, BlockHeight},
8 ensure,
9 identifiers::ChainId,
10};
11#[cfg(with_testing)]
12use linera_views::context::MemoryContext;
13use linera_views::{
14 context::Context,
15 queue_view::QueueView,
16 register_view::RegisterView,
17 views::{ClonableView, View},
18 ViewError,
19};
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22
23use crate::{data_types::MessageBundle, ChainError};
24
25#[cfg(test)]
26#[path = "unit_tests/inbox_tests.rs"]
27mod inbox_tests;
28
29#[cfg(with_metrics)]
30mod metrics {
31 use std::sync::LazyLock;
32
33 use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
34 use prometheus::HistogramVec;
35
36 pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
37 register_histogram_vec(
38 "inbox_size",
39 "Inbox size",
40 &[],
41 exponential_bucket_interval(1.0, 2_000_000.0),
42 )
43 });
44
45 pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
46 register_histogram_vec(
47 "removed_bundles",
48 "Number of bundles removed by anticipation",
49 &[],
50 exponential_bucket_interval(1.0, 10_000.0),
51 )
52 });
53}
54
55#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
68#[derive(Allocative, Debug, ClonableView, View)]
69#[allocative(bound = "C")]
70pub struct InboxStateView<C>
71where
72 C: Clone + Context,
73{
74 pub next_cursor_to_add: RegisterView<C, Cursor>,
76 pub next_cursor_to_remove: RegisterView<C, Cursor>,
78 pub added_bundles: QueueView<C, MessageBundle>,
80 pub removed_bundles: QueueView<C, MessageBundle>,
83}
84
85#[derive(
86 Debug,
87 Default,
88 Clone,
89 Copy,
90 Hash,
91 Eq,
92 PartialEq,
93 Ord,
94 PartialOrd,
95 Serialize,
96 Deserialize,
97 SimpleObject,
98 Allocative,
99)]
100pub struct Cursor {
101 height: BlockHeight,
102 index: u32,
103}
104
105#[derive(Error, Debug)]
106pub(crate) enum InboxError {
107 #[error(transparent)]
108 ViewError(#[from] ViewError),
109 #[error(transparent)]
110 ArithmeticError(#[from] ArithmeticError),
111 #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
112 UnexpectedBundle {
113 bundle: MessageBundle,
114 previous_bundle: MessageBundle,
115 },
116 #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
117 IncorrectOrder {
118 bundle: MessageBundle,
119 next_cursor: Cursor,
120 },
121 #[error(
122 "{bundle:?} cannot be skipped: it must be received before the next \
123 messages from the same origin"
124 )]
125 UnskippableBundle { bundle: MessageBundle },
126}
127
128impl From<&MessageBundle> for Cursor {
129 #[inline]
130 fn from(bundle: &MessageBundle) -> Self {
131 Self {
132 height: bundle.height,
133 index: bundle.transaction_index,
134 }
135 }
136}
137
138impl Cursor {
139 fn try_add_one(self) -> Result<Self, ArithmeticError> {
140 let value = Self {
141 height: self.height,
142 index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
143 };
144 Ok(value)
145 }
146}
147
148impl From<(ChainId, ChainId, InboxError)> for ChainError {
149 fn from(value: (ChainId, ChainId, InboxError)) -> Self {
150 let (chain_id, origin, error) = value;
151 match error {
152 InboxError::ViewError(e) => ChainError::ViewError(e),
153 InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
154 InboxError::UnexpectedBundle {
155 bundle,
156 previous_bundle,
157 } => ChainError::UnexpectedMessage {
158 chain_id,
159 origin,
160 bundle: Box::new(bundle),
161 previous_bundle: Box::new(previous_bundle),
162 },
163 InboxError::IncorrectOrder {
164 bundle,
165 next_cursor,
166 } => ChainError::IncorrectMessageOrder {
167 chain_id,
168 origin,
169 bundle: Box::new(bundle),
170 next_height: next_cursor.height,
171 next_index: next_cursor.index,
172 },
173 InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
174 chain_id,
175 origin,
176 bundle: Box::new(bundle),
177 },
178 }
179 }
180}
181
182impl<C> InboxStateView<C>
183where
184 C: Context + Clone + 'static,
185{
186 pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
189 let cursor = self.next_cursor_to_add.get();
190 if cursor.index == 0 {
191 Ok(cursor.height)
192 } else {
193 Ok(cursor.height.try_add_one()?)
194 }
195 }
196
197 pub(crate) async fn remove_bundle(
201 &mut self,
202 bundle: &MessageBundle,
203 ) -> Result<bool, InboxError> {
204 let cursor = Cursor::from(bundle);
206 ensure!(
207 cursor >= *self.next_cursor_to_remove.get(),
208 InboxError::IncorrectOrder {
209 bundle: bundle.clone(),
210 next_cursor: *self.next_cursor_to_remove.get(),
211 }
212 );
213 while let Some(previous_bundle) = self.added_bundles.front().await? {
215 if Cursor::from(&previous_bundle) >= cursor {
216 break;
217 }
218 ensure!(
219 previous_bundle.is_skippable(),
220 InboxError::UnskippableBundle {
221 bundle: previous_bundle
222 }
223 );
224 self.added_bundles.delete_front();
225 #[cfg(with_metrics)]
226 metrics::INBOX_SIZE
227 .with_label_values(&[])
228 .observe(self.added_bundles.count() as f64);
229 tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
230 }
231 let already_known = match self.added_bundles.front().await? {
233 Some(previous_bundle) => {
234 ensure!(
240 bundle == &previous_bundle,
241 InboxError::UnexpectedBundle {
242 previous_bundle,
243 bundle: bundle.clone(),
244 }
245 );
246 self.added_bundles.delete_front();
247 #[cfg(with_metrics)]
248 metrics::INBOX_SIZE
249 .with_label_values(&[])
250 .observe(self.added_bundles.count() as f64);
251 tracing::trace!("Consuming bundle {:?}", bundle);
252 true
253 }
254 None => {
255 tracing::trace!("Marking bundle as expected: {:?}", bundle);
256 self.removed_bundles.push_back(bundle.clone());
257 #[cfg(with_metrics)]
258 metrics::REMOVED_BUNDLES
259 .with_label_values(&[])
260 .observe(self.removed_bundles.count() as f64);
261 false
262 }
263 };
264 self.next_cursor_to_remove.set(cursor.try_add_one()?);
265 Ok(already_known)
266 }
267
268 pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
273 let cursor = Cursor::from(&bundle);
275 ensure!(
276 cursor >= *self.next_cursor_to_add.get(),
277 InboxError::IncorrectOrder {
278 bundle: bundle.clone(),
279 next_cursor: *self.next_cursor_to_add.get(),
280 }
281 );
282 let newly_added = match self.removed_bundles.front().await? {
284 Some(previous_bundle) => {
285 if Cursor::from(&previous_bundle) == cursor {
286 ensure!(
289 bundle == previous_bundle,
290 InboxError::UnexpectedBundle {
291 previous_bundle,
292 bundle,
293 }
294 );
295 self.removed_bundles.delete_front();
296 #[cfg(with_metrics)]
297 metrics::REMOVED_BUNDLES
298 .with_label_values(&[])
299 .observe(self.removed_bundles.count() as f64);
300 } else {
301 ensure!(
304 cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
305 InboxError::UnexpectedBundle {
306 previous_bundle,
307 bundle,
308 }
309 );
310 }
311 false
312 }
313 None => {
314 self.added_bundles.push_back(bundle);
316 #[cfg(with_metrics)]
317 metrics::INBOX_SIZE
318 .with_label_values(&[])
319 .observe(self.added_bundles.count() as f64);
320 true
321 }
322 };
323 self.next_cursor_to_add.set(cursor.try_add_one()?);
324 Ok(newly_added)
325 }
326}
327
328#[cfg(with_testing)]
329impl InboxStateView<MemoryContext<()>>
330where
331 MemoryContext<()>: Context + Clone + Send + Sync + 'static,
332{
333 pub async fn new() -> Self {
334 let context = MemoryContext::new_for_testing(());
335 Self::load(context)
336 .await
337 .expect("Loading from memory should work")
338 }
339}