linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14        OracleResponse, Timestamp,
15    },
16    ensure,
17    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18    ownership::ChainOwnership,
19};
20use linera_execution::{
21    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
22    Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
23    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26    context::Context,
27    log_view::LogView,
28    map_view::MapView,
29    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
30    register_view::RegisterView,
31    views::{ClonableView, RootView, View},
32};
33use serde::{Deserialize, Serialize};
34use tracing::instrument;
35
36use crate::{
37    block::{Block, ConfirmedBlock},
38    block_tracker::BlockExecutionTracker,
39    data_types::{
40        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
41        Transaction,
42    },
43    inbox::{InboxError, InboxStateView},
44    manager::ChainManager,
45    outbox::OutboxStateView,
46    pending_blobs::PendingBlobsView,
47    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
48};
49
50#[cfg(test)]
51#[path = "unit_tests/chain_tests.rs"]
52mod chain_tests;
53
54#[cfg(with_metrics)]
55use linera_base::prometheus_util::MeasureLatency;
56
57#[cfg(with_metrics)]
58pub(crate) mod metrics {
59    use std::sync::LazyLock;
60
61    use linera_base::prometheus_util::{
62        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
63        register_int_counter_vec,
64    };
65    use linera_execution::ResourceTracker;
66    use prometheus::{HistogramVec, IntCounterVec};
67
68    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
70    });
71
72    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
73        register_histogram_vec(
74            "block_execution_latency",
75            "Block execution latency",
76            &[],
77            exponential_bucket_interval(50.0_f64, 10_000_000.0),
78        )
79    });
80
81    #[cfg(with_metrics)]
82    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
83        register_histogram_vec(
84            "message_execution_latency",
85            "Message execution latency",
86            &[],
87            exponential_bucket_interval(0.1_f64, 50_000.0),
88        )
89    });
90
91    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
92        register_histogram_vec(
93            "operation_execution_latency",
94            "Operation execution latency",
95            &[],
96            exponential_bucket_interval(0.1_f64, 50_000.0),
97        )
98    });
99
100    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
101        register_histogram_vec(
102            "wasm_fuel_used_per_block",
103            "Wasm fuel used per block",
104            &[],
105            exponential_bucket_interval(10.0, 1_000_000.0),
106        )
107    });
108
109    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
110        register_histogram_vec(
111            "evm_fuel_used_per_block",
112            "EVM fuel used per block",
113            &[],
114            exponential_bucket_interval(10.0, 1_000_000.0),
115        )
116    });
117
118    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
119        register_histogram_vec(
120            "vm_num_reads_per_block",
121            "VM number of reads per block",
122            &[],
123            exponential_bucket_interval(0.1, 100.0),
124        )
125    });
126
127    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128        register_histogram_vec(
129            "vm_bytes_read_per_block",
130            "VM number of bytes read per block",
131            &[],
132            exponential_bucket_interval(0.1, 10_000_000.0),
133        )
134    });
135
136    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
137        register_histogram_vec(
138            "vm_bytes_written_per_block",
139            "VM number of bytes written per block",
140            &[],
141            exponential_bucket_interval(0.1, 10_000_000.0),
142        )
143    });
144
145    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
146        register_histogram_vec(
147            "state_hash_computation_latency",
148            "Time to recompute the state hash",
149            &[],
150            exponential_bucket_latencies(500.0),
151        )
152    });
153
154    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
155        register_histogram_vec(
156            "num_inboxes",
157            "Number of inboxes",
158            &[],
159            exponential_bucket_interval(1.0, 10_000.0),
160        )
161    });
162
163    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
164        register_histogram_vec(
165            "num_outboxes",
166            "Number of outboxes",
167            &[],
168            exponential_bucket_interval(1.0, 10_000.0),
169        )
170    });
171
172    /// Tracks block execution metrics in Prometheus.
173    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
174        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
175        WASM_FUEL_USED_PER_BLOCK
176            .with_label_values(&[])
177            .observe(tracker.wasm_fuel as f64);
178        EVM_FUEL_USED_PER_BLOCK
179            .with_label_values(&[])
180            .observe(tracker.evm_fuel as f64);
181        VM_NUM_READS_PER_BLOCK
182            .with_label_values(&[])
183            .observe(tracker.read_operations as f64);
184        VM_BYTES_READ_PER_BLOCK
185            .with_label_values(&[])
186            .observe(tracker.bytes_read as f64);
187        VM_BYTES_WRITTEN_PER_BLOCK
188            .with_label_values(&[])
189            .observe(tracker.bytes_written as f64);
190    }
191}
192
193/// The BCS-serialized size of an empty [`Block`].
194pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
195
196/// A view accessing the state of a chain.
197#[cfg_attr(
198    with_graphql,
199    derive(async_graphql::SimpleObject),
200    graphql(cache_control(no_cache))
201)]
202#[derive(Debug, RootView, ClonableView, Allocative)]
203#[allocative(bound = "C")]
204pub struct ChainStateView<C>
205where
206    C: Clone + Context + 'static,
207{
208    /// Execution state, including system and user applications.
209    pub execution_state: ExecutionStateView<C>,
210    /// Hash of the execution state.
211    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
212
213    /// Block-chaining state.
214    pub tip_state: RegisterView<C, ChainTipState>,
215
216    /// Consensus state.
217    pub manager: ChainManager<C>,
218    /// Pending validated block that is still missing blobs.
219    /// The incomplete set of blobs for the pending validated block.
220    pub pending_validated_blobs: PendingBlobsView<C>,
221    /// The incomplete sets of blobs for upcoming proposals.
222    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
223
224    /// Hashes of all certified blocks for this sender.
225    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
226    pub confirmed_log: LogView<C, CryptoHash>,
227    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
228    pub received_log: LogView<C, ChainAndHeight>,
229    /// The number of `received_log` entries we have synchronized, for each validator.
230    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
231
232    /// Mailboxes used to receive messages indexed by their origin.
233    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
234    /// Mailboxes used to send messages, indexed by their target.
235    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
236    /// The indices of next events we expect to see per stream (could be ahead of the last
237    /// executed block in sparse chains).
238    pub next_expected_events: MapView<C, StreamId, u32>,
239    /// Number of outgoing messages in flight for each block height.
240    /// We use a `RegisterView` to prioritize speed for small maps.
241    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
242    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
243    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
244
245    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
246    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
247}
248
249/// Block-chaining state.
250#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
251#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
252pub struct ChainTipState {
253    /// Hash of the latest certified block in this chain, if any.
254    pub block_hash: Option<CryptoHash>,
255    /// Sequence number tracking blocks.
256    pub next_block_height: BlockHeight,
257    /// Number of incoming message bundles.
258    pub num_incoming_bundles: u32,
259    /// Number of operations.
260    pub num_operations: u32,
261    /// Number of outgoing messages.
262    pub num_outgoing_messages: u32,
263}
264
265impl ChainTipState {
266    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
267    /// expected parent.
268    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
269        ensure!(
270            new_block.height == self.next_block_height,
271            ChainError::UnexpectedBlockHeight {
272                expected_block_height: self.next_block_height,
273                found_block_height: new_block.height
274            }
275        );
276        ensure!(
277            new_block.previous_block_hash == self.block_hash,
278            ChainError::UnexpectedPreviousBlockHash
279        );
280        Ok(())
281    }
282
283    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
284    /// it is higher than the tip.
285    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
286        ensure!(
287            self.next_block_height >= height,
288            ChainError::MissingEarlierBlocks {
289                current_block_height: self.next_block_height,
290            }
291        );
292        Ok(self.next_block_height > height)
293    }
294
295    /// Checks if the measurement counters would be valid.
296    pub fn update_counters(
297        &mut self,
298        transactions: &[Transaction],
299        messages: &[Vec<OutgoingMessage>],
300    ) -> Result<(), ChainError> {
301        let mut num_incoming_bundles = 0u32;
302        let mut num_operations = 0u32;
303
304        for transaction in transactions {
305            match transaction {
306                Transaction::ReceiveMessages(_) => {
307                    num_incoming_bundles = num_incoming_bundles
308                        .checked_add(1)
309                        .ok_or(ArithmeticError::Overflow)?;
310                }
311                Transaction::ExecuteOperation(_) => {
312                    num_operations = num_operations
313                        .checked_add(1)
314                        .ok_or(ArithmeticError::Overflow)?;
315                }
316            }
317        }
318
319        self.num_incoming_bundles = self
320            .num_incoming_bundles
321            .checked_add(num_incoming_bundles)
322            .ok_or(ArithmeticError::Overflow)?;
323
324        self.num_operations = self
325            .num_operations
326            .checked_add(num_operations)
327            .ok_or(ArithmeticError::Overflow)?;
328
329        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
330            .map_err(|_| ArithmeticError::Overflow)?;
331        self.num_outgoing_messages = self
332            .num_outgoing_messages
333            .checked_add(num_outgoing_messages)
334            .ok_or(ArithmeticError::Overflow)?;
335
336        Ok(())
337    }
338}
339
340impl<C> ChainStateView<C>
341where
342    C: Context + Clone + 'static,
343    C::Extra: ExecutionRuntimeContext,
344{
345    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
346    pub fn chain_id(&self) -> ChainId {
347        self.context().extra().chain_id()
348    }
349
350    #[instrument(skip_all, fields(
351        chain_id = %self.chain_id(),
352    ))]
353    pub async fn query_application(
354        &mut self,
355        local_time: Timestamp,
356        query: Query,
357        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
358    ) -> Result<QueryOutcome, ChainError> {
359        let context = QueryContext {
360            chain_id: self.chain_id(),
361            next_block_height: self.tip_state.get().next_block_height,
362            local_time,
363        };
364        self.execution_state
365            .query_application(context, query, service_runtime_endpoint)
366            .await
367            .with_execution_context(ChainExecutionContext::Query)
368    }
369
370    #[instrument(skip_all, fields(
371        chain_id = %self.chain_id(),
372        application_id = %application_id
373    ))]
374    pub async fn describe_application(
375        &mut self,
376        application_id: ApplicationId,
377    ) -> Result<ApplicationDescription, ChainError> {
378        self.execution_state
379            .system
380            .describe_application(application_id, &mut TransactionTracker::default())
381            .await
382            .with_execution_context(ChainExecutionContext::DescribeApplication)
383    }
384
385    #[instrument(skip_all, fields(
386        chain_id = %self.chain_id(),
387        target = %target,
388        height = %height
389    ))]
390    pub async fn mark_messages_as_received(
391        &mut self,
392        target: &ChainId,
393        height: BlockHeight,
394    ) -> Result<bool, ChainError> {
395        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
396        let updates = outbox.mark_messages_as_received(height).await?;
397        if updates.is_empty() {
398            return Ok(false);
399        }
400        for update in updates {
401            let counter = self
402                .outbox_counters
403                .get_mut()
404                .get_mut(&update)
405                .ok_or_else(|| {
406                    ChainError::InternalError("message counter should be present".into())
407                })?;
408            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
409            if *counter == 0 {
410                // Important for the test in `all_messages_delivered_up_to`.
411                self.outbox_counters.get_mut().remove(&update);
412            }
413        }
414        if outbox.queue.count() == 0 {
415            self.nonempty_outboxes.get_mut().remove(target);
416            // If the outbox is empty and not ahead of the executed blocks, remove it.
417            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
418                self.outboxes.remove_entry(target)?;
419            }
420        }
421        #[cfg(with_metrics)]
422        metrics::NUM_OUTBOXES
423            .with_label_values(&[])
424            .observe(self.outboxes.count().await? as f64);
425        Ok(true)
426    }
427
428    /// Returns true if there are no more outgoing messages in flight up to the given
429    /// block height.
430    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
431        tracing::debug!(
432            "Messages left in {:.8}'s outbox: {:?}",
433            self.chain_id(),
434            self.outbox_counters.get()
435        );
436        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
437            key > &height
438        } else {
439            true
440        }
441    }
442
443    /// Invariant for the states of active chains.
444    pub fn is_active(&self) -> bool {
445        self.execution_state.system.is_active()
446    }
447
448    /// Initializes the chain if it is not active yet.
449    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
450        let chain_id = self.chain_id();
451        // Initialize ourselves.
452        if self
453            .execution_state
454            .system
455            .initialize_chain(chain_id)
456            .await
457            .with_execution_context(ChainExecutionContext::Block)?
458        {
459            // The chain was already initialized.
460            return Ok(());
461        }
462        // Recompute the state hash.
463        let hash = self.execution_state.crypto_hash_mut().await?;
464        self.execution_state_hash.set(Some(hash));
465        let maybe_committee = self.execution_state.system.current_committee().into_iter();
466        // Last, reset the consensus state based on the current ownership.
467        self.manager.reset(
468            self.execution_state.system.ownership.get().clone(),
469            BlockHeight(0),
470            local_time,
471            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
472        )?;
473        Ok(())
474    }
475
476    pub async fn next_block_height_to_receive(
477        &self,
478        origin: &ChainId,
479    ) -> Result<BlockHeight, ChainError> {
480        let inbox = self.inboxes.try_load_entry(origin).await?;
481        match inbox {
482            Some(inbox) => inbox.next_block_height_to_receive(),
483            None => Ok(BlockHeight::ZERO),
484        }
485    }
486
487    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
488    ///
489    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
490    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
491        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
492            return Ok(height.saturating_add(BlockHeight(1)));
493        }
494        Ok(self.tip_state.get().next_block_height)
495    }
496
497    pub async fn last_anticipated_block_height(
498        &self,
499        origin: &ChainId,
500    ) -> Result<Option<BlockHeight>, ChainError> {
501        let inbox = self.inboxes.try_load_entry(origin).await?;
502        match inbox {
503            Some(inbox) => match inbox.removed_bundles.back().await? {
504                Some(bundle) => Ok(Some(bundle.height)),
505                None => Ok(None),
506            },
507            None => Ok(None),
508        }
509    }
510
511    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
512    /// internal error if the bundle doesn't appear to be new, based on the sender's
513    /// height. The value `local_time` is specific to each validator and only used for
514    /// round timeouts.
515    ///
516    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
517    #[instrument(skip_all, fields(
518        chain_id = %self.chain_id(),
519        origin = %origin,
520        bundle_height = %bundle.height
521    ))]
522    pub async fn receive_message_bundle(
523        &mut self,
524        origin: &ChainId,
525        bundle: MessageBundle,
526        local_time: Timestamp,
527        add_to_received_log: bool,
528    ) -> Result<(), ChainError> {
529        assert!(!bundle.messages.is_empty());
530        let chain_id = self.chain_id();
531        tracing::trace!(
532            "Processing new messages to {chain_id:.8} from {origin} at height {}",
533            bundle.height,
534        );
535        let chain_and_height = ChainAndHeight {
536            chain_id: *origin,
537            height: bundle.height,
538        };
539
540        match self.initialize_if_needed(local_time).await {
541            Ok(_) => (),
542            // if the only issue was that we couldn't initialize the chain because of a
543            // missing chain description blob, we might still want to update the inbox
544            Err(ChainError::ExecutionError(exec_err, _))
545                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
546                if blobs.iter().all(|blob_id| {
547                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
548                })) => {}
549            err => {
550                return err;
551            }
552        }
553
554        // Process the inbox bundle and update the inbox state.
555        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
556        #[cfg(with_metrics)]
557        metrics::NUM_INBOXES
558            .with_label_values(&[])
559            .observe(self.inboxes.count().await? as f64);
560        inbox
561            .add_bundle(bundle)
562            .await
563            .map_err(|error| match error {
564                InboxError::ViewError(error) => ChainError::ViewError(error),
565                error => ChainError::InternalError(format!(
566                    "while processing messages in certified block: {error}"
567                )),
568            })?;
569
570        // Remember the certificate for future validator/client synchronizations.
571        if add_to_received_log {
572            self.received_log.push(chain_and_height);
573        }
574        Ok(())
575    }
576
577    /// Updates the `received_log` trackers.
578    pub fn update_received_certificate_trackers(
579        &mut self,
580        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
581    ) {
582        for (name, tracker) in new_trackers {
583            self.received_certificate_trackers
584                .get_mut()
585                .entry(name)
586                .and_modify(|t| {
587                    // Because several synchronizations could happen in parallel, we need to make
588                    // sure to never go backward.
589                    if tracker > *t {
590                        *t = tracker;
591                    }
592                })
593                .or_insert(tracker);
594        }
595    }
596
597    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
598        self.execution_state
599            .system
600            .current_committee()
601            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
602    }
603
604    pub fn ownership(&self) -> &ChainOwnership {
605        self.execution_state.system.ownership.get()
606    }
607
608    /// Removes the incoming message bundles in the block from the inboxes.
609    ///
610    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
611    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
612    /// and `false` if the block is already confirmed.
613    #[instrument(skip_all, fields(
614        chain_id = %self.chain_id(),
615    ))]
616    pub async fn remove_bundles_from_inboxes(
617        &mut self,
618        timestamp: Timestamp,
619        must_be_present: bool,
620        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
621    ) -> Result<(), ChainError> {
622        let chain_id = self.chain_id();
623        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
624        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
625            ensure!(
626                bundle.timestamp <= timestamp,
627                ChainError::IncorrectBundleTimestamp {
628                    chain_id,
629                    bundle_timestamp: bundle.timestamp,
630                    block_timestamp: timestamp,
631                }
632            );
633            let bundles = bundles_by_origin.entry(*origin).or_default();
634            bundles.push(bundle);
635        }
636        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
637        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
638        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
639            tracing::trace!(
640                "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
641                bundles
642                    .iter()
643                    .map(|bundle| bundle.height.to_string())
644                    .collect::<Vec<_>>()
645                    .join(", ")
646            );
647            for bundle in bundles {
648                // Mark the message as processed in the inbox.
649                let was_present = inbox
650                    .remove_bundle(bundle)
651                    .await
652                    .map_err(|error| (chain_id, origin, error))?;
653                if must_be_present {
654                    ensure!(
655                        was_present,
656                        ChainError::MissingCrossChainUpdate {
657                            chain_id,
658                            origin,
659                            height: bundle.height,
660                        }
661                    );
662                }
663            }
664        }
665        #[cfg(with_metrics)]
666        metrics::NUM_INBOXES
667            .with_label_values(&[])
668            .observe(self.inboxes.count().await? as f64);
669        Ok(())
670    }
671
672    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
673    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
674        self.nonempty_outboxes.get().iter().copied().collect()
675    }
676
677    /// Returns the outboxes for the given targets, or an error if any of them are missing.
678    pub async fn load_outboxes(
679        &self,
680        targets: &[ChainId],
681    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
682        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
683        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
684        optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
685    }
686
687    /// Executes a block: first the incoming messages, then the main operation.
688    /// Does not update chain state other than the execution state.
689    #[instrument(skip_all, fields(
690        chain_id = %block.chain_id,
691        block_height = %block.height
692    ))]
693    async fn execute_block_inner(
694        chain: &mut ExecutionStateView<C>,
695        confirmed_log: &LogView<C, CryptoHash>,
696        block: &ProposedBlock,
697        local_time: Timestamp,
698        round: Option<u32>,
699        published_blobs: &[Blob],
700        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
701    ) -> Result<BlockExecutionOutcome, ChainError> {
702        #[cfg(with_metrics)]
703        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
704        chain.system.timestamp.set(block.timestamp);
705
706        let policy = chain
707            .system
708            .current_committee()
709            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
710            .1
711            .policy()
712            .clone();
713
714        let mut resource_controller = ResourceController::new(
715            Arc::new(policy),
716            ResourceTracker::default(),
717            block.authenticated_owner,
718        );
719
720        for blob in published_blobs {
721            let blob_id = blob.id();
722            resource_controller
723                .policy()
724                .check_blob_size(blob.content())
725                .with_execution_context(ChainExecutionContext::Block)?;
726            chain.system.used_blobs.insert(&blob_id)?;
727        }
728
729        // Execute each incoming bundle as a transaction, then each operation.
730        // Collect messages, events and oracle responses, each as one list per transaction.
731        let mut block_execution_tracker = BlockExecutionTracker::new(
732            &mut resource_controller,
733            published_blobs
734                .iter()
735                .map(|blob| (blob.id(), blob))
736                .collect(),
737            local_time,
738            replaying_oracle_responses,
739            block,
740        )?;
741
742        for transaction in block.transaction_refs() {
743            block_execution_tracker
744                .execute_transaction(transaction, round, chain)
745                .await?;
746        }
747
748        let recipients = block_execution_tracker.recipients();
749        let mut recipient_heights = Vec::new();
750        let mut indices = Vec::new();
751        for (recipient, height) in chain
752            .previous_message_blocks
753            .multi_get_pairs(recipients)
754            .await?
755        {
756            chain
757                .previous_message_blocks
758                .insert(&recipient, block.height)?;
759            if let Some(height) = height {
760                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
761                indices.push(index);
762                recipient_heights.push((recipient, height));
763            }
764        }
765        let hashes = confirmed_log.multi_get(indices).await?;
766        let mut previous_message_blocks = BTreeMap::new();
767        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
768            let hash = hash.ok_or_else(|| {
769                ChainError::InternalError("missing entry in confirmed_log".into())
770            })?;
771            previous_message_blocks.insert(recipient, (hash, height));
772        }
773
774        let streams = block_execution_tracker.event_streams();
775        let mut stream_heights = Vec::new();
776        let mut indices = Vec::new();
777        for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
778            chain.previous_event_blocks.insert(&stream, block.height)?;
779            if let Some(height) = height {
780                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
781                indices.push(index);
782                stream_heights.push((stream, height));
783            }
784        }
785        let hashes = confirmed_log.multi_get(indices).await?;
786        let mut previous_event_blocks = BTreeMap::new();
787        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
788            let hash = hash.ok_or_else(|| {
789                ChainError::InternalError("missing entry in confirmed_log".into())
790            })?;
791            previous_event_blocks.insert(stream, (hash, height));
792        }
793
794        let state_hash = {
795            #[cfg(with_metrics)]
796            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
797            chain.crypto_hash_mut().await?
798        };
799
800        let (messages, oracle_responses, events, blobs, operation_results) =
801            block_execution_tracker.finalize();
802
803        Ok(BlockExecutionOutcome {
804            messages,
805            previous_message_blocks,
806            previous_event_blocks,
807            state_hash,
808            oracle_responses,
809            events,
810            blobs,
811            operation_results,
812        })
813    }
814
815    /// Executes a block: first the incoming messages, then the main operation.
816    /// Does not update chain state other than the execution state.
817    #[instrument(skip_all, fields(
818        chain_id = %self.chain_id(),
819        block_height = %block.height
820    ))]
821    pub async fn execute_block(
822        &mut self,
823        block: &ProposedBlock,
824        local_time: Timestamp,
825        round: Option<u32>,
826        published_blobs: &[Blob],
827        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
828    ) -> Result<BlockExecutionOutcome, ChainError> {
829        assert_eq!(
830            block.chain_id,
831            self.execution_state.context().extra().chain_id()
832        );
833
834        self.initialize_if_needed(local_time).await?;
835
836        let chain_timestamp = *self.execution_state.system.timestamp.get();
837        ensure!(
838            chain_timestamp <= block.timestamp,
839            ChainError::InvalidBlockTimestamp {
840                parent: chain_timestamp,
841                new: block.timestamp
842            }
843        );
844        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
845
846        ensure!(
847            block.published_blob_ids()
848                == published_blobs
849                    .iter()
850                    .map(|blob| blob.id())
851                    .collect::<BTreeSet<_>>(),
852            ChainError::InternalError("published_blobs mismatch".to_string())
853        );
854
855        if *self.execution_state.system.closed.get() {
856            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
857        }
858
859        Self::check_app_permissions(
860            self.execution_state.system.application_permissions.get(),
861            block,
862        )?;
863
864        Self::execute_block_inner(
865            &mut self.execution_state,
866            &self.confirmed_log,
867            block,
868            local_time,
869            round,
870            published_blobs,
871            replaying_oracle_responses,
872        )
873        .await
874    }
875
876    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
877    /// manager. This does not touch the execution state itself, which must be updated separately.
878    /// Returns the set of event streams that were updated as a result of applying the block.
879    #[instrument(skip_all, fields(
880        chain_id = %self.chain_id(),
881        block_height = %block.inner().inner().header.height
882    ))]
883    pub async fn apply_confirmed_block(
884        &mut self,
885        block: &ConfirmedBlock,
886        local_time: Timestamp,
887    ) -> Result<BTreeSet<StreamId>, ChainError> {
888        let hash = block.inner().hash();
889        let block = block.inner().inner();
890        self.execution_state_hash.set(Some(block.header.state_hash));
891        let updated_streams = self.process_emitted_events(block).await?;
892        self.process_outgoing_messages(block).await?;
893
894        // Last, reset the consensus state based on the current ownership.
895        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
896
897        // Advance to next block height.
898        let tip = self.tip_state.get_mut();
899        tip.block_hash = Some(hash);
900        tip.next_block_height.try_add_assign_one()?;
901        tip.update_counters(&block.body.transactions, &block.body.messages)?;
902        self.confirmed_log.push(hash);
903        self.preprocessed_blocks.remove(&block.header.height)?;
904        Ok(updated_streams)
905    }
906
907    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
908    /// Returns the set of streams that were updated as a result of preprocessing the block.
909    #[instrument(skip_all, fields(
910        chain_id = %self.chain_id(),
911        block_height = %block.inner().inner().header.height
912    ))]
913    pub async fn preprocess_block(
914        &mut self,
915        block: &ConfirmedBlock,
916    ) -> Result<BTreeSet<StreamId>, ChainError> {
917        let hash = block.inner().hash();
918        let block = block.inner().inner();
919        let height = block.header.height;
920        if height < self.tip_state.get().next_block_height {
921            return Ok(BTreeSet::new());
922        }
923        self.process_outgoing_messages(block).await?;
924        let updated_streams = self.process_emitted_events(block).await?;
925        self.preprocessed_blocks.insert(&height, hash)?;
926        Ok(updated_streams)
927    }
928
929    /// Returns whether this is a child chain.
930    pub fn is_child(&self) -> bool {
931        let Some(description) = self.execution_state.system.description.get() else {
932            // Root chains are always initialized, so this must be a child chain.
933            return true;
934        };
935        description.is_child()
936    }
937
938    /// Verifies that the block is valid according to the chain's application permission settings.
939    #[instrument(skip_all, fields(
940        block_height = %block.height,
941        num_transactions = %block.transactions.len()
942    ))]
943    fn check_app_permissions(
944        app_permissions: &ApplicationPermissions,
945        block: &ProposedBlock,
946    ) -> Result<(), ChainError> {
947        let mut mandatory = HashSet::<ApplicationId>::from_iter(
948            app_permissions.mandatory_applications.iter().copied(),
949        );
950        for transaction in &block.transactions {
951            match transaction {
952                Transaction::ExecuteOperation(operation)
953                    if operation.is_exempt_from_permissions() =>
954                {
955                    mandatory.clear()
956                }
957                Transaction::ExecuteOperation(operation) => {
958                    ensure!(
959                        app_permissions.can_execute_operations(&operation.application_id()),
960                        ChainError::AuthorizedApplications(
961                            app_permissions.execute_operations.clone().unwrap()
962                        )
963                    );
964                    if let Operation::User { application_id, .. } = operation {
965                        mandatory.remove(application_id);
966                    }
967                }
968                Transaction::ReceiveMessages(incoming_bundle) => {
969                    for pending in incoming_bundle.messages() {
970                        if let Message::User { application_id, .. } = &pending.message {
971                            mandatory.remove(application_id);
972                        }
973                    }
974                }
975            }
976        }
977        ensure!(
978            mandatory.is_empty(),
979            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
980        );
981        Ok(())
982    }
983
984    /// Returns the hashes of all blocks we have in the given range.
985    ///
986    /// If the input heights are in ascending order, the hashes will be in the same order.
987    /// Otherwise they may be unordered.
988    #[instrument(skip_all, fields(
989        chain_id = %self.chain_id(),
990        next_block_height = %self.tip_state.get().next_block_height,
991    ))]
992    pub async fn block_hashes(
993        &self,
994        heights: impl IntoIterator<Item = BlockHeight>,
995    ) -> Result<Vec<CryptoHash>, ChainError> {
996        let next_height = self.tip_state.get().next_block_height;
997        // Everything up to (excluding) next_height is in confirmed_log.
998        let (confirmed_heights, unconfirmed_heights) = heights
999            .into_iter()
1000            .partition::<Vec<_>, _>(|height| *height < next_height);
1001        let confirmed_indices = confirmed_heights
1002            .into_iter()
1003            .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1004            .collect::<Result<_, _>>()?;
1005        let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1006        // Everything after (including) next_height in preprocessed_blocks if we have it.
1007        let unconfirmed_hashes = self
1008            .preprocessed_blocks
1009            .multi_get(&unconfirmed_heights)
1010            .await?;
1011        Ok(confirmed_hashes
1012            .into_iter()
1013            .chain(unconfirmed_hashes)
1014            .flatten()
1015            .collect())
1016    }
1017
1018    /// Resets the chain manager for the next block height.
1019    fn reset_chain_manager(
1020        &mut self,
1021        next_height: BlockHeight,
1022        local_time: Timestamp,
1023    ) -> Result<(), ChainError> {
1024        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1025        let ownership = self.execution_state.system.ownership.get().clone();
1026        let fallback_owners =
1027            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1028        self.pending_validated_blobs.clear();
1029        self.pending_proposed_blobs.clear();
1030        self.manager
1031            .reset(ownership, next_height, local_time, fallback_owners)
1032    }
1033
1034    /// Updates the outboxes with the messages sent in the block.
1035    ///
1036    /// Returns the set of all recipients.
1037    #[instrument(skip_all, fields(
1038        chain_id = %self.chain_id(),
1039        block_height = %block.header.height
1040    ))]
1041    async fn process_outgoing_messages(
1042        &mut self,
1043        block: &Block,
1044    ) -> Result<Vec<ChainId>, ChainError> {
1045        // Record the messages of the execution. Messages are understood within an
1046        // application.
1047        let recipients = block.recipients();
1048        let block_height = block.header.height;
1049        let next_height = self.tip_state.get().next_block_height;
1050
1051        // Update the outboxes.
1052        let outbox_counters = self.outbox_counters.get_mut();
1053        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1054        let targets = recipients.into_iter().collect::<Vec<_>>();
1055        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1056        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1057            if block_height > next_height {
1058                // There may be a gap in the chain before this block. We can only add it to this
1059                // outbox if the previous message to the same recipient has already been added.
1060                if *outbox.next_height_to_schedule.get() > block_height {
1061                    continue; // We already added this recipient's messages to the outbox.
1062                }
1063                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1064                {
1065                    // The block with the last added message has already been executed; look up its
1066                    // hash in the confirmed_log.
1067                    Some(height) if height < next_height => {
1068                        let index =
1069                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1070                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1071                            ChainError::InternalError("missing entry in confirmed_log".into())
1072                        })?)
1073                    }
1074                    // The block with last added message has not been executed yet. If we have it,
1075                    // it's in preprocessed_blocks.
1076                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1077                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1078                    )?),
1079                    None => None, // No message to that sender was added yet.
1080                };
1081                // Only schedule if this block contains the next message for that recipient.
1082                match (
1083                    maybe_prev_hash,
1084                    block.body.previous_message_blocks.get(target),
1085                ) {
1086                    (None, None) => {
1087                        // No previous message block expected and none indicated by the outbox -
1088                        // all good
1089                    }
1090                    (Some(_), None) => {
1091                        // Outbox indicates there was a previous message block, but
1092                        // previous_message_blocks has no idea about it - possible bug
1093                        return Err(ChainError::InternalError(
1094                            "block indicates no previous message block,\
1095                            but we have one in the outbox"
1096                                .into(),
1097                        ));
1098                    }
1099                    (None, Some((_, prev_msg_block_height))) => {
1100                        // We have no previously processed block in the outbox, but we are
1101                        // expecting one - this could be due to an empty outbox having been pruned.
1102                        // Only process the outbox if the height of the previous message block is
1103                        // lower than the tip
1104                        if *prev_msg_block_height >= next_height {
1105                            continue;
1106                        }
1107                    }
1108                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1109                        // Only process the outbox if the hashes match.
1110                        if prev_hash != prev_msg_block_hash {
1111                            continue;
1112                        }
1113                    }
1114                }
1115            }
1116            if outbox.schedule_message(block_height)? {
1117                *outbox_counters.entry(block_height).or_default() += 1;
1118                nonempty_outboxes.insert(*target);
1119            }
1120        }
1121
1122        #[cfg(with_metrics)]
1123        metrics::NUM_OUTBOXES
1124            .with_label_values(&[])
1125            .observe(self.outboxes.count().await? as f64);
1126        Ok(targets)
1127    }
1128
1129    /// Updates the event streams with events emitted by the block if they form a contiguous
1130    /// sequence (might not be the case when preprocessing a block).
1131    /// Returns the set of updated event streams.
1132    #[instrument(skip_all, fields(
1133        chain_id = %self.chain_id(),
1134        block_height = %block.header.height
1135    ))]
1136    async fn process_emitted_events(
1137        &mut self,
1138        block: &Block,
1139    ) -> Result<BTreeSet<StreamId>, ChainError> {
1140        let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1141        for event in block.body.events.iter().flatten() {
1142            emitted_streams
1143                .entry(event.stream_id.clone())
1144                .or_default()
1145                .insert(event.index);
1146        }
1147        let mut stream_ids = Vec::new();
1148        let mut list_indices = Vec::new();
1149        for (stream_id, indices) in emitted_streams {
1150            stream_ids.push(stream_id);
1151            list_indices.push(indices);
1152        }
1153
1154        let mut updated_streams = BTreeSet::new();
1155        for ((stream_id, next_index), indices) in self
1156            .next_expected_events
1157            .multi_get_pairs(stream_ids)
1158            .await?
1159            .into_iter()
1160            .zip(list_indices)
1161        {
1162            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1163                // we don't expect the epoch stream to contain event 0
1164                1
1165            } else {
1166                0
1167            };
1168            let mut current_expected_index = next_index.unwrap_or(initial_index);
1169            for index in indices {
1170                if index == current_expected_index {
1171                    updated_streams.insert(stream_id.clone());
1172                    current_expected_index = index.saturating_add(1);
1173                }
1174            }
1175            if current_expected_index != 0 {
1176                self.next_expected_events
1177                    .insert(&stream_id, current_expected_index)?;
1178            }
1179        }
1180        Ok(updated_streams)
1181    }
1182}
1183
1184#[test]
1185fn empty_block_size() {
1186    let size = bcs::serialized_size(&crate::block::Block::new(
1187        crate::test::make_first_block(
1188            linera_execution::test_utils::dummy_chain_description(0).id(),
1189        ),
1190        crate::data_types::BlockExecutionOutcome::default(),
1191    ))
1192    .unwrap();
1193    assert_eq!(size, EMPTY_BLOCK_SIZE);
1194}