linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    ops::RangeBounds,
7    sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{
14        Amount, ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15        BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16    },
17    ensure,
18    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, MessageId},
19    ownership::ChainOwnership,
20};
21use linera_execution::{
22    committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext,
23    Operation, OperationContext, OutgoingMessage, Query, QueryContext, QueryOutcome,
24    ResourceController, ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27    bucket_queue_view::BucketQueueView,
28    context::Context,
29    log_view::LogView,
30    map_view::MapView,
31    reentrant_collection_view::ReentrantCollectionView,
32    register_view::RegisterView,
33    set_view::SetView,
34    store::ReadableKeyValueStore as _,
35    views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40    block::{Block, ConfirmedBlock},
41    block_tracker::BlockExecutionTracker,
42    data_types::{
43        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageAction, MessageBundle,
44        PostedMessage, ProposedBlock, Transaction,
45    },
46    inbox::{Cursor, InboxError, InboxStateView},
47    manager::ChainManager,
48    outbox::OutboxStateView,
49    pending_blobs::PendingBlobsView,
50    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62    use std::sync::LazyLock;
63
64    use linera_base::prometheus_util::{
65        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66        register_int_counter_vec,
67    };
68    use linera_execution::ResourceTracker;
69    use prometheus::{HistogramVec, IntCounterVec};
70
71    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73    });
74
75    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76        register_histogram_vec(
77            "block_execution_latency",
78            "Block execution latency",
79            &[],
80            exponential_bucket_latencies(1000.0),
81        )
82    });
83
84    #[cfg(with_metrics)]
85    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86        register_histogram_vec(
87            "message_execution_latency",
88            "Message execution latency",
89            &[],
90            exponential_bucket_latencies(50.0),
91        )
92    });
93
94    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95        register_histogram_vec(
96            "operation_execution_latency",
97            "Operation execution latency",
98            &[],
99            exponential_bucket_latencies(50.0),
100        )
101    });
102
103    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104        register_histogram_vec(
105            "wasm_fuel_used_per_block",
106            "Wasm fuel used per block",
107            &[],
108            exponential_bucket_interval(10.0, 1_000_000.0),
109        )
110    });
111
112    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "evm_fuel_used_per_block",
115            "EVM fuel used per block",
116            &[],
117            exponential_bucket_interval(10.0, 1_000_000.0),
118        )
119    });
120
121    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122        register_histogram_vec(
123            "vm_num_reads_per_block",
124            "VM number of reads per block",
125            &[],
126            exponential_bucket_interval(0.1, 100.0),
127        )
128    });
129
130    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131        register_histogram_vec(
132            "vm_bytes_read_per_block",
133            "VM number of bytes read per block",
134            &[],
135            exponential_bucket_interval(0.1, 10_000_000.0),
136        )
137    });
138
139    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140        register_histogram_vec(
141            "vm_bytes_written_per_block",
142            "VM number of bytes written per block",
143            &[],
144            exponential_bucket_interval(0.1, 10_000_000.0),
145        )
146    });
147
148    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149        register_histogram_vec(
150            "state_hash_computation_latency",
151            "Time to recompute the state hash",
152            &[],
153            exponential_bucket_latencies(500.0),
154        )
155    });
156
157    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158        register_histogram_vec(
159            "num_inboxes",
160            "Number of inboxes",
161            &[],
162            exponential_bucket_interval(1.0, 10_000.0),
163        )
164    });
165
166    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
167        register_histogram_vec(
168            "num_outboxes",
169            "Number of outboxes",
170            &[],
171            exponential_bucket_interval(1.0, 10_000.0),
172        )
173    });
174
175    /// Tracks block execution metrics in Prometheus.
176    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
177        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
178        WASM_FUEL_USED_PER_BLOCK
179            .with_label_values(&[])
180            .observe(tracker.wasm_fuel as f64);
181        EVM_FUEL_USED_PER_BLOCK
182            .with_label_values(&[])
183            .observe(tracker.evm_fuel as f64);
184        VM_NUM_READS_PER_BLOCK
185            .with_label_values(&[])
186            .observe(tracker.read_operations as f64);
187        VM_BYTES_READ_PER_BLOCK
188            .with_label_values(&[])
189            .observe(tracker.bytes_read as f64);
190        VM_BYTES_WRITTEN_PER_BLOCK
191            .with_label_values(&[])
192            .observe(tracker.bytes_written as f64);
193    }
194}
195
196/// The BCS-serialized size of an empty [`Block`].
197pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
198
199/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
200#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TimestampedBundleInInbox {
203    /// The origin and cursor of the bundle.
204    pub entry: BundleInInbox,
205    /// The timestamp when the bundle was added to the inbox.
206    pub seen: Timestamp,
207}
208
209/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
210#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
211#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
212pub struct BundleInInbox {
213    /// The origin from which we received the bundle.
214    pub origin: ChainId,
215    /// The cursor of the bundle in the inbox.
216    pub cursor: Cursor,
217}
218
219impl BundleInInbox {
220    fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
221        BundleInInbox {
222            cursor: Cursor::from(bundle),
223            origin,
224        }
225    }
226}
227
228// The `TimestampedBundleInInbox` is a relatively small type, so a total
229// of 100 seems reasonable for the storing of the data.
230const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
231
232/// A view accessing the state of a chain.
233#[cfg_attr(
234    with_graphql,
235    derive(async_graphql::SimpleObject),
236    graphql(cache_control(no_cache))
237)]
238#[derive(Debug, RootView, ClonableView)]
239pub struct ChainStateView<C>
240where
241    C: Clone + Context + Send + Sync + 'static,
242{
243    /// Execution state, including system and user applications.
244    pub execution_state: ExecutionStateView<C>,
245    /// Hash of the execution state.
246    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
247
248    /// Block-chaining state.
249    pub tip_state: RegisterView<C, ChainTipState>,
250
251    /// Consensus state.
252    pub manager: ChainManager<C>,
253    /// Pending validated block that is still missing blobs.
254    /// The incomplete set of blobs for the pending validated block.
255    pub pending_validated_blobs: PendingBlobsView<C>,
256    /// The incomplete sets of blobs for upcoming proposals.
257    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
258
259    /// Hashes of all certified blocks for this sender.
260    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
261    pub confirmed_log: LogView<C, CryptoHash>,
262    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
263    pub received_log: LogView<C, ChainAndHeight>,
264    /// The number of `received_log` entries we have synchronized, for each validator.
265    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
266
267    /// Mailboxes used to receive messages indexed by their origin.
268    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
269    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
270    pub unskippable_bundles:
271        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
272    /// Unskippable bundles that have been removed but are still in the queue.
273    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
274    /// The heights of previous blocks that sent messages to the same recipients.
275    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
276    /// Mailboxes used to send messages, indexed by their target.
277    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
278    /// Number of outgoing messages in flight for each block height.
279    /// We use a `RegisterView` to prioritize speed for small maps.
280    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
281
282    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
283    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
284}
285
286/// Block-chaining state.
287#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
288#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
289pub struct ChainTipState {
290    /// Hash of the latest certified block in this chain, if any.
291    pub block_hash: Option<CryptoHash>,
292    /// Sequence number tracking blocks.
293    pub next_block_height: BlockHeight,
294    /// Number of incoming message bundles.
295    pub num_incoming_bundles: u32,
296    /// Number of operations.
297    pub num_operations: u32,
298    /// Number of outgoing messages.
299    pub num_outgoing_messages: u32,
300}
301
302impl ChainTipState {
303    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
304    /// expected parent.
305    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
306        ensure!(
307            new_block.height == self.next_block_height,
308            ChainError::UnexpectedBlockHeight {
309                expected_block_height: self.next_block_height,
310                found_block_height: new_block.height
311            }
312        );
313        ensure!(
314            new_block.previous_block_hash == self.block_hash,
315            ChainError::UnexpectedPreviousBlockHash
316        );
317        Ok(())
318    }
319
320    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
321    /// it is higher than the tip.
322    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
323        ensure!(
324            self.next_block_height >= height,
325            ChainError::MissingEarlierBlocks {
326                current_block_height: self.next_block_height,
327            }
328        );
329        Ok(self.next_block_height > height)
330    }
331
332    /// Checks if the measurement counters would be valid.
333    pub fn update_counters(
334        &mut self,
335        incoming_bundles: &[IncomingBundle],
336        operations: &[Operation],
337        messages: &[Vec<OutgoingMessage>],
338    ) -> Result<(), ChainError> {
339        let num_incoming_bundles =
340            u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
341        self.num_incoming_bundles = self
342            .num_incoming_bundles
343            .checked_add(num_incoming_bundles)
344            .ok_or(ArithmeticError::Overflow)?;
345
346        let num_operations =
347            u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
348        self.num_operations = self
349            .num_operations
350            .checked_add(num_operations)
351            .ok_or(ArithmeticError::Overflow)?;
352
353        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
354            .map_err(|_| ArithmeticError::Overflow)?;
355        self.num_outgoing_messages = self
356            .num_outgoing_messages
357            .checked_add(num_outgoing_messages)
358            .ok_or(ArithmeticError::Overflow)?;
359
360        Ok(())
361    }
362}
363
364impl<C> ChainStateView<C>
365where
366    C: Context + Clone + Send + Sync + 'static,
367    C::Extra: ExecutionRuntimeContext,
368{
369    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
370    pub fn chain_id(&self) -> ChainId {
371        self.context().extra().chain_id()
372    }
373
374    pub async fn query_application(
375        &mut self,
376        local_time: Timestamp,
377        query: Query,
378        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
379    ) -> Result<QueryOutcome, ChainError> {
380        let context = QueryContext {
381            chain_id: self.chain_id(),
382            next_block_height: self.tip_state.get().next_block_height,
383            local_time,
384        };
385        self.execution_state
386            .query_application(context, query, service_runtime_endpoint)
387            .await
388            .with_execution_context(ChainExecutionContext::Query)
389    }
390
391    pub async fn describe_application(
392        &mut self,
393        application_id: ApplicationId,
394    ) -> Result<ApplicationDescription, ChainError> {
395        self.execution_state
396            .system
397            .describe_application(application_id, &mut TransactionTracker::default())
398            .await
399            .with_execution_context(ChainExecutionContext::DescribeApplication)
400    }
401
402    pub async fn mark_messages_as_received(
403        &mut self,
404        target: &ChainId,
405        height: BlockHeight,
406    ) -> Result<bool, ChainError> {
407        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
408        let updates = outbox.mark_messages_as_received(height).await?;
409        if updates.is_empty() {
410            return Ok(false);
411        }
412        for update in updates {
413            let counter = self
414                .outbox_counters
415                .get_mut()
416                .get_mut(&update)
417                .expect("message counter should be present");
418            *counter = counter
419                .checked_sub(1)
420                .expect("message counter should not underflow");
421            if *counter == 0 {
422                // Important for the test in `all_messages_delivered_up_to`.
423                self.outbox_counters.get_mut().remove(&update);
424            }
425        }
426        #[cfg(with_metrics)]
427        metrics::NUM_OUTBOXES
428            .with_label_values(&[])
429            .observe(self.outboxes.count().await? as f64);
430        Ok(true)
431    }
432
433    /// Returns true if there are no more outgoing messages in flight up to the given
434    /// block height.
435    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
436        tracing::debug!(
437            "Messages left in {:.8}'s outbox: {:?}",
438            self.chain_id(),
439            self.outbox_counters.get()
440        );
441        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
442            key > &height
443        } else {
444            true
445        }
446    }
447
448    /// Invariant for the states of active chains.
449    pub fn is_active(&self) -> bool {
450        self.execution_state.system.is_active()
451    }
452
453    /// Invariant for the states of active chains.
454    pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
455        // Initialize ourselves.
456        if self
457            .execution_state
458            .system
459            .initialize_chain(self.chain_id())
460            .await
461            .with_execution_context(ChainExecutionContext::Block)?
462        {
463            // the chain was already initialized
464            return Ok(());
465        }
466        // Recompute the state hash.
467        let hash = self.execution_state.crypto_hash().await?;
468        self.execution_state_hash.set(Some(hash));
469        let maybe_committee = self.execution_state.system.current_committee().into_iter();
470        // Last, reset the consensus state based on the current ownership.
471        self.manager.reset(
472            self.execution_state.system.ownership.get().clone(),
473            BlockHeight(0),
474            local_time,
475            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
476        )?;
477        self.save().await?;
478        Ok(())
479    }
480
481    /// Verifies that this chain is up-to-date and all the messages executed ahead of time
482    /// have been properly received by now.
483    pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
484        let chain_id = self.chain_id();
485        let pairs = self.inboxes.try_load_all_entries().await?;
486        let max_stream_queries = self.context().store().max_stream_queries();
487        let stream = stream::iter(pairs)
488            .map(|(origin, inbox)| async move {
489                if let Some(bundle) = inbox.removed_bundles.front().await? {
490                    return Err(ChainError::MissingCrossChainUpdate {
491                        chain_id,
492                        origin,
493                        height: bundle.height,
494                    });
495                }
496                Ok::<(), ChainError>(())
497            })
498            .buffer_unordered(max_stream_queries);
499        stream.try_collect::<Vec<_>>().await?;
500        Ok(())
501    }
502
503    pub async fn next_block_height_to_receive(
504        &self,
505        origin: &ChainId,
506    ) -> Result<BlockHeight, ChainError> {
507        let inbox = self.inboxes.try_load_entry(origin).await?;
508        match inbox {
509            Some(inbox) => inbox.next_block_height_to_receive(),
510            None => Ok(BlockHeight::ZERO),
511        }
512    }
513
514    pub async fn last_anticipated_block_height(
515        &self,
516        origin: &ChainId,
517    ) -> Result<Option<BlockHeight>, ChainError> {
518        let inbox = self.inboxes.try_load_entry(origin).await?;
519        match inbox {
520            Some(inbox) => match inbox.removed_bundles.back().await? {
521                Some(bundle) => Ok(Some(bundle.height)),
522                None => Ok(None),
523            },
524            None => Ok(None),
525        }
526    }
527
528    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
529    /// internal error if the bundle doesn't appear to be new, based on the sender's
530    /// height. The value `local_time` is specific to each validator and only used for
531    /// round timeouts.
532    ///
533    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
534    pub async fn receive_message_bundle(
535        &mut self,
536        origin: &ChainId,
537        bundle: MessageBundle,
538        local_time: Timestamp,
539        add_to_received_log: bool,
540    ) -> Result<(), ChainError> {
541        assert!(!bundle.messages.is_empty());
542        let chain_id = self.chain_id();
543        tracing::trace!(
544            "Processing new messages to {chain_id:.8} from {origin} at height {}",
545            bundle.height,
546        );
547        let chain_and_height = ChainAndHeight {
548            chain_id: *origin,
549            height: bundle.height,
550        };
551
552        match self.ensure_is_active(local_time).await {
553            Ok(_) => (),
554            // if the only issue was that we couldn't initialize the chain because of a
555            // missing chain description blob, we might still want to update the inbox
556            Err(ChainError::ExecutionError(exec_err, _))
557                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
558                if blobs.iter().all(|blob_id| {
559                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
560                })) => {}
561            err => {
562                return err;
563            }
564        }
565
566        // Process the inbox bundle and update the inbox state.
567        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
568        #[cfg(with_metrics)]
569        metrics::NUM_INBOXES
570            .with_label_values(&[])
571            .observe(self.inboxes.count().await? as f64);
572        let entry = BundleInInbox::new(*origin, &bundle);
573        let skippable = bundle.is_skippable();
574        let newly_added = inbox
575            .add_bundle(bundle)
576            .await
577            .map_err(|error| match error {
578                InboxError::ViewError(error) => ChainError::ViewError(error),
579                error => ChainError::InternalError(format!(
580                    "while processing messages in certified block: {error}"
581                )),
582            })?;
583        if newly_added && !skippable {
584            let seen = local_time;
585            self.unskippable_bundles
586                .push_back(TimestampedBundleInInbox { entry, seen });
587        }
588
589        // Remember the certificate for future validator/client synchronizations.
590        if add_to_received_log {
591            self.received_log.push(chain_and_height);
592        }
593        Ok(())
594    }
595
596    /// Updates the `received_log` trackers.
597    pub fn update_received_certificate_trackers(
598        &mut self,
599        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
600    ) {
601        for (name, tracker) in new_trackers {
602            self.received_certificate_trackers
603                .get_mut()
604                .entry(name)
605                .and_modify(|t| {
606                    // Because several synchronizations could happen in parallel, we need to make
607                    // sure to never go backward.
608                    if tracker > *t {
609                        *t = tracker;
610                    }
611                })
612                .or_insert(tracker);
613        }
614    }
615
616    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
617        self.execution_state
618            .system
619            .current_committee()
620            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
621    }
622
623    pub fn ownership(&self) -> &ChainOwnership {
624        self.execution_state.system.ownership.get()
625    }
626
627    /// Removes the incoming message bundles in the block from the inboxes.
628    pub async fn remove_bundles_from_inboxes(
629        &mut self,
630        timestamp: Timestamp,
631        incoming_bundles: &[IncomingBundle],
632    ) -> Result<(), ChainError> {
633        let chain_id = self.chain_id();
634        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
635        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
636            ensure!(
637                bundle.timestamp <= timestamp,
638                ChainError::IncorrectBundleTimestamp {
639                    chain_id,
640                    bundle_timestamp: bundle.timestamp,
641                    block_timestamp: timestamp,
642                }
643            );
644            let bundles = bundles_by_origin.entry(origin).or_default();
645            bundles.push(bundle);
646        }
647        let origins = bundles_by_origin.keys().copied();
648        let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
649        let mut removed_unskippable = HashSet::new();
650        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
651            tracing::trace!(
652                "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
653                bundles
654                    .iter()
655                    .map(|bundle| bundle.height)
656                    .collect::<Vec<_>>()
657            );
658            for bundle in bundles {
659                // Mark the message as processed in the inbox.
660                let was_present = inbox
661                    .remove_bundle(bundle)
662                    .await
663                    .map_err(|error| (chain_id, *origin, error))?;
664                if was_present && !bundle.is_skippable() {
665                    removed_unskippable.insert(BundleInInbox::new(*origin, bundle));
666                }
667            }
668        }
669        if !removed_unskippable.is_empty() {
670            // Delete all removed bundles from the front of the unskippable queue.
671            let maybe_front = self.unskippable_bundles.front();
672            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
673                self.unskippable_bundles.delete_front().await?;
674                while let Some(ts_entry) = self.unskippable_bundles.front() {
675                    if !removed_unskippable.remove(&ts_entry.entry) {
676                        if !self
677                            .removed_unskippable_bundles
678                            .contains(&ts_entry.entry)
679                            .await?
680                        {
681                            break;
682                        }
683                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
684                    }
685                    self.unskippable_bundles.delete_front().await?;
686                }
687            }
688            for entry in removed_unskippable {
689                self.removed_unskippable_bundles.insert(&entry)?;
690            }
691        }
692        #[cfg(with_metrics)]
693        metrics::NUM_INBOXES
694            .with_label_values(&[])
695            .observe(self.inboxes.count().await? as f64);
696        Ok(())
697    }
698
699    /// Executes a block: first the incoming messages, then the main operation.
700    /// Does not update chain state other than the execution state.
701    #[expect(clippy::too_many_arguments)]
702    async fn execute_block_inner(
703        chain: &mut ExecutionStateView<C>,
704        confirmed_log: &LogView<C, CryptoHash>,
705        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
706        block: &ProposedBlock,
707        local_time: Timestamp,
708        round: Option<u32>,
709        published_blobs: &[Blob],
710        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
711    ) -> Result<BlockExecutionOutcome, ChainError> {
712        #[cfg(with_metrics)]
713        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
714        chain.system.timestamp.set(block.timestamp);
715
716        let policy = chain
717            .system
718            .current_committee()
719            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
720            .1
721            .policy()
722            .clone();
723
724        let mut resource_controller = ResourceController::new(
725            Arc::new(policy),
726            ResourceTracker::default(),
727            block.authenticated_signer,
728        );
729
730        for blob in published_blobs {
731            let blob_id = blob.id();
732            resource_controller
733                .policy()
734                .check_blob_size(blob.content())
735                .with_execution_context(ChainExecutionContext::Block)?;
736            chain.system.used_blobs.insert(&blob_id)?;
737        }
738
739        // Execute each incoming bundle as a transaction, then each operation.
740        // Collect messages, events and oracle responses, each as one list per transaction.
741        let mut block_execution_tracker = BlockExecutionTracker::new(
742            &mut resource_controller,
743            published_blobs
744                .iter()
745                .map(|blob| (blob.id(), blob))
746                .collect(),
747            local_time,
748            replaying_oracle_responses,
749            block,
750        )?;
751
752        for transaction in block.transactions() {
753            let chain_execution_context =
754                block_execution_tracker.chain_execution_context(&transaction);
755            let mut txn_tracker = block_execution_tracker.new_transaction_tracker()?;
756            match transaction {
757                Transaction::ReceiveMessages(incoming_bundle) => {
758                    block_execution_tracker
759                        .resource_controller_mut()
760                        .track_block_size_of(&incoming_bundle)
761                        .with_execution_context(chain_execution_context)?;
762                    for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
763                        Box::pin(Self::execute_message_in_block(
764                            chain,
765                            message_id,
766                            posted_message,
767                            incoming_bundle,
768                            block,
769                            round,
770                            &mut txn_tracker,
771                            block_execution_tracker.resource_controller_mut(),
772                        ))
773                        .await?;
774                    }
775                }
776                Transaction::ExecuteOperation(operation) => {
777                    block_execution_tracker
778                        .resource_controller_mut()
779                        .with_state(&mut chain.system)
780                        .await?
781                        .track_block_size_of(&operation)
782                        .with_execution_context(chain_execution_context)?;
783                    #[cfg(with_metrics)]
784                    let _operation_latency = metrics::OPERATION_EXECUTION_LATENCY.measure_latency();
785                    let context = OperationContext {
786                        chain_id: block.chain_id,
787                        height: block.height,
788                        round,
789                        authenticated_signer: block.authenticated_signer,
790                        authenticated_caller_id: None,
791                        timestamp: block.timestamp,
792                    };
793                    Box::pin(chain.execute_operation(
794                        context,
795                        operation.clone(),
796                        &mut txn_tracker,
797                        block_execution_tracker.resource_controller_mut(),
798                    ))
799                    .await
800                    .with_execution_context(chain_execution_context)?;
801                    block_execution_tracker
802                        .resource_controller_mut()
803                        .with_state(&mut chain.system)
804                        .await?
805                        .track_operation(operation)
806                        .with_execution_context(chain_execution_context)?;
807                }
808            }
809
810            let txn_outcome = txn_tracker
811                .into_outcome()
812                .with_execution_context(chain_execution_context)?;
813
814            block_execution_tracker
815                .process_txn_outcome(&txn_outcome, &mut chain.system, chain_execution_context)
816                .await?;
817        }
818
819        let recipients = block_execution_tracker.recipients();
820        let mut previous_message_blocks = BTreeMap::new();
821        for recipient in recipients {
822            if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
823                let hash = confirmed_log
824                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
825                    .await?
826                    .ok_or_else(|| {
827                        ChainError::InternalError("missing entry in confirmed_log".into())
828                    })?;
829                previous_message_blocks.insert(recipient, hash);
830            }
831        }
832
833        let state_hash = {
834            #[cfg(with_metrics)]
835            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
836            chain.crypto_hash().await?
837        };
838
839        let (messages, oracle_responses, events, blobs, operation_results) =
840            block_execution_tracker.finalize();
841
842        Ok(BlockExecutionOutcome {
843            messages,
844            previous_message_blocks,
845            state_hash,
846            oracle_responses,
847            events,
848            blobs,
849            operation_results,
850        })
851    }
852
853    /// Executes a block: first the incoming messages, then the main operation.
854    /// Does not update chain state other than the execution state.
855    pub async fn execute_block(
856        &mut self,
857        block: &ProposedBlock,
858        local_time: Timestamp,
859        round: Option<u32>,
860        published_blobs: &[Blob],
861        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
862    ) -> Result<BlockExecutionOutcome, ChainError> {
863        assert_eq!(
864            block.chain_id,
865            self.execution_state.context().extra().chain_id()
866        );
867
868        self.ensure_is_active(local_time).await?;
869
870        let chain_timestamp = *self.execution_state.system.timestamp.get();
871        ensure!(
872            chain_timestamp <= block.timestamp,
873            ChainError::InvalidBlockTimestamp {
874                parent: chain_timestamp,
875                new: block.timestamp
876            }
877        );
878        ensure!(
879            !block.incoming_bundles.is_empty() || !block.operations.is_empty(),
880            ChainError::EmptyBlock
881        );
882
883        ensure!(
884            block.published_blob_ids()
885                == published_blobs
886                    .iter()
887                    .map(|blob| blob.id())
888                    .collect::<BTreeSet<_>>(),
889            ChainError::InternalError("published_blobs mismatch".to_string())
890        );
891
892        if *self.execution_state.system.closed.get() {
893            ensure!(
894                !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
895                ChainError::ClosedChain
896            );
897        }
898
899        Self::check_app_permissions(
900            self.execution_state.system.application_permissions.get(),
901            block,
902        )?;
903
904        Self::execute_block_inner(
905            &mut self.execution_state,
906            &self.confirmed_log,
907            &self.previous_message_blocks,
908            block,
909            local_time,
910            round,
911            published_blobs,
912            replaying_oracle_responses,
913        )
914        .await
915    }
916
917    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
918    /// manager. This does not touch the execution state itself, which must be updated separately.
919    pub async fn apply_confirmed_block(
920        &mut self,
921        block: &ConfirmedBlock,
922        local_time: Timestamp,
923    ) -> Result<(), ChainError> {
924        let hash = block.inner().hash();
925        let block = block.inner().inner();
926        self.execution_state_hash.set(Some(block.header.state_hash));
927        let recipients = self.process_outgoing_messages(block).await?;
928
929        for recipient in recipients {
930            self.previous_message_blocks
931                .insert(&recipient, block.header.height)?;
932        }
933        // Last, reset the consensus state based on the current ownership.
934        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
935
936        // Advance to next block height.
937        let tip = self.tip_state.get_mut();
938        tip.block_hash = Some(hash);
939        tip.next_block_height.try_add_assign_one()?;
940        tip.update_counters(
941            &block.body.incoming_bundles,
942            &block.body.operations,
943            &block.body.messages,
944        )?;
945        self.confirmed_log.push(hash);
946        self.preprocessed_blocks.remove(&block.header.height)?;
947        Ok(())
948    }
949
950    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
951    pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
952        let hash = block.inner().hash();
953        let block = block.inner().inner();
954        let height = block.header.height;
955        if height < self.tip_state.get().next_block_height {
956            return Ok(());
957        }
958        self.process_outgoing_messages(block).await?;
959        self.preprocessed_blocks.insert(&height, hash)?;
960        Ok(())
961    }
962
963    /// Executes a message as part of an incoming bundle in a block.
964    #[expect(clippy::too_many_arguments)]
965    async fn execute_message_in_block(
966        chain: &mut ExecutionStateView<C>,
967        message_id: MessageId,
968        posted_message: &PostedMessage,
969        incoming_bundle: &IncomingBundle,
970        block: &ProposedBlock,
971        round: Option<u32>,
972        txn_tracker: &mut TransactionTracker,
973        resource_controller: &mut ResourceController<Option<AccountOwner>>,
974    ) -> Result<(), ChainError> {
975        #[cfg(with_metrics)]
976        let _message_latency = metrics::MESSAGE_EXECUTION_LATENCY.measure_latency();
977        let context = MessageContext {
978            chain_id: block.chain_id,
979            is_bouncing: posted_message.is_bouncing(),
980            height: block.height,
981            round,
982            message_id,
983            authenticated_signer: posted_message.authenticated_signer,
984            refund_grant_to: posted_message.refund_grant_to,
985            timestamp: block.timestamp,
986        };
987        let mut grant = posted_message.grant;
988        match incoming_bundle.action {
989            MessageAction::Accept => {
990                let chain_execution_context =
991                    ChainExecutionContext::IncomingBundle(txn_tracker.transaction_index());
992                // Once a chain is closed, accepting incoming messages is not allowed.
993                ensure!(!chain.system.closed.get(), ChainError::ClosedChain);
994
995                Box::pin(chain.execute_message(
996                    context,
997                    posted_message.message.clone(),
998                    (grant > Amount::ZERO).then_some(&mut grant),
999                    txn_tracker,
1000                    resource_controller,
1001                ))
1002                .await
1003                .with_execution_context(chain_execution_context)?;
1004                chain
1005                    .send_refund(context, grant, txn_tracker)
1006                    .await
1007                    .with_execution_context(chain_execution_context)?;
1008            }
1009            MessageAction::Reject => {
1010                // If rejecting a message fails, the entire block proposal should be
1011                // scrapped.
1012                ensure!(
1013                    !posted_message.is_protected() || *chain.system.closed.get(),
1014                    ChainError::CannotRejectMessage {
1015                        chain_id: block.chain_id,
1016                        origin: incoming_bundle.origin,
1017                        posted_message: Box::new(posted_message.clone()),
1018                    }
1019                );
1020                if posted_message.is_tracked() {
1021                    // Bounce the message.
1022                    chain
1023                        .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1024                        .await
1025                        .with_execution_context(ChainExecutionContext::Block)?;
1026                } else {
1027                    // Nothing to do except maybe refund the grant.
1028                    chain
1029                        .send_refund(context, grant, txn_tracker)
1030                        .await
1031                        .with_execution_context(ChainExecutionContext::Block)?;
1032                }
1033            }
1034        }
1035        Ok(())
1036    }
1037
1038    /// Returns whether this is a child chain.
1039    pub fn is_child(&self) -> bool {
1040        let Some(description) = self.execution_state.system.description.get() else {
1041            // Root chains are always initialized, so this must be a child chain.
1042            return true;
1043        };
1044        description.is_child()
1045    }
1046
1047    /// Verifies that the block is valid according to the chain's application permission settings.
1048    fn check_app_permissions(
1049        app_permissions: &ApplicationPermissions,
1050        block: &ProposedBlock,
1051    ) -> Result<(), ChainError> {
1052        let mut mandatory = HashSet::<ApplicationId>::from_iter(
1053            app_permissions.mandatory_applications.iter().cloned(),
1054        );
1055        for operation in &block.operations {
1056            if operation.is_exempt_from_permissions() {
1057                mandatory.clear();
1058                continue;
1059            }
1060            ensure!(
1061                app_permissions.can_execute_operations(&operation.application_id()),
1062                ChainError::AuthorizedApplications(
1063                    app_permissions.execute_operations.clone().unwrap()
1064                )
1065            );
1066            if let Operation::User { application_id, .. } = operation {
1067                mandatory.remove(application_id);
1068            }
1069        }
1070        for pending in block.incoming_messages() {
1071            if mandatory.is_empty() {
1072                break;
1073            }
1074            if let Message::User { application_id, .. } = &pending.message {
1075                mandatory.remove(application_id);
1076            }
1077        }
1078        ensure!(
1079            mandatory.is_empty(),
1080            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1081        );
1082        Ok(())
1083    }
1084
1085    /// Returns the hashes of all blocks in the given range. Returns an error if we are missing
1086    /// any of those blocks.
1087    pub async fn block_hashes(
1088        &self,
1089        range: impl RangeBounds<BlockHeight>,
1090    ) -> Result<Vec<CryptoHash>, ChainError> {
1091        let next_height = self.tip_state.get().next_block_height;
1092        // If the range is not empty, it can always be represented as start..=end.
1093        let Some((start, end)) = range.to_inclusive() else {
1094            return Ok(Vec::new());
1095        };
1096        let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1097            let usize_start = usize::try_from(start)?;
1098            let usize_end = usize::try_from(end.min(last_height))?;
1099            self.confirmed_log.read(usize_start..=usize_end).await?
1100        } else {
1101            Vec::new()
1102        };
1103        for height in start.max(next_height).0..=end.0 {
1104            hashes.push(
1105                self.preprocessed_blocks
1106                    .get(&BlockHeight(height))
1107                    .await?
1108                    .ok_or_else(|| {
1109                        ChainError::InternalError("missing entry in preprocessed_blocks".into())
1110                    })?,
1111            );
1112        }
1113        Ok(hashes)
1114    }
1115
1116    /// Resets the chain manager for the next block height.
1117    fn reset_chain_manager(
1118        &mut self,
1119        next_height: BlockHeight,
1120        local_time: Timestamp,
1121    ) -> Result<(), ChainError> {
1122        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1123        let ownership = self.execution_state.system.ownership.get().clone();
1124        let fallback_owners =
1125            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1126        self.pending_validated_blobs.clear();
1127        self.pending_proposed_blobs.clear();
1128        self.manager
1129            .reset(ownership, next_height, local_time, fallback_owners)
1130    }
1131
1132    /// Updates the outboxes with the messages sent in the block.
1133    ///
1134    /// Returns the set of all recipients.
1135    async fn process_outgoing_messages(
1136        &mut self,
1137        block: &Block,
1138    ) -> Result<Vec<ChainId>, ChainError> {
1139        // Record the messages of the execution. Messages are understood within an
1140        // application.
1141        let recipients = block.recipients();
1142        let block_height = block.header.height;
1143        let next_height = self.tip_state.get().next_block_height;
1144
1145        // Update the outboxes.
1146        let outbox_counters = self.outbox_counters.get_mut();
1147        let targets = recipients.into_iter().collect::<Vec<_>>();
1148        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1149        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1150            if block_height > next_height {
1151                // Find the hash of the block that was most recently added to the outbox.
1152                let prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok() {
1153                    Some(height) if height < next_height => {
1154                        let index =
1155                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1156                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1157                            ChainError::InternalError("missing entry in confirmed_log".into())
1158                        })?)
1159                    }
1160                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1161                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1162                    )?),
1163                    None => None,
1164                };
1165                // Schedule only if no block is missing that sent something to the same recipient.
1166                if prev_hash.as_ref() != block.body.previous_message_blocks.get(target) {
1167                    continue;
1168                }
1169            }
1170            if outbox.schedule_message(block_height)? {
1171                *outbox_counters.entry(block_height).or_default() += 1;
1172            }
1173        }
1174
1175        #[cfg(with_metrics)]
1176        metrics::NUM_OUTBOXES
1177            .with_label_values(&[])
1178            .observe(self.outboxes.count().await? as f64);
1179        Ok(targets)
1180    }
1181}
1182
1183#[test]
1184fn empty_block_size() {
1185    let size = bcs::serialized_size(&crate::block::Block::new(
1186        crate::test::make_first_block(
1187            linera_execution::test_utils::dummy_chain_description(0).id(),
1188        ),
1189        crate::data_types::BlockExecutionOutcome::default(),
1190    ))
1191    .unwrap();
1192    assert_eq!(size, EMPTY_BLOCK_SIZE);
1193}