linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    ops::RangeBounds,
7    sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{
14        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15        BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16    },
17    ensure,
18    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19    ownership::ChainOwnership,
20};
21use linera_execution::{
22    committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, Operation,
23    OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController, ResourceTracker,
24    ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27    bucket_queue_view::BucketQueueView,
28    context::Context,
29    log_view::LogView,
30    map_view::MapView,
31    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32    register_view::RegisterView,
33    set_view::SetView,
34    store::ReadableKeyValueStore as _,
35    views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40    block::{Block, ConfirmedBlock},
41    block_tracker::BlockExecutionTracker,
42    data_types::{
43        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
44    },
45    inbox::{Cursor, InboxError, InboxStateView},
46    manager::ChainManager,
47    outbox::OutboxStateView,
48    pending_blobs::PendingBlobsView,
49    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61    use std::sync::LazyLock;
62
63    use linera_base::prometheus_util::{
64        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65        register_int_counter_vec,
66    };
67    use linera_execution::ResourceTracker;
68    use prometheus::{HistogramVec, IntCounterVec};
69
70    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72    });
73
74    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75        register_histogram_vec(
76            "block_execution_latency",
77            "Block execution latency",
78            &[],
79            exponential_bucket_latencies(1000.0),
80        )
81    });
82
83    #[cfg(with_metrics)]
84    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85        register_histogram_vec(
86            "message_execution_latency",
87            "Message execution latency",
88            &[],
89            exponential_bucket_latencies(50.0),
90        )
91    });
92
93    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "operation_execution_latency",
96            "Operation execution latency",
97            &[],
98            exponential_bucket_latencies(50.0),
99        )
100    });
101
102    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "wasm_fuel_used_per_block",
105            "Wasm fuel used per block",
106            &[],
107            exponential_bucket_interval(10.0, 1_000_000.0),
108        )
109    });
110
111    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "evm_fuel_used_per_block",
114            "EVM fuel used per block",
115            &[],
116            exponential_bucket_interval(10.0, 1_000_000.0),
117        )
118    });
119
120    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "vm_num_reads_per_block",
123            "VM number of reads per block",
124            &[],
125            exponential_bucket_interval(0.1, 100.0),
126        )
127    });
128
129    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "vm_bytes_read_per_block",
132            "VM number of bytes read per block",
133            &[],
134            exponential_bucket_interval(0.1, 10_000_000.0),
135        )
136    });
137
138    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139        register_histogram_vec(
140            "vm_bytes_written_per_block",
141            "VM number of bytes written per block",
142            &[],
143            exponential_bucket_interval(0.1, 10_000_000.0),
144        )
145    });
146
147    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148        register_histogram_vec(
149            "state_hash_computation_latency",
150            "Time to recompute the state hash",
151            &[],
152            exponential_bucket_latencies(500.0),
153        )
154    });
155
156    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157        register_histogram_vec(
158            "num_inboxes",
159            "Number of inboxes",
160            &[],
161            exponential_bucket_interval(1.0, 10_000.0),
162        )
163    });
164
165    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166        register_histogram_vec(
167            "num_outboxes",
168            "Number of outboxes",
169            &[],
170            exponential_bucket_interval(1.0, 10_000.0),
171        )
172    });
173
174    /// Tracks block execution metrics in Prometheus.
175    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177        WASM_FUEL_USED_PER_BLOCK
178            .with_label_values(&[])
179            .observe(tracker.wasm_fuel as f64);
180        EVM_FUEL_USED_PER_BLOCK
181            .with_label_values(&[])
182            .observe(tracker.evm_fuel as f64);
183        VM_NUM_READS_PER_BLOCK
184            .with_label_values(&[])
185            .observe(tracker.read_operations as f64);
186        VM_BYTES_READ_PER_BLOCK
187            .with_label_values(&[])
188            .observe(tracker.bytes_read as f64);
189        VM_BYTES_WRITTEN_PER_BLOCK
190            .with_label_values(&[])
191            .observe(tracker.bytes_written as f64);
192    }
193}
194
195/// The BCS-serialized size of an empty [`Block`].
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 95;
197
198/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
199#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct TimestampedBundleInInbox {
202    /// The origin and cursor of the bundle.
203    pub entry: BundleInInbox,
204    /// The timestamp when the bundle was added to the inbox.
205    pub seen: Timestamp,
206}
207
208/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
209#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
211pub struct BundleInInbox {
212    /// The origin from which we received the bundle.
213    pub origin: ChainId,
214    /// The cursor of the bundle in the inbox.
215    pub cursor: Cursor,
216}
217
218impl BundleInInbox {
219    fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
220        BundleInInbox {
221            cursor: Cursor::from(bundle),
222            origin,
223        }
224    }
225}
226
227// The `TimestampedBundleInInbox` is a relatively small type, so a total
228// of 100 seems reasonable for the storing of the data.
229const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
230
231/// A view accessing the state of a chain.
232#[cfg_attr(
233    with_graphql,
234    derive(async_graphql::SimpleObject),
235    graphql(cache_control(no_cache))
236)]
237#[derive(Debug, RootView, ClonableView)]
238pub struct ChainStateView<C>
239where
240    C: Clone + Context + Send + Sync + 'static,
241{
242    /// Execution state, including system and user applications.
243    pub execution_state: ExecutionStateView<C>,
244    /// Hash of the execution state.
245    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
246
247    /// Block-chaining state.
248    pub tip_state: RegisterView<C, ChainTipState>,
249
250    /// Consensus state.
251    pub manager: ChainManager<C>,
252    /// Pending validated block that is still missing blobs.
253    /// The incomplete set of blobs for the pending validated block.
254    pub pending_validated_blobs: PendingBlobsView<C>,
255    /// The incomplete sets of blobs for upcoming proposals.
256    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
257
258    /// Hashes of all certified blocks for this sender.
259    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
260    pub confirmed_log: LogView<C, CryptoHash>,
261    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
262    pub received_log: LogView<C, ChainAndHeight>,
263    /// The number of `received_log` entries we have synchronized, for each validator.
264    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
265
266    /// Mailboxes used to receive messages indexed by their origin.
267    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
268    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
269    pub unskippable_bundles:
270        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
271    /// Unskippable bundles that have been removed but are still in the queue.
272    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
273    /// The heights of previous blocks that sent messages to the same recipients.
274    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
275    /// The heights of previous blocks that published events to the same streams.
276    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
277    /// Mailboxes used to send messages, indexed by their target.
278    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
279    /// Number of outgoing messages in flight for each block height.
280    /// We use a `RegisterView` to prioritize speed for small maps.
281    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
282    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
283    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
284
285    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
286    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
287}
288
289/// Block-chaining state.
290#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
291#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
292pub struct ChainTipState {
293    /// Hash of the latest certified block in this chain, if any.
294    pub block_hash: Option<CryptoHash>,
295    /// Sequence number tracking blocks.
296    pub next_block_height: BlockHeight,
297    /// Number of incoming message bundles.
298    pub num_incoming_bundles: u32,
299    /// Number of operations.
300    pub num_operations: u32,
301    /// Number of outgoing messages.
302    pub num_outgoing_messages: u32,
303}
304
305impl ChainTipState {
306    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
307    /// expected parent.
308    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
309        ensure!(
310            new_block.height == self.next_block_height,
311            ChainError::UnexpectedBlockHeight {
312                expected_block_height: self.next_block_height,
313                found_block_height: new_block.height
314            }
315        );
316        ensure!(
317            new_block.previous_block_hash == self.block_hash,
318            ChainError::UnexpectedPreviousBlockHash
319        );
320        Ok(())
321    }
322
323    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
324    /// it is higher than the tip.
325    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
326        ensure!(
327            self.next_block_height >= height,
328            ChainError::MissingEarlierBlocks {
329                current_block_height: self.next_block_height,
330            }
331        );
332        Ok(self.next_block_height > height)
333    }
334
335    /// Checks if the measurement counters would be valid.
336    pub fn update_counters(
337        &mut self,
338        incoming_bundles: &[IncomingBundle],
339        operations: &[Operation],
340        messages: &[Vec<OutgoingMessage>],
341    ) -> Result<(), ChainError> {
342        let num_incoming_bundles =
343            u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
344        self.num_incoming_bundles = self
345            .num_incoming_bundles
346            .checked_add(num_incoming_bundles)
347            .ok_or(ArithmeticError::Overflow)?;
348
349        let num_operations =
350            u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
351        self.num_operations = self
352            .num_operations
353            .checked_add(num_operations)
354            .ok_or(ArithmeticError::Overflow)?;
355
356        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
357            .map_err(|_| ArithmeticError::Overflow)?;
358        self.num_outgoing_messages = self
359            .num_outgoing_messages
360            .checked_add(num_outgoing_messages)
361            .ok_or(ArithmeticError::Overflow)?;
362
363        Ok(())
364    }
365}
366
367impl<C> ChainStateView<C>
368where
369    C: Context + Clone + Send + Sync + 'static,
370    C::Extra: ExecutionRuntimeContext,
371{
372    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
373    pub fn chain_id(&self) -> ChainId {
374        self.context().extra().chain_id()
375    }
376
377    pub async fn query_application(
378        &mut self,
379        local_time: Timestamp,
380        query: Query,
381        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
382    ) -> Result<QueryOutcome, ChainError> {
383        let context = QueryContext {
384            chain_id: self.chain_id(),
385            next_block_height: self.tip_state.get().next_block_height,
386            local_time,
387        };
388        self.execution_state
389            .query_application(context, query, service_runtime_endpoint)
390            .await
391            .with_execution_context(ChainExecutionContext::Query)
392    }
393
394    pub async fn describe_application(
395        &mut self,
396        application_id: ApplicationId,
397    ) -> Result<ApplicationDescription, ChainError> {
398        self.execution_state
399            .system
400            .describe_application(application_id, &mut TransactionTracker::default())
401            .await
402            .with_execution_context(ChainExecutionContext::DescribeApplication)
403    }
404
405    pub async fn mark_messages_as_received(
406        &mut self,
407        target: &ChainId,
408        height: BlockHeight,
409    ) -> Result<bool, ChainError> {
410        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
411        let updates = outbox.mark_messages_as_received(height).await?;
412        if updates.is_empty() {
413            return Ok(false);
414        }
415        for update in updates {
416            let counter = self
417                .outbox_counters
418                .get_mut()
419                .get_mut(&update)
420                .ok_or_else(|| {
421                    ChainError::InternalError("message counter should be present".into())
422                })?;
423            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
424            if *counter == 0 {
425                // Important for the test in `all_messages_delivered_up_to`.
426                self.outbox_counters.get_mut().remove(&update);
427            }
428        }
429        if outbox.queue.count() == 0 {
430            self.nonempty_outboxes.get_mut().remove(target);
431            // If the outbox is empty and not ahead of the executed blocks, remove it.
432            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
433                self.outboxes.remove_entry(target)?;
434            }
435        }
436        #[cfg(with_metrics)]
437        metrics::NUM_OUTBOXES
438            .with_label_values(&[])
439            .observe(self.outboxes.count().await? as f64);
440        Ok(true)
441    }
442
443    /// Returns true if there are no more outgoing messages in flight up to the given
444    /// block height.
445    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
446        tracing::debug!(
447            "Messages left in {:.8}'s outbox: {:?}",
448            self.chain_id(),
449            self.outbox_counters.get()
450        );
451        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
452            key > &height
453        } else {
454            true
455        }
456    }
457
458    /// Invariant for the states of active chains.
459    pub fn is_active(&self) -> bool {
460        self.execution_state.system.is_active()
461    }
462
463    /// Invariant for the states of active chains.
464    pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
465        // Initialize ourselves.
466        if self
467            .execution_state
468            .system
469            .initialize_chain(self.chain_id())
470            .await
471            .with_execution_context(ChainExecutionContext::Block)?
472        {
473            // the chain was already initialized
474            return Ok(());
475        }
476        // Recompute the state hash.
477        let hash = self.execution_state.crypto_hash().await?;
478        self.execution_state_hash.set(Some(hash));
479        let maybe_committee = self.execution_state.system.current_committee().into_iter();
480        // Last, reset the consensus state based on the current ownership.
481        self.manager.reset(
482            self.execution_state.system.ownership.get().clone(),
483            BlockHeight(0),
484            local_time,
485            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
486        )?;
487        self.save().await?;
488        Ok(())
489    }
490
491    /// Verifies that this chain is up-to-date and all the messages executed ahead of time
492    /// have been properly received by now.
493    pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
494        let chain_id = self.chain_id();
495        let pairs = self.inboxes.try_load_all_entries().await?;
496        let max_stream_queries = self.context().store().max_stream_queries();
497        let stream = stream::iter(pairs)
498            .map(|(origin, inbox)| async move {
499                if let Some(bundle) = inbox.removed_bundles.front().await? {
500                    return Err(ChainError::MissingCrossChainUpdate {
501                        chain_id,
502                        origin,
503                        height: bundle.height,
504                    });
505                }
506                Ok::<(), ChainError>(())
507            })
508            .buffer_unordered(max_stream_queries);
509        stream.try_collect::<Vec<_>>().await?;
510        Ok(())
511    }
512
513    pub async fn next_block_height_to_receive(
514        &self,
515        origin: &ChainId,
516    ) -> Result<BlockHeight, ChainError> {
517        let inbox = self.inboxes.try_load_entry(origin).await?;
518        match inbox {
519            Some(inbox) => inbox.next_block_height_to_receive(),
520            None => Ok(BlockHeight::ZERO),
521        }
522    }
523
524    pub async fn last_anticipated_block_height(
525        &self,
526        origin: &ChainId,
527    ) -> Result<Option<BlockHeight>, ChainError> {
528        let inbox = self.inboxes.try_load_entry(origin).await?;
529        match inbox {
530            Some(inbox) => match inbox.removed_bundles.back().await? {
531                Some(bundle) => Ok(Some(bundle.height)),
532                None => Ok(None),
533            },
534            None => Ok(None),
535        }
536    }
537
538    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
539    /// internal error if the bundle doesn't appear to be new, based on the sender's
540    /// height. The value `local_time` is specific to each validator and only used for
541    /// round timeouts.
542    ///
543    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
544    pub async fn receive_message_bundle(
545        &mut self,
546        origin: &ChainId,
547        bundle: MessageBundle,
548        local_time: Timestamp,
549        add_to_received_log: bool,
550    ) -> Result<(), ChainError> {
551        assert!(!bundle.messages.is_empty());
552        let chain_id = self.chain_id();
553        tracing::trace!(
554            "Processing new messages to {chain_id:.8} from {origin} at height {}",
555            bundle.height,
556        );
557        let chain_and_height = ChainAndHeight {
558            chain_id: *origin,
559            height: bundle.height,
560        };
561
562        match self.ensure_is_active(local_time).await {
563            Ok(_) => (),
564            // if the only issue was that we couldn't initialize the chain because of a
565            // missing chain description blob, we might still want to update the inbox
566            Err(ChainError::ExecutionError(exec_err, _))
567                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
568                if blobs.iter().all(|blob_id| {
569                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
570                })) => {}
571            err => {
572                return err;
573            }
574        }
575
576        // Process the inbox bundle and update the inbox state.
577        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
578        #[cfg(with_metrics)]
579        metrics::NUM_INBOXES
580            .with_label_values(&[])
581            .observe(self.inboxes.count().await? as f64);
582        let entry = BundleInInbox::new(*origin, &bundle);
583        let skippable = bundle.is_skippable();
584        let newly_added = inbox
585            .add_bundle(bundle)
586            .await
587            .map_err(|error| match error {
588                InboxError::ViewError(error) => ChainError::ViewError(error),
589                error => ChainError::InternalError(format!(
590                    "while processing messages in certified block: {error}"
591                )),
592            })?;
593        if newly_added && !skippable {
594            let seen = local_time;
595            self.unskippable_bundles
596                .push_back(TimestampedBundleInInbox { entry, seen });
597        }
598
599        // Remember the certificate for future validator/client synchronizations.
600        if add_to_received_log {
601            self.received_log.push(chain_and_height);
602        }
603        Ok(())
604    }
605
606    /// Updates the `received_log` trackers.
607    pub fn update_received_certificate_trackers(
608        &mut self,
609        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
610    ) {
611        for (name, tracker) in new_trackers {
612            self.received_certificate_trackers
613                .get_mut()
614                .entry(name)
615                .and_modify(|t| {
616                    // Because several synchronizations could happen in parallel, we need to make
617                    // sure to never go backward.
618                    if tracker > *t {
619                        *t = tracker;
620                    }
621                })
622                .or_insert(tracker);
623        }
624    }
625
626    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
627        self.execution_state
628            .system
629            .current_committee()
630            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
631    }
632
633    pub fn ownership(&self) -> &ChainOwnership {
634        self.execution_state.system.ownership.get()
635    }
636
637    /// Removes the incoming message bundles in the block from the inboxes.
638    pub async fn remove_bundles_from_inboxes(
639        &mut self,
640        timestamp: Timestamp,
641        incoming_bundles: &[IncomingBundle],
642    ) -> Result<(), ChainError> {
643        let chain_id = self.chain_id();
644        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
645        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
646            ensure!(
647                bundle.timestamp <= timestamp,
648                ChainError::IncorrectBundleTimestamp {
649                    chain_id,
650                    bundle_timestamp: bundle.timestamp,
651                    block_timestamp: timestamp,
652                }
653            );
654            let bundles = bundles_by_origin.entry(origin).or_default();
655            bundles.push(bundle);
656        }
657        let origins = bundles_by_origin.keys().copied();
658        let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
659        let mut removed_unskippable = HashSet::new();
660        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
661            tracing::trace!(
662                "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
663                bundles
664                    .iter()
665                    .map(|bundle| bundle.height)
666                    .collect::<Vec<_>>()
667            );
668            for bundle in bundles {
669                // Mark the message as processed in the inbox.
670                let was_present = inbox
671                    .remove_bundle(bundle)
672                    .await
673                    .map_err(|error| (chain_id, *origin, error))?;
674                if was_present && !bundle.is_skippable() {
675                    removed_unskippable.insert(BundleInInbox::new(*origin, bundle));
676                }
677            }
678        }
679        if !removed_unskippable.is_empty() {
680            // Delete all removed bundles from the front of the unskippable queue.
681            let maybe_front = self.unskippable_bundles.front();
682            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
683                self.unskippable_bundles.delete_front().await?;
684                while let Some(ts_entry) = self.unskippable_bundles.front() {
685                    if !removed_unskippable.remove(&ts_entry.entry) {
686                        if !self
687                            .removed_unskippable_bundles
688                            .contains(&ts_entry.entry)
689                            .await?
690                        {
691                            break;
692                        }
693                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
694                    }
695                    self.unskippable_bundles.delete_front().await?;
696                }
697            }
698            for entry in removed_unskippable {
699                self.removed_unskippable_bundles.insert(&entry)?;
700            }
701        }
702        #[cfg(with_metrics)]
703        metrics::NUM_INBOXES
704            .with_label_values(&[])
705            .observe(self.inboxes.count().await? as f64);
706        Ok(())
707    }
708
709    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
710    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
711        self.nonempty_outboxes.get().iter().copied().collect()
712    }
713
714    /// Returns the outboxes for the given targets, or an error if any of them are missing.
715    pub async fn load_outboxes(
716        &self,
717        targets: &[ChainId],
718    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
719        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
720        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
721        optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
722    }
723
724    /// Executes a block: first the incoming messages, then the main operation.
725    /// Does not update chain state other than the execution state.
726    #[expect(clippy::too_many_arguments)]
727    async fn execute_block_inner(
728        chain: &mut ExecutionStateView<C>,
729        confirmed_log: &LogView<C, CryptoHash>,
730        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
731        previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
732        block: &ProposedBlock,
733        local_time: Timestamp,
734        round: Option<u32>,
735        published_blobs: &[Blob],
736        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
737    ) -> Result<BlockExecutionOutcome, ChainError> {
738        #[cfg(with_metrics)]
739        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
740        chain.system.timestamp.set(block.timestamp);
741
742        let policy = chain
743            .system
744            .current_committee()
745            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
746            .1
747            .policy()
748            .clone();
749
750        let mut resource_controller = ResourceController::new(
751            Arc::new(policy),
752            ResourceTracker::default(),
753            block.authenticated_signer,
754        );
755
756        for blob in published_blobs {
757            let blob_id = blob.id();
758            resource_controller
759                .policy()
760                .check_blob_size(blob.content())
761                .with_execution_context(ChainExecutionContext::Block)?;
762            chain.system.used_blobs.insert(&blob_id)?;
763        }
764
765        // Execute each incoming bundle as a transaction, then each operation.
766        // Collect messages, events and oracle responses, each as one list per transaction.
767        let mut block_execution_tracker = BlockExecutionTracker::new(
768            &mut resource_controller,
769            published_blobs
770                .iter()
771                .map(|blob| (blob.id(), blob))
772                .collect(),
773            local_time,
774            replaying_oracle_responses,
775            block,
776        )?;
777
778        for transaction in block.transactions() {
779            block_execution_tracker
780                .execute_transaction(transaction, round, chain)
781                .await?;
782        }
783
784        let recipients = block_execution_tracker.recipients();
785        let mut previous_message_blocks = BTreeMap::new();
786        for recipient in recipients {
787            if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
788                let hash = confirmed_log
789                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
790                    .await?
791                    .ok_or_else(|| {
792                        ChainError::InternalError("missing entry in confirmed_log".into())
793                    })?;
794                previous_message_blocks.insert(recipient, (hash, height));
795            }
796        }
797
798        let streams = block_execution_tracker.event_streams();
799        let mut previous_event_blocks = BTreeMap::new();
800        for stream in streams {
801            if let Some(height) = previous_event_blocks_view.get(&stream).await? {
802                let hash = confirmed_log
803                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
804                    .await?
805                    .ok_or_else(|| {
806                        ChainError::InternalError("missing entry in confirmed_log".into())
807                    })?;
808                previous_event_blocks.insert(stream, (hash, height));
809            }
810        }
811
812        let state_hash = {
813            #[cfg(with_metrics)]
814            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
815            chain.crypto_hash().await?
816        };
817
818        let (messages, oracle_responses, events, blobs, operation_results) =
819            block_execution_tracker.finalize();
820
821        Ok(BlockExecutionOutcome {
822            messages,
823            previous_message_blocks,
824            previous_event_blocks,
825            state_hash,
826            oracle_responses,
827            events,
828            blobs,
829            operation_results,
830        })
831    }
832
833    /// Executes a block: first the incoming messages, then the main operation.
834    /// Does not update chain state other than the execution state.
835    pub async fn execute_block(
836        &mut self,
837        block: &ProposedBlock,
838        local_time: Timestamp,
839        round: Option<u32>,
840        published_blobs: &[Blob],
841        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
842    ) -> Result<BlockExecutionOutcome, ChainError> {
843        assert_eq!(
844            block.chain_id,
845            self.execution_state.context().extra().chain_id()
846        );
847
848        self.ensure_is_active(local_time).await?;
849
850        let chain_timestamp = *self.execution_state.system.timestamp.get();
851        ensure!(
852            chain_timestamp <= block.timestamp,
853            ChainError::InvalidBlockTimestamp {
854                parent: chain_timestamp,
855                new: block.timestamp
856            }
857        );
858        ensure!(
859            !block.incoming_bundles.is_empty() || !block.operations.is_empty(),
860            ChainError::EmptyBlock
861        );
862
863        ensure!(
864            block.published_blob_ids()
865                == published_blobs
866                    .iter()
867                    .map(|blob| blob.id())
868                    .collect::<BTreeSet<_>>(),
869            ChainError::InternalError("published_blobs mismatch".to_string())
870        );
871
872        if *self.execution_state.system.closed.get() {
873            ensure!(
874                !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
875                ChainError::ClosedChain
876            );
877        }
878
879        Self::check_app_permissions(
880            self.execution_state.system.application_permissions.get(),
881            block,
882        )?;
883
884        Self::execute_block_inner(
885            &mut self.execution_state,
886            &self.confirmed_log,
887            &self.previous_message_blocks,
888            &self.previous_event_blocks,
889            block,
890            local_time,
891            round,
892            published_blobs,
893            replaying_oracle_responses,
894        )
895        .await
896    }
897
898    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
899    /// manager. This does not touch the execution state itself, which must be updated separately.
900    pub async fn apply_confirmed_block(
901        &mut self,
902        block: &ConfirmedBlock,
903        local_time: Timestamp,
904    ) -> Result<(), ChainError> {
905        let hash = block.inner().hash();
906        let block = block.inner().inner();
907        self.execution_state_hash.set(Some(block.header.state_hash));
908        let recipients = self.process_outgoing_messages(block).await?;
909
910        for recipient in recipients {
911            self.previous_message_blocks
912                .insert(&recipient, block.header.height)?;
913        }
914        for event in block.body.events.iter().flatten() {
915            self.previous_event_blocks
916                .insert(&event.stream_id, block.header.height)?;
917        }
918        // Last, reset the consensus state based on the current ownership.
919        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
920
921        // Advance to next block height.
922        let tip = self.tip_state.get_mut();
923        tip.block_hash = Some(hash);
924        tip.next_block_height.try_add_assign_one()?;
925        tip.update_counters(
926            &block.body.incoming_bundles,
927            &block.body.operations,
928            &block.body.messages,
929        )?;
930        self.confirmed_log.push(hash);
931        self.preprocessed_blocks.remove(&block.header.height)?;
932        Ok(())
933    }
934
935    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
936    pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
937        let hash = block.inner().hash();
938        let block = block.inner().inner();
939        let height = block.header.height;
940        if height < self.tip_state.get().next_block_height {
941            return Ok(());
942        }
943        self.process_outgoing_messages(block).await?;
944        self.preprocessed_blocks.insert(&height, hash)?;
945        Ok(())
946    }
947
948    /// Returns whether this is a child chain.
949    pub fn is_child(&self) -> bool {
950        let Some(description) = self.execution_state.system.description.get() else {
951            // Root chains are always initialized, so this must be a child chain.
952            return true;
953        };
954        description.is_child()
955    }
956
957    /// Verifies that the block is valid according to the chain's application permission settings.
958    fn check_app_permissions(
959        app_permissions: &ApplicationPermissions,
960        block: &ProposedBlock,
961    ) -> Result<(), ChainError> {
962        let mut mandatory = HashSet::<ApplicationId>::from_iter(
963            app_permissions.mandatory_applications.iter().cloned(),
964        );
965        for operation in &block.operations {
966            if operation.is_exempt_from_permissions() {
967                mandatory.clear();
968                continue;
969            }
970            ensure!(
971                app_permissions.can_execute_operations(&operation.application_id()),
972                ChainError::AuthorizedApplications(
973                    app_permissions.execute_operations.clone().unwrap()
974                )
975            );
976            if let Operation::User { application_id, .. } = operation {
977                mandatory.remove(application_id);
978            }
979        }
980        for pending in block.incoming_messages() {
981            if mandatory.is_empty() {
982                break;
983            }
984            if let Message::User { application_id, .. } = &pending.message {
985                mandatory.remove(application_id);
986            }
987        }
988        ensure!(
989            mandatory.is_empty(),
990            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
991        );
992        Ok(())
993    }
994
995    /// Returns the hashes of all blocks we have in the given range.
996    pub async fn block_hashes(
997        &self,
998        range: impl RangeBounds<BlockHeight>,
999    ) -> Result<Vec<CryptoHash>, ChainError> {
1000        let next_height = self.tip_state.get().next_block_height;
1001        // If the range is not empty, it can always be represented as start..=end.
1002        let Some((start, end)) = range.to_inclusive() else {
1003            return Ok(Vec::new());
1004        };
1005        // Everything up to (excluding) next_height is in confirmed_log.
1006        let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1007            let usize_start = usize::try_from(start)?;
1008            let usize_end = usize::try_from(end.min(last_height))?;
1009            self.confirmed_log.read(usize_start..=usize_end).await?
1010        } else {
1011            Vec::new()
1012        };
1013        // Everything after (including) next_height in preprocessed_blocks if we have it.
1014        for height in start.max(next_height).0..=end.0 {
1015            if let Some(hash) = self.preprocessed_blocks.get(&BlockHeight(height)).await? {
1016                hashes.push(hash);
1017            }
1018        }
1019        Ok(hashes)
1020    }
1021
1022    /// Resets the chain manager for the next block height.
1023    fn reset_chain_manager(
1024        &mut self,
1025        next_height: BlockHeight,
1026        local_time: Timestamp,
1027    ) -> Result<(), ChainError> {
1028        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1029        let ownership = self.execution_state.system.ownership.get().clone();
1030        let fallback_owners =
1031            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1032        self.pending_validated_blobs.clear();
1033        self.pending_proposed_blobs.clear();
1034        self.manager
1035            .reset(ownership, next_height, local_time, fallback_owners)
1036    }
1037
1038    /// Updates the outboxes with the messages sent in the block.
1039    ///
1040    /// Returns the set of all recipients.
1041    async fn process_outgoing_messages(
1042        &mut self,
1043        block: &Block,
1044    ) -> Result<Vec<ChainId>, ChainError> {
1045        // Record the messages of the execution. Messages are understood within an
1046        // application.
1047        let recipients = block.recipients();
1048        let block_height = block.header.height;
1049        let next_height = self.tip_state.get().next_block_height;
1050
1051        // Update the outboxes.
1052        let outbox_counters = self.outbox_counters.get_mut();
1053        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1054        let targets = recipients.into_iter().collect::<Vec<_>>();
1055        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1056        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1057            if block_height > next_height {
1058                // There may be a gap in the chain before this block. We can only add it to this
1059                // outbox if the previous message to the same recipient has already been added.
1060                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1061                {
1062                    // The block with the last added message has already been executed; look up its
1063                    // hash in the confirmed_log.
1064                    Some(height) if height < next_height => {
1065                        let index =
1066                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1067                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1068                            ChainError::InternalError("missing entry in confirmed_log".into())
1069                        })?)
1070                    }
1071                    // The block with last added message has not been executed yet. If we have it,
1072                    // it's in preprocessed_blocks.
1073                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1074                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1075                    )?),
1076                    None => None, // No message to that sender was added yet.
1077                };
1078                // Only schedule if this block contains the next message for that recipient.
1079                match (
1080                    maybe_prev_hash,
1081                    block.body.previous_message_blocks.get(target),
1082                ) {
1083                    (None, None) => {
1084                        // No previous message block expected and none indicated by the outbox -
1085                        // all good
1086                    }
1087                    (Some(_), None) => {
1088                        // Outbox indicates there was a previous message block, but
1089                        // previous_message_blocks has no idea about it - possible bug
1090                        return Err(ChainError::InternalError(
1091                            "block indicates no previous message block,\
1092                            but we have one in the outbox"
1093                                .into(),
1094                        ));
1095                    }
1096                    (None, Some((_, prev_msg_block_height))) => {
1097                        // We have no previously processed block in the outbox, but we are
1098                        // expecting one - this could be due to an empty outbox having been pruned.
1099                        // Only process the outbox if the height of the previous message block is
1100                        // lower than the tip
1101                        if *prev_msg_block_height >= next_height {
1102                            continue;
1103                        }
1104                    }
1105                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1106                        // Only process the outbox if the hashes match.
1107                        if prev_hash != prev_msg_block_hash {
1108                            continue;
1109                        }
1110                    }
1111                }
1112            }
1113            if outbox.schedule_message(block_height)? {
1114                *outbox_counters.entry(block_height).or_default() += 1;
1115                nonempty_outboxes.insert(*target);
1116            }
1117        }
1118
1119        #[cfg(with_metrics)]
1120        metrics::NUM_OUTBOXES
1121            .with_label_values(&[])
1122            .observe(self.outboxes.count().await? as f64);
1123        Ok(targets)
1124    }
1125}
1126
1127#[test]
1128fn empty_block_size() {
1129    let size = bcs::serialized_size(&crate::block::Block::new(
1130        crate::test::make_first_block(
1131            linera_execution::test_utils::dummy_chain_description(0).id(),
1132        ),
1133        crate::data_types::BlockExecutionOutcome::default(),
1134    ))
1135    .unwrap();
1136    assert_eq!(size, EMPTY_BLOCK_SIZE);
1137}