linera_chain/
inbox.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_graphql::SimpleObject;
5use linera_base::{
6    data_types::{ArithmeticError, BlockHeight},
7    ensure,
8    identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::MemoryContext;
12use linera_views::{
13    context::Context,
14    queue_view::QueueView,
15    register_view::RegisterView,
16    views::{ClonableView, View},
17    ViewError,
18};
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21
22use crate::{data_types::MessageBundle, ChainError};
23
24#[cfg(test)]
25#[path = "unit_tests/inbox_tests.rs"]
26mod inbox_tests;
27
28#[cfg(with_metrics)]
29mod metrics {
30    use std::sync::LazyLock;
31
32    use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
33    use prometheus::HistogramVec;
34
35    pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
36        register_histogram_vec(
37            "inbox_size",
38            "Inbox size",
39            &[],
40            exponential_bucket_interval(1.0, 2_000_000.0),
41        )
42    });
43
44    pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
45        register_histogram_vec(
46            "removed_bundles",
47            "Number of bundles removed by anticipation",
48            &[],
49            exponential_bucket_interval(1.0, 10_000.0),
50        )
51    });
52}
53
54/// The state of an inbox.
55/// * An inbox is used to track bundles received and executed locally.
56/// * A `MessageBundle` consists of a logical cursor `(height, index)` and some message
57///   content `messages`.
58/// * On the surface, an inbox looks like a FIFO queue: the main APIs are `add_bundle` and
59///   `remove_bundle`.
60/// * However, bundles can also be removed before they are added. When this happens,
61///   the bundles removed by anticipation are tracked in a separate queue. Any bundle added
62///   later will be required to match the first removed bundle and so on.
63/// * The cursors of added bundles (resp. removed bundles) must be increasing over time.
64/// * Reconciliation of added and removed bundles is allowed to skip some added bundles.
65///   However, the opposite is not true: every removed bundle must be eventually added.
66#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
67#[derive(Debug, ClonableView, View)]
68pub struct InboxStateView<C>
69where
70    C: Clone + Context + Send + Sync,
71{
72    /// We have already added all the messages below this height and index.
73    pub next_cursor_to_add: RegisterView<C, Cursor>,
74    /// We have already removed all the messages below this height and index.
75    pub next_cursor_to_remove: RegisterView<C, Cursor>,
76    /// These bundles have been added and are waiting to be removed.
77    pub added_bundles: QueueView<C, MessageBundle>,
78    /// These bundles have been removed by anticipation and are waiting to be added.
79    /// At least one of `added_bundles` and `removed_bundles` should be empty.
80    pub removed_bundles: QueueView<C, MessageBundle>,
81}
82
83#[derive(
84    Debug,
85    Default,
86    Clone,
87    Copy,
88    Hash,
89    Eq,
90    PartialEq,
91    Ord,
92    PartialOrd,
93    Serialize,
94    Deserialize,
95    SimpleObject,
96)]
97pub struct Cursor {
98    height: BlockHeight,
99    index: u32,
100}
101
102#[derive(Error, Debug)]
103pub(crate) enum InboxError {
104    #[error(transparent)]
105    ViewError(#[from] ViewError),
106    #[error(transparent)]
107    ArithmeticError(#[from] ArithmeticError),
108    #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
109    UnexpectedBundle {
110        bundle: MessageBundle,
111        previous_bundle: MessageBundle,
112    },
113    #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
114    IncorrectOrder {
115        bundle: MessageBundle,
116        next_cursor: Cursor,
117    },
118    #[error(
119        "{bundle:?} cannot be skipped: it must be received before the next \
120        messages from the same origin"
121    )]
122    UnskippableBundle { bundle: MessageBundle },
123}
124
125impl From<&MessageBundle> for Cursor {
126    #[inline]
127    fn from(bundle: &MessageBundle) -> Self {
128        Self {
129            height: bundle.height,
130            index: bundle.transaction_index,
131        }
132    }
133}
134
135impl Cursor {
136    fn try_add_one(self) -> Result<Self, ArithmeticError> {
137        let value = Self {
138            height: self.height,
139            index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
140        };
141        Ok(value)
142    }
143}
144
145impl From<(ChainId, ChainId, InboxError)> for ChainError {
146    fn from(value: (ChainId, ChainId, InboxError)) -> Self {
147        let (chain_id, origin, error) = value;
148        match error {
149            InboxError::ViewError(e) => ChainError::ViewError(e),
150            InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
151            InboxError::UnexpectedBundle {
152                bundle,
153                previous_bundle,
154            } => ChainError::UnexpectedMessage {
155                chain_id,
156                origin,
157                bundle: Box::new(bundle),
158                previous_bundle: Box::new(previous_bundle),
159            },
160            InboxError::IncorrectOrder {
161                bundle,
162                next_cursor,
163            } => ChainError::IncorrectMessageOrder {
164                chain_id,
165                origin,
166                bundle: Box::new(bundle),
167                next_height: next_cursor.height,
168                next_index: next_cursor.index,
169            },
170            InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
171                chain_id,
172                origin,
173                bundle: Box::new(bundle),
174            },
175        }
176    }
177}
178
179impl<C> InboxStateView<C>
180where
181    C: Context + Clone + Send + Sync + 'static,
182{
183    /// Converts the internal cursor for added bundles into an externally-visible block height.
184    /// This makes sense because the rest of the system always adds bundles one block at a time.
185    pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
186        let cursor = self.next_cursor_to_add.get();
187        if cursor.index == 0 {
188            Ok(cursor.height)
189        } else {
190            Ok(cursor.height.try_add_one()?)
191        }
192    }
193
194    /// Consumes a bundle from the inbox.
195    ///
196    /// Returns `true` if the bundle was already known, i.e. it was present in `added_bundles`.
197    pub(crate) async fn remove_bundle(
198        &mut self,
199        bundle: &MessageBundle,
200    ) -> Result<bool, InboxError> {
201        // Record the latest cursor.
202        let cursor = Cursor::from(bundle);
203        ensure!(
204            cursor >= *self.next_cursor_to_remove.get(),
205            InboxError::IncorrectOrder {
206                bundle: bundle.clone(),
207                next_cursor: *self.next_cursor_to_remove.get(),
208            }
209        );
210        // Discard added bundles with lower cursors (if any).
211        while let Some(previous_bundle) = self.added_bundles.front().await? {
212            if Cursor::from(&previous_bundle) >= cursor {
213                break;
214            }
215            ensure!(
216                previous_bundle.is_skippable(),
217                InboxError::UnskippableBundle {
218                    bundle: previous_bundle
219                }
220            );
221            self.added_bundles.delete_front();
222            #[cfg(with_metrics)]
223            metrics::INBOX_SIZE
224                .with_label_values(&[])
225                .observe(self.added_bundles.count() as f64);
226            tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
227        }
228        // Reconcile the bundle with the next added bundle, or mark it as removed.
229        let already_known = match self.added_bundles.front().await? {
230            Some(previous_bundle) => {
231                // Rationale: If the two cursors are equal, then the bundles should match.
232                // Otherwise, at this point we know that `self.next_cursor_to_add >
233                // Cursor::from(&previous_bundle) > cursor`. Notably, `bundle` will never be
234                // added in the future. Therefore, we should fail instead of adding
235                // it to `self.removed_bundles`.
236                ensure!(
237                    bundle == &previous_bundle,
238                    InboxError::UnexpectedBundle {
239                        previous_bundle,
240                        bundle: bundle.clone(),
241                    }
242                );
243                self.added_bundles.delete_front();
244                #[cfg(with_metrics)]
245                metrics::INBOX_SIZE
246                    .with_label_values(&[])
247                    .observe(self.added_bundles.count() as f64);
248                tracing::trace!("Consuming bundle {:?}", bundle);
249                true
250            }
251            None => {
252                tracing::trace!("Marking bundle as expected: {:?}", bundle);
253                self.removed_bundles.push_back(bundle.clone());
254                #[cfg(with_metrics)]
255                metrics::REMOVED_BUNDLES
256                    .with_label_values(&[])
257                    .observe(self.removed_bundles.count() as f64);
258                false
259            }
260        };
261        self.next_cursor_to_remove.set(cursor.try_add_one()?);
262        Ok(already_known)
263    }
264
265    /// Pushes a bundle to the inbox. The verifications should not fail in production unless
266    /// many validators are faulty.
267    ///
268    /// Returns `true` if the bundle was new, `false` if it was already in `removed_bundles`.
269    pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
270        // Record the latest cursor.
271        let cursor = Cursor::from(&bundle);
272        ensure!(
273            cursor >= *self.next_cursor_to_add.get(),
274            InboxError::IncorrectOrder {
275                bundle: bundle.clone(),
276                next_cursor: *self.next_cursor_to_add.get(),
277            }
278        );
279        // Find if the bundle was removed ahead of time.
280        let newly_added = match self.removed_bundles.front().await? {
281            Some(previous_bundle) => {
282                if Cursor::from(&previous_bundle) == cursor {
283                    // We already executed this bundle by anticipation. Remove it from
284                    // the queue.
285                    ensure!(
286                        bundle == previous_bundle,
287                        InboxError::UnexpectedBundle {
288                            previous_bundle,
289                            bundle,
290                        }
291                    );
292                    self.removed_bundles.delete_front();
293                    #[cfg(with_metrics)]
294                    metrics::REMOVED_BUNDLES
295                        .with_label_values(&[])
296                        .observe(self.removed_bundles.count() as f64);
297                } else {
298                    // The receiver has already executed a later bundle from the same
299                    // sender ahead of time so we should skip this one.
300                    ensure!(
301                        cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
302                        InboxError::UnexpectedBundle {
303                            previous_bundle,
304                            bundle,
305                        }
306                    );
307                }
308                false
309            }
310            None => {
311                // Otherwise, schedule the messages for execution.
312                self.added_bundles.push_back(bundle);
313                #[cfg(with_metrics)]
314                metrics::INBOX_SIZE
315                    .with_label_values(&[])
316                    .observe(self.added_bundles.count() as f64);
317                true
318            }
319        };
320        self.next_cursor_to_add.set(cursor.try_add_one()?);
321        Ok(newly_added)
322    }
323}
324
325#[cfg(with_testing)]
326impl InboxStateView<MemoryContext<()>>
327where
328    MemoryContext<()>: Context + Clone + Send + Sync + 'static,
329{
330    pub async fn new() -> Self {
331        let context = MemoryContext::new_for_testing(());
332        Self::load(context)
333            .await
334            .expect("Loading from memory should work")
335    }
336}