linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    ops::RangeBounds,
7    sync::Arc,
8};
9
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
14        BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
15    },
16    ensure,
17    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18    ownership::ChainOwnership,
19};
20use linera_execution::{
21    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
22    Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
23    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26    bucket_queue_view::BucketQueueView,
27    context::Context,
28    log_view::LogView,
29    map_view::MapView,
30    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31    register_view::RegisterView,
32    set_view::SetView,
33    views::{ClonableView, CryptoHashView, RootView, View},
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37
38use crate::{
39    block::{Block, ConfirmedBlock},
40    block_tracker::BlockExecutionTracker,
41    data_types::{
42        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
43        Transaction,
44    },
45    inbox::{Cursor, InboxError, InboxStateView},
46    manager::ChainManager,
47    outbox::OutboxStateView,
48    pending_blobs::PendingBlobsView,
49    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61    use std::sync::LazyLock;
62
63    use linera_base::prometheus_util::{
64        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65        register_int_counter_vec,
66    };
67    use linera_execution::ResourceTracker;
68    use prometheus::{HistogramVec, IntCounterVec};
69
70    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72    });
73
74    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75        register_histogram_vec(
76            "block_execution_latency",
77            "Block execution latency",
78            &[],
79            exponential_bucket_latencies(1000.0),
80        )
81    });
82
83    #[cfg(with_metrics)]
84    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85        register_histogram_vec(
86            "message_execution_latency",
87            "Message execution latency",
88            &[],
89            exponential_bucket_latencies(50.0),
90        )
91    });
92
93    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "operation_execution_latency",
96            "Operation execution latency",
97            &[],
98            exponential_bucket_latencies(50.0),
99        )
100    });
101
102    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "wasm_fuel_used_per_block",
105            "Wasm fuel used per block",
106            &[],
107            exponential_bucket_interval(10.0, 1_000_000.0),
108        )
109    });
110
111    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "evm_fuel_used_per_block",
114            "EVM fuel used per block",
115            &[],
116            exponential_bucket_interval(10.0, 1_000_000.0),
117        )
118    });
119
120    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "vm_num_reads_per_block",
123            "VM number of reads per block",
124            &[],
125            exponential_bucket_interval(0.1, 100.0),
126        )
127    });
128
129    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "vm_bytes_read_per_block",
132            "VM number of bytes read per block",
133            &[],
134            exponential_bucket_interval(0.1, 10_000_000.0),
135        )
136    });
137
138    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139        register_histogram_vec(
140            "vm_bytes_written_per_block",
141            "VM number of bytes written per block",
142            &[],
143            exponential_bucket_interval(0.1, 10_000_000.0),
144        )
145    });
146
147    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148        register_histogram_vec(
149            "state_hash_computation_latency",
150            "Time to recompute the state hash",
151            &[],
152            exponential_bucket_latencies(500.0),
153        )
154    });
155
156    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157        register_histogram_vec(
158            "num_inboxes",
159            "Number of inboxes",
160            &[],
161            exponential_bucket_interval(1.0, 10_000.0),
162        )
163    });
164
165    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166        register_histogram_vec(
167            "num_outboxes",
168            "Number of outboxes",
169            &[],
170            exponential_bucket_interval(1.0, 10_000.0),
171        )
172    });
173
174    /// Tracks block execution metrics in Prometheus.
175    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177        WASM_FUEL_USED_PER_BLOCK
178            .with_label_values(&[])
179            .observe(tracker.wasm_fuel as f64);
180        EVM_FUEL_USED_PER_BLOCK
181            .with_label_values(&[])
182            .observe(tracker.evm_fuel as f64);
183        VM_NUM_READS_PER_BLOCK
184            .with_label_values(&[])
185            .observe(tracker.read_operations as f64);
186        VM_BYTES_READ_PER_BLOCK
187            .with_label_values(&[])
188            .observe(tracker.bytes_read as f64);
189        VM_BYTES_WRITTEN_PER_BLOCK
190            .with_label_values(&[])
191            .observe(tracker.bytes_written as f64);
192    }
193}
194
195/// The BCS-serialized size of an empty [`Block`].
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
199#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct TimestampedBundleInInbox {
202    /// The origin and cursor of the bundle.
203    pub entry: BundleInInbox,
204    /// The timestamp when the bundle was added to the inbox.
205    pub seen: Timestamp,
206}
207
208/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
209#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
211pub struct BundleInInbox {
212    /// The origin from which we received the bundle.
213    pub origin: ChainId,
214    /// The cursor of the bundle in the inbox.
215    pub cursor: Cursor,
216}
217
218impl BundleInInbox {
219    fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
220        BundleInInbox {
221            cursor: Cursor::from(bundle),
222            origin,
223        }
224    }
225}
226
227// The `TimestampedBundleInInbox` is a relatively small type, so a total
228// of 100 seems reasonable for the storing of the data.
229const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
230
231/// A view accessing the state of a chain.
232#[cfg_attr(
233    with_graphql,
234    derive(async_graphql::SimpleObject),
235    graphql(cache_control(no_cache))
236)]
237#[derive(Debug, RootView, ClonableView)]
238pub struct ChainStateView<C>
239where
240    C: Clone + Context + Send + Sync + 'static,
241{
242    /// Execution state, including system and user applications.
243    pub execution_state: ExecutionStateView<C>,
244    /// Hash of the execution state.
245    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
246
247    /// Block-chaining state.
248    pub tip_state: RegisterView<C, ChainTipState>,
249
250    /// Consensus state.
251    pub manager: ChainManager<C>,
252    /// Pending validated block that is still missing blobs.
253    /// The incomplete set of blobs for the pending validated block.
254    pub pending_validated_blobs: PendingBlobsView<C>,
255    /// The incomplete sets of blobs for upcoming proposals.
256    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
257
258    /// Hashes of all certified blocks for this sender.
259    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
260    pub confirmed_log: LogView<C, CryptoHash>,
261    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
262    pub received_log: LogView<C, ChainAndHeight>,
263    /// The number of `received_log` entries we have synchronized, for each validator.
264    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
265
266    /// Mailboxes used to receive messages indexed by their origin.
267    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
268    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
269    pub unskippable_bundles:
270        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
271    /// Unskippable bundles that have been removed but are still in the queue.
272    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
273    /// The heights of previous blocks that sent messages to the same recipients.
274    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
275    /// The heights of previous blocks that published events to the same streams.
276    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
277    /// Mailboxes used to send messages, indexed by their target.
278    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
279    /// The indices of next events we expect to see per stream (could be ahead of the last
280    /// executed block in sparse chains).
281    pub next_expected_events: MapView<C, StreamId, u32>,
282    /// Number of outgoing messages in flight for each block height.
283    /// We use a `RegisterView` to prioritize speed for small maps.
284    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
285    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
286    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
287
288    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
289    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
290}
291
292/// Block-chaining state.
293#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
294#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
295pub struct ChainTipState {
296    /// Hash of the latest certified block in this chain, if any.
297    pub block_hash: Option<CryptoHash>,
298    /// Sequence number tracking blocks.
299    pub next_block_height: BlockHeight,
300    /// Number of incoming message bundles.
301    pub num_incoming_bundles: u32,
302    /// Number of operations.
303    pub num_operations: u32,
304    /// Number of outgoing messages.
305    pub num_outgoing_messages: u32,
306}
307
308impl ChainTipState {
309    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
310    /// expected parent.
311    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
312        ensure!(
313            new_block.height == self.next_block_height,
314            ChainError::UnexpectedBlockHeight {
315                expected_block_height: self.next_block_height,
316                found_block_height: new_block.height
317            }
318        );
319        ensure!(
320            new_block.previous_block_hash == self.block_hash,
321            ChainError::UnexpectedPreviousBlockHash
322        );
323        Ok(())
324    }
325
326    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
327    /// it is higher than the tip.
328    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
329        ensure!(
330            self.next_block_height >= height,
331            ChainError::MissingEarlierBlocks {
332                current_block_height: self.next_block_height,
333            }
334        );
335        Ok(self.next_block_height > height)
336    }
337
338    /// Checks if the measurement counters would be valid.
339    pub fn update_counters(
340        &mut self,
341        transactions: &[Transaction],
342        messages: &[Vec<OutgoingMessage>],
343    ) -> Result<(), ChainError> {
344        let mut num_incoming_bundles = 0u32;
345        let mut num_operations = 0u32;
346
347        for transaction in transactions {
348            match transaction {
349                Transaction::ReceiveMessages(_) => {
350                    num_incoming_bundles = num_incoming_bundles
351                        .checked_add(1)
352                        .ok_or(ArithmeticError::Overflow)?;
353                }
354                Transaction::ExecuteOperation(_) => {
355                    num_operations = num_operations
356                        .checked_add(1)
357                        .ok_or(ArithmeticError::Overflow)?;
358                }
359            }
360        }
361
362        self.num_incoming_bundles = self
363            .num_incoming_bundles
364            .checked_add(num_incoming_bundles)
365            .ok_or(ArithmeticError::Overflow)?;
366
367        self.num_operations = self
368            .num_operations
369            .checked_add(num_operations)
370            .ok_or(ArithmeticError::Overflow)?;
371
372        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
373            .map_err(|_| ArithmeticError::Overflow)?;
374        self.num_outgoing_messages = self
375            .num_outgoing_messages
376            .checked_add(num_outgoing_messages)
377            .ok_or(ArithmeticError::Overflow)?;
378
379        Ok(())
380    }
381}
382
383impl<C> ChainStateView<C>
384where
385    C: Context + Clone + Send + Sync + 'static,
386    C::Extra: ExecutionRuntimeContext,
387{
388    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
389    pub fn chain_id(&self) -> ChainId {
390        self.context().extra().chain_id()
391    }
392
393    #[instrument(skip_all, fields(
394        chain_id = %self.chain_id(),
395    ))]
396    pub async fn query_application(
397        &mut self,
398        local_time: Timestamp,
399        query: Query,
400        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
401    ) -> Result<QueryOutcome, ChainError> {
402        let context = QueryContext {
403            chain_id: self.chain_id(),
404            next_block_height: self.tip_state.get().next_block_height,
405            local_time,
406        };
407        self.execution_state
408            .query_application(context, query, service_runtime_endpoint)
409            .await
410            .with_execution_context(ChainExecutionContext::Query)
411    }
412
413    #[instrument(skip_all, fields(
414        chain_id = %self.chain_id(),
415        application_id = %application_id
416    ))]
417    pub async fn describe_application(
418        &mut self,
419        application_id: ApplicationId,
420    ) -> Result<ApplicationDescription, ChainError> {
421        self.execution_state
422            .system
423            .describe_application(application_id, &mut TransactionTracker::default())
424            .await
425            .with_execution_context(ChainExecutionContext::DescribeApplication)
426    }
427
428    #[instrument(skip_all, fields(
429        chain_id = %self.chain_id(),
430        target = %target,
431        height = %height
432    ))]
433    pub async fn mark_messages_as_received(
434        &mut self,
435        target: &ChainId,
436        height: BlockHeight,
437    ) -> Result<bool, ChainError> {
438        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
439        let updates = outbox.mark_messages_as_received(height).await?;
440        if updates.is_empty() {
441            return Ok(false);
442        }
443        for update in updates {
444            let counter = self
445                .outbox_counters
446                .get_mut()
447                .get_mut(&update)
448                .ok_or_else(|| {
449                    ChainError::InternalError("message counter should be present".into())
450                })?;
451            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
452            if *counter == 0 {
453                // Important for the test in `all_messages_delivered_up_to`.
454                self.outbox_counters.get_mut().remove(&update);
455            }
456        }
457        if outbox.queue.count() == 0 {
458            self.nonempty_outboxes.get_mut().remove(target);
459            // If the outbox is empty and not ahead of the executed blocks, remove it.
460            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
461                self.outboxes.remove_entry(target)?;
462            }
463        }
464        #[cfg(with_metrics)]
465        metrics::NUM_OUTBOXES
466            .with_label_values(&[])
467            .observe(self.outboxes.count().await? as f64);
468        Ok(true)
469    }
470
471    /// Returns true if there are no more outgoing messages in flight up to the given
472    /// block height.
473    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
474        tracing::debug!(
475            "Messages left in {:.8}'s outbox: {:?}",
476            self.chain_id(),
477            self.outbox_counters.get()
478        );
479        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
480            key > &height
481        } else {
482            true
483        }
484    }
485
486    /// Invariant for the states of active chains.
487    pub fn is_active(&self) -> bool {
488        self.execution_state.system.is_active()
489    }
490
491    /// Initializes the chain if it is not active yet.
492    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
493        // Initialize ourselves.
494        if self
495            .execution_state
496            .system
497            .initialize_chain(self.chain_id())
498            .await
499            .with_execution_context(ChainExecutionContext::Block)?
500        {
501            // The chain was already initialized.
502            return Ok(());
503        }
504        // Recompute the state hash.
505        let hash = self.execution_state.crypto_hash_mut().await?;
506        self.execution_state_hash.set(Some(hash));
507        let maybe_committee = self.execution_state.system.current_committee().into_iter();
508        // Last, reset the consensus state based on the current ownership.
509        self.manager.reset(
510            self.execution_state.system.ownership.get().clone(),
511            BlockHeight(0),
512            local_time,
513            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
514        )?;
515        Ok(())
516    }
517
518    pub async fn next_block_height_to_receive(
519        &self,
520        origin: &ChainId,
521    ) -> Result<BlockHeight, ChainError> {
522        let inbox = self.inboxes.try_load_entry(origin).await?;
523        match inbox {
524            Some(inbox) => inbox.next_block_height_to_receive(),
525            None => Ok(BlockHeight::ZERO),
526        }
527    }
528
529    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
530    ///
531    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
532    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
533        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
534            return Ok(height.saturating_add(BlockHeight(1)));
535        }
536        Ok(self.tip_state.get().next_block_height)
537    }
538
539    pub async fn last_anticipated_block_height(
540        &self,
541        origin: &ChainId,
542    ) -> Result<Option<BlockHeight>, ChainError> {
543        let inbox = self.inboxes.try_load_entry(origin).await?;
544        match inbox {
545            Some(inbox) => match inbox.removed_bundles.back().await? {
546                Some(bundle) => Ok(Some(bundle.height)),
547                None => Ok(None),
548            },
549            None => Ok(None),
550        }
551    }
552
553    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
554    /// internal error if the bundle doesn't appear to be new, based on the sender's
555    /// height. The value `local_time` is specific to each validator and only used for
556    /// round timeouts.
557    ///
558    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
559    #[instrument(skip_all, fields(
560        chain_id = %self.chain_id(),
561        origin = %origin,
562        bundle_height = %bundle.height
563    ))]
564    pub async fn receive_message_bundle(
565        &mut self,
566        origin: &ChainId,
567        bundle: MessageBundle,
568        local_time: Timestamp,
569        add_to_received_log: bool,
570    ) -> Result<(), ChainError> {
571        assert!(!bundle.messages.is_empty());
572        let chain_id = self.chain_id();
573        tracing::trace!(
574            "Processing new messages to {chain_id:.8} from {origin} at height {}",
575            bundle.height,
576        );
577        let chain_and_height = ChainAndHeight {
578            chain_id: *origin,
579            height: bundle.height,
580        };
581
582        match self.initialize_if_needed(local_time).await {
583            Ok(_) => (),
584            // if the only issue was that we couldn't initialize the chain because of a
585            // missing chain description blob, we might still want to update the inbox
586            Err(ChainError::ExecutionError(exec_err, _))
587                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
588                if blobs.iter().all(|blob_id| {
589                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
590                })) => {}
591            err => {
592                return err;
593            }
594        }
595
596        // Process the inbox bundle and update the inbox state.
597        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
598        #[cfg(with_metrics)]
599        metrics::NUM_INBOXES
600            .with_label_values(&[])
601            .observe(self.inboxes.count().await? as f64);
602        let entry = BundleInInbox::new(*origin, &bundle);
603        let skippable = bundle.is_skippable();
604        let newly_added = inbox
605            .add_bundle(bundle)
606            .await
607            .map_err(|error| match error {
608                InboxError::ViewError(error) => ChainError::ViewError(error),
609                error => ChainError::InternalError(format!(
610                    "while processing messages in certified block: {error}"
611                )),
612            })?;
613        if newly_added && !skippable {
614            let seen = local_time;
615            self.unskippable_bundles
616                .push_back(TimestampedBundleInInbox { entry, seen });
617        }
618
619        // Remember the certificate for future validator/client synchronizations.
620        if add_to_received_log {
621            self.received_log.push(chain_and_height);
622        }
623        Ok(())
624    }
625
626    /// Updates the `received_log` trackers.
627    pub fn update_received_certificate_trackers(
628        &mut self,
629        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
630    ) {
631        for (name, tracker) in new_trackers {
632            self.received_certificate_trackers
633                .get_mut()
634                .entry(name)
635                .and_modify(|t| {
636                    // Because several synchronizations could happen in parallel, we need to make
637                    // sure to never go backward.
638                    if tracker > *t {
639                        *t = tracker;
640                    }
641                })
642                .or_insert(tracker);
643        }
644    }
645
646    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
647        self.execution_state
648            .system
649            .current_committee()
650            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
651    }
652
653    pub fn ownership(&self) -> &ChainOwnership {
654        self.execution_state.system.ownership.get()
655    }
656
657    /// Removes the incoming message bundles in the block from the inboxes.
658    ///
659    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
660    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
661    /// and `false` if the block is already confirmed.
662    #[instrument(skip_all, fields(
663        chain_id = %self.chain_id(),
664    ))]
665    pub async fn remove_bundles_from_inboxes(
666        &mut self,
667        timestamp: Timestamp,
668        must_be_present: bool,
669        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
670    ) -> Result<(), ChainError> {
671        let chain_id = self.chain_id();
672        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
673        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
674            ensure!(
675                bundle.timestamp <= timestamp,
676                ChainError::IncorrectBundleTimestamp {
677                    chain_id,
678                    bundle_timestamp: bundle.timestamp,
679                    block_timestamp: timestamp,
680                }
681            );
682            let bundles = bundles_by_origin.entry(*origin).or_default();
683            bundles.push(bundle);
684        }
685        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
686        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
687        let mut removed_unskippable = HashSet::new();
688        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
689            tracing::trace!(
690                "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
691                bundles
692                    .iter()
693                    .map(|bundle| bundle.height.to_string())
694                    .collect::<Vec<_>>()
695                    .join(", ")
696            );
697            for bundle in bundles {
698                // Mark the message as processed in the inbox.
699                let was_present = inbox
700                    .remove_bundle(bundle)
701                    .await
702                    .map_err(|error| (chain_id, origin, error))?;
703                if must_be_present {
704                    ensure!(
705                        was_present,
706                        ChainError::MissingCrossChainUpdate {
707                            chain_id,
708                            origin,
709                            height: bundle.height,
710                        }
711                    );
712                }
713                if was_present && !bundle.is_skippable() {
714                    removed_unskippable.insert(BundleInInbox::new(origin, bundle));
715                }
716            }
717        }
718        if !removed_unskippable.is_empty() {
719            // Delete all removed bundles from the front of the unskippable queue.
720            let maybe_front = self.unskippable_bundles.front();
721            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
722                self.unskippable_bundles.delete_front().await?;
723                while let Some(ts_entry) = self.unskippable_bundles.front() {
724                    if !removed_unskippable.remove(&ts_entry.entry) {
725                        if !self
726                            .removed_unskippable_bundles
727                            .contains(&ts_entry.entry)
728                            .await?
729                        {
730                            break;
731                        }
732                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
733                    }
734                    self.unskippable_bundles.delete_front().await?;
735                }
736            }
737            for entry in removed_unskippable {
738                self.removed_unskippable_bundles.insert(&entry)?;
739            }
740        }
741        #[cfg(with_metrics)]
742        metrics::NUM_INBOXES
743            .with_label_values(&[])
744            .observe(self.inboxes.count().await? as f64);
745        Ok(())
746    }
747
748    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
749    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
750        self.nonempty_outboxes.get().iter().copied().collect()
751    }
752
753    /// Returns the outboxes for the given targets, or an error if any of them are missing.
754    pub async fn load_outboxes(
755        &self,
756        targets: &[ChainId],
757    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
758        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
759        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
760        optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
761    }
762
763    /// Executes a block: first the incoming messages, then the main operation.
764    /// Does not update chain state other than the execution state.
765    #[instrument(skip_all, fields(
766        chain_id = %block.chain_id,
767        block_height = %block.height
768    ))]
769    #[expect(clippy::too_many_arguments)]
770    async fn execute_block_inner(
771        chain: &mut ExecutionStateView<C>,
772        confirmed_log: &LogView<C, CryptoHash>,
773        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
774        previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
775        block: &ProposedBlock,
776        local_time: Timestamp,
777        round: Option<u32>,
778        published_blobs: &[Blob],
779        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
780    ) -> Result<BlockExecutionOutcome, ChainError> {
781        #[cfg(with_metrics)]
782        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
783        chain.system.timestamp.set(block.timestamp);
784
785        let policy = chain
786            .system
787            .current_committee()
788            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
789            .1
790            .policy()
791            .clone();
792
793        let mut resource_controller = ResourceController::new(
794            Arc::new(policy),
795            ResourceTracker::default(),
796            block.authenticated_owner,
797        );
798
799        for blob in published_blobs {
800            let blob_id = blob.id();
801            resource_controller
802                .policy()
803                .check_blob_size(blob.content())
804                .with_execution_context(ChainExecutionContext::Block)?;
805            chain.system.used_blobs.insert(&blob_id)?;
806        }
807
808        // Execute each incoming bundle as a transaction, then each operation.
809        // Collect messages, events and oracle responses, each as one list per transaction.
810        let mut block_execution_tracker = BlockExecutionTracker::new(
811            &mut resource_controller,
812            published_blobs
813                .iter()
814                .map(|blob| (blob.id(), blob))
815                .collect(),
816            local_time,
817            replaying_oracle_responses,
818            block,
819        )?;
820
821        for transaction in block.transaction_refs() {
822            block_execution_tracker
823                .execute_transaction(transaction, round, chain)
824                .await?;
825        }
826
827        let recipients = block_execution_tracker.recipients();
828        let mut recipient_heights = Vec::new();
829        let mut indices = Vec::new();
830        for (recipient, height) in previous_message_blocks_view
831            .multi_get_pairs(recipients)
832            .await?
833        {
834            if let Some(height) = height {
835                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
836                indices.push(index);
837                recipient_heights.push((recipient, height));
838            }
839        }
840        let hashes = confirmed_log.multi_get(indices).await?;
841        let mut previous_message_blocks = BTreeMap::new();
842        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
843            let hash = hash.ok_or_else(|| {
844                ChainError::InternalError("missing entry in confirmed_log".into())
845            })?;
846            previous_message_blocks.insert(recipient, (hash, height));
847        }
848
849        let streams = block_execution_tracker.event_streams();
850        let mut stream_heights = Vec::new();
851        let mut indices = Vec::new();
852        for (stream, height) in previous_event_blocks_view.multi_get_pairs(streams).await? {
853            if let Some(height) = height {
854                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
855                indices.push(index);
856                stream_heights.push((stream, height));
857            }
858        }
859        let hashes = confirmed_log.multi_get(indices).await?;
860        let mut previous_event_blocks = BTreeMap::new();
861        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
862            let hash = hash.ok_or_else(|| {
863                ChainError::InternalError("missing entry in confirmed_log".into())
864            })?;
865            previous_event_blocks.insert(stream, (hash, height));
866        }
867
868        let state_hash = {
869            #[cfg(with_metrics)]
870            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
871            chain.crypto_hash_mut().await?
872        };
873
874        let (messages, oracle_responses, events, blobs, operation_results) =
875            block_execution_tracker.finalize();
876
877        Ok(BlockExecutionOutcome {
878            messages,
879            previous_message_blocks,
880            previous_event_blocks,
881            state_hash,
882            oracle_responses,
883            events,
884            blobs,
885            operation_results,
886        })
887    }
888
889    /// Executes a block: first the incoming messages, then the main operation.
890    /// Does not update chain state other than the execution state.
891    #[instrument(skip_all, fields(
892        chain_id = %self.chain_id(),
893        block_height = %block.height
894    ))]
895    pub async fn execute_block(
896        &mut self,
897        block: &ProposedBlock,
898        local_time: Timestamp,
899        round: Option<u32>,
900        published_blobs: &[Blob],
901        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
902    ) -> Result<BlockExecutionOutcome, ChainError> {
903        assert_eq!(
904            block.chain_id,
905            self.execution_state.context().extra().chain_id()
906        );
907
908        self.initialize_if_needed(local_time).await?;
909
910        let chain_timestamp = *self.execution_state.system.timestamp.get();
911        ensure!(
912            chain_timestamp <= block.timestamp,
913            ChainError::InvalidBlockTimestamp {
914                parent: chain_timestamp,
915                new: block.timestamp
916            }
917        );
918        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
919
920        ensure!(
921            block.published_blob_ids()
922                == published_blobs
923                    .iter()
924                    .map(|blob| blob.id())
925                    .collect::<BTreeSet<_>>(),
926            ChainError::InternalError("published_blobs mismatch".to_string())
927        );
928
929        if *self.execution_state.system.closed.get() {
930            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
931        }
932
933        Self::check_app_permissions(
934            self.execution_state.system.application_permissions.get(),
935            block,
936        )?;
937
938        Self::execute_block_inner(
939            &mut self.execution_state,
940            &self.confirmed_log,
941            &self.previous_message_blocks,
942            &self.previous_event_blocks,
943            block,
944            local_time,
945            round,
946            published_blobs,
947            replaying_oracle_responses,
948        )
949        .await
950    }
951
952    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
953    /// manager. This does not touch the execution state itself, which must be updated separately.
954    /// Returns the set of event streams that were updated as a result of applying the block.
955    #[instrument(skip_all, fields(
956        chain_id = %self.chain_id(),
957        block_height = %block.inner().inner().header.height
958    ))]
959    pub async fn apply_confirmed_block(
960        &mut self,
961        block: &ConfirmedBlock,
962        local_time: Timestamp,
963    ) -> Result<BTreeSet<StreamId>, ChainError> {
964        let hash = block.inner().hash();
965        let block = block.inner().inner();
966        self.execution_state_hash.set(Some(block.header.state_hash));
967        let updated_streams = self.process_emitted_events(block).await?;
968        let recipients = self.process_outgoing_messages(block).await?;
969
970        for recipient in recipients {
971            self.previous_message_blocks
972                .insert(&recipient, block.header.height)?;
973        }
974        for event in block.body.events.iter().flatten() {
975            self.previous_event_blocks
976                .insert(&event.stream_id, block.header.height)?;
977        }
978        // Last, reset the consensus state based on the current ownership.
979        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
980
981        // Advance to next block height.
982        let tip = self.tip_state.get_mut();
983        tip.block_hash = Some(hash);
984        tip.next_block_height.try_add_assign_one()?;
985        tip.update_counters(&block.body.transactions, &block.body.messages)?;
986        self.confirmed_log.push(hash);
987        self.preprocessed_blocks.remove(&block.header.height)?;
988        Ok(updated_streams)
989    }
990
991    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
992    /// Returns the set of streams that were updated as a result of preprocessing the block.
993    #[instrument(skip_all, fields(
994        chain_id = %self.chain_id(),
995        block_height = %block.inner().inner().header.height
996    ))]
997    pub async fn preprocess_block(
998        &mut self,
999        block: &ConfirmedBlock,
1000    ) -> Result<BTreeSet<StreamId>, ChainError> {
1001        let hash = block.inner().hash();
1002        let block = block.inner().inner();
1003        let height = block.header.height;
1004        if height < self.tip_state.get().next_block_height {
1005            return Ok(BTreeSet::new());
1006        }
1007        self.process_outgoing_messages(block).await?;
1008        let updated_streams = self.process_emitted_events(block).await?;
1009        self.preprocessed_blocks.insert(&height, hash)?;
1010        Ok(updated_streams)
1011    }
1012
1013    /// Returns whether this is a child chain.
1014    pub fn is_child(&self) -> bool {
1015        let Some(description) = self.execution_state.system.description.get() else {
1016            // Root chains are always initialized, so this must be a child chain.
1017            return true;
1018        };
1019        description.is_child()
1020    }
1021
1022    /// Verifies that the block is valid according to the chain's application permission settings.
1023    #[instrument(skip_all, fields(
1024        block_height = %block.height,
1025        num_transactions = %block.transactions.len()
1026    ))]
1027    fn check_app_permissions(
1028        app_permissions: &ApplicationPermissions,
1029        block: &ProposedBlock,
1030    ) -> Result<(), ChainError> {
1031        let mut mandatory = HashSet::<ApplicationId>::from_iter(
1032            app_permissions.mandatory_applications.iter().copied(),
1033        );
1034        for transaction in &block.transactions {
1035            match transaction {
1036                Transaction::ExecuteOperation(operation)
1037                    if operation.is_exempt_from_permissions() =>
1038                {
1039                    mandatory.clear()
1040                }
1041                Transaction::ExecuteOperation(operation) => {
1042                    ensure!(
1043                        app_permissions.can_execute_operations(&operation.application_id()),
1044                        ChainError::AuthorizedApplications(
1045                            app_permissions.execute_operations.clone().unwrap()
1046                        )
1047                    );
1048                    if let Operation::User { application_id, .. } = operation {
1049                        mandatory.remove(application_id);
1050                    }
1051                }
1052                Transaction::ReceiveMessages(incoming_bundle) => {
1053                    for pending in incoming_bundle.messages() {
1054                        if let Message::User { application_id, .. } = &pending.message {
1055                            mandatory.remove(application_id);
1056                        }
1057                    }
1058                }
1059            }
1060        }
1061        ensure!(
1062            mandatory.is_empty(),
1063            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1064        );
1065        Ok(())
1066    }
1067
1068    /// Returns the hashes of all blocks we have in the given range.
1069    #[instrument(skip_all, fields(
1070        chain_id = %self.chain_id(),
1071        next_block_height = %self.tip_state.get().next_block_height,
1072        start_height = ?range.start_bound(),
1073        end_height = ?range.end_bound()
1074    ))]
1075    pub async fn block_hashes(
1076        &self,
1077        range: impl RangeBounds<BlockHeight>,
1078    ) -> Result<Vec<CryptoHash>, ChainError> {
1079        let next_height = self.tip_state.get().next_block_height;
1080        // If the range is not empty, it can always be represented as start..=end.
1081        let Some((start, end)) = range.to_inclusive() else {
1082            return Ok(Vec::new());
1083        };
1084        // Everything up to (excluding) next_height is in confirmed_log.
1085        let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1086            let usize_start = usize::try_from(start)?;
1087            let usize_end = usize::try_from(end.min(last_height))?;
1088            self.confirmed_log.read(usize_start..=usize_end).await?
1089        } else {
1090            Vec::new()
1091        };
1092        // Everything after (including) next_height in preprocessed_blocks if we have it.
1093        let block_heights = (start.max(next_height).0..=end.0)
1094            .map(BlockHeight)
1095            .collect::<Vec<_>>();
1096        for hash in self
1097            .preprocessed_blocks
1098            .multi_get(&block_heights)
1099            .await?
1100            .into_iter()
1101            .flatten()
1102        {
1103            hashes.push(hash);
1104        }
1105        Ok(hashes)
1106    }
1107
1108    /// Resets the chain manager for the next block height.
1109    fn reset_chain_manager(
1110        &mut self,
1111        next_height: BlockHeight,
1112        local_time: Timestamp,
1113    ) -> Result<(), ChainError> {
1114        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1115        let ownership = self.execution_state.system.ownership.get().clone();
1116        let fallback_owners =
1117            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1118        self.pending_validated_blobs.clear();
1119        self.pending_proposed_blobs.clear();
1120        self.manager
1121            .reset(ownership, next_height, local_time, fallback_owners)
1122    }
1123
1124    /// Updates the outboxes with the messages sent in the block.
1125    ///
1126    /// Returns the set of all recipients.
1127    #[instrument(skip_all, fields(
1128        chain_id = %self.chain_id(),
1129        block_height = %block.header.height
1130    ))]
1131    async fn process_outgoing_messages(
1132        &mut self,
1133        block: &Block,
1134    ) -> Result<Vec<ChainId>, ChainError> {
1135        // Record the messages of the execution. Messages are understood within an
1136        // application.
1137        let recipients = block.recipients();
1138        let block_height = block.header.height;
1139        let next_height = self.tip_state.get().next_block_height;
1140
1141        // Update the outboxes.
1142        let outbox_counters = self.outbox_counters.get_mut();
1143        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1144        let targets = recipients.into_iter().collect::<Vec<_>>();
1145        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1146        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1147            if block_height > next_height {
1148                // There may be a gap in the chain before this block. We can only add it to this
1149                // outbox if the previous message to the same recipient has already been added.
1150                if *outbox.next_height_to_schedule.get() > block_height {
1151                    continue; // We already added this recipient's messages to the outbox.
1152                }
1153                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1154                {
1155                    // The block with the last added message has already been executed; look up its
1156                    // hash in the confirmed_log.
1157                    Some(height) if height < next_height => {
1158                        let index =
1159                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1160                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1161                            ChainError::InternalError("missing entry in confirmed_log".into())
1162                        })?)
1163                    }
1164                    // The block with last added message has not been executed yet. If we have it,
1165                    // it's in preprocessed_blocks.
1166                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1167                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1168                    )?),
1169                    None => None, // No message to that sender was added yet.
1170                };
1171                // Only schedule if this block contains the next message for that recipient.
1172                match (
1173                    maybe_prev_hash,
1174                    block.body.previous_message_blocks.get(target),
1175                ) {
1176                    (None, None) => {
1177                        // No previous message block expected and none indicated by the outbox -
1178                        // all good
1179                    }
1180                    (Some(_), None) => {
1181                        // Outbox indicates there was a previous message block, but
1182                        // previous_message_blocks has no idea about it - possible bug
1183                        return Err(ChainError::InternalError(
1184                            "block indicates no previous message block,\
1185                            but we have one in the outbox"
1186                                .into(),
1187                        ));
1188                    }
1189                    (None, Some((_, prev_msg_block_height))) => {
1190                        // We have no previously processed block in the outbox, but we are
1191                        // expecting one - this could be due to an empty outbox having been pruned.
1192                        // Only process the outbox if the height of the previous message block is
1193                        // lower than the tip
1194                        if *prev_msg_block_height >= next_height {
1195                            continue;
1196                        }
1197                    }
1198                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1199                        // Only process the outbox if the hashes match.
1200                        if prev_hash != prev_msg_block_hash {
1201                            continue;
1202                        }
1203                    }
1204                }
1205            }
1206            if outbox.schedule_message(block_height)? {
1207                *outbox_counters.entry(block_height).or_default() += 1;
1208                nonempty_outboxes.insert(*target);
1209            }
1210        }
1211
1212        #[cfg(with_metrics)]
1213        metrics::NUM_OUTBOXES
1214            .with_label_values(&[])
1215            .observe(self.outboxes.count().await? as f64);
1216        Ok(targets)
1217    }
1218
1219    /// Updates the event streams with events emitted by the block if they form a contiguous
1220    /// sequence (might not be the case when preprocessing a block).
1221    /// Returns the set of updated event streams.
1222    #[instrument(skip_all, fields(
1223        chain_id = %self.chain_id(),
1224        block_height = %block.header.height
1225    ))]
1226    async fn process_emitted_events(
1227        &mut self,
1228        block: &Block,
1229    ) -> Result<BTreeSet<StreamId>, ChainError> {
1230        let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1231        for event in block.body.events.iter().flatten() {
1232            emitted_streams
1233                .entry(event.stream_id.clone())
1234                .or_default()
1235                .insert(event.index);
1236        }
1237        let mut stream_ids = Vec::new();
1238        let mut list_indices = Vec::new();
1239        for (stream_id, indices) in emitted_streams {
1240            stream_ids.push(stream_id);
1241            list_indices.push(indices);
1242        }
1243
1244        let mut updated_streams = BTreeSet::new();
1245        for ((stream_id, next_index), indices) in self
1246            .next_expected_events
1247            .multi_get_pairs(stream_ids)
1248            .await?
1249            .into_iter()
1250            .zip(list_indices)
1251        {
1252            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1253                // we don't expect the epoch stream to contain event 0
1254                1
1255            } else {
1256                0
1257            };
1258            let mut current_expected_index = next_index.unwrap_or(initial_index);
1259            for index in indices {
1260                if index == current_expected_index {
1261                    updated_streams.insert(stream_id.clone());
1262                    current_expected_index = index.saturating_add(1);
1263                }
1264            }
1265            if current_expected_index != 0 {
1266                self.next_expected_events
1267                    .insert(&stream_id, current_expected_index)?;
1268            }
1269        }
1270        Ok(updated_streams)
1271    }
1272}
1273
1274#[test]
1275fn empty_block_size() {
1276    let size = bcs::serialized_size(&crate::block::Block::new(
1277        crate::test::make_first_block(
1278            linera_execution::test_utils::dummy_chain_description(0).id(),
1279        ),
1280        crate::data_types::BlockExecutionOutcome::default(),
1281    ))
1282    .unwrap();
1283    assert_eq!(size, EMPTY_BLOCK_SIZE);
1284}