linera_chain/
inbox.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use allocative::Allocative;
5use async_graphql::SimpleObject;
6use linera_base::{
7    data_types::{ArithmeticError, BlockHeight},
8    ensure,
9    identifiers::ChainId,
10};
11#[cfg(with_testing)]
12use linera_views::context::MemoryContext;
13use linera_views::{
14    context::Context,
15    queue_view::QueueView,
16    register_view::RegisterView,
17    views::{ClonableView, View},
18    ViewError,
19};
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22
23use crate::{data_types::MessageBundle, ChainError};
24
25#[cfg(test)]
26#[path = "unit_tests/inbox_tests.rs"]
27mod inbox_tests;
28
29#[cfg(with_metrics)]
30mod metrics {
31    use std::sync::LazyLock;
32
33    use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
34    use prometheus::HistogramVec;
35
36    pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
37        register_histogram_vec(
38            "inbox_size",
39            "Inbox size",
40            &[],
41            exponential_bucket_interval(1.0, 2_000_000.0),
42        )
43    });
44
45    pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
46        register_histogram_vec(
47            "removed_bundles",
48            "Number of bundles removed by anticipation",
49            &[],
50            exponential_bucket_interval(1.0, 10_000.0),
51        )
52    });
53}
54
55/// The state of an inbox.
56/// * An inbox is used to track bundles received and executed locally.
57/// * A `MessageBundle` consists of a logical cursor `(height, index)` and some message
58///   content `messages`.
59/// * On the surface, an inbox looks like a FIFO queue: the main APIs are `add_bundle` and
60///   `remove_bundle`.
61/// * However, bundles can also be removed before they are added. When this happens,
62///   the bundles removed by anticipation are tracked in a separate queue. Any bundle added
63///   later will be required to match the first removed bundle and so on.
64/// * The cursors of added bundles (resp. removed bundles) must be increasing over time.
65/// * Reconciliation of added and removed bundles is allowed to skip some added bundles.
66///   However, the opposite is not true: every removed bundle must be eventually added.
67#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
68#[derive(Allocative, Debug, ClonableView, View)]
69#[allocative(bound = "C")]
70pub struct InboxStateView<C>
71where
72    C: Clone + Context,
73{
74    /// We have already added all the messages below this height and index.
75    pub next_cursor_to_add: RegisterView<C, Cursor>,
76    /// We have already removed all the messages below this height and index.
77    pub next_cursor_to_remove: RegisterView<C, Cursor>,
78    /// These bundles have been added and are waiting to be removed.
79    pub added_bundles: QueueView<C, MessageBundle>,
80    /// These bundles have been removed by anticipation and are waiting to be added.
81    /// At least one of `added_bundles` and `removed_bundles` should be empty.
82    pub removed_bundles: QueueView<C, MessageBundle>,
83}
84
85#[derive(
86    Debug,
87    Default,
88    Clone,
89    Copy,
90    Hash,
91    Eq,
92    PartialEq,
93    Ord,
94    PartialOrd,
95    Serialize,
96    Deserialize,
97    SimpleObject,
98    Allocative,
99)]
100pub struct Cursor {
101    height: BlockHeight,
102    index: u32,
103}
104
105#[derive(Error, Debug)]
106pub(crate) enum InboxError {
107    #[error(transparent)]
108    ViewError(#[from] ViewError),
109    #[error(transparent)]
110    ArithmeticError(#[from] ArithmeticError),
111    #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
112    UnexpectedBundle {
113        bundle: MessageBundle,
114        previous_bundle: MessageBundle,
115    },
116    #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
117    IncorrectOrder {
118        bundle: MessageBundle,
119        next_cursor: Cursor,
120    },
121    #[error(
122        "{bundle:?} cannot be skipped: it must be received before the next \
123        messages from the same origin"
124    )]
125    UnskippableBundle { bundle: MessageBundle },
126}
127
128impl From<&MessageBundle> for Cursor {
129    #[inline]
130    fn from(bundle: &MessageBundle) -> Self {
131        Self {
132            height: bundle.height,
133            index: bundle.transaction_index,
134        }
135    }
136}
137
138impl Cursor {
139    fn try_add_one(self) -> Result<Self, ArithmeticError> {
140        let value = Self {
141            height: self.height,
142            index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
143        };
144        Ok(value)
145    }
146}
147
148impl From<(ChainId, ChainId, InboxError)> for ChainError {
149    fn from(value: (ChainId, ChainId, InboxError)) -> Self {
150        let (chain_id, origin, error) = value;
151        match error {
152            InboxError::ViewError(e) => ChainError::ViewError(e),
153            InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
154            InboxError::UnexpectedBundle {
155                bundle,
156                previous_bundle,
157            } => ChainError::UnexpectedMessage {
158                chain_id,
159                origin,
160                bundle: Box::new(bundle),
161                previous_bundle: Box::new(previous_bundle),
162            },
163            InboxError::IncorrectOrder {
164                bundle,
165                next_cursor,
166            } => ChainError::IncorrectMessageOrder {
167                chain_id,
168                origin,
169                bundle: Box::new(bundle),
170                next_height: next_cursor.height,
171                next_index: next_cursor.index,
172            },
173            InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
174                chain_id,
175                origin,
176                bundle: Box::new(bundle),
177            },
178        }
179    }
180}
181
182impl<C> InboxStateView<C>
183where
184    C: Context + Clone + 'static,
185{
186    /// Converts the internal cursor for added bundles into an externally-visible block height.
187    /// This makes sense because the rest of the system always adds bundles one block at a time.
188    pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
189        let cursor = self.next_cursor_to_add.get();
190        if cursor.index == 0 {
191            Ok(cursor.height)
192        } else {
193            Ok(cursor.height.try_add_one()?)
194        }
195    }
196
197    /// Consumes a bundle from the inbox.
198    ///
199    /// Returns `true` if the bundle was already known, i.e. it was present in `added_bundles`.
200    pub(crate) async fn remove_bundle(
201        &mut self,
202        bundle: &MessageBundle,
203    ) -> Result<bool, InboxError> {
204        // Record the latest cursor.
205        let cursor = Cursor::from(bundle);
206        ensure!(
207            cursor >= *self.next_cursor_to_remove.get(),
208            InboxError::IncorrectOrder {
209                bundle: bundle.clone(),
210                next_cursor: *self.next_cursor_to_remove.get(),
211            }
212        );
213        // Discard added bundles with lower cursors (if any).
214        while let Some(previous_bundle) = self.added_bundles.front().await? {
215            if Cursor::from(&previous_bundle) >= cursor {
216                break;
217            }
218            ensure!(
219                previous_bundle.is_skippable(),
220                InboxError::UnskippableBundle {
221                    bundle: previous_bundle
222                }
223            );
224            self.added_bundles.delete_front();
225            #[cfg(with_metrics)]
226            metrics::INBOX_SIZE
227                .with_label_values(&[])
228                .observe(self.added_bundles.count() as f64);
229            tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
230        }
231        // Reconcile the bundle with the next added bundle, or mark it as removed.
232        let already_known = match self.added_bundles.front().await? {
233            Some(previous_bundle) => {
234                // Rationale: If the two cursors are equal, then the bundles should match.
235                // Otherwise, at this point we know that `self.next_cursor_to_add >
236                // Cursor::from(&previous_bundle) > cursor`. Notably, `bundle` will never be
237                // added in the future. Therefore, we should fail instead of adding
238                // it to `self.removed_bundles`.
239                ensure!(
240                    bundle == &previous_bundle,
241                    InboxError::UnexpectedBundle {
242                        previous_bundle,
243                        bundle: bundle.clone(),
244                    }
245                );
246                self.added_bundles.delete_front();
247                #[cfg(with_metrics)]
248                metrics::INBOX_SIZE
249                    .with_label_values(&[])
250                    .observe(self.added_bundles.count() as f64);
251                tracing::trace!("Consuming bundle {:?}", bundle);
252                true
253            }
254            None => {
255                tracing::trace!("Marking bundle as expected: {:?}", bundle);
256                self.removed_bundles.push_back(bundle.clone());
257                #[cfg(with_metrics)]
258                metrics::REMOVED_BUNDLES
259                    .with_label_values(&[])
260                    .observe(self.removed_bundles.count() as f64);
261                false
262            }
263        };
264        self.next_cursor_to_remove.set(cursor.try_add_one()?);
265        Ok(already_known)
266    }
267
268    /// Pushes a bundle to the inbox. The verifications should not fail in production unless
269    /// many validators are faulty.
270    ///
271    /// Returns `true` if the bundle was new, `false` if it was already in `removed_bundles`.
272    pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
273        // Record the latest cursor.
274        let cursor = Cursor::from(&bundle);
275        ensure!(
276            cursor >= *self.next_cursor_to_add.get(),
277            InboxError::IncorrectOrder {
278                bundle: bundle.clone(),
279                next_cursor: *self.next_cursor_to_add.get(),
280            }
281        );
282        // Find if the bundle was removed ahead of time.
283        let newly_added = match self.removed_bundles.front().await? {
284            Some(previous_bundle) => {
285                if Cursor::from(&previous_bundle) == cursor {
286                    // We already executed this bundle by anticipation. Remove it from
287                    // the queue.
288                    ensure!(
289                        bundle == previous_bundle,
290                        InboxError::UnexpectedBundle {
291                            previous_bundle,
292                            bundle,
293                        }
294                    );
295                    self.removed_bundles.delete_front();
296                    #[cfg(with_metrics)]
297                    metrics::REMOVED_BUNDLES
298                        .with_label_values(&[])
299                        .observe(self.removed_bundles.count() as f64);
300                } else {
301                    // The receiver has already executed a later bundle from the same
302                    // sender ahead of time so we should skip this one.
303                    ensure!(
304                        cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
305                        InboxError::UnexpectedBundle {
306                            previous_bundle,
307                            bundle,
308                        }
309                    );
310                }
311                false
312            }
313            None => {
314                // Otherwise, schedule the messages for execution.
315                self.added_bundles.push_back(bundle);
316                #[cfg(with_metrics)]
317                metrics::INBOX_SIZE
318                    .with_label_values(&[])
319                    .observe(self.added_bundles.count() as f64);
320                true
321            }
322        };
323        self.next_cursor_to_add.set(cursor.try_add_one()?);
324        Ok(newly_added)
325    }
326}
327
328#[cfg(with_testing)]
329impl InboxStateView<MemoryContext<()>>
330where
331    MemoryContext<()>: Context + Clone + Send + Sync + 'static,
332{
333    pub async fn new() -> Self {
334        let context = MemoryContext::new_for_testing(());
335        Self::load(context)
336            .await
337            .expect("Loading from memory should work")
338    }
339}