Skip to main content

linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Cursor,
14        Epoch, NonCanonicalBTreeMap, NonCanonicalBTreeSet, OracleResponse, Timestamp,
15    },
16    ensure,
17    hashed::Hashed,
18    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19    ownership::ChainOwnership,
20    time::{Duration, Instant},
21};
22use linera_execution::{
23    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
24    Message, Operation, OutgoingMessage, PreparedCheckpoint, Query, QueryContext, QueryOutcome,
25    ResourceController, ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
26};
27use linera_views::{
28    context::Context,
29    log_view::LogView,
30    map_view::{CustomMapView, MapView},
31    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32    register_view::RegisterView,
33    set_view::SetView,
34    views::{ClonableView, RootView, View},
35};
36use serde::{Deserialize, Serialize};
37use tracing::{info, instrument, warn};
38
39use crate::{
40    block::{Block, ConfirmedBlock},
41    block_tracker::BlockExecutionTracker,
42    data_types::{
43        BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
44        IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
45    },
46    inbox::{InboxError, InboxStateView},
47    manager::ChainManager,
48    outbox::OutboxStateView,
49    pending_blobs::PendingBlobsView,
50    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62    use std::sync::LazyLock;
63
64    use linera_base::prometheus_util::{
65        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
66    };
67    use linera_execution::ResourceTracker;
68    use prometheus::{HistogramVec, IntCounterVec};
69
70    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72    });
73
74    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75        register_histogram_vec(
76            "block_execution_latency",
77            "Block execution latency",
78            &[],
79            exponential_bucket_interval(50.0_f64, 10_000_000.0),
80        )
81    });
82
83    #[cfg(with_metrics)]
84    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85        register_histogram_vec(
86            "message_execution_latency",
87            "Message execution latency",
88            &[],
89            exponential_bucket_interval(0.1_f64, 50_000.0),
90        )
91    });
92
93    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "operation_execution_latency",
96            "Operation execution latency",
97            &[],
98            exponential_bucket_interval(0.1_f64, 50_000.0),
99        )
100    });
101
102    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "wasm_fuel_used_per_block",
105            "Wasm fuel used per block",
106            &[],
107            exponential_bucket_interval(10.0, 100_000_000.0),
108        )
109    });
110
111    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "evm_fuel_used_per_block",
114            "EVM fuel used per block",
115            &[],
116            exponential_bucket_interval(10.0, 100_000_000.0),
117        )
118    });
119
120    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "vm_num_reads_per_block",
123            "VM number of reads per block",
124            &[],
125            exponential_bucket_interval(0.1, 100.0),
126        )
127    });
128
129    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "vm_bytes_read_per_block",
132            "VM number of bytes read per block",
133            &[],
134            exponential_bucket_interval(0.1, 10_000_000.0),
135        )
136    });
137
138    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139        register_histogram_vec(
140            "vm_bytes_written_per_block",
141            "VM number of bytes written per block",
142            &[],
143            exponential_bucket_interval(0.1, 10_000_000.0),
144        )
145    });
146
147    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148        register_histogram_vec(
149            "state_hash_computation_latency",
150            "Time to recompute the state hash, in microseconds",
151            &[],
152            exponential_bucket_interval(1.0, 2_000_000.0),
153        )
154    });
155
156    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157        register_histogram_vec(
158            "num_outboxes",
159            "Number of outboxes",
160            &[],
161            exponential_bucket_interval(1.0, 1_000_000.0),
162        )
163    });
164
165    pub static OUTBOX_COUNTERS_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
166        register_histogram_vec(
167            "outbox_counters_size",
168            "Number of entries in the outbox_counters map (in-flight message heights)",
169            &[],
170            exponential_bucket_interval(1.0, 1_000_000.0),
171        )
172    });
173
174    /// Tracks block execution metrics in Prometheus.
175    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177        WASM_FUEL_USED_PER_BLOCK
178            .with_label_values(&[])
179            .observe(tracker.wasm_fuel as f64);
180        EVM_FUEL_USED_PER_BLOCK
181            .with_label_values(&[])
182            .observe(tracker.evm_fuel as f64);
183        VM_NUM_READS_PER_BLOCK
184            .with_label_values(&[])
185            .observe(tracker.read_operations as f64);
186        VM_BYTES_READ_PER_BLOCK
187            .with_label_values(&[])
188            .observe(tracker.bytes_read as f64);
189        VM_BYTES_WRITTEN_PER_BLOCK
190            .with_label_values(&[])
191            .observe(tracker.bytes_written as f64);
192    }
193}
194
195/// The BCS-serialized size of an empty [`Block`].
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198/// A set of fully-tracked chains. Wrapped in [`Hashed`] (as `Hashed<ChainIdSet>`) so the hash that
199/// identifies the set — stored in [`ChainStateView::outbox_index_tracked_hash`] to detect when the
200/// outbox indices must be reconciled — is computed once when the tracked set changes rather than on
201/// every cross-chain operation. The hash is order-independent because `BTreeSet` iterates in sorted
202/// order.
203#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct ChainIdSet(pub BTreeSet<ChainId>);
205
206impl linera_base::crypto::BcsHashable<'_> for ChainIdSet {}
207
208impl std::ops::Deref for ChainIdSet {
209    type Target = BTreeSet<ChainId>;
210
211    fn deref(&self) -> &Self::Target {
212        &self.0
213    }
214}
215
216/// A view accessing the state of a chain.
217#[cfg_attr(
218    with_graphql,
219    derive(async_graphql::SimpleObject),
220    graphql(cache_control(no_cache))
221)]
222#[derive(Debug, RootView, ClonableView, Allocative)]
223#[allocative(bound = "C")]
224pub struct ChainStateView<C>
225where
226    C: Clone + Context + 'static,
227{
228    /// Execution state, including system and user applications.
229    pub execution_state: ExecutionStateView<C>,
230
231    /// Block-chaining state.
232    pub tip_state: RegisterView<C, ChainTipState>,
233
234    /// Consensus state.
235    pub manager: ChainManager<C>,
236    /// Pending validated block that is still missing blobs.
237    /// The incomplete set of blobs for the pending validated block.
238    pub pending_validated_blobs: PendingBlobsView<C>,
239    /// The incomplete sets of blobs for upcoming proposals.
240    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
241
242    /// Hashes of all known blocks in this chain, indexed by their height. A block at
243    /// `height < next_block_height` is executed; a block at `height >= next_block_height`
244    /// is preprocessed (verified but not yet executed) and may not be contiguous.
245    pub block_hashes: CustomMapView<C, BlockHeight, CryptoHash>,
246    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
247    pub received_log: LogView<C, ChainAndHeight>,
248    /// The number of `received_log` entries we have synchronized, for each validator.
249    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
250
251    /// Mailboxes used to receive messages indexed by their origin.
252    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
253    /// Mailboxes used to send messages, indexed by their target.
254    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
255    /// The indices of next events we expect to see per stream (could be ahead of the last
256    /// executed block in sparse chains).
257    pub next_expected_events: MapView<C, StreamId, u32>,
258    /// Number of outgoing messages in flight for each block height.
259    /// We use a `RegisterView` to prioritize speed for small maps.
260    pub outbox_counters: RegisterView<C, NonCanonicalBTreeMap<BlockHeight, u32>>,
261    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
262    pub nonempty_outboxes: RegisterView<C, NonCanonicalBTreeSet<ChainId>>,
263
264    /// Inboxes with at least one pending added bundle. This allows us to avoid loading all inboxes.
265    pub nonempty_inboxes: RegisterView<C, NonCanonicalBTreeSet<ChainId>>,
266
267    /// The local wall-clock time when block 0 was last executed. Used to prevent
268    /// reset-on-incorrect-outcome from looping: if not enough time has elapsed since
269    /// the last reset, the error is returned instead.
270    pub block_zero_executed_at: RegisterView<C, Timestamp>,
271
272    /// The height at which the next block can be preprocessed: one past the highest
273    /// height in `block_hashes` (executed or preprocessed), or `next_block_height` if
274    /// `block_hashes` is empty.
275    ///
276    /// Maintained as an O(1) shortcut for `next_height_to_preprocess`, since
277    /// `CustomMapView` does not yet expose a `last_index` lookup. Once
278    /// `linera-views` gains efficient first/last key support, this field can be
279    /// removed in favor of `block_hashes.last_index()`.
280    pub next_height_to_preprocess: RegisterView<C, BlockHeight>,
281
282    /// The height of the most recent checkpoint block applied to this chain, if any.
283    /// Maintained by `apply_confirmed_block` whenever a block starting with
284    /// `SystemOperation::Checkpoint` is executed.
285    pub latest_checkpoint_height: RegisterView<C, Option<BlockHeight>>,
286
287    /// Hashes of pre-checkpoint sender blocks the chain has seen a checkpoint cert
288    /// vouch for via `outbox_block_hashes`, but whose actual cert bytes are not yet
289    /// in storage. The worker errors a checkpoint push with
290    /// `MissingPreCheckpointBlocks` when this set is non-empty, then accepts each
291    /// referenced cert (regardless of its own — possibly revoked — epoch) and
292    /// removes the entry. Once the set is empty, the checkpoint restoration can run
293    /// end-to-end.
294    pub pre_checkpoint_block_trust: SetView<C, CryptoHash>,
295
296    /// The hash of the set of fully-tracked chains that `nonempty_outboxes` and
297    /// `outbox_counters` were last reconciled against. On a client these two indices only hold
298    /// entries for tracked targets; when the tracked set changes this hash stops matching and the
299    /// indices are reconciled (`reconcile_outbox_index`). `None` means
300    /// they have never been filtered — a pre-existing database entry (migration), or a validator
301    /// that tracks all chains and never filters.
302    pub outbox_index_tracked_hash: RegisterView<C, Option<CryptoHash>>,
303}
304
305/// Block-chaining state.
306#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
307#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
308pub struct ChainTipState {
309    /// Hash of the latest certified block in this chain, if any.
310    pub block_hash: Option<CryptoHash>,
311    /// Sequence number tracking blocks.
312    pub next_block_height: BlockHeight,
313    /// Number of incoming message bundles.
314    pub num_incoming_bundles: u32,
315    /// Number of operations.
316    pub num_operations: u32,
317    /// Number of outgoing messages.
318    pub num_outgoing_messages: u32,
319}
320
321impl ChainTipState {
322    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
323    /// expected parent.
324    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
325        ensure!(
326            new_block.height == self.next_block_height,
327            ChainError::UnexpectedBlockHeight {
328                expected_block_height: self.next_block_height,
329                found_block_height: new_block.height
330            }
331        );
332        ensure!(
333            new_block.previous_block_hash == self.block_hash,
334            ChainError::UnexpectedPreviousBlockHash
335        );
336        Ok(())
337    }
338
339    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
340    /// it is higher than the tip.
341    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
342        ensure!(
343            self.next_block_height >= height,
344            ChainError::MissingEarlierBlocks {
345                current_block_height: self.next_block_height,
346            }
347        );
348        Ok(self.next_block_height > height)
349    }
350
351    /// Checks if the measurement counters would be valid.
352    pub fn update_counters(
353        &mut self,
354        transactions: &[Transaction],
355        messages: &[Vec<OutgoingMessage>],
356    ) -> Result<(), ChainError> {
357        let mut num_incoming_bundles = 0u32;
358        let mut num_operations = 0u32;
359
360        for transaction in transactions {
361            match transaction {
362                Transaction::ReceiveMessages(_) => {
363                    num_incoming_bundles = num_incoming_bundles
364                        .checked_add(1)
365                        .ok_or(ArithmeticError::Overflow)?;
366                }
367                Transaction::ExecuteOperation(_) => {
368                    num_operations = num_operations
369                        .checked_add(1)
370                        .ok_or(ArithmeticError::Overflow)?;
371                }
372            }
373        }
374
375        self.num_incoming_bundles = self
376            .num_incoming_bundles
377            .checked_add(num_incoming_bundles)
378            .ok_or(ArithmeticError::Overflow)?;
379
380        self.num_operations = self
381            .num_operations
382            .checked_add(num_operations)
383            .ok_or(ArithmeticError::Overflow)?;
384
385        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
386            .map_err(|_| ArithmeticError::Overflow)?;
387        self.num_outgoing_messages = self
388            .num_outgoing_messages
389            .checked_add(num_outgoing_messages)
390            .ok_or(ArithmeticError::Overflow)?;
391
392        Ok(())
393    }
394}
395
396impl<C> ChainStateView<C>
397where
398    C: Context + Clone + 'static,
399    C::Extra: ExecutionRuntimeContext,
400{
401    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
402    pub fn chain_id(&self) -> ChainId {
403        self.context().extra().chain_id()
404    }
405
406    #[instrument(skip_all, fields(
407        chain_id = %self.chain_id(),
408    ))]
409    pub async fn query_application(
410        &mut self,
411        local_time: Timestamp,
412        query: Query,
413        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
414    ) -> Result<QueryOutcome, ChainError> {
415        let context = QueryContext {
416            chain_id: self.chain_id(),
417            next_block_height: self.tip_state.get().next_block_height,
418            local_time,
419        };
420        self.execution_state
421            .query_application(context, query, service_runtime_endpoint)
422            .await
423            .with_execution_context(ChainExecutionContext::Query)
424    }
425
426    #[instrument(skip_all, fields(
427        chain_id = %self.chain_id(),
428        application_id = %application_id
429    ))]
430    pub async fn describe_application(
431        &mut self,
432        application_id: ApplicationId,
433    ) -> Result<ApplicationDescription, ChainError> {
434        self.execution_state
435            .system
436            .describe_application(application_id, &mut TransactionTracker::default())
437            .await
438            .with_execution_context(ChainExecutionContext::DescribeApplication)
439    }
440
441    #[instrument(skip_all, fields(
442        chain_id = %self.chain_id(),
443        target = %target,
444        height = %height
445    ))]
446    pub async fn mark_messages_as_received(
447        &mut self,
448        target: &ChainId,
449        height: BlockHeight,
450        tracked: Option<&ChainIdSet>,
451    ) -> Result<bool, ChainError> {
452        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
453        let updates = outbox.mark_messages_as_received(height).await?;
454        if updates.is_empty() {
455            return Ok(false);
456        }
457        // `outbox_counters` is keyed by block height and shared across all recipients of that
458        // block, but only counts targets we index: every chain on a validator (`tracked == None`),
459        // or tracked targets on a client. An untracked target was never counted, so confirming it
460        // must NOT touch the counters at all — a present `counter[height]` belongs to a tracked
461        // sibling recipient of the same block and must be left intact. We only drain the queue
462        // (done above) for such a target.
463        if tracked.is_none_or(|tracked| tracked.contains(target)) {
464            for update in updates {
465                let counter = self
466                    .outbox_counters
467                    .get_mut()
468                    .get_mut(&update)
469                    .ok_or_else(|| {
470                        ChainError::CorruptedChainState("message counter should be present".into())
471                    })?;
472                *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
473                if *counter == 0 {
474                    // Important for the test in `all_messages_delivered_up_to`.
475                    self.outbox_counters.get_mut().remove(&update);
476                }
477            }
478        }
479        if outbox.queue.count() == 0 {
480            self.nonempty_outboxes.get_mut().remove(target);
481            // If the outbox is empty and not ahead of the executed blocks, remove it.
482            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
483                self.outboxes.remove_entry(target)?;
484            }
485        }
486        #[cfg(with_metrics)]
487        metrics::NUM_OUTBOXES
488            .with_label_values(&[])
489            .observe(self.nonempty_outboxes.get().len() as f64);
490        #[cfg(with_metrics)]
491        metrics::OUTBOX_COUNTERS_SIZE
492            .with_label_values(&[])
493            .observe(self.outbox_counters.get().len() as f64);
494        Ok(true)
495    }
496
497    /// Returns true if there are no more outgoing messages in flight up to the given
498    /// block height.
499    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
500        tracing::debug!(
501            "Messages left in {:.8}'s outbox: {:?}",
502            self.chain_id(),
503            self.outbox_counters.get()
504        );
505        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
506            key > &height
507        } else {
508            true
509        }
510    }
511
512    /// Invariant for the states of active chains.
513    pub async fn is_active(&self) -> Result<bool, ChainError> {
514        Ok(self.execution_state.system.is_active().await?)
515    }
516
517    /// Initializes the chain if it is not active yet.
518    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
519        let chain_id = self.chain_id();
520        // Initialize ourselves.
521        if self
522            .execution_state
523            .system
524            .initialize_chain(chain_id)
525            .await
526            .with_execution_context(ChainExecutionContext::Block)?
527        {
528            // The chain was already initialized.
529            return Ok(());
530        }
531        let maybe_committee = self
532            .execution_state
533            .system
534            .current_committee()
535            .await
536            .with_execution_context(ChainExecutionContext::Block)?;
537        // Last, reset the consensus state based on the current ownership.
538        self.manager.reset(
539            self.execution_state.system.ownership.get().await?.clone(),
540            BlockHeight(0),
541            local_time,
542            maybe_committee
543                .iter()
544                .flat_map(|(_, committee)| committee.account_keys_and_weights()),
545        )?;
546        Ok(())
547    }
548
549    /// Inserts `(height, hash)` into `block_hashes` and updates the
550    /// `next_height_to_preprocess` register accordingly. Every write to
551    /// `block_hashes` must go through this helper so the register stays in sync.
552    fn insert_block_hash(
553        &mut self,
554        height: BlockHeight,
555        hash: CryptoHash,
556    ) -> Result<(), ChainError> {
557        self.block_hashes.insert(&height, hash)?;
558        let next = self.next_height_to_preprocess.get_mut();
559        if *next <= height {
560            *next = height.try_add_one()?;
561        }
562        Ok(())
563    }
564
565    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
566    /// internal error if the bundle doesn't appear to be new, based on the sender's
567    /// height. The value `local_time` is specific to each validator and only used for
568    /// round timeouts.
569    ///
570    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
571    #[instrument(skip_all, fields(
572        chain_id = %self.chain_id(),
573        origin = %origin,
574        bundle_height = %bundle.height
575    ))]
576    pub async fn receive_message_bundle_with_inbox(
577        &mut self,
578        inbox: &mut InboxStateView<C>,
579        origin: &ChainId,
580        bundle: MessageBundle,
581        local_time: Timestamp,
582        add_to_received_log: bool,
583    ) -> Result<(), ChainError> {
584        assert!(!bundle.messages.is_empty());
585        let chain_id = self.chain_id();
586        tracing::trace!(
587            "Processing new messages from {origin} at height {}",
588            bundle.height,
589        );
590        let chain_and_height = ChainAndHeight {
591            chain_id: *origin,
592            height: bundle.height,
593        };
594
595        match self.initialize_if_needed(local_time).await {
596            Ok(_) => (),
597            // if the only issue was that we couldn't initialize the chain because of a
598            // missing chain description blob, we might still want to update the inbox
599            Err(ChainError::ExecutionError(exec_err, _))
600                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
601                if blobs.iter().all(|blob_id| {
602                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
603                })) => {}
604            err => {
605                return err;
606            }
607        }
608
609        // Process the inbox bundle and update the inbox state.
610        let newly_added = inbox
611            .add_bundle(bundle)
612            .await
613            .map_err(|error| match error {
614                InboxError::ViewError(error) => ChainError::ViewError(error),
615                error => ChainError::CorruptedChainState(format!(
616                    "while processing messages in certified block: {error}"
617                )),
618            })?;
619        if newly_added {
620            self.nonempty_inboxes.get_mut().insert(*origin);
621        }
622
623        // Remember the certificate for future validator/client synchronizations.
624        if add_to_received_log {
625            self.received_log.push(chain_and_height);
626        }
627        Ok(())
628    }
629
630    /// Updates the `received_log` trackers.
631    pub fn update_received_certificate_trackers(
632        &mut self,
633        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
634    ) {
635        for (name, tracker) in new_trackers {
636            self.received_certificate_trackers
637                .get_mut()
638                .entry(name)
639                .and_modify(|t| {
640                    // Because several synchronizations could happen in parallel, we need to make
641                    // sure to never go backward.
642                    if tracker > *t {
643                        *t = tracker;
644                    }
645                })
646                .or_insert(tracker);
647        }
648    }
649
650    pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
651        let chain_id = self.chain_id();
652        self.execution_state
653            .system
654            .current_committee()
655            .await
656            .with_execution_context(ChainExecutionContext::Block)?
657            .ok_or(ChainError::InactiveChain(chain_id))
658    }
659
660    pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
661        Ok(self.execution_state.system.ownership.get().await?)
662    }
663
664    /// Removes the incoming message bundles in the block from the inboxes.
665    ///
666    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
667    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
668    /// and `false` if the block is already confirmed.
669    #[instrument(skip_all, fields(
670        chain_id = %self.chain_id(),
671    ))]
672    pub async fn remove_bundles_from_inboxes(
673        &mut self,
674        timestamp: Timestamp,
675        must_be_present: bool,
676        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
677    ) -> Result<(), ChainError> {
678        let chain_id = self.chain_id();
679        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
680        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
681            ensure!(
682                bundle.timestamp <= timestamp,
683                ChainError::IncorrectBundleTimestamp {
684                    chain_id,
685                    bundle_timestamp: bundle.timestamp,
686                    block_timestamp: timestamp,
687                }
688            );
689            let bundles = bundles_by_origin.entry(*origin).or_default();
690            bundles.push(bundle);
691        }
692        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
693        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
694        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
695            tracing::trace!(
696                "Removing [{}] from inbox for {origin}",
697                bundles
698                    .iter()
699                    .map(|bundle| bundle.height.to_string())
700                    .collect::<Vec<_>>()
701                    .join(", ")
702            );
703            for bundle in bundles {
704                // Mark the message as processed in the inbox.
705                let was_present = inbox
706                    .remove_bundle(bundle)
707                    .await
708                    .map_err(|error| (chain_id, origin, error))?;
709                if must_be_present {
710                    ensure!(
711                        was_present,
712                        ChainError::MissingCrossChainUpdate {
713                            chain_id,
714                            origin,
715                            height: bundle.height,
716                        }
717                    );
718                }
719            }
720            inbox.observe_size_metric();
721            if inbox.added_bundles.count() == 0 {
722                self.nonempty_inboxes.get_mut().remove(&origin);
723            }
724        }
725        Ok(())
726    }
727
728    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
729    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
730        self.nonempty_outboxes.get().iter().copied().collect()
731    }
732
733    /// Returns the outboxes for the given targets, or an error if any of them are missing.
734    pub async fn load_outboxes(
735        &self,
736        targets: &[ChainId],
737    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
738        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
739        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
740        optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
741    }
742
743    /// Reconciles the `nonempty_outboxes` and `outbox_counters` indices from the retained outbox
744    /// queues, rebuilding only when the tracked set changed since the last call (i.e. the stored
745    /// [`Self::outbox_index_tracked_hash`] no longer matches). The per-target outbox *queues* in
746    /// `outboxes` are always kept; only these indices are filtered. Returns whether a rebuild
747    /// actually happened, so a read-only caller can skip persisting when nothing changed.
748    pub async fn reconcile_outbox_index(
749        &mut self,
750        tracked: Option<&Hashed<ChainIdSet>>,
751    ) -> Result<bool, ChainError> {
752        if *self.outbox_index_tracked_hash.get() == tracked.map(|tracked| tracked.hash()) {
753            return Ok(false);
754        }
755        self.rebuild_outbox_index(tracked).await?;
756        Ok(true)
757    }
758
759    /// Rebuilds `nonempty_outboxes` and `outbox_counters` from the retained outbox queues for the
760    /// tracked set (every retained queue on a validator, `tracked == None`), and stamps
761    /// [`Self::outbox_index_tracked_hash`]. Unlike [`Self::reconcile_outbox_index`] this always
762    /// rebuilds, so callers that have just rewritten the queues can refresh the indices regardless
763    /// of the stored stamp.
764    async fn rebuild_outbox_index(
765        &mut self,
766        tracked: Option<&Hashed<ChainIdSet>>,
767    ) -> Result<(), ChainError> {
768        self.nonempty_outboxes.get_mut().clear();
769        self.outbox_counters.get_mut().clear();
770        // In full mode (`None`) there is no tracked subset to iterate, so re-index from the keys of
771        // every retained outbox queue.
772        let targets = match tracked {
773            Some(tracked) => tracked.inner().iter().copied().collect::<Vec<_>>(),
774            None => self.outboxes.indices().await?,
775        };
776        for target in &targets {
777            let heights = {
778                let Some(outbox) = self.outboxes.try_load_entry(target).await? else {
779                    continue;
780                };
781                outbox.queue.elements().await?
782            };
783            if heights.is_empty() {
784                continue;
785            }
786            for height in heights {
787                *self.outbox_counters.get_mut().entry(height).or_default() += 1;
788            }
789            self.nonempty_outboxes.get_mut().insert(*target);
790        }
791        self.outbox_index_tracked_hash
792            .set(tracked.map(|tracked| tracked.hash()));
793        Ok(())
794    }
795
796    /// Returns whether the outbox index is already reconciled to `tracked` (the stored hash
797    /// matches), so the read-only network-actions path can read it without a write-lock rebuild.
798    pub fn outbox_index_is_reconciled(&self, tracked: Option<&Hashed<ChainIdSet>>) -> bool {
799        *self.outbox_index_tracked_hash.get() == tracked.map(|tracked| tracked.hash())
800    }
801
802    /// Executes a block with a specified policy for handling bundle failures.
803    #[expect(clippy::too_many_arguments)]
804    #[instrument(skip_all, fields(
805        chain_id = %block.chain_id,
806        block_height = %block.height
807    ))]
808    async fn execute_block_inner(
809        chain: &mut ExecutionStateView<C>,
810        block_hashes: &CustomMapView<C, BlockHeight, CryptoHash>,
811        block: &mut ProposedBlock,
812        local_time: Timestamp,
813        round: Option<u32>,
814        published_blobs: &[Blob],
815        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
816        exec_policy: BundleExecutionPolicy,
817        checkpoint_origin_cursors: Vec<(ChainId, Cursor)>,
818        checkpoint_inbox_cursors: Vec<(ChainId, Cursor)>,
819        checkpoint_outbox_block_hashes: Vec<CryptoHash>,
820    ) -> Result<(BlockExecutionOutcome, ResourceTracker, HashSet<ChainId>), ChainError> {
821        // AutoRetry is incompatible with replaying oracle responses because discarding or
822        // rejecting bundles would change which transactions execute.
823        if !matches!(&exec_policy.on_failure, BundleFailurePolicy::Abort) {
824            assert!(
825                replaying_oracle_responses.is_none(),
826                "Cannot use AutoRetry policy when replaying oracle responses"
827            );
828        }
829
830        #[cfg(with_metrics)]
831        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
832
833        // Resolve the current epoch's resource policy first: `prepare_checkpoint` needs
834        // `maximum_blob_size` to chunk the dump, and `current_committee` is a pure read
835        // so it can run before the dump without tainting the inner view's pending-changes
836        // set.
837        let committee_policy = chain
838            .system
839            .current_committee()
840            .await
841            .with_execution_context(ChainExecutionContext::Block)?
842            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
843            .1
844            .policy()
845            .clone();
846
847        // Pre-block hook: if this block contains a `SystemOperation::Checkpoint`, dump the
848        // execution state from storage *now*, before any block-level mutation taints the
849        // inner view's pending-changes set. The matching operation handler will publish
850        // the resulting blobs without re-dumping; subsequent fees and other state changes
851        // accumulate normally and end up persisted with the override hash on save. The
852        // inbox snapshot was already taken by the outer `execute_block` and passed in as
853        // `checkpoint_origin_cursors`; we bundle the two halves into a
854        // [`PreparedCheckpoint`] for the handler.
855        let prepared_checkpoint = if block.starts_with_checkpoint() {
856            let blobs = chain
857                .prepare_checkpoint(committee_policy.maximum_blob_size)
858                .await
859                .with_execution_context(ChainExecutionContext::Block)?;
860            Some(PreparedCheckpoint {
861                blobs,
862                origin_cursors: checkpoint_origin_cursors,
863                inbox_cursors: checkpoint_inbox_cursors,
864                outbox_block_hashes: checkpoint_outbox_block_hashes,
865            })
866        } else {
867            None
868        };
869
870        chain.system.timestamp.set(block.timestamp);
871
872        let mut resource_controller = ResourceController::new(
873            Arc::new(committee_policy),
874            ResourceTracker::default(),
875            block.authenticated_owner,
876        );
877
878        for blob in published_blobs {
879            let blob_id = blob.id();
880            resource_controller
881                .policy()
882                .check_blob_size(blob.content())
883                .with_execution_context(ChainExecutionContext::Block)?;
884            chain.system.used_blobs.insert(&blob_id)?;
885        }
886
887        let mut block_execution_tracker = BlockExecutionTracker::new(
888            &mut resource_controller,
889            published_blobs
890                .iter()
891                .map(|blob| (blob.id(), blob))
892                .collect(),
893            local_time,
894            replaying_oracle_responses,
895            block,
896        )?;
897        if let Some(prepared) = prepared_checkpoint {
898            block_execution_tracker.set_prepared_checkpoint(prepared);
899        }
900
901        // Extract failure-policy parameters from exec_policy.
902        let (max_failures, never_reject_application_ids) = match &exec_policy.on_failure {
903            BundleFailurePolicy::Abort => (0, Arc::new(HashSet::new())),
904            BundleFailurePolicy::AutoRetry {
905                max_failures,
906                never_reject_application_ids,
907            } => (*max_failures, never_reject_application_ids.clone()),
908        };
909        let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
910        let mut failure_count = 0u32;
911        let mut never_reject_discarded_origins = HashSet::new();
912
913        let time_budget = exec_policy.time_budget;
914        let mut cumulative_bundle_time = Duration::ZERO;
915
916        let mut i = 0;
917        while i < block.transactions.len() {
918            let transaction = &mut block.transactions[i];
919            let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
920            let is_stream_update = transaction.is_update_stream();
921
922            // If we have a time budget and it's been exceeded, discard remaining bundles.
923            if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
924                info!(
925                    ?cumulative_bundle_time,
926                    ?time_budget,
927                    "Time budget for bundle staging exceeded, discarding remaining bundles"
928                );
929                Self::discard_remaining_bundles(block, i, None);
930                continue;
931            }
932
933            let checkpoint = if auto_retry && (is_bundle || is_stream_update) {
934                Some((
935                    chain.clone_unchecked()?,
936                    block_execution_tracker.create_checkpoint(),
937                ))
938            } else {
939                None
940            };
941
942            let bundle_start = if is_bundle && time_budget.is_some() {
943                Some(Instant::now())
944            } else {
945                None
946            };
947
948            let result = block_execution_tracker
949                .execute_transaction(&*transaction, round, chain)
950                .await;
951
952            if let Some(start) = bundle_start {
953                cumulative_bundle_time += start.elapsed();
954            }
955
956            // If the transaction executed successfully, we move on to the next one.
957            // On transient errors (e.g. missing blobs) we fail, so it can be retried after
958            // syncing. In auto-retry mode, we can discard or reject message bundles that failed
959            // with non-transient errors.
960            match (result, transaction, checkpoint) {
961                (Ok(()), _, _) => {
962                    i += 1;
963                }
964                (
965                    Err(ChainError::ExecutionError(error, _context)),
966                    Transaction::ReceiveMessages(incoming_bundle),
967                    Some((saved_chain, saved_tracker)),
968                ) if !error.is_transient_error() && error.is_limit_error() && i > 0 => {
969                    // Restore checkpoint.
970                    *chain = saved_chain;
971                    block_execution_tracker.restore_checkpoint(&saved_tracker);
972                    failure_count += 1;
973                    // If we've exceeded max failures, discard all remaining message bundles.
974                    let maybe_sender = if failure_count > max_failures {
975                        info!(
976                            failure_count,
977                            max_failures,
978                            "Exceeded max bundle failures, discarding all remaining message \
979                            bundles and stream updates"
980                        );
981                        Self::discard_remaining_stream_updates(block, i);
982                        None
983                    } else {
984                        // Not the first - discard it and same-sender subsequent bundles.
985                        info!(
986                            %error,
987                            index = i,
988                            origin = %incoming_bundle.origin,
989                            "Message bundle exceeded block limits and will be discarded for \
990                            retry in a later block"
991                        );
992                        Some(incoming_bundle.origin)
993                    };
994                    Self::discard_remaining_bundles(block, i, maybe_sender);
995                    // Do not increment i - the next transaction is now at i.
996                }
997                (
998                    Err(ChainError::ExecutionError(error, context)),
999                    Transaction::ReceiveMessages(incoming_bundle),
1000                    Some((saved_chain, saved_tracker)),
1001                ) if !error.is_transient_error() => {
1002                    // Restore checkpoint.
1003                    *chain = saved_chain;
1004                    block_execution_tracker.restore_checkpoint(&saved_tracker);
1005
1006                    let all_messages_never_reject = !never_reject_application_ids.is_empty()
1007                        && incoming_bundle.messages().all(|posted_msg| {
1008                            never_reject_application_ids
1009                                .contains(&posted_msg.message.application_id())
1010                        });
1011                    if (all_messages_never_reject || incoming_bundle.bundle.is_protected())
1012                        && incoming_bundle.action != MessageAction::Reject
1013                    {
1014                        let origin = incoming_bundle.origin;
1015                        never_reject_discarded_origins.insert(origin);
1016                        warn!(
1017                            %error,
1018                            index = i,
1019                            %origin,
1020                            "Message bundle cannot be rejected (protected or never-reject); \
1021                            discarding the bundle (and same-sender subsequent bundles) for retry \
1022                            in a later block"
1023                        );
1024                        Self::discard_remaining_bundles(block, i, Some(origin));
1025                    } else if incoming_bundle.action == MessageAction::Reject {
1026                        // Failed rejected bundles fail the block.
1027                        return Err(ChainError::ExecutionError(error, context));
1028                    } else {
1029                        // Reject the bundle: either a non-limit error, or the first bundle
1030                        // exceeded limits (and is inherently too large for any block).
1031                        info!(
1032                            %error,
1033                            index = i,
1034                            origin = %incoming_bundle.origin,
1035                            "Message bundle failed to execute and will be rejected"
1036                        );
1037                        incoming_bundle.action = MessageAction::Reject;
1038                    }
1039                    // Do not increment i - retry the transaction after modification.
1040                }
1041                (
1042                    Err(ChainError::ExecutionError(error, _context)),
1043                    transaction,
1044                    Some((saved_chain, saved_tracker)),
1045                ) if transaction.is_update_stream()
1046                    && !error.is_transient_error()
1047                    && error.is_limit_error()
1048                    && i > 0 =>
1049                {
1050                    // Restore checkpoint.
1051                    *chain = saved_chain;
1052                    block_execution_tracker.restore_checkpoint(&saved_tracker);
1053                    failure_count += 1;
1054                    if failure_count > max_failures {
1055                        info!(
1056                            failure_count,
1057                            max_failures,
1058                            "Exceeded max failures, discarding all remaining stream updates and \
1059                            message bundles"
1060                        );
1061                        Self::discard_remaining_bundles(block, i, None);
1062                        Self::discard_remaining_stream_updates(block, i);
1063                    } else {
1064                        info!(
1065                            %error,
1066                            index = i,
1067                            "UpdateStream exceeded block limits, discarding for retry"
1068                        );
1069                        block.transactions.remove(i);
1070                    }
1071                    // Do not increment i - the next transaction is now at i.
1072                }
1073                (Err(e), _, _) => return Err(e),
1074            };
1075        }
1076
1077        // This can only happen if all transactions were incoming bundles that all got discarded
1078        // due to resource limit errors. This is unlikely in practice but theoretically possible.
1079        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1080
1081        let recipients = block_execution_tracker.recipients();
1082        let non_ack_tx_indices = block_execution_tracker.non_checkpoint_ack_tx_indices();
1083        let mut recipient_heights = Vec::new();
1084        for (recipient, height) in chain
1085            .previous_message_blocks
1086            .multi_get_pairs(recipients)
1087            .await?
1088        {
1089            // Only `CheckpointAck`-only blocks are excluded from the chain-level
1090            // tracking. Otherwise the recipient never acknowledges (a
1091            // `CheckpointAck` doesn't trigger a return `CheckpointAck`), so the
1092            // entry would never get trimmed. Off-chain outbox bookkeeping further
1093            // down still queues these for delivery; only the
1094            // `previous_message_blocks` / `unfinalized_message_blocks` chain skips
1095            // them.
1096            if let Some(tx_indices) = non_ack_tx_indices.get(&recipient) {
1097                chain
1098                    .previous_message_blocks
1099                    .insert(&recipient, block.height)?;
1100                // Track each non-CheckpointAck bundle's cursor as pending
1101                // acknowledgement from the recipient. We don't know this block's
1102                // hash yet (we're mid-execution); the checkpoint pre-block hook
1103                // resolves heights to hashes via `block_hashes` when it builds the
1104                // oracle response.
1105                let mut cursors = chain
1106                    .system
1107                    .unfinalized_message_blocks
1108                    .get(&recipient)
1109                    .await?
1110                    .unwrap_or_default();
1111                for index in tx_indices {
1112                    cursors.insert(Cursor {
1113                        height: block.height,
1114                        index: *index,
1115                    });
1116                }
1117                chain
1118                    .system
1119                    .unfinalized_message_blocks
1120                    .insert(&recipient, cursors)?;
1121            }
1122            if let Some(height) = height {
1123                recipient_heights.push((recipient, height));
1124            }
1125        }
1126        let hashes = block_hashes
1127            .multi_get(recipient_heights.iter().map(|(_, height)| height))
1128            .await?;
1129        let mut previous_message_blocks = BTreeMap::new();
1130        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
1131            let hash = hash.ok_or_else(|| {
1132                ChainError::CorruptedChainState("missing entry in block_hashes".into())
1133            })?;
1134            previous_message_blocks.insert(recipient, (hash, height));
1135        }
1136
1137        let streams = block_execution_tracker.event_streams();
1138        let mut stream_heights = Vec::new();
1139        for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
1140            chain.previous_event_blocks.insert(&stream, block.height)?;
1141            if let Some(height) = height {
1142                stream_heights.push((stream, height));
1143            }
1144        }
1145        let hashes = block_hashes
1146            .multi_get(stream_heights.iter().map(|(_, height)| height))
1147            .await?;
1148        let mut previous_event_blocks = BTreeMap::new();
1149        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
1150            let hash = hash.ok_or_else(|| {
1151                ChainError::CorruptedChainState("missing entry in block_hashes".into())
1152            })?;
1153            previous_event_blocks.insert(stream, (hash, height));
1154        }
1155
1156        let state_hash = {
1157            #[cfg(with_metrics)]
1158            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
1159            chain.crypto_hash_mut().await?
1160        };
1161
1162        let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
1163            block_execution_tracker.finalize(block.transactions.len());
1164
1165        Ok((
1166            BlockExecutionOutcome {
1167                messages,
1168                previous_message_blocks,
1169                previous_event_blocks,
1170                state_hash,
1171                oracle_responses,
1172                events,
1173                blobs,
1174                operation_results,
1175            },
1176            resource_tracker,
1177            never_reject_discarded_origins,
1178        ))
1179    }
1180
1181    fn discard_remaining_stream_updates(block: &mut ProposedBlock, mut index: usize) {
1182        while index < block.transactions.len() {
1183            if block.transactions[index].is_update_stream() {
1184                block.transactions.remove(index);
1185            } else {
1186                index += 1;
1187            }
1188        }
1189    }
1190
1191    fn discard_remaining_bundles(
1192        block: &mut ProposedBlock,
1193        mut index: usize,
1194        maybe_origin: Option<ChainId>,
1195    ) {
1196        while index < block.transactions.len() {
1197            if matches!(
1198                &block.transactions[index],
1199                Transaction::ReceiveMessages(bundle)
1200                if maybe_origin.is_none_or(|origin| bundle.origin == origin)
1201            ) {
1202                block.transactions.remove(index);
1203            } else {
1204                index += 1;
1205            }
1206        }
1207    }
1208
1209    /// Executes a block with a specified policy for handling bundle failures.
1210    ///
1211    /// This method supports automatic retry with checkpointing when bundles fail:
1212    /// - For limit errors (block too large, fuel exceeded, etc.): the bundle is discarded
1213    ///   so it can be retried in a later block, unless it's the first transaction
1214    ///   (which gets rejected as inherently too large).
1215    /// - For non-limit errors: the bundle is rejected (triggering bounced messages).
1216    /// - After `max_failures` failed bundles, all remaining message bundles are discarded.
1217    ///
1218    /// The block may be modified to reflect the actual executed transactions.
1219    #[instrument(skip_all, fields(
1220        chain_id = %self.chain_id(),
1221        block_height = %block.height
1222    ))]
1223    pub async fn execute_block(
1224        &mut self,
1225        mut block: ProposedBlock,
1226        local_time: Timestamp,
1227        round: Option<u32>,
1228        published_blobs: &[Blob],
1229        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
1230        policy: BundleExecutionPolicy,
1231    ) -> Result<
1232        (
1233            ProposedBlock,
1234            BlockExecutionOutcome,
1235            ResourceTracker,
1236            HashSet<ChainId>,
1237        ),
1238        ChainError,
1239    > {
1240        assert_eq!(
1241            block.chain_id,
1242            self.execution_state.context().extra().chain_id()
1243        );
1244
1245        self.initialize_if_needed(local_time).await?;
1246
1247        let chain_timestamp = *self.execution_state.system.timestamp.get();
1248        ensure!(
1249            chain_timestamp <= block.timestamp,
1250            ChainError::InvalidBlockTimestamp {
1251                parent: chain_timestamp,
1252                new: block.timestamp
1253            }
1254        );
1255        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1256
1257        ensure!(
1258            block.published_blob_ids()
1259                == published_blobs
1260                    .iter()
1261                    .map(|blob| blob.id())
1262                    .collect::<BTreeSet<_>>(),
1263            ChainError::InternalError("published_blobs mismatch".to_string())
1264        );
1265
1266        if *self.execution_state.system.closed.get() {
1267            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
1268        }
1269
1270        Self::check_app_permissions(
1271            self.execution_state
1272                .system
1273                .application_permissions
1274                .get()
1275                .await?,
1276            &block,
1277        )?;
1278
1279        ensure!(
1280            !block
1281                .transactions
1282                .iter()
1283                .skip(1)
1284                .any(Transaction::is_checkpoint),
1285            ChainError::CheckpointPreconditionFailed(
1286                "Checkpoint must be the first transaction in its block",
1287            )
1288        );
1289        let (origin_cursors, inbox_cursors, outbox_block_hashes) = if block.starts_with_checkpoint()
1290        {
1291            self.check_checkpoint_preconditions().await?;
1292            let origin_cursors = self.collect_inbox_cursors().await?;
1293            let inbox_cursors = self.collect_all_inbox_cursors().await?;
1294            let hashes = self.collect_unfinalized_block_hashes().await?;
1295            (origin_cursors, inbox_cursors, hashes)
1296        } else {
1297            (Vec::new(), Vec::new(), Vec::new())
1298        };
1299
1300        Self::execute_block_inner(
1301            &mut self.execution_state,
1302            &self.block_hashes,
1303            &mut block,
1304            local_time,
1305            round,
1306            published_blobs,
1307            replaying_oracle_responses,
1308            policy,
1309            origin_cursors,
1310            inbox_cursors,
1311            outbox_block_hashes,
1312        )
1313        .await
1314        .map(|(outcome, tracker, never_reject_origins)| {
1315            (block, outcome, tracker, never_reject_origins)
1316        })
1317    }
1318
1319    /// Snapshots `(origin, next_cursor_to_remove)` for each chain we've received a
1320    /// non-`Checkpoint` message from since our last own checkpoint. Used by the
1321    /// pre-block hook so the matching operation handler can emit a
1322    /// [`SystemMessage::CheckpointAck`] to each origin chain. Iterating
1323    /// `pending_checkpoint_ack_targets` (instead of every inbox) is what prevents the
1324    /// notification ping-pong: a chain that only ever sent us `Checkpoint`s is
1325    /// excluded here, so we never reply with a `Checkpoint` of our own.
1326    async fn collect_inbox_cursors(&self) -> Result<Vec<(ChainId, Cursor)>, ChainError> {
1327        let targets = self
1328            .execution_state
1329            .system
1330            .pending_checkpoint_ack_targets
1331            .indices()
1332            .await?;
1333        let mut cursors = Vec::with_capacity(targets.len());
1334        for origin in targets {
1335            let Some(inbox) = self.inboxes.try_load_entry(&origin).await? else {
1336                continue;
1337            };
1338            cursors.push((origin, *inbox.next_cursor_to_remove.get()));
1339        }
1340        Ok(cursors)
1341    }
1342
1343    /// Snapshots `(origin, next_cursor_to_remove)` for every inbox with a non-default
1344    /// `next_cursor_to_remove`. Recorded in the checkpoint's oracle response so a
1345    /// bootstrapping node can seed each inbox's `restored_cursor` and silently drop
1346    /// any sender re-pushes whose effects are already in the restored execution state.
1347    async fn collect_all_inbox_cursors(&self) -> Result<Vec<(ChainId, Cursor)>, ChainError> {
1348        let origins = self.inboxes.indices().await?;
1349        let mut cursors = Vec::new();
1350        for origin in origins {
1351            let Some(inbox) = self.inboxes.try_load_entry(&origin).await? else {
1352                continue;
1353            };
1354            let cursor = *inbox.next_cursor_to_remove.get();
1355            if cursor != Cursor::default() {
1356                cursors.push((origin, cursor));
1357            }
1358        }
1359        Ok(cursors)
1360    }
1361
1362    /// Re-populates `outboxes`, `outbox_counters`, and `nonempty_outboxes` from
1363    /// the on-chain `unfinalized_message_blocks` map after a checkpoint
1364    /// bootstrap. Called once after `execution_state.restore_from_content` so
1365    /// the freshly-restored chain can pick up cross-chain delivery for
1366    /// pre-checkpoint messages — the off-chain outbox state isn't part of the
1367    /// certified checkpoint blob, so without this a bootstrapped node would
1368    /// silently stop pushing pending messages forward.
1369    pub async fn restore_outboxes_from_unfinalized(
1370        &mut self,
1371        tracked: Option<&Hashed<ChainIdSet>>,
1372    ) -> Result<(), ChainError> {
1373        // A lagging validator hit by a checkpoint push may already have outbox
1374        // queues from blocks it processed before the gap. Clear them so the
1375        // rebuild from `unfinalized_message_blocks` doesn't append duplicate
1376        // heights onto stale queues (the counters/nonempty set below `set`
1377        // wholesale, but the queues are appended to per recipient).
1378        let prior_recipients = self.outboxes.indices().await?;
1379        for recipient in prior_recipients {
1380            let mut outbox = self.outboxes.try_load_entry_mut(&recipient).await?;
1381            outbox.queue.clear();
1382            outbox.next_height_to_schedule.set(BlockHeight::ZERO);
1383        }
1384        let entries = self
1385            .execution_state
1386            .system
1387            .unfinalized_message_blocks
1388            .index_values()
1389            .await?;
1390        for (recipient, cursors) in entries {
1391            if cursors.is_empty() {
1392                continue;
1393            }
1394            // Dedup by height: the outbox queue tracks block heights only (multiple
1395            // bundles at the same height share a single queue entry).
1396            let heights = cursors
1397                .into_iter()
1398                .map(|cursor| cursor.height)
1399                .collect::<BTreeSet<_>>();
1400            let mut outbox = self.outboxes.try_load_entry_mut(&recipient).await?;
1401            for height in &heights {
1402                outbox.queue.push_back(*height);
1403            }
1404            let max_height = *heights
1405                .last()
1406                .expect("the empty case was filtered out above");
1407            outbox
1408                .next_height_to_schedule
1409                .set(max_height.try_add_one()?);
1410        }
1411        // The queues are now authoritative; rebuild the tracked-only indices from them.
1412        self.rebuild_outbox_index(tracked).await?;
1413        Ok(())
1414    }
1415
1416    /// Collects the hashes of every block on this chain still listed in the on-chain
1417    /// `unfinalized_message_blocks` map. The checkpoint pre-block hook calls this to
1418    /// build the oracle response's `outbox_block_hashes`, so the checkpoint
1419    /// certificate transitively re-certifies those older (possibly revoked-epoch)
1420    /// blocks.
1421    async fn collect_unfinalized_block_hashes(&self) -> Result<Vec<CryptoHash>, ChainError> {
1422        let heights = self.collect_unfinalized_heights().await?;
1423        let mut hashes = Vec::with_capacity(heights.len());
1424        for height in heights {
1425            let hash = self.block_hashes.get(&height).await?.ok_or_else(|| {
1426                ChainError::CorruptedChainState(format!(
1427                    "missing entry in block_hashes at height {height}"
1428                ))
1429            })?;
1430            hashes.push(hash);
1431        }
1432        Ok(hashes)
1433    }
1434
1435    /// Returns the sorted, deduplicated set of block heights referenced by the on-chain
1436    /// `unfinalized_message_blocks` map (one entry per height even if multiple bundles
1437    /// at that height are still unfinalized). Used both when building the checkpoint
1438    /// oracle response (to resolve heights to hashes via `block_hashes`) and on the
1439    /// bootstrap path (to zip with the certified `outbox_block_hashes` from the
1440    /// response).
1441    pub async fn collect_unfinalized_heights(&self) -> Result<BTreeSet<BlockHeight>, ChainError> {
1442        let mut heights = BTreeSet::new();
1443        let entries = self
1444            .execution_state
1445            .system
1446            .unfinalized_message_blocks
1447            .index_values()
1448            .await?;
1449        for (_, per_recipient) in entries {
1450            heights.extend(per_recipient.into_iter().map(|cursor| cursor.height));
1451        }
1452        Ok(heights)
1453    }
1454
1455    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
1456    /// manager. This does not touch the execution state itself, which must be updated separately.
1457    /// Returns the set of event streams that were updated as a result of applying the block.
1458    #[instrument(skip_all, fields(
1459        chain_id = %self.chain_id(),
1460        block_height = %block.inner().inner().header.height
1461    ))]
1462    pub async fn apply_confirmed_block(
1463        &mut self,
1464        block: &ConfirmedBlock,
1465        local_time: Timestamp,
1466        tracked: Option<&ChainIdSet>,
1467    ) -> Result<BTreeSet<StreamId>, ChainError> {
1468        let hash = block.inner().hash();
1469        let block = block.inner().inner();
1470        if block.header.height == BlockHeight::ZERO {
1471            self.block_zero_executed_at.set(local_time);
1472        }
1473        let updated_streams = self.process_emitted_events(block).await?;
1474        self.process_outgoing_messages(block, tracked).await?;
1475
1476        // Last, reset the consensus state based on the current ownership.
1477        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1478            .await?;
1479
1480        // Advance to next block height.
1481        let tip = self.tip_state.get_mut();
1482        tip.block_hash = Some(hash);
1483        tip.next_block_height.try_add_assign_one()?;
1484        tip.update_counters(&block.body.transactions, &block.body.messages)?;
1485        self.insert_block_hash(block.header.height, hash)?;
1486        if block.body.starts_with_checkpoint() {
1487            self.latest_checkpoint_height.set(Some(block.header.height));
1488        }
1489        Ok(updated_streams)
1490    }
1491
1492    /// Adds a block to `block_hashes` as preprocessed, and updates the outboxes where possible.
1493    /// Returns the set of streams that were updated as a result of preprocessing the block.
1494    #[instrument(skip_all, fields(
1495        chain_id = %self.chain_id(),
1496        block_height = %block.inner().inner().header.height
1497    ))]
1498    pub async fn preprocess_block(
1499        &mut self,
1500        block: &ConfirmedBlock,
1501        tracked: Option<&ChainIdSet>,
1502    ) -> Result<BTreeSet<StreamId>, ChainError> {
1503        let hash = block.inner().hash();
1504        let block = block.inner().inner();
1505        let height = block.header.height;
1506        if height < self.tip_state.get().next_block_height {
1507            return Ok(BTreeSet::new());
1508        }
1509        self.process_outgoing_messages(block, tracked).await?;
1510        let updated_streams = self.process_emitted_events(block).await?;
1511        self.insert_block_hash(height, hash)?;
1512        Ok(updated_streams)
1513    }
1514
1515    /// Verifies that the block is valid according to the chain's application permission settings.
1516    #[instrument(skip_all, fields(
1517        block_height = %block.height,
1518        num_transactions = %block.transactions.len()
1519    ))]
1520    fn check_app_permissions(
1521        app_permissions: &ApplicationPermissions,
1522        block: &ProposedBlock,
1523    ) -> Result<(), ChainError> {
1524        let mut mandatory = app_permissions
1525            .mandatory_applications
1526            .iter()
1527            .copied()
1528            .collect::<HashSet<ApplicationId>>();
1529        for transaction in &block.transactions {
1530            match transaction {
1531                Transaction::ExecuteOperation(operation)
1532                    if operation.is_exempt_from_permissions() =>
1533                {
1534                    mandatory.clear()
1535                }
1536                Transaction::ExecuteOperation(operation) => {
1537                    ensure!(
1538                        app_permissions.can_execute_operations(&operation.application_id()),
1539                        ChainError::AuthorizedApplications(
1540                            app_permissions.execute_operations.clone().unwrap()
1541                        )
1542                    );
1543                    if let Operation::User { application_id, .. } = operation {
1544                        mandatory.remove(application_id);
1545                    }
1546                }
1547                Transaction::ReceiveMessages(incoming_bundle)
1548                    if incoming_bundle.action == MessageAction::Accept =>
1549                {
1550                    for pending in incoming_bundle.messages() {
1551                        if let Message::User { application_id, .. } = &pending.message {
1552                            mandatory.remove(application_id);
1553                        }
1554                    }
1555                }
1556                Transaction::ReceiveMessages(_) => {}
1557            }
1558        }
1559        ensure!(
1560            mandatory.is_empty(),
1561            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1562        );
1563        Ok(())
1564    }
1565
1566    /// Validates the chain-state-level preconditions for a `SystemOperation::Checkpoint`:
1567    /// no event stream tracker is set.
1568    ///
1569    /// The structural invariant that Checkpoint must be the *first* transaction in its
1570    /// block is enforced unconditionally in `execute_block`, independently of these
1571    /// preconditions. Sender-side event conditions (no events ever published) are
1572    /// validated inside `ExecutionStateView::prepare_checkpoint`. Outgoing messages
1573    /// and consumed incoming bundles are no longer preconditions: the on-chain
1574    /// `unfinalized_message_blocks` map and the per-inbox `restored_cursor` (seeded
1575    /// from the checkpoint's oracle response) together carry everything a
1576    /// bootstrapping node needs.
1577    async fn check_checkpoint_preconditions(&self) -> Result<(), ChainError> {
1578        let mut had_event_tracker = false;
1579        self.next_expected_events
1580            .for_each_index_while(|_| {
1581                had_event_tracker = true;
1582                Ok(false)
1583            })
1584            .await?;
1585        ensure!(
1586            !had_event_tracker,
1587            ChainError::CheckpointPreconditionFailed("chain has consumed events")
1588        );
1589
1590        Ok(())
1591    }
1592
1593    /// Returns the hashes of all blocks we have at the given heights, in input order.
1594    /// Unknown heights are skipped.
1595    #[instrument(skip_all, fields(
1596        chain_id = %self.chain_id(),
1597        next_block_height = %self.tip_state.get().next_block_height,
1598    ))]
1599    pub async fn block_hashes_for_heights(
1600        &self,
1601        heights: impl IntoIterator<Item = BlockHeight>,
1602    ) -> Result<Vec<CryptoHash>, ChainError> {
1603        let heights = heights.into_iter().collect::<Vec<_>>();
1604        Ok(self
1605            .block_hashes
1606            .multi_get(&heights)
1607            .await?
1608            .into_iter()
1609            .flatten()
1610            .collect())
1611    }
1612
1613    /// Resets the chain manager for the next block height.
1614    async fn reset_chain_manager(
1615        &mut self,
1616        next_height: BlockHeight,
1617        local_time: Timestamp,
1618    ) -> Result<(), ChainError> {
1619        let maybe_committee = self
1620            .execution_state
1621            .system
1622            .current_committee()
1623            .await
1624            .with_execution_context(ChainExecutionContext::Block)?;
1625        let ownership = self.execution_state.system.ownership.get().await?.clone();
1626        let fallback_owners = maybe_committee
1627            .iter()
1628            .flat_map(|(_, committee)| committee.account_keys_and_weights());
1629        self.pending_validated_blobs.clear();
1630        self.pending_proposed_blobs.clear();
1631        self.manager
1632            .reset(ownership, next_height, local_time, fallback_owners)
1633    }
1634
1635    /// Updates the outboxes with the messages sent in the block.
1636    ///
1637    /// Returns the set of all recipients.
1638    #[instrument(skip_all, fields(
1639        chain_id = %self.chain_id(),
1640        block_height = %block.header.height
1641    ))]
1642    async fn process_outgoing_messages(
1643        &mut self,
1644        block: &Block,
1645        tracked: Option<&ChainIdSet>,
1646    ) -> Result<Vec<ChainId>, ChainError> {
1647        // Record the messages of the execution. Messages are understood within an
1648        // application.
1649        let recipients = block.recipients();
1650        let block_height = block.header.height;
1651        let next_height = self.tip_state.get().next_block_height;
1652
1653        // Update the outboxes. Every recipient's per-target outbox queue is updated, but the
1654        // `nonempty_outboxes` and `outbox_counters` indices are only populated for targets we track.
1655        let targets = recipients.into_iter().collect::<Vec<_>>();
1656        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1657        let mut scheduled_tracked = Vec::new();
1658        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1659            if block_height > next_height {
1660                // There may be a gap in the chain before this block. We can only add it to this
1661                // outbox if the previous message to the same recipient has already been added.
1662                if *outbox.next_height_to_schedule.get() > block_height {
1663                    continue; // We already added this recipient's messages to the outbox.
1664                }
1665                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1666                {
1667                    Some(height) => {
1668                        Some(self.block_hashes.get(&height).await?.ok_or_else(|| {
1669                            ChainError::CorruptedChainState("missing entry in block_hashes".into())
1670                        })?)
1671                    }
1672                    None => None, // No message to that sender was added yet.
1673                };
1674                // Only schedule if this block contains the next message for that recipient.
1675                match (
1676                    maybe_prev_hash,
1677                    block.body.previous_message_blocks.get(target),
1678                ) {
1679                    (None, None) => {
1680                        // No previous message block expected and none indicated by the outbox -
1681                        // all good
1682                    }
1683                    (Some(_), None) => {
1684                        // The outbox already has a previous height for this recipient,
1685                        // but this block's body recorded no predecessor — that means
1686                        // this is the first non-`CheckpointAck` send to this recipient,
1687                        // even though earlier `CheckpointAck`-only blocks have already
1688                        // been added to the off-chain outbox. We can still schedule:
1689                        // the bundle will carry `previous_height = None`, which the
1690                        // receiver accepts as "first ever".
1691                    }
1692                    (None, Some((_, prev_msg_block_height))) => {
1693                        // We have no previously processed block in the outbox, but we are
1694                        // expecting one - this could be due to an empty outbox having been pruned.
1695                        // Only process the outbox if the height of the previous message block is
1696                        // lower than the tip
1697                        if *prev_msg_block_height >= next_height {
1698                            continue;
1699                        }
1700                    }
1701                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1702                        // Only process the outbox if the hashes match. A mismatch can
1703                        // arise legitimately when intermediate `CheckpointAck`-only
1704                        // blocks sit in the off-chain outbox but are skipped from
1705                        // `body.previous_message_blocks`; same fallback as the
1706                        // `(Some, None)` arm above.
1707                        if prev_hash != prev_msg_block_hash {
1708                            continue;
1709                        }
1710                    }
1711                }
1712            }
1713            if outbox.schedule_message(block_height)?
1714                && tracked.is_none_or(|set| set.contains(target))
1715            {
1716                scheduled_tracked.push(*target);
1717            }
1718            #[cfg(with_metrics)]
1719            crate::outbox::metrics::OUTBOX_SIZE
1720                .with_label_values(&[])
1721                .observe(outbox.queue.count() as f64);
1722        }
1723
1724        if !scheduled_tracked.is_empty() {
1725            // All scheduled messages are at `block_height`.
1726            let scheduled_count =
1727                u32::try_from(scheduled_tracked.len()).map_err(|_| ArithmeticError::Overflow)?;
1728            *self
1729                .outbox_counters
1730                .get_mut()
1731                .entry(block_height)
1732                .or_default() += scheduled_count;
1733            let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1734            for target in &scheduled_tracked {
1735                nonempty_outboxes.insert(*target);
1736            }
1737        }
1738
1739        #[cfg(with_metrics)]
1740        metrics::NUM_OUTBOXES
1741            .with_label_values(&[])
1742            .observe(self.nonempty_outboxes.get().len() as f64);
1743        #[cfg(with_metrics)]
1744        metrics::OUTBOX_COUNTERS_SIZE
1745            .with_label_values(&[])
1746            .observe(self.outbox_counters.get().len() as f64);
1747        Ok(targets)
1748    }
1749
1750    /// Updates the event streams with events emitted by the block if they form a contiguous
1751    /// sequence (might not be the case when preprocessing a block).
1752    /// Returns the set of updated event streams.
1753    #[instrument(skip_all, fields(
1754        chain_id = %self.chain_id(),
1755        block_height = %block.header.height
1756    ))]
1757    async fn process_emitted_events(
1758        &mut self,
1759        block: &Block,
1760    ) -> Result<BTreeSet<StreamId>, ChainError> {
1761        let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1762        for event in block.body.events.iter().flatten() {
1763            emitted_streams
1764                .entry(event.stream_id.clone())
1765                .or_default()
1766                .insert(event.index);
1767        }
1768        let mut stream_ids = Vec::new();
1769        let mut list_indices = Vec::new();
1770        for (stream_id, indices) in emitted_streams {
1771            stream_ids.push(stream_id);
1772            list_indices.push(indices);
1773        }
1774
1775        let mut updated_streams = BTreeSet::new();
1776        for ((stream_id, next_index), indices) in self
1777            .next_expected_events
1778            .multi_get_pairs(stream_ids)
1779            .await?
1780            .into_iter()
1781            .zip(list_indices)
1782        {
1783            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1784                // we don't expect the epoch stream to contain event 0
1785                1
1786            } else {
1787                0
1788            };
1789            let mut current_expected_index = next_index.unwrap_or(initial_index);
1790            for index in indices {
1791                if index == current_expected_index {
1792                    updated_streams.insert(stream_id.clone());
1793                    current_expected_index = index.saturating_add(1);
1794                }
1795            }
1796            if current_expected_index != 0 {
1797                self.next_expected_events
1798                    .insert(&stream_id, current_expected_index)?;
1799            }
1800        }
1801        Ok(updated_streams)
1802    }
1803}
1804
1805#[test]
1806fn empty_block_size() {
1807    let size = bcs::serialized_size(&crate::block::Block::new(
1808        crate::test::make_first_block(
1809            linera_execution::test_utils::dummy_chain_description(0).id(),
1810        ),
1811        crate::data_types::BlockExecutionOutcome::default(),
1812    ))
1813    .unwrap();
1814    assert_eq!(size, EMPTY_BLOCK_SIZE);
1815}