linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14        OracleResponse, Timestamp,
15    },
16    ensure,
17    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18    ownership::ChainOwnership,
19    time::{Duration, Instant},
20};
21use linera_execution::{
22    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23    Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27    context::Context,
28    log_view::LogView,
29    map_view::MapView,
30    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31    register_view::RegisterView,
32    views::{ClonableView, RootView, View},
33};
34use serde::{Deserialize, Serialize};
35use tracing::{info, instrument};
36
37use crate::{
38    block::{Block, ConfirmedBlock},
39    block_tracker::BlockExecutionTracker,
40    data_types::{
41        BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
42        IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
43    },
44    inbox::{InboxError, InboxStateView},
45    manager::ChainManager,
46    outbox::OutboxStateView,
47    pending_blobs::PendingBlobsView,
48    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
49};
50
51#[cfg(test)]
52#[path = "unit_tests/chain_tests.rs"]
53mod chain_tests;
54
55#[cfg(with_metrics)]
56use linera_base::prometheus_util::MeasureLatency;
57
58#[cfg(with_metrics)]
59pub(crate) mod metrics {
60    use std::sync::LazyLock;
61
62    use linera_base::prometheus_util::{
63        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
64    };
65    use linera_execution::ResourceTracker;
66    use prometheus::{HistogramVec, IntCounterVec};
67
68    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
70    });
71
72    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
73        register_histogram_vec(
74            "block_execution_latency",
75            "Block execution latency",
76            &[],
77            exponential_bucket_interval(50.0_f64, 10_000_000.0),
78        )
79    });
80
81    #[cfg(with_metrics)]
82    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
83        register_histogram_vec(
84            "message_execution_latency",
85            "Message execution latency",
86            &[],
87            exponential_bucket_interval(0.1_f64, 50_000.0),
88        )
89    });
90
91    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
92        register_histogram_vec(
93            "operation_execution_latency",
94            "Operation execution latency",
95            &[],
96            exponential_bucket_interval(0.1_f64, 50_000.0),
97        )
98    });
99
100    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
101        register_histogram_vec(
102            "wasm_fuel_used_per_block",
103            "Wasm fuel used per block",
104            &[],
105            exponential_bucket_interval(10.0, 100_000_000.0),
106        )
107    });
108
109    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
110        register_histogram_vec(
111            "evm_fuel_used_per_block",
112            "EVM fuel used per block",
113            &[],
114            exponential_bucket_interval(10.0, 100_000_000.0),
115        )
116    });
117
118    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
119        register_histogram_vec(
120            "vm_num_reads_per_block",
121            "VM number of reads per block",
122            &[],
123            exponential_bucket_interval(0.1, 100.0),
124        )
125    });
126
127    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128        register_histogram_vec(
129            "vm_bytes_read_per_block",
130            "VM number of bytes read per block",
131            &[],
132            exponential_bucket_interval(0.1, 10_000_000.0),
133        )
134    });
135
136    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
137        register_histogram_vec(
138            "vm_bytes_written_per_block",
139            "VM number of bytes written per block",
140            &[],
141            exponential_bucket_interval(0.1, 10_000_000.0),
142        )
143    });
144
145    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
146        register_histogram_vec(
147            "state_hash_computation_latency",
148            "Time to recompute the state hash, in microseconds",
149            &[],
150            exponential_bucket_interval(1.0, 2_000_000.0),
151        )
152    });
153
154    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
155        register_histogram_vec(
156            "num_outboxes",
157            "Number of outboxes",
158            &[],
159            exponential_bucket_interval(1.0, 10_000.0),
160        )
161    });
162
163    /// Tracks block execution metrics in Prometheus.
164    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
165        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
166        WASM_FUEL_USED_PER_BLOCK
167            .with_label_values(&[])
168            .observe(tracker.wasm_fuel as f64);
169        EVM_FUEL_USED_PER_BLOCK
170            .with_label_values(&[])
171            .observe(tracker.evm_fuel as f64);
172        VM_NUM_READS_PER_BLOCK
173            .with_label_values(&[])
174            .observe(tracker.read_operations as f64);
175        VM_BYTES_READ_PER_BLOCK
176            .with_label_values(&[])
177            .observe(tracker.bytes_read as f64);
178        VM_BYTES_WRITTEN_PER_BLOCK
179            .with_label_values(&[])
180            .observe(tracker.bytes_written as f64);
181    }
182}
183
184/// The BCS-serialized size of an empty [`Block`].
185pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
186
187/// A view accessing the state of a chain.
188#[cfg_attr(
189    with_graphql,
190    derive(async_graphql::SimpleObject),
191    graphql(cache_control(no_cache))
192)]
193#[derive(Debug, RootView, ClonableView, Allocative)]
194#[allocative(bound = "C")]
195pub struct ChainStateView<C>
196where
197    C: Clone + Context + 'static,
198{
199    /// Execution state, including system and user applications.
200    pub execution_state: ExecutionStateView<C>,
201    /// Hash of the execution state.
202    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
203
204    /// Block-chaining state.
205    pub tip_state: RegisterView<C, ChainTipState>,
206
207    /// Consensus state.
208    pub manager: ChainManager<C>,
209    /// Pending validated block that is still missing blobs.
210    /// The incomplete set of blobs for the pending validated block.
211    pub pending_validated_blobs: PendingBlobsView<C>,
212    /// The incomplete sets of blobs for upcoming proposals.
213    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
214
215    /// Hashes of all certified blocks for this sender.
216    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
217    pub confirmed_log: LogView<C, CryptoHash>,
218    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
219    pub received_log: LogView<C, ChainAndHeight>,
220    /// The number of `received_log` entries we have synchronized, for each validator.
221    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
222
223    /// Mailboxes used to receive messages indexed by their origin.
224    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
225    /// Mailboxes used to send messages, indexed by their target.
226    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
227    /// The indices of next events we expect to see per stream (could be ahead of the last
228    /// executed block in sparse chains).
229    pub next_expected_events: MapView<C, StreamId, u32>,
230    /// Number of outgoing messages in flight for each block height.
231    /// We use a `RegisterView` to prioritize speed for small maps.
232    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
233    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
234    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
235
236    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
237    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
238    /// Inboxes with at least one pending added bundle. This allows us to avoid loading all inboxes.
239    pub nonempty_inboxes: RegisterView<C, BTreeSet<ChainId>>,
240
241    /// The local wall-clock time when block 0 was last executed. Used to prevent
242    /// reset-on-incorrect-outcome from looping: if not enough time has elapsed since
243    /// the last reset, the error is returned instead.
244    pub block_zero_executed_at: RegisterView<C, Timestamp>,
245}
246
247/// Block-chaining state.
248#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
249#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
250pub struct ChainTipState {
251    /// Hash of the latest certified block in this chain, if any.
252    pub block_hash: Option<CryptoHash>,
253    /// Sequence number tracking blocks.
254    pub next_block_height: BlockHeight,
255    /// Number of incoming message bundles.
256    pub num_incoming_bundles: u32,
257    /// Number of operations.
258    pub num_operations: u32,
259    /// Number of outgoing messages.
260    pub num_outgoing_messages: u32,
261}
262
263impl ChainTipState {
264    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
265    /// expected parent.
266    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
267        ensure!(
268            new_block.height == self.next_block_height,
269            ChainError::UnexpectedBlockHeight {
270                expected_block_height: self.next_block_height,
271                found_block_height: new_block.height
272            }
273        );
274        ensure!(
275            new_block.previous_block_hash == self.block_hash,
276            ChainError::UnexpectedPreviousBlockHash
277        );
278        Ok(())
279    }
280
281    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
282    /// it is higher than the tip.
283    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
284        ensure!(
285            self.next_block_height >= height,
286            ChainError::MissingEarlierBlocks {
287                current_block_height: self.next_block_height,
288            }
289        );
290        Ok(self.next_block_height > height)
291    }
292
293    /// Checks if the measurement counters would be valid.
294    pub fn update_counters(
295        &mut self,
296        transactions: &[Transaction],
297        messages: &[Vec<OutgoingMessage>],
298    ) -> Result<(), ChainError> {
299        let mut num_incoming_bundles = 0u32;
300        let mut num_operations = 0u32;
301
302        for transaction in transactions {
303            match transaction {
304                Transaction::ReceiveMessages(_) => {
305                    num_incoming_bundles = num_incoming_bundles
306                        .checked_add(1)
307                        .ok_or(ArithmeticError::Overflow)?;
308                }
309                Transaction::ExecuteOperation(_) => {
310                    num_operations = num_operations
311                        .checked_add(1)
312                        .ok_or(ArithmeticError::Overflow)?;
313                }
314            }
315        }
316
317        self.num_incoming_bundles = self
318            .num_incoming_bundles
319            .checked_add(num_incoming_bundles)
320            .ok_or(ArithmeticError::Overflow)?;
321
322        self.num_operations = self
323            .num_operations
324            .checked_add(num_operations)
325            .ok_or(ArithmeticError::Overflow)?;
326
327        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
328            .map_err(|_| ArithmeticError::Overflow)?;
329        self.num_outgoing_messages = self
330            .num_outgoing_messages
331            .checked_add(num_outgoing_messages)
332            .ok_or(ArithmeticError::Overflow)?;
333
334        Ok(())
335    }
336}
337
338impl<C> ChainStateView<C>
339where
340    C: Context + Clone + 'static,
341    C::Extra: ExecutionRuntimeContext,
342{
343    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
344    pub fn chain_id(&self) -> ChainId {
345        self.context().extra().chain_id()
346    }
347
348    #[instrument(skip_all, fields(
349        chain_id = %self.chain_id(),
350    ))]
351    pub async fn query_application(
352        &mut self,
353        local_time: Timestamp,
354        query: Query,
355        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
356    ) -> Result<QueryOutcome, ChainError> {
357        let context = QueryContext {
358            chain_id: self.chain_id(),
359            next_block_height: self.tip_state.get().next_block_height,
360            local_time,
361        };
362        self.execution_state
363            .query_application(context, query, service_runtime_endpoint)
364            .await
365            .with_execution_context(ChainExecutionContext::Query)
366    }
367
368    #[instrument(skip_all, fields(
369        chain_id = %self.chain_id(),
370        application_id = %application_id
371    ))]
372    pub async fn describe_application(
373        &mut self,
374        application_id: ApplicationId,
375    ) -> Result<ApplicationDescription, ChainError> {
376        self.execution_state
377            .system
378            .describe_application(application_id, &mut TransactionTracker::default())
379            .await
380            .with_execution_context(ChainExecutionContext::DescribeApplication)
381    }
382
383    #[instrument(skip_all, fields(
384        chain_id = %self.chain_id(),
385        target = %target,
386        height = %height
387    ))]
388    pub async fn mark_messages_as_received(
389        &mut self,
390        target: &ChainId,
391        height: BlockHeight,
392    ) -> Result<bool, ChainError> {
393        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
394        let updates = outbox.mark_messages_as_received(height).await?;
395        if updates.is_empty() {
396            return Ok(false);
397        }
398        for update in updates {
399            let counter = self
400                .outbox_counters
401                .get_mut()
402                .get_mut(&update)
403                .ok_or_else(|| {
404                    ChainError::CorruptedChainState("message counter should be present".into())
405                })?;
406            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
407            if *counter == 0 {
408                // Important for the test in `all_messages_delivered_up_to`.
409                self.outbox_counters.get_mut().remove(&update);
410            }
411        }
412        if outbox.queue.count() == 0 {
413            self.nonempty_outboxes.get_mut().remove(target);
414            // If the outbox is empty and not ahead of the executed blocks, remove it.
415            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
416                self.outboxes.remove_entry(target)?;
417            }
418        }
419        #[cfg(with_metrics)]
420        metrics::NUM_OUTBOXES
421            .with_label_values(&[])
422            .observe(self.nonempty_outboxes.get().len() as f64);
423        Ok(true)
424    }
425
426    /// Returns true if there are no more outgoing messages in flight up to the given
427    /// block height.
428    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
429        tracing::debug!(
430            "Messages left in {:.8}'s outbox: {:?}",
431            self.chain_id(),
432            self.outbox_counters.get()
433        );
434        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
435            key > &height
436        } else {
437            true
438        }
439    }
440
441    /// Invariant for the states of active chains.
442    pub async fn is_active(&self) -> Result<bool, ChainError> {
443        Ok(self.execution_state.system.is_active().await?)
444    }
445
446    /// Initializes the chain if it is not active yet.
447    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
448        let chain_id = self.chain_id();
449        // Initialize ourselves.
450        if self
451            .execution_state
452            .system
453            .initialize_chain(chain_id)
454            .await
455            .with_execution_context(ChainExecutionContext::Block)?
456        {
457            // The chain was already initialized.
458            return Ok(());
459        }
460        // Recompute the state hash.
461        let hash = self.execution_state.crypto_hash_mut().await?;
462        self.execution_state_hash.set(Some(hash));
463        let maybe_committee = self
464            .execution_state
465            .system
466            .current_committee()
467            .await
468            .with_execution_context(ChainExecutionContext::Block)?;
469        // Last, reset the consensus state based on the current ownership.
470        self.manager.reset(
471            self.execution_state.system.ownership.get().await?.clone(),
472            BlockHeight(0),
473            local_time,
474            maybe_committee
475                .iter()
476                .flat_map(|(_, committee)| committee.account_keys_and_weights()),
477        )?;
478        Ok(())
479    }
480
481    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
482    ///
483    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
484    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
485        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
486            return Ok(height.saturating_add(BlockHeight(1)));
487        }
488        Ok(self.tip_state.get().next_block_height)
489    }
490
491    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
492    /// internal error if the bundle doesn't appear to be new, based on the sender's
493    /// height. The value `local_time` is specific to each validator and only used for
494    /// round timeouts.
495    ///
496    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
497    #[instrument(skip_all, fields(
498        chain_id = %self.chain_id(),
499        origin = %origin,
500        bundle_height = %bundle.height
501    ))]
502    pub async fn receive_message_bundle_with_inbox(
503        &mut self,
504        inbox: &mut InboxStateView<C>,
505        origin: &ChainId,
506        bundle: MessageBundle,
507        local_time: Timestamp,
508        add_to_received_log: bool,
509    ) -> Result<(), ChainError> {
510        assert!(!bundle.messages.is_empty());
511        let chain_id = self.chain_id();
512        tracing::trace!(
513            "Processing new messages from {origin} at height {}",
514            bundle.height,
515        );
516        let chain_and_height = ChainAndHeight {
517            chain_id: *origin,
518            height: bundle.height,
519        };
520
521        match self.initialize_if_needed(local_time).await {
522            Ok(_) => (),
523            // if the only issue was that we couldn't initialize the chain because of a
524            // missing chain description blob, we might still want to update the inbox
525            Err(ChainError::ExecutionError(exec_err, _))
526                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
527                if blobs.iter().all(|blob_id| {
528                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
529                })) => {}
530            err => {
531                return err;
532            }
533        }
534
535        // Process the inbox bundle and update the inbox state.
536        let newly_added = inbox
537            .add_bundle(bundle)
538            .await
539            .map_err(|error| match error {
540                InboxError::ViewError(error) => ChainError::ViewError(error),
541                error => ChainError::CorruptedChainState(format!(
542                    "while processing messages in certified block: {error}"
543                )),
544            })?;
545        if newly_added {
546            self.nonempty_inboxes.get_mut().insert(*origin);
547        }
548
549        // Remember the certificate for future validator/client synchronizations.
550        if add_to_received_log {
551            self.received_log.push(chain_and_height);
552        }
553        Ok(())
554    }
555
556    /// Updates the `received_log` trackers.
557    pub fn update_received_certificate_trackers(
558        &mut self,
559        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
560    ) {
561        for (name, tracker) in new_trackers {
562            self.received_certificate_trackers
563                .get_mut()
564                .entry(name)
565                .and_modify(|t| {
566                    // Because several synchronizations could happen in parallel, we need to make
567                    // sure to never go backward.
568                    if tracker > *t {
569                        *t = tracker;
570                    }
571                })
572                .or_insert(tracker);
573        }
574    }
575
576    pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
577        let chain_id = self.chain_id();
578        self.execution_state
579            .system
580            .current_committee()
581            .await
582            .with_execution_context(ChainExecutionContext::Block)?
583            .ok_or(ChainError::InactiveChain(chain_id))
584    }
585
586    pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
587        Ok(self.execution_state.system.ownership.get().await?)
588    }
589
590    /// Removes the incoming message bundles in the block from the inboxes.
591    ///
592    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
593    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
594    /// and `false` if the block is already confirmed.
595    #[instrument(skip_all, fields(
596        chain_id = %self.chain_id(),
597    ))]
598    pub async fn remove_bundles_from_inboxes(
599        &mut self,
600        timestamp: Timestamp,
601        must_be_present: bool,
602        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
603    ) -> Result<(), ChainError> {
604        let chain_id = self.chain_id();
605        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
606        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
607            ensure!(
608                bundle.timestamp <= timestamp,
609                ChainError::IncorrectBundleTimestamp {
610                    chain_id,
611                    bundle_timestamp: bundle.timestamp,
612                    block_timestamp: timestamp,
613                }
614            );
615            let bundles = bundles_by_origin.entry(*origin).or_default();
616            bundles.push(bundle);
617        }
618        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
619        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
620        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
621            tracing::trace!(
622                "Removing [{}] from inbox for {origin}",
623                bundles
624                    .iter()
625                    .map(|bundle| bundle.height.to_string())
626                    .collect::<Vec<_>>()
627                    .join(", ")
628            );
629            for bundle in bundles {
630                // Mark the message as processed in the inbox.
631                let was_present = inbox
632                    .remove_bundle(bundle)
633                    .await
634                    .map_err(|error| (chain_id, origin, error))?;
635                if must_be_present {
636                    ensure!(
637                        was_present,
638                        ChainError::MissingCrossChainUpdate {
639                            chain_id,
640                            origin,
641                            height: bundle.height,
642                        }
643                    );
644                }
645            }
646            inbox.observe_size_metric();
647            if inbox.added_bundles.count() == 0 {
648                self.nonempty_inboxes.get_mut().remove(&origin);
649            }
650        }
651        Ok(())
652    }
653
654    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
655    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
656        self.nonempty_outboxes.get().iter().copied().collect()
657    }
658
659    /// Returns the outboxes for the given targets, or an error if any of them are missing.
660    pub async fn load_outboxes(
661        &self,
662        targets: &[ChainId],
663    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
664        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
665        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
666        optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
667    }
668
669    /// Executes a block with a specified policy for handling bundle failures.
670    #[allow(clippy::too_many_arguments)]
671    #[instrument(skip_all, fields(
672        chain_id = %block.chain_id,
673        block_height = %block.height
674    ))]
675    async fn execute_block_inner(
676        chain: &mut ExecutionStateView<C>,
677        confirmed_log: &LogView<C, CryptoHash>,
678        block: &mut ProposedBlock,
679        local_time: Timestamp,
680        round: Option<u32>,
681        published_blobs: &[Blob],
682        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
683        exec_policy: BundleExecutionPolicy,
684    ) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
685        // AutoRetry is incompatible with replaying oracle responses because discarding or
686        // rejecting bundles would change which transactions execute.
687        if !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort) {
688            assert!(
689                replaying_oracle_responses.is_none(),
690                "Cannot use AutoRetry policy when replaying oracle responses"
691            );
692        }
693
694        #[cfg(with_metrics)]
695        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
696        chain.system.timestamp.set(block.timestamp);
697
698        let committee_policy = chain
699            .system
700            .current_committee()
701            .await
702            .with_execution_context(ChainExecutionContext::Block)?
703            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
704            .1
705            .policy()
706            .clone();
707
708        let mut resource_controller = ResourceController::new(
709            Arc::new(committee_policy),
710            ResourceTracker::default(),
711            block.authenticated_owner,
712        );
713
714        for blob in published_blobs {
715            let blob_id = blob.id();
716            resource_controller
717                .policy()
718                .check_blob_size(blob.content())
719                .with_execution_context(ChainExecutionContext::Block)?;
720            chain.system.used_blobs.insert(&blob_id)?;
721        }
722
723        let mut block_execution_tracker = BlockExecutionTracker::new(
724            &mut resource_controller,
725            published_blobs
726                .iter()
727                .map(|blob| (blob.id(), blob))
728                .collect(),
729            local_time,
730            replaying_oracle_responses,
731            block,
732        )?;
733
734        // Extract max_failures from exec_policy.
735        let max_failures = match exec_policy.on_failure {
736            BundleFailurePolicy::Abort => 0,
737            BundleFailurePolicy::AutoRetry { max_failures } => max_failures,
738        };
739        let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
740        let mut failure_count = 0u32;
741
742        let time_budget = exec_policy.time_budget;
743        let mut cumulative_bundle_time = Duration::ZERO;
744
745        let mut i = 0;
746        while i < block.transactions.len() {
747            let transaction = &mut block.transactions[i];
748            let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
749
750            // If we have a time budget and it's been exceeded, discard remaining bundles.
751            if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
752                info!(
753                    ?cumulative_bundle_time,
754                    ?time_budget,
755                    "Time budget for bundle staging exceeded, discarding remaining bundles"
756                );
757                Self::discard_remaining_bundles(block, i, None);
758                continue;
759            }
760
761            // Checkpoint before bundle transactions if using auto-retry.
762            let checkpoint = if auto_retry && is_bundle {
763                Some((
764                    chain.clone_unchecked()?,
765                    block_execution_tracker.create_checkpoint(),
766                ))
767            } else {
768                None
769            };
770
771            let bundle_start = if is_bundle && time_budget.is_some() {
772                Some(Instant::now())
773            } else {
774                None
775            };
776
777            let result = block_execution_tracker
778                .execute_transaction(&*transaction, round, chain)
779                .await;
780
781            if let Some(start) = bundle_start {
782                cumulative_bundle_time += start.elapsed();
783            }
784
785            // If the transaction executed successfully, we move on to the next one.
786            // On transient errors (e.g. missing blobs) we fail, so it can be retried after
787            // syncing. In auto-retry mode, we can discard or reject message bundles that failed
788            // with non-transient errors.
789            let (error, context, incoming_bundle, saved_chain, saved_tracker) =
790                match (result, transaction, checkpoint) {
791                    (Ok(()), _, _) => {
792                        i += 1;
793                        continue;
794                    }
795                    (
796                        Err(ChainError::ExecutionError(error, context)),
797                        Transaction::ReceiveMessages(incoming_bundle),
798                        Some((saved_chain, saved_tracker)),
799                    ) if !error.is_transient_error() => {
800                        (error, context, incoming_bundle, saved_chain, saved_tracker)
801                    }
802                    (Err(e), _, _) => return Err(e),
803                };
804
805            // Restore checkpoint.
806            *chain = saved_chain;
807            block_execution_tracker.restore_checkpoint(&saved_tracker);
808
809            if error.is_limit_error() && i > 0 {
810                failure_count += 1;
811                // If we've exceeded max failures, discard all remaining message bundles.
812                let maybe_sender = if failure_count > max_failures {
813                    info!(
814                        failure_count,
815                        max_failures,
816                        "Exceeded max bundle failures, discarding all remaining message bundles"
817                    );
818                    None
819                } else {
820                    // Not the first - discard it and same-sender subsequent bundles.
821                    info!(
822                        %error,
823                        index = i,
824                        origin = %incoming_bundle.origin,
825                        "Message bundle exceeded block limits and will be discarded for \
826                        retry in a later block"
827                    );
828                    Some(incoming_bundle.origin)
829                };
830                Self::discard_remaining_bundles(block, i, maybe_sender);
831                // Continue without incrementing i (next transaction is now at i).
832            } else if incoming_bundle.bundle.is_protected()
833                || incoming_bundle.action == MessageAction::Reject
834            {
835                // Protected bundles cannot be rejected. Failed rejected bundles fail the block.
836                return Err(ChainError::ExecutionError(error, context));
837            } else {
838                // Reject the bundle: either a non-limit error, or the first bundle
839                // exceeded limits (and is inherently too large for any block).
840                info!(
841                    %error,
842                    index = i,
843                    origin = %incoming_bundle.origin,
844                    "Message bundle failed to execute and will be rejected"
845                );
846                incoming_bundle.action = MessageAction::Reject;
847                // Retry the transaction as rejected (don't increment i).
848            }
849        }
850
851        // This can only happen if all transactions were incoming bundles that all got discarded
852        // due to resource limit errors. This is unlikely in practice but theoretically possible.
853        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
854
855        let recipients = block_execution_tracker.recipients();
856        let mut recipient_heights = Vec::new();
857        let mut indices = Vec::new();
858        for (recipient, height) in chain
859            .previous_message_blocks
860            .multi_get_pairs(recipients)
861            .await?
862        {
863            chain
864                .previous_message_blocks
865                .insert(&recipient, block.height)?;
866            if let Some(height) = height {
867                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
868                indices.push(index);
869                recipient_heights.push((recipient, height));
870            }
871        }
872        let hashes = confirmed_log.multi_get(indices).await?;
873        let mut previous_message_blocks = BTreeMap::new();
874        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
875            let hash = hash.ok_or_else(|| {
876                ChainError::CorruptedChainState("missing entry in confirmed_log".into())
877            })?;
878            previous_message_blocks.insert(recipient, (hash, height));
879        }
880
881        let streams = block_execution_tracker.event_streams();
882        let mut stream_heights = Vec::new();
883        let mut indices = Vec::new();
884        for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
885            chain.previous_event_blocks.insert(&stream, block.height)?;
886            if let Some(height) = height {
887                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
888                indices.push(index);
889                stream_heights.push((stream, height));
890            }
891        }
892        let hashes = confirmed_log.multi_get(indices).await?;
893        let mut previous_event_blocks = BTreeMap::new();
894        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
895            let hash = hash.ok_or_else(|| {
896                ChainError::CorruptedChainState("missing entry in confirmed_log".into())
897            })?;
898            previous_event_blocks.insert(stream, (hash, height));
899        }
900
901        let state_hash = {
902            #[cfg(with_metrics)]
903            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
904            chain.crypto_hash_mut().await?
905        };
906
907        let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
908            block_execution_tracker.finalize(block.transactions.len());
909
910        Ok((
911            BlockExecutionOutcome {
912                messages,
913                previous_message_blocks,
914                previous_event_blocks,
915                state_hash,
916                oracle_responses,
917                events,
918                blobs,
919                operation_results,
920            },
921            resource_tracker,
922        ))
923    }
924
925    /// Discards all bundles from the given origin (or all if `None`), starting at the given index.
926    fn discard_remaining_bundles(
927        block: &mut ProposedBlock,
928        mut index: usize,
929        maybe_origin: Option<ChainId>,
930    ) {
931        while index < block.transactions.len() {
932            if matches!(
933                &block.transactions[index],
934                Transaction::ReceiveMessages(bundle)
935                if maybe_origin.is_none_or(|origin| bundle.origin == origin)
936            ) {
937                block.transactions.remove(index);
938            } else {
939                index += 1;
940            }
941        }
942    }
943
944    /// Executes a block with a specified policy for handling bundle failures.
945    ///
946    /// This method supports automatic retry with checkpointing when bundles fail:
947    /// - For limit errors (block too large, fuel exceeded, etc.): the bundle is discarded
948    ///   so it can be retried in a later block, unless it's the first transaction
949    ///   (which gets rejected as inherently too large).
950    /// - For non-limit errors: the bundle is rejected (triggering bounced messages).
951    /// - After `max_failures` failed bundles, all remaining message bundles are discarded.
952    ///
953    /// The block may be modified to reflect the actual executed transactions.
954    #[instrument(skip_all, fields(
955        chain_id = %self.chain_id(),
956        block_height = %block.height
957    ))]
958    pub async fn execute_block(
959        &mut self,
960        mut block: ProposedBlock,
961        local_time: Timestamp,
962        round: Option<u32>,
963        published_blobs: &[Blob],
964        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
965        policy: BundleExecutionPolicy,
966    ) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
967        assert_eq!(
968            block.chain_id,
969            self.execution_state.context().extra().chain_id()
970        );
971
972        self.initialize_if_needed(local_time).await?;
973
974        let chain_timestamp = *self.execution_state.system.timestamp.get();
975        ensure!(
976            chain_timestamp <= block.timestamp,
977            ChainError::InvalidBlockTimestamp {
978                parent: chain_timestamp,
979                new: block.timestamp
980            }
981        );
982        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
983
984        ensure!(
985            block.published_blob_ids()
986                == published_blobs
987                    .iter()
988                    .map(|blob| blob.id())
989                    .collect::<BTreeSet<_>>(),
990            ChainError::InternalError("published_blobs mismatch".to_string())
991        );
992
993        if *self.execution_state.system.closed.get() {
994            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
995        }
996
997        Self::check_app_permissions(
998            self.execution_state
999                .system
1000                .application_permissions
1001                .get()
1002                .await?,
1003            &block,
1004        )?;
1005
1006        Self::execute_block_inner(
1007            &mut self.execution_state,
1008            &self.confirmed_log,
1009            &mut block,
1010            local_time,
1011            round,
1012            published_blobs,
1013            replaying_oracle_responses,
1014            policy,
1015        )
1016        .await
1017        .map(|(outcome, tracker)| (block, outcome, tracker))
1018    }
1019
1020    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
1021    /// manager. This does not touch the execution state itself, which must be updated separately.
1022    /// Returns the set of event streams that were updated as a result of applying the block.
1023    #[instrument(skip_all, fields(
1024        chain_id = %self.chain_id(),
1025        block_height = %block.inner().inner().header.height
1026    ))]
1027    pub async fn apply_confirmed_block(
1028        &mut self,
1029        block: &ConfirmedBlock,
1030        local_time: Timestamp,
1031    ) -> Result<BTreeSet<StreamId>, ChainError> {
1032        let hash = block.inner().hash();
1033        let block = block.inner().inner();
1034        if block.header.height == BlockHeight::ZERO {
1035            self.block_zero_executed_at.set(local_time);
1036        }
1037        self.execution_state_hash.set(Some(block.header.state_hash));
1038        let updated_streams = self.process_emitted_events(block).await?;
1039        self.process_outgoing_messages(block).await?;
1040
1041        // Last, reset the consensus state based on the current ownership.
1042        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1043            .await?;
1044
1045        // Advance to next block height.
1046        let tip = self.tip_state.get_mut();
1047        tip.block_hash = Some(hash);
1048        tip.next_block_height.try_add_assign_one()?;
1049        tip.update_counters(&block.body.transactions, &block.body.messages)?;
1050        self.confirmed_log.push(hash);
1051        self.preprocessed_blocks.remove(&block.header.height)?;
1052        Ok(updated_streams)
1053    }
1054
1055    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
1056    /// Returns the set of streams that were updated as a result of preprocessing the block.
1057    #[instrument(skip_all, fields(
1058        chain_id = %self.chain_id(),
1059        block_height = %block.inner().inner().header.height
1060    ))]
1061    pub async fn preprocess_block(
1062        &mut self,
1063        block: &ConfirmedBlock,
1064    ) -> Result<BTreeSet<StreamId>, ChainError> {
1065        let hash = block.inner().hash();
1066        let block = block.inner().inner();
1067        let height = block.header.height;
1068        if height < self.tip_state.get().next_block_height {
1069            return Ok(BTreeSet::new());
1070        }
1071        self.process_outgoing_messages(block).await?;
1072        let updated_streams = self.process_emitted_events(block).await?;
1073        self.preprocessed_blocks.insert(&height, hash)?;
1074        Ok(updated_streams)
1075    }
1076
1077    /// Verifies that the block is valid according to the chain's application permission settings.
1078    #[instrument(skip_all, fields(
1079        block_height = %block.height,
1080        num_transactions = %block.transactions.len()
1081    ))]
1082    fn check_app_permissions(
1083        app_permissions: &ApplicationPermissions,
1084        block: &ProposedBlock,
1085    ) -> Result<(), ChainError> {
1086        let mut mandatory = app_permissions
1087            .mandatory_applications
1088            .iter()
1089            .copied()
1090            .collect::<HashSet<ApplicationId>>();
1091        for transaction in &block.transactions {
1092            match transaction {
1093                Transaction::ExecuteOperation(operation)
1094                    if operation.is_exempt_from_permissions() =>
1095                {
1096                    mandatory.clear()
1097                }
1098                Transaction::ExecuteOperation(operation) => {
1099                    ensure!(
1100                        app_permissions.can_execute_operations(&operation.application_id()),
1101                        ChainError::AuthorizedApplications(
1102                            app_permissions.execute_operations.clone().unwrap()
1103                        )
1104                    );
1105                    if let Operation::User { application_id, .. } = operation {
1106                        mandatory.remove(application_id);
1107                    }
1108                }
1109                Transaction::ReceiveMessages(incoming_bundle)
1110                    if incoming_bundle.action == MessageAction::Accept =>
1111                {
1112                    for pending in incoming_bundle.messages() {
1113                        if let Message::User { application_id, .. } = &pending.message {
1114                            mandatory.remove(application_id);
1115                        }
1116                    }
1117                }
1118                Transaction::ReceiveMessages(_) => {}
1119            }
1120        }
1121        ensure!(
1122            mandatory.is_empty(),
1123            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1124        );
1125        Ok(())
1126    }
1127
1128    /// Returns the hashes of all blocks we have in the given range.
1129    ///
1130    /// If the input heights are in ascending order, the hashes will be in the same order.
1131    /// Otherwise they may be unordered.
1132    #[instrument(skip_all, fields(
1133        chain_id = %self.chain_id(),
1134        next_block_height = %self.tip_state.get().next_block_height,
1135    ))]
1136    pub async fn block_hashes(
1137        &self,
1138        heights: impl IntoIterator<Item = BlockHeight>,
1139    ) -> Result<Vec<CryptoHash>, ChainError> {
1140        let next_height = self.tip_state.get().next_block_height;
1141        // Everything up to (excluding) next_height is in confirmed_log.
1142        let (confirmed_heights, unconfirmed_heights) = heights
1143            .into_iter()
1144            .partition::<Vec<_>, _>(|height| *height < next_height);
1145        let confirmed_indices = confirmed_heights
1146            .into_iter()
1147            .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1148            .collect::<Result<_, _>>()?;
1149        let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1150        // Everything after (including) next_height in preprocessed_blocks if we have it.
1151        let unconfirmed_hashes = self
1152            .preprocessed_blocks
1153            .multi_get(&unconfirmed_heights)
1154            .await?;
1155        Ok(confirmed_hashes
1156            .into_iter()
1157            .chain(unconfirmed_hashes)
1158            .flatten()
1159            .collect())
1160    }
1161
1162    /// Resets the chain manager for the next block height.
1163    async fn reset_chain_manager(
1164        &mut self,
1165        next_height: BlockHeight,
1166        local_time: Timestamp,
1167    ) -> Result<(), ChainError> {
1168        let maybe_committee = self
1169            .execution_state
1170            .system
1171            .current_committee()
1172            .await
1173            .with_execution_context(ChainExecutionContext::Block)?;
1174        let ownership = self.execution_state.system.ownership.get().await?.clone();
1175        let fallback_owners = maybe_committee
1176            .iter()
1177            .flat_map(|(_, committee)| committee.account_keys_and_weights());
1178        self.pending_validated_blobs.clear();
1179        self.pending_proposed_blobs.clear();
1180        self.manager
1181            .reset(ownership, next_height, local_time, fallback_owners)
1182    }
1183
1184    /// Updates the outboxes with the messages sent in the block.
1185    ///
1186    /// Returns the set of all recipients.
1187    #[instrument(skip_all, fields(
1188        chain_id = %self.chain_id(),
1189        block_height = %block.header.height
1190    ))]
1191    async fn process_outgoing_messages(
1192        &mut self,
1193        block: &Block,
1194    ) -> Result<Vec<ChainId>, ChainError> {
1195        // Record the messages of the execution. Messages are understood within an
1196        // application.
1197        let recipients = block.recipients();
1198        let block_height = block.header.height;
1199        let next_height = self.tip_state.get().next_block_height;
1200
1201        // Update the outboxes.
1202        let outbox_counters = self.outbox_counters.get_mut();
1203        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1204        let targets = recipients.into_iter().collect::<Vec<_>>();
1205        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1206        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1207            if block_height > next_height {
1208                // There may be a gap in the chain before this block. We can only add it to this
1209                // outbox if the previous message to the same recipient has already been added.
1210                if *outbox.next_height_to_schedule.get() > block_height {
1211                    continue; // We already added this recipient's messages to the outbox.
1212                }
1213                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1214                {
1215                    // The block with the last added message has already been executed; look up its
1216                    // hash in the confirmed_log.
1217                    Some(height) if height < next_height => {
1218                        let index =
1219                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1220                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1221                            ChainError::CorruptedChainState("missing entry in confirmed_log".into())
1222                        })?)
1223                    }
1224                    // The block with last added message has not been executed yet. If we have it,
1225                    // it's in preprocessed_blocks.
1226                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1227                        || {
1228                            ChainError::CorruptedChainState(
1229                                "missing entry in preprocessed_blocks".into(),
1230                            )
1231                        },
1232                    )?),
1233                    None => None, // No message to that sender was added yet.
1234                };
1235                // Only schedule if this block contains the next message for that recipient.
1236                match (
1237                    maybe_prev_hash,
1238                    block.body.previous_message_blocks.get(target),
1239                ) {
1240                    (None, None) => {
1241                        // No previous message block expected and none indicated by the outbox -
1242                        // all good
1243                    }
1244                    (Some(_), None) => {
1245                        // Outbox indicates there was a previous message block, but
1246                        // previous_message_blocks has no idea about it - possible bug
1247                        return Err(ChainError::CorruptedChainState(
1248                            "block indicates no previous message block,\
1249                            but we have one in the outbox"
1250                                .into(),
1251                        ));
1252                    }
1253                    (None, Some((_, prev_msg_block_height))) => {
1254                        // We have no previously processed block in the outbox, but we are
1255                        // expecting one - this could be due to an empty outbox having been pruned.
1256                        // Only process the outbox if the height of the previous message block is
1257                        // lower than the tip
1258                        if *prev_msg_block_height >= next_height {
1259                            continue;
1260                        }
1261                    }
1262                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1263                        // Only process the outbox if the hashes match.
1264                        if prev_hash != prev_msg_block_hash {
1265                            continue;
1266                        }
1267                    }
1268                }
1269            }
1270            if outbox.schedule_message(block_height)? {
1271                *outbox_counters.entry(block_height).or_default() += 1;
1272                nonempty_outboxes.insert(*target);
1273            }
1274            #[cfg(with_metrics)]
1275            crate::outbox::metrics::OUTBOX_SIZE
1276                .with_label_values(&[])
1277                .observe(outbox.queue.count() as f64);
1278        }
1279
1280        #[cfg(with_metrics)]
1281        metrics::NUM_OUTBOXES
1282            .with_label_values(&[])
1283            .observe(nonempty_outboxes.len() as f64);
1284        Ok(targets)
1285    }
1286
1287    /// Updates the event streams with events emitted by the block if they form a contiguous
1288    /// sequence (might not be the case when preprocessing a block).
1289    /// Returns the set of updated event streams.
1290    #[instrument(skip_all, fields(
1291        chain_id = %self.chain_id(),
1292        block_height = %block.header.height
1293    ))]
1294    async fn process_emitted_events(
1295        &mut self,
1296        block: &Block,
1297    ) -> Result<BTreeSet<StreamId>, ChainError> {
1298        let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1299        for event in block.body.events.iter().flatten() {
1300            emitted_streams
1301                .entry(event.stream_id.clone())
1302                .or_default()
1303                .insert(event.index);
1304        }
1305        let mut stream_ids = Vec::new();
1306        let mut list_indices = Vec::new();
1307        for (stream_id, indices) in emitted_streams {
1308            stream_ids.push(stream_id);
1309            list_indices.push(indices);
1310        }
1311
1312        let mut updated_streams = BTreeSet::new();
1313        for ((stream_id, next_index), indices) in self
1314            .next_expected_events
1315            .multi_get_pairs(stream_ids)
1316            .await?
1317            .into_iter()
1318            .zip(list_indices)
1319        {
1320            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1321                // we don't expect the epoch stream to contain event 0
1322                1
1323            } else {
1324                0
1325            };
1326            let mut current_expected_index = next_index.unwrap_or(initial_index);
1327            for index in indices {
1328                if index == current_expected_index {
1329                    updated_streams.insert(stream_id.clone());
1330                    current_expected_index = index.saturating_add(1);
1331                }
1332            }
1333            if current_expected_index != 0 {
1334                self.next_expected_events
1335                    .insert(&stream_id, current_expected_index)?;
1336            }
1337        }
1338        Ok(updated_streams)
1339    }
1340}
1341
1342#[test]
1343fn empty_block_size() {
1344    let size = bcs::serialized_size(&crate::block::Block::new(
1345        crate::test::make_first_block(
1346            linera_execution::test_utils::dummy_chain_description(0).id(),
1347        ),
1348        crate::data_types::BlockExecutionOutcome::default(),
1349    ))
1350    .unwrap();
1351    assert_eq!(size, EMPTY_BLOCK_SIZE);
1352}