linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    ops::RangeBounds,
7    sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{
14        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15        BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16    },
17    ensure,
18    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19    ownership::ChainOwnership,
20};
21use linera_execution::{
22    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23    Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27    bucket_queue_view::BucketQueueView,
28    context::Context,
29    log_view::LogView,
30    map_view::MapView,
31    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32    register_view::RegisterView,
33    set_view::SetView,
34    store::ReadableKeyValueStore as _,
35    views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40    block::{Block, ConfirmedBlock},
41    block_tracker::BlockExecutionTracker,
42    data_types::{
43        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
44        Transaction,
45    },
46    inbox::{Cursor, InboxError, InboxStateView},
47    manager::ChainManager,
48    outbox::OutboxStateView,
49    pending_blobs::PendingBlobsView,
50    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62    use std::sync::LazyLock;
63
64    use linera_base::prometheus_util::{
65        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66        register_int_counter_vec,
67    };
68    use linera_execution::ResourceTracker;
69    use prometheus::{HistogramVec, IntCounterVec};
70
71    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73    });
74
75    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76        register_histogram_vec(
77            "block_execution_latency",
78            "Block execution latency",
79            &[],
80            exponential_bucket_latencies(1000.0),
81        )
82    });
83
84    #[cfg(with_metrics)]
85    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86        register_histogram_vec(
87            "message_execution_latency",
88            "Message execution latency",
89            &[],
90            exponential_bucket_latencies(50.0),
91        )
92    });
93
94    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95        register_histogram_vec(
96            "operation_execution_latency",
97            "Operation execution latency",
98            &[],
99            exponential_bucket_latencies(50.0),
100        )
101    });
102
103    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104        register_histogram_vec(
105            "wasm_fuel_used_per_block",
106            "Wasm fuel used per block",
107            &[],
108            exponential_bucket_interval(10.0, 1_000_000.0),
109        )
110    });
111
112    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "evm_fuel_used_per_block",
115            "EVM fuel used per block",
116            &[],
117            exponential_bucket_interval(10.0, 1_000_000.0),
118        )
119    });
120
121    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122        register_histogram_vec(
123            "vm_num_reads_per_block",
124            "VM number of reads per block",
125            &[],
126            exponential_bucket_interval(0.1, 100.0),
127        )
128    });
129
130    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131        register_histogram_vec(
132            "vm_bytes_read_per_block",
133            "VM number of bytes read per block",
134            &[],
135            exponential_bucket_interval(0.1, 10_000_000.0),
136        )
137    });
138
139    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140        register_histogram_vec(
141            "vm_bytes_written_per_block",
142            "VM number of bytes written per block",
143            &[],
144            exponential_bucket_interval(0.1, 10_000_000.0),
145        )
146    });
147
148    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149        register_histogram_vec(
150            "state_hash_computation_latency",
151            "Time to recompute the state hash",
152            &[],
153            exponential_bucket_latencies(500.0),
154        )
155    });
156
157    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158        register_histogram_vec(
159            "num_inboxes",
160            "Number of inboxes",
161            &[],
162            exponential_bucket_interval(1.0, 10_000.0),
163        )
164    });
165
166    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
167        register_histogram_vec(
168            "num_outboxes",
169            "Number of outboxes",
170            &[],
171            exponential_bucket_interval(1.0, 10_000.0),
172        )
173    });
174
175    /// Tracks block execution metrics in Prometheus.
176    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
177        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
178        WASM_FUEL_USED_PER_BLOCK
179            .with_label_values(&[])
180            .observe(tracker.wasm_fuel as f64);
181        EVM_FUEL_USED_PER_BLOCK
182            .with_label_values(&[])
183            .observe(tracker.evm_fuel as f64);
184        VM_NUM_READS_PER_BLOCK
185            .with_label_values(&[])
186            .observe(tracker.read_operations as f64);
187        VM_BYTES_READ_PER_BLOCK
188            .with_label_values(&[])
189            .observe(tracker.bytes_read as f64);
190        VM_BYTES_WRITTEN_PER_BLOCK
191            .with_label_values(&[])
192            .observe(tracker.bytes_written as f64);
193    }
194}
195
196/// The BCS-serialized size of an empty [`Block`].
197pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
198
199/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
200#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TimestampedBundleInInbox {
203    /// The origin and cursor of the bundle.
204    pub entry: BundleInInbox,
205    /// The timestamp when the bundle was added to the inbox.
206    pub seen: Timestamp,
207}
208
209/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
210#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
211#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
212pub struct BundleInInbox {
213    /// The origin from which we received the bundle.
214    pub origin: ChainId,
215    /// The cursor of the bundle in the inbox.
216    pub cursor: Cursor,
217}
218
219impl BundleInInbox {
220    fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
221        BundleInInbox {
222            cursor: Cursor::from(bundle),
223            origin,
224        }
225    }
226}
227
228// The `TimestampedBundleInInbox` is a relatively small type, so a total
229// of 100 seems reasonable for the storing of the data.
230const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
231
232/// A view accessing the state of a chain.
233#[cfg_attr(
234    with_graphql,
235    derive(async_graphql::SimpleObject),
236    graphql(cache_control(no_cache))
237)]
238#[derive(Debug, RootView, ClonableView)]
239pub struct ChainStateView<C>
240where
241    C: Clone + Context + Send + Sync + 'static,
242{
243    /// Execution state, including system and user applications.
244    pub execution_state: ExecutionStateView<C>,
245    /// Hash of the execution state.
246    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
247
248    /// Block-chaining state.
249    pub tip_state: RegisterView<C, ChainTipState>,
250
251    /// Consensus state.
252    pub manager: ChainManager<C>,
253    /// Pending validated block that is still missing blobs.
254    /// The incomplete set of blobs for the pending validated block.
255    pub pending_validated_blobs: PendingBlobsView<C>,
256    /// The incomplete sets of blobs for upcoming proposals.
257    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
258
259    /// Hashes of all certified blocks for this sender.
260    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
261    pub confirmed_log: LogView<C, CryptoHash>,
262    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
263    pub received_log: LogView<C, ChainAndHeight>,
264    /// The number of `received_log` entries we have synchronized, for each validator.
265    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
266
267    /// Mailboxes used to receive messages indexed by their origin.
268    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
269    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
270    pub unskippable_bundles:
271        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
272    /// Unskippable bundles that have been removed but are still in the queue.
273    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
274    /// The heights of previous blocks that sent messages to the same recipients.
275    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
276    /// The heights of previous blocks that published events to the same streams.
277    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
278    /// Mailboxes used to send messages, indexed by their target.
279    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
280    /// The indices of next events we expect to see per stream (could be ahead of the last
281    /// executed block in sparse chains).
282    pub next_expected_events: MapView<C, StreamId, u32>,
283    /// Number of outgoing messages in flight for each block height.
284    /// We use a `RegisterView` to prioritize speed for small maps.
285    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
286    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
287    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
288
289    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
290    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
291}
292
293/// Block-chaining state.
294#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
295#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
296pub struct ChainTipState {
297    /// Hash of the latest certified block in this chain, if any.
298    pub block_hash: Option<CryptoHash>,
299    /// Sequence number tracking blocks.
300    pub next_block_height: BlockHeight,
301    /// Number of incoming message bundles.
302    pub num_incoming_bundles: u32,
303    /// Number of operations.
304    pub num_operations: u32,
305    /// Number of outgoing messages.
306    pub num_outgoing_messages: u32,
307}
308
309impl ChainTipState {
310    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
311    /// expected parent.
312    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
313        ensure!(
314            new_block.height == self.next_block_height,
315            ChainError::UnexpectedBlockHeight {
316                expected_block_height: self.next_block_height,
317                found_block_height: new_block.height
318            }
319        );
320        ensure!(
321            new_block.previous_block_hash == self.block_hash,
322            ChainError::UnexpectedPreviousBlockHash
323        );
324        Ok(())
325    }
326
327    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
328    /// it is higher than the tip.
329    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
330        ensure!(
331            self.next_block_height >= height,
332            ChainError::MissingEarlierBlocks {
333                current_block_height: self.next_block_height,
334            }
335        );
336        Ok(self.next_block_height > height)
337    }
338
339    /// Checks if the measurement counters would be valid.
340    pub fn update_counters(
341        &mut self,
342        transactions: &[Transaction],
343        messages: &[Vec<OutgoingMessage>],
344    ) -> Result<(), ChainError> {
345        let mut num_incoming_bundles = 0u32;
346        let mut num_operations = 0u32;
347
348        for transaction in transactions {
349            match transaction {
350                Transaction::ReceiveMessages(_) => {
351                    num_incoming_bundles = num_incoming_bundles
352                        .checked_add(1)
353                        .ok_or(ArithmeticError::Overflow)?;
354                }
355                Transaction::ExecuteOperation(_) => {
356                    num_operations = num_operations
357                        .checked_add(1)
358                        .ok_or(ArithmeticError::Overflow)?;
359                }
360            }
361        }
362
363        self.num_incoming_bundles = self
364            .num_incoming_bundles
365            .checked_add(num_incoming_bundles)
366            .ok_or(ArithmeticError::Overflow)?;
367
368        self.num_operations = self
369            .num_operations
370            .checked_add(num_operations)
371            .ok_or(ArithmeticError::Overflow)?;
372
373        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
374            .map_err(|_| ArithmeticError::Overflow)?;
375        self.num_outgoing_messages = self
376            .num_outgoing_messages
377            .checked_add(num_outgoing_messages)
378            .ok_or(ArithmeticError::Overflow)?;
379
380        Ok(())
381    }
382}
383
384impl<C> ChainStateView<C>
385where
386    C: Context + Clone + Send + Sync + 'static,
387    C::Extra: ExecutionRuntimeContext,
388{
389    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
390    pub fn chain_id(&self) -> ChainId {
391        self.context().extra().chain_id()
392    }
393
394    pub async fn query_application(
395        &mut self,
396        local_time: Timestamp,
397        query: Query,
398        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
399    ) -> Result<QueryOutcome, ChainError> {
400        let context = QueryContext {
401            chain_id: self.chain_id(),
402            next_block_height: self.tip_state.get().next_block_height,
403            local_time,
404        };
405        self.execution_state
406            .query_application(context, query, service_runtime_endpoint)
407            .await
408            .with_execution_context(ChainExecutionContext::Query)
409    }
410
411    pub async fn describe_application(
412        &mut self,
413        application_id: ApplicationId,
414    ) -> Result<ApplicationDescription, ChainError> {
415        self.execution_state
416            .system
417            .describe_application(application_id, &mut TransactionTracker::default())
418            .await
419            .with_execution_context(ChainExecutionContext::DescribeApplication)
420    }
421
422    pub async fn mark_messages_as_received(
423        &mut self,
424        target: &ChainId,
425        height: BlockHeight,
426    ) -> Result<bool, ChainError> {
427        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
428        let updates = outbox.mark_messages_as_received(height).await?;
429        if updates.is_empty() {
430            return Ok(false);
431        }
432        for update in updates {
433            let counter = self
434                .outbox_counters
435                .get_mut()
436                .get_mut(&update)
437                .ok_or_else(|| {
438                    ChainError::InternalError("message counter should be present".into())
439                })?;
440            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
441            if *counter == 0 {
442                // Important for the test in `all_messages_delivered_up_to`.
443                self.outbox_counters.get_mut().remove(&update);
444            }
445        }
446        if outbox.queue.count() == 0 {
447            self.nonempty_outboxes.get_mut().remove(target);
448            // If the outbox is empty and not ahead of the executed blocks, remove it.
449            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
450                self.outboxes.remove_entry(target)?;
451            }
452        }
453        #[cfg(with_metrics)]
454        metrics::NUM_OUTBOXES
455            .with_label_values(&[])
456            .observe(self.outboxes.count().await? as f64);
457        Ok(true)
458    }
459
460    /// Returns true if there are no more outgoing messages in flight up to the given
461    /// block height.
462    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
463        tracing::debug!(
464            "Messages left in {:.8}'s outbox: {:?}",
465            self.chain_id(),
466            self.outbox_counters.get()
467        );
468        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
469            key > &height
470        } else {
471            true
472        }
473    }
474
475    /// Invariant for the states of active chains.
476    pub fn is_active(&self) -> bool {
477        self.execution_state.system.is_active()
478    }
479
480    /// Invariant for the states of active chains.
481    pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
482        // Initialize ourselves.
483        if self
484            .execution_state
485            .system
486            .initialize_chain(self.chain_id())
487            .await
488            .with_execution_context(ChainExecutionContext::Block)?
489        {
490            // the chain was already initialized
491            return Ok(());
492        }
493        // Recompute the state hash.
494        let hash = self.execution_state.crypto_hash().await?;
495        self.execution_state_hash.set(Some(hash));
496        let maybe_committee = self.execution_state.system.current_committee().into_iter();
497        // Last, reset the consensus state based on the current ownership.
498        self.manager.reset(
499            self.execution_state.system.ownership.get().clone(),
500            BlockHeight(0),
501            local_time,
502            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
503        )?;
504        Ok(())
505    }
506
507    /// Verifies that this chain is up-to-date and all the messages executed ahead of time
508    /// have been properly received by now.
509    pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
510        let chain_id = self.chain_id();
511        let pairs = self.inboxes.try_load_all_entries().await?;
512        let max_stream_queries = self.context().store().max_stream_queries();
513        let stream = stream::iter(pairs)
514            .map(|(origin, inbox)| async move {
515                if let Some(bundle) = inbox.removed_bundles.front().await? {
516                    return Err(ChainError::MissingCrossChainUpdate {
517                        chain_id,
518                        origin,
519                        height: bundle.height,
520                    });
521                }
522                Ok::<(), ChainError>(())
523            })
524            .buffer_unordered(max_stream_queries);
525        stream.try_collect::<Vec<_>>().await?;
526        Ok(())
527    }
528
529    pub async fn next_block_height_to_receive(
530        &self,
531        origin: &ChainId,
532    ) -> Result<BlockHeight, ChainError> {
533        let inbox = self.inboxes.try_load_entry(origin).await?;
534        match inbox {
535            Some(inbox) => inbox.next_block_height_to_receive(),
536            None => Ok(BlockHeight::ZERO),
537        }
538    }
539
540    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
541    ///
542    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
543    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
544        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
545            return Ok(height.saturating_add(BlockHeight(1)));
546        }
547        Ok(self.tip_state.get().next_block_height)
548    }
549
550    pub async fn last_anticipated_block_height(
551        &self,
552        origin: &ChainId,
553    ) -> Result<Option<BlockHeight>, ChainError> {
554        let inbox = self.inboxes.try_load_entry(origin).await?;
555        match inbox {
556            Some(inbox) => match inbox.removed_bundles.back().await? {
557                Some(bundle) => Ok(Some(bundle.height)),
558                None => Ok(None),
559            },
560            None => Ok(None),
561        }
562    }
563
564    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
565    /// internal error if the bundle doesn't appear to be new, based on the sender's
566    /// height. The value `local_time` is specific to each validator and only used for
567    /// round timeouts.
568    ///
569    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
570    pub async fn receive_message_bundle(
571        &mut self,
572        origin: &ChainId,
573        bundle: MessageBundle,
574        local_time: Timestamp,
575        add_to_received_log: bool,
576    ) -> Result<(), ChainError> {
577        assert!(!bundle.messages.is_empty());
578        let chain_id = self.chain_id();
579        tracing::trace!(
580            "Processing new messages to {chain_id:.8} from {origin} at height {}",
581            bundle.height,
582        );
583        let chain_and_height = ChainAndHeight {
584            chain_id: *origin,
585            height: bundle.height,
586        };
587
588        match self.ensure_is_active(local_time).await {
589            Ok(_) => (),
590            // if the only issue was that we couldn't initialize the chain because of a
591            // missing chain description blob, we might still want to update the inbox
592            Err(ChainError::ExecutionError(exec_err, _))
593                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
594                if blobs.iter().all(|blob_id| {
595                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
596                })) => {}
597            err => {
598                return err;
599            }
600        }
601
602        // Process the inbox bundle and update the inbox state.
603        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
604        #[cfg(with_metrics)]
605        metrics::NUM_INBOXES
606            .with_label_values(&[])
607            .observe(self.inboxes.count().await? as f64);
608        let entry = BundleInInbox::new(*origin, &bundle);
609        let skippable = bundle.is_skippable();
610        let newly_added = inbox
611            .add_bundle(bundle)
612            .await
613            .map_err(|error| match error {
614                InboxError::ViewError(error) => ChainError::ViewError(error),
615                error => ChainError::InternalError(format!(
616                    "while processing messages in certified block: {error}"
617                )),
618            })?;
619        if newly_added && !skippable {
620            let seen = local_time;
621            self.unskippable_bundles
622                .push_back(TimestampedBundleInInbox { entry, seen });
623        }
624
625        // Remember the certificate for future validator/client synchronizations.
626        if add_to_received_log {
627            self.received_log.push(chain_and_height);
628        }
629        Ok(())
630    }
631
632    /// Updates the `received_log` trackers.
633    pub fn update_received_certificate_trackers(
634        &mut self,
635        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
636    ) {
637        for (name, tracker) in new_trackers {
638            self.received_certificate_trackers
639                .get_mut()
640                .entry(name)
641                .and_modify(|t| {
642                    // Because several synchronizations could happen in parallel, we need to make
643                    // sure to never go backward.
644                    if tracker > *t {
645                        *t = tracker;
646                    }
647                })
648                .or_insert(tracker);
649        }
650    }
651
652    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
653        self.execution_state
654            .system
655            .current_committee()
656            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
657    }
658
659    pub fn ownership(&self) -> &ChainOwnership {
660        self.execution_state.system.ownership.get()
661    }
662
663    /// Removes the incoming message bundles in the block from the inboxes.
664    pub async fn remove_bundles_from_inboxes(
665        &mut self,
666        timestamp: Timestamp,
667        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
668    ) -> Result<(), ChainError> {
669        let chain_id = self.chain_id();
670        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
671        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
672            ensure!(
673                bundle.timestamp <= timestamp,
674                ChainError::IncorrectBundleTimestamp {
675                    chain_id,
676                    bundle_timestamp: bundle.timestamp,
677                    block_timestamp: timestamp,
678                }
679            );
680            let bundles = bundles_by_origin.entry(*origin).or_default();
681            bundles.push(bundle);
682        }
683        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
684        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
685        let mut removed_unskippable = HashSet::new();
686        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
687            tracing::trace!(
688                "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
689                bundles
690                    .iter()
691                    .map(|bundle| bundle.height.to_string())
692                    .collect::<Vec<_>>()
693                    .join(", ")
694            );
695            for bundle in bundles {
696                // Mark the message as processed in the inbox.
697                let was_present = inbox
698                    .remove_bundle(bundle)
699                    .await
700                    .map_err(|error| (chain_id, origin, error))?;
701                if was_present && !bundle.is_skippable() {
702                    removed_unskippable.insert(BundleInInbox::new(origin, bundle));
703                }
704            }
705        }
706        if !removed_unskippable.is_empty() {
707            // Delete all removed bundles from the front of the unskippable queue.
708            let maybe_front = self.unskippable_bundles.front();
709            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
710                self.unskippable_bundles.delete_front().await?;
711                while let Some(ts_entry) = self.unskippable_bundles.front() {
712                    if !removed_unskippable.remove(&ts_entry.entry) {
713                        if !self
714                            .removed_unskippable_bundles
715                            .contains(&ts_entry.entry)
716                            .await?
717                        {
718                            break;
719                        }
720                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
721                    }
722                    self.unskippable_bundles.delete_front().await?;
723                }
724            }
725            for entry in removed_unskippable {
726                self.removed_unskippable_bundles.insert(&entry)?;
727            }
728        }
729        #[cfg(with_metrics)]
730        metrics::NUM_INBOXES
731            .with_label_values(&[])
732            .observe(self.inboxes.count().await? as f64);
733        Ok(())
734    }
735
736    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
737    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
738        self.nonempty_outboxes.get().iter().copied().collect()
739    }
740
741    /// Returns the outboxes for the given targets, or an error if any of them are missing.
742    pub async fn load_outboxes(
743        &self,
744        targets: &[ChainId],
745    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
746        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
747        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
748        optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
749    }
750
751    /// Executes a block: first the incoming messages, then the main operation.
752    /// Does not update chain state other than the execution state.
753    #[expect(clippy::too_many_arguments)]
754    async fn execute_block_inner(
755        chain: &mut ExecutionStateView<C>,
756        confirmed_log: &LogView<C, CryptoHash>,
757        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
758        previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
759        block: &ProposedBlock,
760        local_time: Timestamp,
761        round: Option<u32>,
762        published_blobs: &[Blob],
763        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
764    ) -> Result<BlockExecutionOutcome, ChainError> {
765        #[cfg(with_metrics)]
766        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
767        chain.system.timestamp.set(block.timestamp);
768
769        let policy = chain
770            .system
771            .current_committee()
772            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
773            .1
774            .policy()
775            .clone();
776
777        let mut resource_controller = ResourceController::new(
778            Arc::new(policy),
779            ResourceTracker::default(),
780            block.authenticated_signer,
781        );
782
783        for blob in published_blobs {
784            let blob_id = blob.id();
785            resource_controller
786                .policy()
787                .check_blob_size(blob.content())
788                .with_execution_context(ChainExecutionContext::Block)?;
789            chain.system.used_blobs.insert(&blob_id)?;
790        }
791
792        // Execute each incoming bundle as a transaction, then each operation.
793        // Collect messages, events and oracle responses, each as one list per transaction.
794        let mut block_execution_tracker = BlockExecutionTracker::new(
795            &mut resource_controller,
796            published_blobs
797                .iter()
798                .map(|blob| (blob.id(), blob))
799                .collect(),
800            local_time,
801            replaying_oracle_responses,
802            block,
803        )?;
804
805        for transaction in block.transaction_refs() {
806            block_execution_tracker
807                .execute_transaction(transaction, round, chain)
808                .await?;
809        }
810
811        let recipients = block_execution_tracker.recipients();
812        let mut previous_message_blocks = BTreeMap::new();
813        for recipient in recipients {
814            if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
815                let hash = confirmed_log
816                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
817                    .await?
818                    .ok_or_else(|| {
819                        ChainError::InternalError("missing entry in confirmed_log".into())
820                    })?;
821                previous_message_blocks.insert(recipient, (hash, height));
822            }
823        }
824
825        let streams = block_execution_tracker.event_streams();
826        let mut previous_event_blocks = BTreeMap::new();
827        for stream in streams {
828            if let Some(height) = previous_event_blocks_view.get(&stream).await? {
829                let hash = confirmed_log
830                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
831                    .await?
832                    .ok_or_else(|| {
833                        ChainError::InternalError("missing entry in confirmed_log".into())
834                    })?;
835                previous_event_blocks.insert(stream, (hash, height));
836            }
837        }
838
839        let state_hash = {
840            #[cfg(with_metrics)]
841            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
842            chain.crypto_hash().await?
843        };
844
845        let (messages, oracle_responses, events, blobs, operation_results) =
846            block_execution_tracker.finalize();
847
848        Ok(BlockExecutionOutcome {
849            messages,
850            previous_message_blocks,
851            previous_event_blocks,
852            state_hash,
853            oracle_responses,
854            events,
855            blobs,
856            operation_results,
857        })
858    }
859
860    /// Executes a block: first the incoming messages, then the main operation.
861    /// Does not update chain state other than the execution state.
862    pub async fn execute_block(
863        &mut self,
864        block: &ProposedBlock,
865        local_time: Timestamp,
866        round: Option<u32>,
867        published_blobs: &[Blob],
868        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
869    ) -> Result<BlockExecutionOutcome, ChainError> {
870        assert_eq!(
871            block.chain_id,
872            self.execution_state.context().extra().chain_id()
873        );
874
875        self.ensure_is_active(local_time).await?;
876
877        let chain_timestamp = *self.execution_state.system.timestamp.get();
878        ensure!(
879            chain_timestamp <= block.timestamp,
880            ChainError::InvalidBlockTimestamp {
881                parent: chain_timestamp,
882                new: block.timestamp
883            }
884        );
885        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
886
887        ensure!(
888            block.published_blob_ids()
889                == published_blobs
890                    .iter()
891                    .map(|blob| blob.id())
892                    .collect::<BTreeSet<_>>(),
893            ChainError::InternalError("published_blobs mismatch".to_string())
894        );
895
896        if *self.execution_state.system.closed.get() {
897            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
898        }
899
900        Self::check_app_permissions(
901            self.execution_state.system.application_permissions.get(),
902            block,
903        )?;
904
905        Self::execute_block_inner(
906            &mut self.execution_state,
907            &self.confirmed_log,
908            &self.previous_message_blocks,
909            &self.previous_event_blocks,
910            block,
911            local_time,
912            round,
913            published_blobs,
914            replaying_oracle_responses,
915        )
916        .await
917    }
918
919    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
920    /// manager. This does not touch the execution state itself, which must be updated separately.
921    /// Returns the set of event streams that were updated as a result of applying the block.
922    pub async fn apply_confirmed_block(
923        &mut self,
924        block: &ConfirmedBlock,
925        local_time: Timestamp,
926    ) -> Result<BTreeSet<StreamId>, ChainError> {
927        let hash = block.inner().hash();
928        let block = block.inner().inner();
929        self.execution_state_hash.set(Some(block.header.state_hash));
930        let updated_streams = self.process_emitted_events(block).await?;
931        let recipients = self.process_outgoing_messages(block).await?;
932
933        for recipient in recipients {
934            self.previous_message_blocks
935                .insert(&recipient, block.header.height)?;
936        }
937        for event in block.body.events.iter().flatten() {
938            self.previous_event_blocks
939                .insert(&event.stream_id, block.header.height)?;
940        }
941        // Last, reset the consensus state based on the current ownership.
942        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
943
944        // Advance to next block height.
945        let tip = self.tip_state.get_mut();
946        tip.block_hash = Some(hash);
947        tip.next_block_height.try_add_assign_one()?;
948        tip.update_counters(&block.body.transactions, &block.body.messages)?;
949        self.confirmed_log.push(hash);
950        self.preprocessed_blocks.remove(&block.header.height)?;
951        Ok(updated_streams)
952    }
953
954    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
955    /// Returns the set of streams that were updated as a result of preprocessing the block.
956    pub async fn preprocess_block(
957        &mut self,
958        block: &ConfirmedBlock,
959    ) -> Result<BTreeSet<StreamId>, ChainError> {
960        let hash = block.inner().hash();
961        let block = block.inner().inner();
962        let height = block.header.height;
963        if height < self.tip_state.get().next_block_height {
964            return Ok(BTreeSet::new());
965        }
966        self.process_outgoing_messages(block).await?;
967        let updated_streams = self.process_emitted_events(block).await?;
968        self.preprocessed_blocks.insert(&height, hash)?;
969        Ok(updated_streams)
970    }
971
972    /// Returns whether this is a child chain.
973    pub fn is_child(&self) -> bool {
974        let Some(description) = self.execution_state.system.description.get() else {
975            // Root chains are always initialized, so this must be a child chain.
976            return true;
977        };
978        description.is_child()
979    }
980
981    /// Verifies that the block is valid according to the chain's application permission settings.
982    fn check_app_permissions(
983        app_permissions: &ApplicationPermissions,
984        block: &ProposedBlock,
985    ) -> Result<(), ChainError> {
986        let mut mandatory = HashSet::<ApplicationId>::from_iter(
987            app_permissions.mandatory_applications.iter().copied(),
988        );
989        for transaction in &block.transactions {
990            match transaction {
991                Transaction::ExecuteOperation(operation)
992                    if operation.is_exempt_from_permissions() =>
993                {
994                    mandatory.clear()
995                }
996                Transaction::ExecuteOperation(operation) => {
997                    ensure!(
998                        app_permissions.can_execute_operations(&operation.application_id()),
999                        ChainError::AuthorizedApplications(
1000                            app_permissions.execute_operations.clone().unwrap()
1001                        )
1002                    );
1003                    if let Operation::User { application_id, .. } = operation {
1004                        mandatory.remove(application_id);
1005                    }
1006                }
1007                Transaction::ReceiveMessages(incoming_bundle) => {
1008                    for pending in incoming_bundle.messages() {
1009                        if let Message::User { application_id, .. } = &pending.message {
1010                            mandatory.remove(application_id);
1011                        }
1012                    }
1013                }
1014            }
1015        }
1016        ensure!(
1017            mandatory.is_empty(),
1018            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1019        );
1020        Ok(())
1021    }
1022
1023    /// Returns the hashes of all blocks we have in the given range.
1024    pub async fn block_hashes(
1025        &self,
1026        range: impl RangeBounds<BlockHeight>,
1027    ) -> Result<Vec<CryptoHash>, ChainError> {
1028        let next_height = self.tip_state.get().next_block_height;
1029        // If the range is not empty, it can always be represented as start..=end.
1030        let Some((start, end)) = range.to_inclusive() else {
1031            return Ok(Vec::new());
1032        };
1033        // Everything up to (excluding) next_height is in confirmed_log.
1034        let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1035            let usize_start = usize::try_from(start)?;
1036            let usize_end = usize::try_from(end.min(last_height))?;
1037            self.confirmed_log.read(usize_start..=usize_end).await?
1038        } else {
1039            Vec::new()
1040        };
1041        // Everything after (including) next_height in preprocessed_blocks if we have it.
1042        for height in start.max(next_height).0..=end.0 {
1043            if let Some(hash) = self.preprocessed_blocks.get(&BlockHeight(height)).await? {
1044                hashes.push(hash);
1045            }
1046        }
1047        Ok(hashes)
1048    }
1049
1050    /// Resets the chain manager for the next block height.
1051    fn reset_chain_manager(
1052        &mut self,
1053        next_height: BlockHeight,
1054        local_time: Timestamp,
1055    ) -> Result<(), ChainError> {
1056        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1057        let ownership = self.execution_state.system.ownership.get().clone();
1058        let fallback_owners =
1059            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1060        self.pending_validated_blobs.clear();
1061        self.pending_proposed_blobs.clear();
1062        self.manager
1063            .reset(ownership, next_height, local_time, fallback_owners)
1064    }
1065
1066    /// Updates the outboxes with the messages sent in the block.
1067    ///
1068    /// Returns the set of all recipients.
1069    async fn process_outgoing_messages(
1070        &mut self,
1071        block: &Block,
1072    ) -> Result<Vec<ChainId>, ChainError> {
1073        // Record the messages of the execution. Messages are understood within an
1074        // application.
1075        let recipients = block.recipients();
1076        let block_height = block.header.height;
1077        let next_height = self.tip_state.get().next_block_height;
1078
1079        // Update the outboxes.
1080        let outbox_counters = self.outbox_counters.get_mut();
1081        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1082        let targets = recipients.into_iter().collect::<Vec<_>>();
1083        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1084        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1085            if block_height > next_height {
1086                // There may be a gap in the chain before this block. We can only add it to this
1087                // outbox if the previous message to the same recipient has already been added.
1088                if *outbox.next_height_to_schedule.get() > block_height {
1089                    continue; // We already added this recipient's messages to the outbox.
1090                }
1091                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1092                {
1093                    // The block with the last added message has already been executed; look up its
1094                    // hash in the confirmed_log.
1095                    Some(height) if height < next_height => {
1096                        let index =
1097                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1098                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1099                            ChainError::InternalError("missing entry in confirmed_log".into())
1100                        })?)
1101                    }
1102                    // The block with last added message has not been executed yet. If we have it,
1103                    // it's in preprocessed_blocks.
1104                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1105                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1106                    )?),
1107                    None => None, // No message to that sender was added yet.
1108                };
1109                // Only schedule if this block contains the next message for that recipient.
1110                match (
1111                    maybe_prev_hash,
1112                    block.body.previous_message_blocks.get(target),
1113                ) {
1114                    (None, None) => {
1115                        // No previous message block expected and none indicated by the outbox -
1116                        // all good
1117                    }
1118                    (Some(_), None) => {
1119                        // Outbox indicates there was a previous message block, but
1120                        // previous_message_blocks has no idea about it - possible bug
1121                        return Err(ChainError::InternalError(
1122                            "block indicates no previous message block,\
1123                            but we have one in the outbox"
1124                                .into(),
1125                        ));
1126                    }
1127                    (None, Some((_, prev_msg_block_height))) => {
1128                        // We have no previously processed block in the outbox, but we are
1129                        // expecting one - this could be due to an empty outbox having been pruned.
1130                        // Only process the outbox if the height of the previous message block is
1131                        // lower than the tip
1132                        if *prev_msg_block_height >= next_height {
1133                            continue;
1134                        }
1135                    }
1136                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1137                        // Only process the outbox if the hashes match.
1138                        if prev_hash != prev_msg_block_hash {
1139                            continue;
1140                        }
1141                    }
1142                }
1143            }
1144            if outbox.schedule_message(block_height)? {
1145                *outbox_counters.entry(block_height).or_default() += 1;
1146                nonempty_outboxes.insert(*target);
1147            }
1148        }
1149
1150        #[cfg(with_metrics)]
1151        metrics::NUM_OUTBOXES
1152            .with_label_values(&[])
1153            .observe(self.outboxes.count().await? as f64);
1154        Ok(targets)
1155    }
1156
1157    /// Updates the event streams with events emitted by the block if they form a contiguous
1158    /// sequence (might not be the case when preprocessing a block).
1159    /// Returns the set of updated event streams.
1160    async fn process_emitted_events(
1161        &mut self,
1162        block: &Block,
1163    ) -> Result<BTreeSet<StreamId>, ChainError> {
1164        let mut emitted_streams: BTreeMap<StreamId, BTreeSet<u32>> = BTreeMap::new();
1165        for event in block.body.events.iter().flatten() {
1166            emitted_streams
1167                .entry(event.stream_id.clone())
1168                .or_default()
1169                .insert(event.index);
1170        }
1171
1172        let mut updated_streams = BTreeSet::new();
1173        for (stream_id, indices) in emitted_streams {
1174            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1175                // we don't expect the epoch stream to contain event 0
1176                1
1177            } else {
1178                0
1179            };
1180            let mut current_expected_index = self
1181                .next_expected_events
1182                .get(&stream_id)
1183                .await?
1184                .unwrap_or(initial_index);
1185            for index in indices {
1186                if index == current_expected_index {
1187                    updated_streams.insert(stream_id.clone());
1188                    current_expected_index = index.saturating_add(1);
1189                }
1190            }
1191            if current_expected_index != 0 {
1192                self.next_expected_events
1193                    .insert(&stream_id, current_expected_index)?;
1194            }
1195        }
1196        Ok(updated_streams)
1197    }
1198}
1199
1200#[test]
1201fn empty_block_size() {
1202    let size = bcs::serialized_size(&crate::block::Block::new(
1203        crate::test::make_first_block(
1204            linera_execution::test_utils::dummy_chain_description(0).id(),
1205        ),
1206        crate::data_types::BlockExecutionOutcome::default(),
1207    ))
1208    .unwrap();
1209    assert_eq!(size, EMPTY_BLOCK_SIZE);
1210}