Skip to main content

linera_chain/
inbox.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use allocative::Allocative;
5use linera_base::{
6    data_types::{ArithmeticError, BlockHeight, Cursor},
7    ensure,
8    identifiers::ChainId,
9};
10#[cfg(with_testing)]
11use linera_views::context::MemoryContext;
12use linera_views::{
13    context::Context,
14    queue_view::QueueView,
15    register_view::RegisterView,
16    views::{ClonableView, View},
17    ViewError,
18};
19use thiserror::Error;
20
21use crate::{data_types::MessageBundle, ChainError};
22
23#[cfg(test)]
24#[path = "unit_tests/inbox_tests.rs"]
25mod inbox_tests;
26
27#[cfg(with_metrics)]
28mod metrics {
29    use std::sync::LazyLock;
30
31    use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
32    use prometheus::HistogramVec;
33
34    pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
35        register_histogram_vec(
36            "inbox_size",
37            "Inbox size",
38            &[],
39            exponential_bucket_interval(1.0, 2_000_000.0),
40        )
41    });
42
43    pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
44        register_histogram_vec(
45            "removed_bundles",
46            "Number of bundles removed by anticipation",
47            &[],
48            exponential_bucket_interval(1.0, 10_000.0),
49        )
50    });
51}
52
53/// The state of an inbox.
54/// * An inbox is used to track bundles received and executed locally.
55/// * A `MessageBundle` consists of a logical cursor `(height, index)` and some message
56///   content `messages`.
57/// * On the surface, an inbox looks like a FIFO queue: the main APIs are `add_bundle` and
58///   `remove_bundle`.
59/// * However, bundles can also be removed before they are added. When this happens,
60///   the bundles removed by anticipation are tracked in a separate queue. Any bundle added
61///   later will be required to match the first removed bundle and so on.
62/// * The cursors of added bundles (resp. removed bundles) must be increasing over time.
63/// * Reconciliation of added and removed bundles is allowed to skip some added bundles.
64///   However, the opposite is not true: every removed bundle must be eventually added.
65#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
66#[derive(Allocative, Debug, ClonableView, View)]
67#[allocative(bound = "C")]
68pub struct InboxStateView<C>
69where
70    C: Clone + Context,
71{
72    /// We have already added all the messages below this height and index.
73    pub next_cursor_to_add: RegisterView<C, Cursor>,
74    /// We have already removed all the messages below this height and index.
75    pub next_cursor_to_remove: RegisterView<C, Cursor>,
76    /// These bundles have been added and are waiting to be removed.
77    pub added_bundles: QueueView<C, MessageBundle>,
78    /// These bundles have been removed by anticipation and are waiting to be added.
79    /// At least one of `added_bundles` and `removed_bundles` should be empty.
80    pub removed_bundles: QueueView<C, MessageBundle>,
81    /// When this inbox was restored from a checkpoint, the sender's
82    /// `next_cursor_to_remove` at that time. Bundles with cursors strictly below this
83    /// are silently dropped from `add_bundle` / `remove_bundle`: their effects are
84    /// already baked into the restored execution state, so re-delivering them on this
85    /// validator (e.g. when a sender re-pushes) must be a no-op rather than re-enter
86    /// the inbox or the removed-by-anticipation queue.
87    pub restored_cursor: RegisterView<C, Cursor>,
88}
89
90#[derive(Error, Debug)]
91pub(crate) enum InboxError {
92    #[error(transparent)]
93    ViewError(#[from] ViewError),
94    #[error(transparent)]
95    ArithmeticError(#[from] ArithmeticError),
96    #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
97    UnexpectedBundle {
98        bundle: MessageBundle,
99        previous_bundle: MessageBundle,
100    },
101    #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
102    IncorrectOrder {
103        bundle: MessageBundle,
104        next_cursor: Cursor,
105    },
106    #[error(
107        "{bundle:?} cannot be skipped: it must be received before the next \
108        messages from the same origin"
109    )]
110    UnskippableBundle { bundle: MessageBundle },
111}
112
113impl From<(ChainId, ChainId, InboxError)> for ChainError {
114    fn from(value: (ChainId, ChainId, InboxError)) -> Self {
115        let (chain_id, origin, error) = value;
116        match error {
117            InboxError::ViewError(e) => ChainError::ViewError(e),
118            InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
119            InboxError::UnexpectedBundle {
120                bundle,
121                previous_bundle,
122            } => ChainError::UnexpectedMessage {
123                chain_id,
124                origin,
125                bundle: Box::new(bundle),
126                previous_bundle: Box::new(previous_bundle),
127            },
128            InboxError::IncorrectOrder {
129                bundle,
130                next_cursor,
131            } => ChainError::IncorrectMessageOrder {
132                chain_id,
133                origin,
134                bundle: Box::new(bundle),
135                next_height: next_cursor.height,
136                next_index: next_cursor.index,
137            },
138            InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
139                chain_id,
140                origin,
141                bundle: Box::new(bundle),
142            },
143        }
144    }
145}
146
147impl<C> InboxStateView<C>
148where
149    C: Context + Clone + 'static,
150{
151    /// Converts the internal cursor for added bundles into an externally-visible block height.
152    /// This makes sense because the rest of the system always adds bundles one block at a time.
153    pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
154        let cursor = self.next_cursor_to_add.get();
155        if cursor.index == 0 {
156            Ok(cursor.height)
157        } else {
158            Ok(cursor.height.try_add_one()?)
159        }
160    }
161
162    /// Observes the current inbox size in the metrics histogram.
163    pub fn observe_size_metric(&self) {
164        #[cfg(with_metrics)]
165        metrics::INBOX_SIZE
166            .with_label_values(&[])
167            .observe(self.added_bundles.count() as f64);
168    }
169
170    /// Reconciles this inbox with the producer's snapshot at checkpoint time.
171    ///
172    /// `next_cursor_to_remove` is chain-state-derived and takes the snapshot's
173    /// value (a lagging validator's higher pre-restore advancement is being
174    /// rolled back along with the execution state). `next_cursor_to_add` only
175    /// ratchets up — a lagging validator may already have received bundles past
176    /// the snapshot, and forgetting those deliveries would either lose them or
177    /// trip the inbox-gap check on the next cross-chain update. Pre-existing
178    /// `added_bundles` entries strictly below the cutoff are dropped (their
179    /// effects are baked into the restored execution state) and the
180    /// anticipated-remove queue is cleared since it came from pre-restore
181    /// blocks the rollback has invalidated.
182    ///
183    /// Errors if `restored_cursor` already sits past `cursor`: that means a
184    /// later checkpoint has already bootstrapped this inbox, and the chain
185    /// worker's dispatch should never have routed an earlier one here.
186    pub async fn restore_from_checkpoint(&mut self, cursor: Cursor) -> Result<(), ChainError> {
187        ensure!(
188            *self.restored_cursor.get() <= cursor,
189            ChainError::InternalError(format!(
190                "cannot restore inbox at cursor {cursor:?}: already bootstrapped \
191                 from a later checkpoint at cursor {previous:?}",
192                previous = *self.restored_cursor.get(),
193            ))
194        );
195        self.restored_cursor.set(cursor);
196        if *self.next_cursor_to_add.get() < cursor {
197            self.next_cursor_to_add.set(cursor);
198        }
199        self.next_cursor_to_remove.set(cursor);
200        while let Some(front) = self.added_bundles.front().await? {
201            if front.cursor() >= cursor {
202                break;
203            }
204            self.added_bundles.delete_front();
205        }
206        self.removed_bundles.clear();
207        Ok(())
208    }
209
210    /// Consumes a bundle from the inbox.
211    ///
212    /// Returns `true` if the bundle was already known, i.e. it was present in `added_bundles`.
213    pub(crate) async fn remove_bundle(
214        &mut self,
215        bundle: &MessageBundle,
216    ) -> Result<bool, InboxError> {
217        // Record the latest cursor.
218        let cursor = bundle.cursor();
219        if cursor < *self.restored_cursor.get() {
220            // Bundle's effects are already in the restored execution state; treat the
221            // consumption as a no-op without touching `removed_bundles` so the queue
222            // doesn't fill with bundles a sender will never push again.
223            return Ok(true);
224        }
225        ensure!(
226            cursor >= *self.next_cursor_to_remove.get(),
227            InboxError::IncorrectOrder {
228                bundle: bundle.clone(),
229                next_cursor: *self.next_cursor_to_remove.get(),
230            }
231        );
232        // Discard added bundles with lower cursors (if any).
233        while let Some(previous_bundle) = self.added_bundles.front().await? {
234            if previous_bundle.cursor() >= cursor {
235                break;
236            }
237            ensure!(
238                previous_bundle.is_skippable(),
239                InboxError::UnskippableBundle {
240                    bundle: previous_bundle
241                }
242            );
243            self.added_bundles.delete_front();
244            tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
245        }
246        // Reconcile the bundle with the next added bundle, or mark it as removed.
247        let already_known = match self.added_bundles.front().await? {
248            Some(previous_bundle) => {
249                // Rationale: If the two cursors are equal, then the bundles should match.
250                // Otherwise, at this point we know that `self.next_cursor_to_add >
251                // previous_bundle.cursor() > cursor`. Notably, `bundle` will never be
252                // added in the future. Therefore, we should fail instead of adding
253                // it to `self.removed_bundles`.
254                ensure!(
255                    bundle == &previous_bundle,
256                    InboxError::UnexpectedBundle {
257                        previous_bundle,
258                        bundle: bundle.clone(),
259                    }
260                );
261                self.added_bundles.delete_front();
262                tracing::trace!("Consuming bundle {:?}", bundle);
263                true
264            }
265            None => {
266                tracing::trace!("Marking bundle as expected: {:?}", bundle);
267                self.removed_bundles.push_back(bundle.clone());
268                #[cfg(with_metrics)]
269                metrics::REMOVED_BUNDLES
270                    .with_label_values(&[])
271                    .observe(self.removed_bundles.count() as f64);
272                false
273            }
274        };
275        self.next_cursor_to_remove.set(cursor.try_add_one()?);
276        Ok(already_known)
277    }
278
279    /// Pushes a bundle to the inbox. The verifications should not fail in production unless
280    /// many validators are faulty.
281    ///
282    /// Returns `true` if the bundle was new, `false` if it was already in `removed_bundles`.
283    pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
284        // Record the latest cursor.
285        let cursor = bundle.cursor();
286        if cursor < *self.restored_cursor.get() {
287            // The sender is re-delivering a bundle whose effects are already baked
288            // into our restored execution state. Silently drop it.
289            return Ok(false);
290        }
291        ensure!(
292            cursor >= *self.next_cursor_to_add.get(),
293            InboxError::IncorrectOrder {
294                bundle: bundle.clone(),
295                next_cursor: *self.next_cursor_to_add.get(),
296            }
297        );
298        // Find if the bundle was removed ahead of time.
299        let newly_added = match self.removed_bundles.front().await? {
300            Some(previous_bundle) => {
301                if previous_bundle.cursor() == cursor {
302                    // We already executed this bundle by anticipation. Remove it from
303                    // the queue.
304                    ensure!(
305                        bundle == previous_bundle,
306                        InboxError::UnexpectedBundle {
307                            previous_bundle,
308                            bundle,
309                        }
310                    );
311                    self.removed_bundles.delete_front();
312                    #[cfg(with_metrics)]
313                    metrics::REMOVED_BUNDLES
314                        .with_label_values(&[])
315                        .observe(self.removed_bundles.count() as f64);
316                } else {
317                    // The receiver has already executed a later bundle from the same
318                    // sender ahead of time so we should skip this one.
319                    ensure!(
320                        cursor < previous_bundle.cursor() && bundle.is_skippable(),
321                        InboxError::UnexpectedBundle {
322                            previous_bundle,
323                            bundle,
324                        }
325                    );
326                }
327                false
328            }
329            None => {
330                // Otherwise, schedule the messages for execution.
331                self.added_bundles.push_back(bundle);
332                true
333            }
334        };
335        self.next_cursor_to_add.set(cursor.try_add_one()?);
336        Ok(newly_added)
337    }
338}
339
340#[cfg(with_testing)]
341impl InboxStateView<MemoryContext<()>>
342where
343    MemoryContext<()>: Context + Clone + Send + Sync + 'static,
344{
345    pub async fn new() -> Self {
346        let context = MemoryContext::new_for_testing(());
347        Self::load(context)
348            .await
349            .expect("Loading from memory should work")
350    }
351}