1use allocative::Allocative;
5use linera_base::{
6 data_types::{ArithmeticError, BlockHeight, Cursor},
7 ensure,
8 identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::MemoryContext;
12use linera_views::{
13 context::Context,
14 queue_view::QueueView,
15 register_view::RegisterView,
16 views::{ClonableView, View},
17 ViewError,
18};
19use thiserror::Error;
20
21use crate::{data_types::MessageBundle, ChainError};
22
23#[cfg(test)]
24#[path = "unit_tests/inbox_tests.rs"]
25mod inbox_tests;
26
27#[cfg(with_metrics)]
28mod metrics {
29 use std::sync::LazyLock;
30
31 use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
32 use prometheus::HistogramVec;
33
34 pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
35 register_histogram_vec(
36 "inbox_size",
37 "Inbox size",
38 &[],
39 exponential_bucket_interval(1.0, 2_000_000.0),
40 )
41 });
42
43 pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
44 register_histogram_vec(
45 "removed_bundles",
46 "Number of bundles removed by anticipation",
47 &[],
48 exponential_bucket_interval(1.0, 10_000.0),
49 )
50 });
51}
52
53#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
66#[derive(Allocative, Debug, ClonableView, View)]
67#[allocative(bound = "C")]
68pub struct InboxStateView<C>
69where
70 C: Clone + Context,
71{
72 pub next_cursor_to_add: RegisterView<C, Cursor>,
74 pub next_cursor_to_remove: RegisterView<C, Cursor>,
76 pub added_bundles: QueueView<C, MessageBundle>,
78 pub removed_bundles: QueueView<C, MessageBundle>,
81 pub restored_cursor: RegisterView<C, Cursor>,
88}
89
90#[derive(Error, Debug)]
91pub(crate) enum InboxError {
92 #[error(transparent)]
93 ViewError(#[from] ViewError),
94 #[error(transparent)]
95 ArithmeticError(#[from] ArithmeticError),
96 #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
97 UnexpectedBundle {
98 bundle: MessageBundle,
99 previous_bundle: MessageBundle,
100 },
101 #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
102 IncorrectOrder {
103 bundle: MessageBundle,
104 next_cursor: Cursor,
105 },
106 #[error(
107 "{bundle:?} cannot be skipped: it must be received before the next \
108 messages from the same origin"
109 )]
110 UnskippableBundle { bundle: MessageBundle },
111}
112
113impl From<(ChainId, ChainId, InboxError)> for ChainError {
114 fn from(value: (ChainId, ChainId, InboxError)) -> Self {
115 let (chain_id, origin, error) = value;
116 match error {
117 InboxError::ViewError(e) => ChainError::ViewError(e),
118 InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
119 InboxError::UnexpectedBundle {
120 bundle,
121 previous_bundle,
122 } => ChainError::UnexpectedMessage {
123 chain_id,
124 origin,
125 bundle: Box::new(bundle),
126 previous_bundle: Box::new(previous_bundle),
127 },
128 InboxError::IncorrectOrder {
129 bundle,
130 next_cursor,
131 } => ChainError::IncorrectMessageOrder {
132 chain_id,
133 origin,
134 bundle: Box::new(bundle),
135 next_height: next_cursor.height,
136 next_index: next_cursor.index,
137 },
138 InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
139 chain_id,
140 origin,
141 bundle: Box::new(bundle),
142 },
143 }
144 }
145}
146
147impl<C> InboxStateView<C>
148where
149 C: Context + Clone + 'static,
150{
151 pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
154 let cursor = self.next_cursor_to_add.get();
155 if cursor.index == 0 {
156 Ok(cursor.height)
157 } else {
158 Ok(cursor.height.try_add_one()?)
159 }
160 }
161
162 pub fn observe_size_metric(&self) {
164 #[cfg(with_metrics)]
165 metrics::INBOX_SIZE
166 .with_label_values(&[])
167 .observe(self.added_bundles.count() as f64);
168 }
169
170 pub async fn restore_from_checkpoint(&mut self, cursor: Cursor) -> Result<(), ChainError> {
187 ensure!(
188 *self.restored_cursor.get() <= cursor,
189 ChainError::InternalError(format!(
190 "cannot restore inbox at cursor {cursor:?}: already bootstrapped \
191 from a later checkpoint at cursor {previous:?}",
192 previous = *self.restored_cursor.get(),
193 ))
194 );
195 self.restored_cursor.set(cursor);
196 if *self.next_cursor_to_add.get() < cursor {
197 self.next_cursor_to_add.set(cursor);
198 }
199 self.next_cursor_to_remove.set(cursor);
200 while let Some(front) = self.added_bundles.front().await? {
201 if front.cursor() >= cursor {
202 break;
203 }
204 self.added_bundles.delete_front();
205 }
206 self.removed_bundles.clear();
207 Ok(())
208 }
209
210 pub(crate) async fn remove_bundle(
214 &mut self,
215 bundle: &MessageBundle,
216 ) -> Result<bool, InboxError> {
217 let cursor = bundle.cursor();
219 if cursor < *self.restored_cursor.get() {
220 return Ok(true);
224 }
225 ensure!(
226 cursor >= *self.next_cursor_to_remove.get(),
227 InboxError::IncorrectOrder {
228 bundle: bundle.clone(),
229 next_cursor: *self.next_cursor_to_remove.get(),
230 }
231 );
232 while let Some(previous_bundle) = self.added_bundles.front().await? {
234 if previous_bundle.cursor() >= cursor {
235 break;
236 }
237 ensure!(
238 previous_bundle.is_skippable(),
239 InboxError::UnskippableBundle {
240 bundle: previous_bundle
241 }
242 );
243 self.added_bundles.delete_front();
244 tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
245 }
246 let already_known = match self.added_bundles.front().await? {
248 Some(previous_bundle) => {
249 ensure!(
255 bundle == &previous_bundle,
256 InboxError::UnexpectedBundle {
257 previous_bundle,
258 bundle: bundle.clone(),
259 }
260 );
261 self.added_bundles.delete_front();
262 tracing::trace!("Consuming bundle {:?}", bundle);
263 true
264 }
265 None => {
266 tracing::trace!("Marking bundle as expected: {:?}", bundle);
267 self.removed_bundles.push_back(bundle.clone());
268 #[cfg(with_metrics)]
269 metrics::REMOVED_BUNDLES
270 .with_label_values(&[])
271 .observe(self.removed_bundles.count() as f64);
272 false
273 }
274 };
275 self.next_cursor_to_remove.set(cursor.try_add_one()?);
276 Ok(already_known)
277 }
278
279 pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
284 let cursor = bundle.cursor();
286 if cursor < *self.restored_cursor.get() {
287 return Ok(false);
290 }
291 ensure!(
292 cursor >= *self.next_cursor_to_add.get(),
293 InboxError::IncorrectOrder {
294 bundle: bundle.clone(),
295 next_cursor: *self.next_cursor_to_add.get(),
296 }
297 );
298 let newly_added = match self.removed_bundles.front().await? {
300 Some(previous_bundle) => {
301 if previous_bundle.cursor() == cursor {
302 ensure!(
305 bundle == previous_bundle,
306 InboxError::UnexpectedBundle {
307 previous_bundle,
308 bundle,
309 }
310 );
311 self.removed_bundles.delete_front();
312 #[cfg(with_metrics)]
313 metrics::REMOVED_BUNDLES
314 .with_label_values(&[])
315 .observe(self.removed_bundles.count() as f64);
316 } else {
317 ensure!(
320 cursor < previous_bundle.cursor() && bundle.is_skippable(),
321 InboxError::UnexpectedBundle {
322 previous_bundle,
323 bundle,
324 }
325 );
326 }
327 false
328 }
329 None => {
330 self.added_bundles.push_back(bundle);
332 true
333 }
334 };
335 self.next_cursor_to_add.set(cursor.try_add_one()?);
336 Ok(newly_added)
337 }
338}
339
340#[cfg(with_testing)]
341impl InboxStateView<MemoryContext<()>>
342where
343 MemoryContext<()>: Context + Clone + Send + Sync + 'static,
344{
345 pub async fn new() -> Self {
346 let context = MemoryContext::new_for_testing(());
347 Self::load(context)
348 .await
349 .expect("Loading from memory should work")
350 }
351}