1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Cursor,
14 Epoch, NonCanonicalBTreeMap, NonCanonicalBTreeSet, OracleResponse, Timestamp,
15 },
16 ensure,
17 hashed::Hashed,
18 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19 ownership::ChainOwnership,
20 time::{Duration, Instant},
21};
22use linera_execution::{
23 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
24 Message, Operation, OutgoingMessage, PreparedCheckpoint, Query, QueryContext, QueryOutcome,
25 ResourceController, ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
26};
27use linera_views::{
28 context::Context,
29 log_view::LogView,
30 map_view::{CustomMapView, MapView},
31 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32 register_view::RegisterView,
33 set_view::SetView,
34 views::{ClonableView, RootView, View},
35};
36use serde::{Deserialize, Serialize};
37use tracing::{info, instrument, warn};
38
39use crate::{
40 block::{Block, ConfirmedBlock},
41 block_tracker::BlockExecutionTracker,
42 data_types::{
43 BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
44 IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
45 },
46 inbox::{InboxError, InboxStateView},
47 manager::ChainManager,
48 outbox::OutboxStateView,
49 pending_blobs::PendingBlobsView,
50 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62 use std::sync::LazyLock;
63
64 use linera_base::prometheus_util::{
65 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
66 };
67 use linera_execution::ResourceTracker;
68 use prometheus::{HistogramVec, IntCounterVec};
69
70 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72 });
73
74 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75 register_histogram_vec(
76 "block_execution_latency",
77 "Block execution latency",
78 &[],
79 exponential_bucket_interval(50.0_f64, 10_000_000.0),
80 )
81 });
82
83 #[cfg(with_metrics)]
84 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85 register_histogram_vec(
86 "message_execution_latency",
87 "Message execution latency",
88 &[],
89 exponential_bucket_interval(0.1_f64, 50_000.0),
90 )
91 });
92
93 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "operation_execution_latency",
96 "Operation execution latency",
97 &[],
98 exponential_bucket_interval(0.1_f64, 50_000.0),
99 )
100 });
101
102 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "wasm_fuel_used_per_block",
105 "Wasm fuel used per block",
106 &[],
107 exponential_bucket_interval(10.0, 100_000_000.0),
108 )
109 });
110
111 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "evm_fuel_used_per_block",
114 "EVM fuel used per block",
115 &[],
116 exponential_bucket_interval(10.0, 100_000_000.0),
117 )
118 });
119
120 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "vm_num_reads_per_block",
123 "VM number of reads per block",
124 &[],
125 exponential_bucket_interval(0.1, 100.0),
126 )
127 });
128
129 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "vm_bytes_read_per_block",
132 "VM number of bytes read per block",
133 &[],
134 exponential_bucket_interval(0.1, 10_000_000.0),
135 )
136 });
137
138 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139 register_histogram_vec(
140 "vm_bytes_written_per_block",
141 "VM number of bytes written per block",
142 &[],
143 exponential_bucket_interval(0.1, 10_000_000.0),
144 )
145 });
146
147 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148 register_histogram_vec(
149 "state_hash_computation_latency",
150 "Time to recompute the state hash, in microseconds",
151 &[],
152 exponential_bucket_interval(1.0, 2_000_000.0),
153 )
154 });
155
156 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157 register_histogram_vec(
158 "num_outboxes",
159 "Number of outboxes",
160 &[],
161 exponential_bucket_interval(1.0, 1_000_000.0),
162 )
163 });
164
165 pub static OUTBOX_COUNTERS_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
166 register_histogram_vec(
167 "outbox_counters_size",
168 "Number of entries in the outbox_counters map (in-flight message heights)",
169 &[],
170 exponential_bucket_interval(1.0, 1_000_000.0),
171 )
172 });
173
174 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177 WASM_FUEL_USED_PER_BLOCK
178 .with_label_values(&[])
179 .observe(tracker.wasm_fuel as f64);
180 EVM_FUEL_USED_PER_BLOCK
181 .with_label_values(&[])
182 .observe(tracker.evm_fuel as f64);
183 VM_NUM_READS_PER_BLOCK
184 .with_label_values(&[])
185 .observe(tracker.read_operations as f64);
186 VM_BYTES_READ_PER_BLOCK
187 .with_label_values(&[])
188 .observe(tracker.bytes_read as f64);
189 VM_BYTES_WRITTEN_PER_BLOCK
190 .with_label_values(&[])
191 .observe(tracker.bytes_written as f64);
192 }
193}
194
195pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct ChainIdSet(pub BTreeSet<ChainId>);
205
206impl linera_base::crypto::BcsHashable<'_> for ChainIdSet {}
207
208impl std::ops::Deref for ChainIdSet {
209 type Target = BTreeSet<ChainId>;
210
211 fn deref(&self) -> &Self::Target {
212 &self.0
213 }
214}
215
216#[cfg_attr(
218 with_graphql,
219 derive(async_graphql::SimpleObject),
220 graphql(cache_control(no_cache))
221)]
222#[derive(Debug, RootView, ClonableView, Allocative)]
223#[allocative(bound = "C")]
224pub struct ChainStateView<C>
225where
226 C: Clone + Context + 'static,
227{
228 pub execution_state: ExecutionStateView<C>,
230
231 pub tip_state: RegisterView<C, ChainTipState>,
233
234 pub manager: ChainManager<C>,
236 pub pending_validated_blobs: PendingBlobsView<C>,
239 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
241
242 pub block_hashes: CustomMapView<C, BlockHeight, CryptoHash>,
246 pub received_log: LogView<C, ChainAndHeight>,
248 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
250
251 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
253 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
255 pub next_expected_events: MapView<C, StreamId, u32>,
258 pub outbox_counters: RegisterView<C, NonCanonicalBTreeMap<BlockHeight, u32>>,
261 pub nonempty_outboxes: RegisterView<C, NonCanonicalBTreeSet<ChainId>>,
263
264 pub nonempty_inboxes: RegisterView<C, NonCanonicalBTreeSet<ChainId>>,
266
267 pub block_zero_executed_at: RegisterView<C, Timestamp>,
271
272 pub next_height_to_preprocess: RegisterView<C, BlockHeight>,
281
282 pub latest_checkpoint_height: RegisterView<C, Option<BlockHeight>>,
286
287 pub pre_checkpoint_block_trust: SetView<C, CryptoHash>,
295
296 pub outbox_index_tracked_hash: RegisterView<C, Option<CryptoHash>>,
303}
304
305#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
307#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
308pub struct ChainTipState {
309 pub block_hash: Option<CryptoHash>,
311 pub next_block_height: BlockHeight,
313 pub num_incoming_bundles: u32,
315 pub num_operations: u32,
317 pub num_outgoing_messages: u32,
319}
320
321impl ChainTipState {
322 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
325 ensure!(
326 new_block.height == self.next_block_height,
327 ChainError::UnexpectedBlockHeight {
328 expected_block_height: self.next_block_height,
329 found_block_height: new_block.height
330 }
331 );
332 ensure!(
333 new_block.previous_block_hash == self.block_hash,
334 ChainError::UnexpectedPreviousBlockHash
335 );
336 Ok(())
337 }
338
339 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
342 ensure!(
343 self.next_block_height >= height,
344 ChainError::MissingEarlierBlocks {
345 current_block_height: self.next_block_height,
346 }
347 );
348 Ok(self.next_block_height > height)
349 }
350
351 pub fn update_counters(
353 &mut self,
354 transactions: &[Transaction],
355 messages: &[Vec<OutgoingMessage>],
356 ) -> Result<(), ChainError> {
357 let mut num_incoming_bundles = 0u32;
358 let mut num_operations = 0u32;
359
360 for transaction in transactions {
361 match transaction {
362 Transaction::ReceiveMessages(_) => {
363 num_incoming_bundles = num_incoming_bundles
364 .checked_add(1)
365 .ok_or(ArithmeticError::Overflow)?;
366 }
367 Transaction::ExecuteOperation(_) => {
368 num_operations = num_operations
369 .checked_add(1)
370 .ok_or(ArithmeticError::Overflow)?;
371 }
372 }
373 }
374
375 self.num_incoming_bundles = self
376 .num_incoming_bundles
377 .checked_add(num_incoming_bundles)
378 .ok_or(ArithmeticError::Overflow)?;
379
380 self.num_operations = self
381 .num_operations
382 .checked_add(num_operations)
383 .ok_or(ArithmeticError::Overflow)?;
384
385 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
386 .map_err(|_| ArithmeticError::Overflow)?;
387 self.num_outgoing_messages = self
388 .num_outgoing_messages
389 .checked_add(num_outgoing_messages)
390 .ok_or(ArithmeticError::Overflow)?;
391
392 Ok(())
393 }
394}
395
396impl<C> ChainStateView<C>
397where
398 C: Context + Clone + 'static,
399 C::Extra: ExecutionRuntimeContext,
400{
401 pub fn chain_id(&self) -> ChainId {
403 self.context().extra().chain_id()
404 }
405
406 #[instrument(skip_all, fields(
407 chain_id = %self.chain_id(),
408 ))]
409 pub async fn query_application(
410 &mut self,
411 local_time: Timestamp,
412 query: Query,
413 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
414 ) -> Result<QueryOutcome, ChainError> {
415 let context = QueryContext {
416 chain_id: self.chain_id(),
417 next_block_height: self.tip_state.get().next_block_height,
418 local_time,
419 };
420 self.execution_state
421 .query_application(context, query, service_runtime_endpoint)
422 .await
423 .with_execution_context(ChainExecutionContext::Query)
424 }
425
426 #[instrument(skip_all, fields(
427 chain_id = %self.chain_id(),
428 application_id = %application_id
429 ))]
430 pub async fn describe_application(
431 &mut self,
432 application_id: ApplicationId,
433 ) -> Result<ApplicationDescription, ChainError> {
434 self.execution_state
435 .system
436 .describe_application(application_id, &mut TransactionTracker::default())
437 .await
438 .with_execution_context(ChainExecutionContext::DescribeApplication)
439 }
440
441 #[instrument(skip_all, fields(
442 chain_id = %self.chain_id(),
443 target = %target,
444 height = %height
445 ))]
446 pub async fn mark_messages_as_received(
447 &mut self,
448 target: &ChainId,
449 height: BlockHeight,
450 tracked: Option<&ChainIdSet>,
451 ) -> Result<bool, ChainError> {
452 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
453 let updates = outbox.mark_messages_as_received(height).await?;
454 if updates.is_empty() {
455 return Ok(false);
456 }
457 if tracked.is_none_or(|tracked| tracked.contains(target)) {
464 for update in updates {
465 let counter = self
466 .outbox_counters
467 .get_mut()
468 .get_mut(&update)
469 .ok_or_else(|| {
470 ChainError::CorruptedChainState("message counter should be present".into())
471 })?;
472 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
473 if *counter == 0 {
474 self.outbox_counters.get_mut().remove(&update);
476 }
477 }
478 }
479 if outbox.queue.count() == 0 {
480 self.nonempty_outboxes.get_mut().remove(target);
481 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
483 self.outboxes.remove_entry(target)?;
484 }
485 }
486 #[cfg(with_metrics)]
487 metrics::NUM_OUTBOXES
488 .with_label_values(&[])
489 .observe(self.nonempty_outboxes.get().len() as f64);
490 #[cfg(with_metrics)]
491 metrics::OUTBOX_COUNTERS_SIZE
492 .with_label_values(&[])
493 .observe(self.outbox_counters.get().len() as f64);
494 Ok(true)
495 }
496
497 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
500 tracing::debug!(
501 "Messages left in {:.8}'s outbox: {:?}",
502 self.chain_id(),
503 self.outbox_counters.get()
504 );
505 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
506 key > &height
507 } else {
508 true
509 }
510 }
511
512 pub async fn is_active(&self) -> Result<bool, ChainError> {
514 Ok(self.execution_state.system.is_active().await?)
515 }
516
517 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
519 let chain_id = self.chain_id();
520 if self
522 .execution_state
523 .system
524 .initialize_chain(chain_id)
525 .await
526 .with_execution_context(ChainExecutionContext::Block)?
527 {
528 return Ok(());
530 }
531 let maybe_committee = self
532 .execution_state
533 .system
534 .current_committee()
535 .await
536 .with_execution_context(ChainExecutionContext::Block)?;
537 self.manager.reset(
539 self.execution_state.system.ownership.get().await?.clone(),
540 BlockHeight(0),
541 local_time,
542 maybe_committee
543 .iter()
544 .flat_map(|(_, committee)| committee.account_keys_and_weights()),
545 )?;
546 Ok(())
547 }
548
549 fn insert_block_hash(
553 &mut self,
554 height: BlockHeight,
555 hash: CryptoHash,
556 ) -> Result<(), ChainError> {
557 self.block_hashes.insert(&height, hash)?;
558 let next = self.next_height_to_preprocess.get_mut();
559 if *next <= height {
560 *next = height.try_add_one()?;
561 }
562 Ok(())
563 }
564
565 #[instrument(skip_all, fields(
572 chain_id = %self.chain_id(),
573 origin = %origin,
574 bundle_height = %bundle.height
575 ))]
576 pub async fn receive_message_bundle_with_inbox(
577 &mut self,
578 inbox: &mut InboxStateView<C>,
579 origin: &ChainId,
580 bundle: MessageBundle,
581 local_time: Timestamp,
582 add_to_received_log: bool,
583 ) -> Result<(), ChainError> {
584 assert!(!bundle.messages.is_empty());
585 let chain_id = self.chain_id();
586 tracing::trace!(
587 "Processing new messages from {origin} at height {}",
588 bundle.height,
589 );
590 let chain_and_height = ChainAndHeight {
591 chain_id: *origin,
592 height: bundle.height,
593 };
594
595 match self.initialize_if_needed(local_time).await {
596 Ok(_) => (),
597 Err(ChainError::ExecutionError(exec_err, _))
600 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
601 if blobs.iter().all(|blob_id| {
602 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
603 })) => {}
604 err => {
605 return err;
606 }
607 }
608
609 let newly_added = inbox
611 .add_bundle(bundle)
612 .await
613 .map_err(|error| match error {
614 InboxError::ViewError(error) => ChainError::ViewError(error),
615 error => ChainError::CorruptedChainState(format!(
616 "while processing messages in certified block: {error}"
617 )),
618 })?;
619 if newly_added {
620 self.nonempty_inboxes.get_mut().insert(*origin);
621 }
622
623 if add_to_received_log {
625 self.received_log.push(chain_and_height);
626 }
627 Ok(())
628 }
629
630 pub fn update_received_certificate_trackers(
632 &mut self,
633 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
634 ) {
635 for (name, tracker) in new_trackers {
636 self.received_certificate_trackers
637 .get_mut()
638 .entry(name)
639 .and_modify(|t| {
640 if tracker > *t {
643 *t = tracker;
644 }
645 })
646 .or_insert(tracker);
647 }
648 }
649
650 pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
651 let chain_id = self.chain_id();
652 self.execution_state
653 .system
654 .current_committee()
655 .await
656 .with_execution_context(ChainExecutionContext::Block)?
657 .ok_or(ChainError::InactiveChain(chain_id))
658 }
659
660 pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
661 Ok(self.execution_state.system.ownership.get().await?)
662 }
663
664 #[instrument(skip_all, fields(
670 chain_id = %self.chain_id(),
671 ))]
672 pub async fn remove_bundles_from_inboxes(
673 &mut self,
674 timestamp: Timestamp,
675 must_be_present: bool,
676 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
677 ) -> Result<(), ChainError> {
678 let chain_id = self.chain_id();
679 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
680 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
681 ensure!(
682 bundle.timestamp <= timestamp,
683 ChainError::IncorrectBundleTimestamp {
684 chain_id,
685 bundle_timestamp: bundle.timestamp,
686 block_timestamp: timestamp,
687 }
688 );
689 let bundles = bundles_by_origin.entry(*origin).or_default();
690 bundles.push(bundle);
691 }
692 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
693 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
694 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
695 tracing::trace!(
696 "Removing [{}] from inbox for {origin}",
697 bundles
698 .iter()
699 .map(|bundle| bundle.height.to_string())
700 .collect::<Vec<_>>()
701 .join(", ")
702 );
703 for bundle in bundles {
704 let was_present = inbox
706 .remove_bundle(bundle)
707 .await
708 .map_err(|error| (chain_id, origin, error))?;
709 if must_be_present {
710 ensure!(
711 was_present,
712 ChainError::MissingCrossChainUpdate {
713 chain_id,
714 origin,
715 height: bundle.height,
716 }
717 );
718 }
719 }
720 inbox.observe_size_metric();
721 if inbox.added_bundles.count() == 0 {
722 self.nonempty_inboxes.get_mut().remove(&origin);
723 }
724 }
725 Ok(())
726 }
727
728 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
730 self.nonempty_outboxes.get().iter().copied().collect()
731 }
732
733 pub async fn load_outboxes(
735 &self,
736 targets: &[ChainId],
737 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
738 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
739 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
740 optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
741 }
742
743 pub async fn reconcile_outbox_index(
749 &mut self,
750 tracked: Option<&Hashed<ChainIdSet>>,
751 ) -> Result<bool, ChainError> {
752 if *self.outbox_index_tracked_hash.get() == tracked.map(|tracked| tracked.hash()) {
753 return Ok(false);
754 }
755 self.rebuild_outbox_index(tracked).await?;
756 Ok(true)
757 }
758
759 async fn rebuild_outbox_index(
765 &mut self,
766 tracked: Option<&Hashed<ChainIdSet>>,
767 ) -> Result<(), ChainError> {
768 self.nonempty_outboxes.get_mut().clear();
769 self.outbox_counters.get_mut().clear();
770 let targets = match tracked {
773 Some(tracked) => tracked.inner().iter().copied().collect::<Vec<_>>(),
774 None => self.outboxes.indices().await?,
775 };
776 for target in &targets {
777 let heights = {
778 let Some(outbox) = self.outboxes.try_load_entry(target).await? else {
779 continue;
780 };
781 outbox.queue.elements().await?
782 };
783 if heights.is_empty() {
784 continue;
785 }
786 for height in heights {
787 *self.outbox_counters.get_mut().entry(height).or_default() += 1;
788 }
789 self.nonempty_outboxes.get_mut().insert(*target);
790 }
791 self.outbox_index_tracked_hash
792 .set(tracked.map(|tracked| tracked.hash()));
793 Ok(())
794 }
795
796 pub fn outbox_index_is_reconciled(&self, tracked: Option<&Hashed<ChainIdSet>>) -> bool {
799 *self.outbox_index_tracked_hash.get() == tracked.map(|tracked| tracked.hash())
800 }
801
802 #[expect(clippy::too_many_arguments)]
804 #[instrument(skip_all, fields(
805 chain_id = %block.chain_id,
806 block_height = %block.height
807 ))]
808 async fn execute_block_inner(
809 chain: &mut ExecutionStateView<C>,
810 block_hashes: &CustomMapView<C, BlockHeight, CryptoHash>,
811 block: &mut ProposedBlock,
812 local_time: Timestamp,
813 round: Option<u32>,
814 published_blobs: &[Blob],
815 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
816 exec_policy: BundleExecutionPolicy,
817 checkpoint_origin_cursors: Vec<(ChainId, Cursor)>,
818 checkpoint_inbox_cursors: Vec<(ChainId, Cursor)>,
819 checkpoint_outbox_block_hashes: Vec<CryptoHash>,
820 ) -> Result<(BlockExecutionOutcome, ResourceTracker, HashSet<ChainId>), ChainError> {
821 if !matches!(&exec_policy.on_failure, BundleFailurePolicy::Abort) {
824 assert!(
825 replaying_oracle_responses.is_none(),
826 "Cannot use AutoRetry policy when replaying oracle responses"
827 );
828 }
829
830 #[cfg(with_metrics)]
831 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
832
833 let committee_policy = chain
838 .system
839 .current_committee()
840 .await
841 .with_execution_context(ChainExecutionContext::Block)?
842 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
843 .1
844 .policy()
845 .clone();
846
847 let prepared_checkpoint = if block.starts_with_checkpoint() {
856 let blobs = chain
857 .prepare_checkpoint(committee_policy.maximum_blob_size)
858 .await
859 .with_execution_context(ChainExecutionContext::Block)?;
860 Some(PreparedCheckpoint {
861 blobs,
862 origin_cursors: checkpoint_origin_cursors,
863 inbox_cursors: checkpoint_inbox_cursors,
864 outbox_block_hashes: checkpoint_outbox_block_hashes,
865 })
866 } else {
867 None
868 };
869
870 chain.system.timestamp.set(block.timestamp);
871
872 let mut resource_controller = ResourceController::new(
873 Arc::new(committee_policy),
874 ResourceTracker::default(),
875 block.authenticated_owner,
876 );
877
878 for blob in published_blobs {
879 let blob_id = blob.id();
880 resource_controller
881 .policy()
882 .check_blob_size(blob.content())
883 .with_execution_context(ChainExecutionContext::Block)?;
884 chain.system.used_blobs.insert(&blob_id)?;
885 }
886
887 let mut block_execution_tracker = BlockExecutionTracker::new(
888 &mut resource_controller,
889 published_blobs
890 .iter()
891 .map(|blob| (blob.id(), blob))
892 .collect(),
893 local_time,
894 replaying_oracle_responses,
895 block,
896 )?;
897 if let Some(prepared) = prepared_checkpoint {
898 block_execution_tracker.set_prepared_checkpoint(prepared);
899 }
900
901 let (max_failures, never_reject_application_ids) = match &exec_policy.on_failure {
903 BundleFailurePolicy::Abort => (0, Arc::new(HashSet::new())),
904 BundleFailurePolicy::AutoRetry {
905 max_failures,
906 never_reject_application_ids,
907 } => (*max_failures, never_reject_application_ids.clone()),
908 };
909 let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
910 let mut failure_count = 0u32;
911 let mut never_reject_discarded_origins = HashSet::new();
912
913 let time_budget = exec_policy.time_budget;
914 let mut cumulative_bundle_time = Duration::ZERO;
915
916 let mut i = 0;
917 while i < block.transactions.len() {
918 let transaction = &mut block.transactions[i];
919 let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
920 let is_stream_update = transaction.is_update_stream();
921
922 if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
924 info!(
925 ?cumulative_bundle_time,
926 ?time_budget,
927 "Time budget for bundle staging exceeded, discarding remaining bundles"
928 );
929 Self::discard_remaining_bundles(block, i, None);
930 continue;
931 }
932
933 let checkpoint = if auto_retry && (is_bundle || is_stream_update) {
934 Some((
935 chain.clone_unchecked()?,
936 block_execution_tracker.create_checkpoint(),
937 ))
938 } else {
939 None
940 };
941
942 let bundle_start = if is_bundle && time_budget.is_some() {
943 Some(Instant::now())
944 } else {
945 None
946 };
947
948 let result = block_execution_tracker
949 .execute_transaction(&*transaction, round, chain)
950 .await;
951
952 if let Some(start) = bundle_start {
953 cumulative_bundle_time += start.elapsed();
954 }
955
956 match (result, transaction, checkpoint) {
961 (Ok(()), _, _) => {
962 i += 1;
963 }
964 (
965 Err(ChainError::ExecutionError(error, _context)),
966 Transaction::ReceiveMessages(incoming_bundle),
967 Some((saved_chain, saved_tracker)),
968 ) if !error.is_transient_error() && error.is_limit_error() && i > 0 => {
969 *chain = saved_chain;
971 block_execution_tracker.restore_checkpoint(&saved_tracker);
972 failure_count += 1;
973 let maybe_sender = if failure_count > max_failures {
975 info!(
976 failure_count,
977 max_failures,
978 "Exceeded max bundle failures, discarding all remaining message \
979 bundles and stream updates"
980 );
981 Self::discard_remaining_stream_updates(block, i);
982 None
983 } else {
984 info!(
986 %error,
987 index = i,
988 origin = %incoming_bundle.origin,
989 "Message bundle exceeded block limits and will be discarded for \
990 retry in a later block"
991 );
992 Some(incoming_bundle.origin)
993 };
994 Self::discard_remaining_bundles(block, i, maybe_sender);
995 }
997 (
998 Err(ChainError::ExecutionError(error, context)),
999 Transaction::ReceiveMessages(incoming_bundle),
1000 Some((saved_chain, saved_tracker)),
1001 ) if !error.is_transient_error() => {
1002 *chain = saved_chain;
1004 block_execution_tracker.restore_checkpoint(&saved_tracker);
1005
1006 let all_messages_never_reject = !never_reject_application_ids.is_empty()
1007 && incoming_bundle.messages().all(|posted_msg| {
1008 never_reject_application_ids
1009 .contains(&posted_msg.message.application_id())
1010 });
1011 if (all_messages_never_reject || incoming_bundle.bundle.is_protected())
1012 && incoming_bundle.action != MessageAction::Reject
1013 {
1014 let origin = incoming_bundle.origin;
1015 never_reject_discarded_origins.insert(origin);
1016 warn!(
1017 %error,
1018 index = i,
1019 %origin,
1020 "Message bundle cannot be rejected (protected or never-reject); \
1021 discarding the bundle (and same-sender subsequent bundles) for retry \
1022 in a later block"
1023 );
1024 Self::discard_remaining_bundles(block, i, Some(origin));
1025 } else if incoming_bundle.action == MessageAction::Reject {
1026 return Err(ChainError::ExecutionError(error, context));
1028 } else {
1029 info!(
1032 %error,
1033 index = i,
1034 origin = %incoming_bundle.origin,
1035 "Message bundle failed to execute and will be rejected"
1036 );
1037 incoming_bundle.action = MessageAction::Reject;
1038 }
1039 }
1041 (
1042 Err(ChainError::ExecutionError(error, _context)),
1043 transaction,
1044 Some((saved_chain, saved_tracker)),
1045 ) if transaction.is_update_stream()
1046 && !error.is_transient_error()
1047 && error.is_limit_error()
1048 && i > 0 =>
1049 {
1050 *chain = saved_chain;
1052 block_execution_tracker.restore_checkpoint(&saved_tracker);
1053 failure_count += 1;
1054 if failure_count > max_failures {
1055 info!(
1056 failure_count,
1057 max_failures,
1058 "Exceeded max failures, discarding all remaining stream updates and \
1059 message bundles"
1060 );
1061 Self::discard_remaining_bundles(block, i, None);
1062 Self::discard_remaining_stream_updates(block, i);
1063 } else {
1064 info!(
1065 %error,
1066 index = i,
1067 "UpdateStream exceeded block limits, discarding for retry"
1068 );
1069 block.transactions.remove(i);
1070 }
1071 }
1073 (Err(e), _, _) => return Err(e),
1074 };
1075 }
1076
1077 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1080
1081 let recipients = block_execution_tracker.recipients();
1082 let non_ack_tx_indices = block_execution_tracker.non_checkpoint_ack_tx_indices();
1083 let mut recipient_heights = Vec::new();
1084 for (recipient, height) in chain
1085 .previous_message_blocks
1086 .multi_get_pairs(recipients)
1087 .await?
1088 {
1089 if let Some(tx_indices) = non_ack_tx_indices.get(&recipient) {
1097 chain
1098 .previous_message_blocks
1099 .insert(&recipient, block.height)?;
1100 let mut cursors = chain
1106 .system
1107 .unfinalized_message_blocks
1108 .get(&recipient)
1109 .await?
1110 .unwrap_or_default();
1111 for index in tx_indices {
1112 cursors.insert(Cursor {
1113 height: block.height,
1114 index: *index,
1115 });
1116 }
1117 chain
1118 .system
1119 .unfinalized_message_blocks
1120 .insert(&recipient, cursors)?;
1121 }
1122 if let Some(height) = height {
1123 recipient_heights.push((recipient, height));
1124 }
1125 }
1126 let hashes = block_hashes
1127 .multi_get(recipient_heights.iter().map(|(_, height)| height))
1128 .await?;
1129 let mut previous_message_blocks = BTreeMap::new();
1130 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
1131 let hash = hash.ok_or_else(|| {
1132 ChainError::CorruptedChainState("missing entry in block_hashes".into())
1133 })?;
1134 previous_message_blocks.insert(recipient, (hash, height));
1135 }
1136
1137 let streams = block_execution_tracker.event_streams();
1138 let mut stream_heights = Vec::new();
1139 for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
1140 chain.previous_event_blocks.insert(&stream, block.height)?;
1141 if let Some(height) = height {
1142 stream_heights.push((stream, height));
1143 }
1144 }
1145 let hashes = block_hashes
1146 .multi_get(stream_heights.iter().map(|(_, height)| height))
1147 .await?;
1148 let mut previous_event_blocks = BTreeMap::new();
1149 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
1150 let hash = hash.ok_or_else(|| {
1151 ChainError::CorruptedChainState("missing entry in block_hashes".into())
1152 })?;
1153 previous_event_blocks.insert(stream, (hash, height));
1154 }
1155
1156 let state_hash = {
1157 #[cfg(with_metrics)]
1158 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
1159 chain.crypto_hash_mut().await?
1160 };
1161
1162 let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
1163 block_execution_tracker.finalize(block.transactions.len());
1164
1165 Ok((
1166 BlockExecutionOutcome {
1167 messages,
1168 previous_message_blocks,
1169 previous_event_blocks,
1170 state_hash,
1171 oracle_responses,
1172 events,
1173 blobs,
1174 operation_results,
1175 },
1176 resource_tracker,
1177 never_reject_discarded_origins,
1178 ))
1179 }
1180
1181 fn discard_remaining_stream_updates(block: &mut ProposedBlock, mut index: usize) {
1182 while index < block.transactions.len() {
1183 if block.transactions[index].is_update_stream() {
1184 block.transactions.remove(index);
1185 } else {
1186 index += 1;
1187 }
1188 }
1189 }
1190
1191 fn discard_remaining_bundles(
1192 block: &mut ProposedBlock,
1193 mut index: usize,
1194 maybe_origin: Option<ChainId>,
1195 ) {
1196 while index < block.transactions.len() {
1197 if matches!(
1198 &block.transactions[index],
1199 Transaction::ReceiveMessages(bundle)
1200 if maybe_origin.is_none_or(|origin| bundle.origin == origin)
1201 ) {
1202 block.transactions.remove(index);
1203 } else {
1204 index += 1;
1205 }
1206 }
1207 }
1208
1209 #[instrument(skip_all, fields(
1220 chain_id = %self.chain_id(),
1221 block_height = %block.height
1222 ))]
1223 pub async fn execute_block(
1224 &mut self,
1225 mut block: ProposedBlock,
1226 local_time: Timestamp,
1227 round: Option<u32>,
1228 published_blobs: &[Blob],
1229 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
1230 policy: BundleExecutionPolicy,
1231 ) -> Result<
1232 (
1233 ProposedBlock,
1234 BlockExecutionOutcome,
1235 ResourceTracker,
1236 HashSet<ChainId>,
1237 ),
1238 ChainError,
1239 > {
1240 assert_eq!(
1241 block.chain_id,
1242 self.execution_state.context().extra().chain_id()
1243 );
1244
1245 self.initialize_if_needed(local_time).await?;
1246
1247 let chain_timestamp = *self.execution_state.system.timestamp.get();
1248 ensure!(
1249 chain_timestamp <= block.timestamp,
1250 ChainError::InvalidBlockTimestamp {
1251 parent: chain_timestamp,
1252 new: block.timestamp
1253 }
1254 );
1255 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1256
1257 ensure!(
1258 block.published_blob_ids()
1259 == published_blobs
1260 .iter()
1261 .map(|blob| blob.id())
1262 .collect::<BTreeSet<_>>(),
1263 ChainError::InternalError("published_blobs mismatch".to_string())
1264 );
1265
1266 if *self.execution_state.system.closed.get() {
1267 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
1268 }
1269
1270 Self::check_app_permissions(
1271 self.execution_state
1272 .system
1273 .application_permissions
1274 .get()
1275 .await?,
1276 &block,
1277 )?;
1278
1279 ensure!(
1280 !block
1281 .transactions
1282 .iter()
1283 .skip(1)
1284 .any(Transaction::is_checkpoint),
1285 ChainError::CheckpointPreconditionFailed(
1286 "Checkpoint must be the first transaction in its block",
1287 )
1288 );
1289 let (origin_cursors, inbox_cursors, outbox_block_hashes) = if block.starts_with_checkpoint()
1290 {
1291 self.check_checkpoint_preconditions().await?;
1292 let origin_cursors = self.collect_inbox_cursors().await?;
1293 let inbox_cursors = self.collect_all_inbox_cursors().await?;
1294 let hashes = self.collect_unfinalized_block_hashes().await?;
1295 (origin_cursors, inbox_cursors, hashes)
1296 } else {
1297 (Vec::new(), Vec::new(), Vec::new())
1298 };
1299
1300 Self::execute_block_inner(
1301 &mut self.execution_state,
1302 &self.block_hashes,
1303 &mut block,
1304 local_time,
1305 round,
1306 published_blobs,
1307 replaying_oracle_responses,
1308 policy,
1309 origin_cursors,
1310 inbox_cursors,
1311 outbox_block_hashes,
1312 )
1313 .await
1314 .map(|(outcome, tracker, never_reject_origins)| {
1315 (block, outcome, tracker, never_reject_origins)
1316 })
1317 }
1318
1319 async fn collect_inbox_cursors(&self) -> Result<Vec<(ChainId, Cursor)>, ChainError> {
1327 let targets = self
1328 .execution_state
1329 .system
1330 .pending_checkpoint_ack_targets
1331 .indices()
1332 .await?;
1333 let mut cursors = Vec::with_capacity(targets.len());
1334 for origin in targets {
1335 let Some(inbox) = self.inboxes.try_load_entry(&origin).await? else {
1336 continue;
1337 };
1338 cursors.push((origin, *inbox.next_cursor_to_remove.get()));
1339 }
1340 Ok(cursors)
1341 }
1342
1343 async fn collect_all_inbox_cursors(&self) -> Result<Vec<(ChainId, Cursor)>, ChainError> {
1348 let origins = self.inboxes.indices().await?;
1349 let mut cursors = Vec::new();
1350 for origin in origins {
1351 let Some(inbox) = self.inboxes.try_load_entry(&origin).await? else {
1352 continue;
1353 };
1354 let cursor = *inbox.next_cursor_to_remove.get();
1355 if cursor != Cursor::default() {
1356 cursors.push((origin, cursor));
1357 }
1358 }
1359 Ok(cursors)
1360 }
1361
1362 pub async fn restore_outboxes_from_unfinalized(
1370 &mut self,
1371 tracked: Option<&Hashed<ChainIdSet>>,
1372 ) -> Result<(), ChainError> {
1373 let prior_recipients = self.outboxes.indices().await?;
1379 for recipient in prior_recipients {
1380 let mut outbox = self.outboxes.try_load_entry_mut(&recipient).await?;
1381 outbox.queue.clear();
1382 outbox.next_height_to_schedule.set(BlockHeight::ZERO);
1383 }
1384 let entries = self
1385 .execution_state
1386 .system
1387 .unfinalized_message_blocks
1388 .index_values()
1389 .await?;
1390 for (recipient, cursors) in entries {
1391 if cursors.is_empty() {
1392 continue;
1393 }
1394 let heights = cursors
1397 .into_iter()
1398 .map(|cursor| cursor.height)
1399 .collect::<BTreeSet<_>>();
1400 let mut outbox = self.outboxes.try_load_entry_mut(&recipient).await?;
1401 for height in &heights {
1402 outbox.queue.push_back(*height);
1403 }
1404 let max_height = *heights
1405 .last()
1406 .expect("the empty case was filtered out above");
1407 outbox
1408 .next_height_to_schedule
1409 .set(max_height.try_add_one()?);
1410 }
1411 self.rebuild_outbox_index(tracked).await?;
1413 Ok(())
1414 }
1415
1416 async fn collect_unfinalized_block_hashes(&self) -> Result<Vec<CryptoHash>, ChainError> {
1422 let heights = self.collect_unfinalized_heights().await?;
1423 let mut hashes = Vec::with_capacity(heights.len());
1424 for height in heights {
1425 let hash = self.block_hashes.get(&height).await?.ok_or_else(|| {
1426 ChainError::CorruptedChainState(format!(
1427 "missing entry in block_hashes at height {height}"
1428 ))
1429 })?;
1430 hashes.push(hash);
1431 }
1432 Ok(hashes)
1433 }
1434
1435 pub async fn collect_unfinalized_heights(&self) -> Result<BTreeSet<BlockHeight>, ChainError> {
1442 let mut heights = BTreeSet::new();
1443 let entries = self
1444 .execution_state
1445 .system
1446 .unfinalized_message_blocks
1447 .index_values()
1448 .await?;
1449 for (_, per_recipient) in entries {
1450 heights.extend(per_recipient.into_iter().map(|cursor| cursor.height));
1451 }
1452 Ok(heights)
1453 }
1454
1455 #[instrument(skip_all, fields(
1459 chain_id = %self.chain_id(),
1460 block_height = %block.inner().inner().header.height
1461 ))]
1462 pub async fn apply_confirmed_block(
1463 &mut self,
1464 block: &ConfirmedBlock,
1465 local_time: Timestamp,
1466 tracked: Option<&ChainIdSet>,
1467 ) -> Result<BTreeSet<StreamId>, ChainError> {
1468 let hash = block.inner().hash();
1469 let block = block.inner().inner();
1470 if block.header.height == BlockHeight::ZERO {
1471 self.block_zero_executed_at.set(local_time);
1472 }
1473 let updated_streams = self.process_emitted_events(block).await?;
1474 self.process_outgoing_messages(block, tracked).await?;
1475
1476 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1478 .await?;
1479
1480 let tip = self.tip_state.get_mut();
1482 tip.block_hash = Some(hash);
1483 tip.next_block_height.try_add_assign_one()?;
1484 tip.update_counters(&block.body.transactions, &block.body.messages)?;
1485 self.insert_block_hash(block.header.height, hash)?;
1486 if block.body.starts_with_checkpoint() {
1487 self.latest_checkpoint_height.set(Some(block.header.height));
1488 }
1489 Ok(updated_streams)
1490 }
1491
1492 #[instrument(skip_all, fields(
1495 chain_id = %self.chain_id(),
1496 block_height = %block.inner().inner().header.height
1497 ))]
1498 pub async fn preprocess_block(
1499 &mut self,
1500 block: &ConfirmedBlock,
1501 tracked: Option<&ChainIdSet>,
1502 ) -> Result<BTreeSet<StreamId>, ChainError> {
1503 let hash = block.inner().hash();
1504 let block = block.inner().inner();
1505 let height = block.header.height;
1506 if height < self.tip_state.get().next_block_height {
1507 return Ok(BTreeSet::new());
1508 }
1509 self.process_outgoing_messages(block, tracked).await?;
1510 let updated_streams = self.process_emitted_events(block).await?;
1511 self.insert_block_hash(height, hash)?;
1512 Ok(updated_streams)
1513 }
1514
1515 #[instrument(skip_all, fields(
1517 block_height = %block.height,
1518 num_transactions = %block.transactions.len()
1519 ))]
1520 fn check_app_permissions(
1521 app_permissions: &ApplicationPermissions,
1522 block: &ProposedBlock,
1523 ) -> Result<(), ChainError> {
1524 let mut mandatory = app_permissions
1525 .mandatory_applications
1526 .iter()
1527 .copied()
1528 .collect::<HashSet<ApplicationId>>();
1529 for transaction in &block.transactions {
1530 match transaction {
1531 Transaction::ExecuteOperation(operation)
1532 if operation.is_exempt_from_permissions() =>
1533 {
1534 mandatory.clear()
1535 }
1536 Transaction::ExecuteOperation(operation) => {
1537 ensure!(
1538 app_permissions.can_execute_operations(&operation.application_id()),
1539 ChainError::AuthorizedApplications(
1540 app_permissions.execute_operations.clone().unwrap()
1541 )
1542 );
1543 if let Operation::User { application_id, .. } = operation {
1544 mandatory.remove(application_id);
1545 }
1546 }
1547 Transaction::ReceiveMessages(incoming_bundle)
1548 if incoming_bundle.action == MessageAction::Accept =>
1549 {
1550 for pending in incoming_bundle.messages() {
1551 if let Message::User { application_id, .. } = &pending.message {
1552 mandatory.remove(application_id);
1553 }
1554 }
1555 }
1556 Transaction::ReceiveMessages(_) => {}
1557 }
1558 }
1559 ensure!(
1560 mandatory.is_empty(),
1561 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1562 );
1563 Ok(())
1564 }
1565
1566 async fn check_checkpoint_preconditions(&self) -> Result<(), ChainError> {
1578 let mut had_event_tracker = false;
1579 self.next_expected_events
1580 .for_each_index_while(|_| {
1581 had_event_tracker = true;
1582 Ok(false)
1583 })
1584 .await?;
1585 ensure!(
1586 !had_event_tracker,
1587 ChainError::CheckpointPreconditionFailed("chain has consumed events")
1588 );
1589
1590 Ok(())
1591 }
1592
1593 #[instrument(skip_all, fields(
1596 chain_id = %self.chain_id(),
1597 next_block_height = %self.tip_state.get().next_block_height,
1598 ))]
1599 pub async fn block_hashes_for_heights(
1600 &self,
1601 heights: impl IntoIterator<Item = BlockHeight>,
1602 ) -> Result<Vec<CryptoHash>, ChainError> {
1603 let heights = heights.into_iter().collect::<Vec<_>>();
1604 Ok(self
1605 .block_hashes
1606 .multi_get(&heights)
1607 .await?
1608 .into_iter()
1609 .flatten()
1610 .collect())
1611 }
1612
1613 async fn reset_chain_manager(
1615 &mut self,
1616 next_height: BlockHeight,
1617 local_time: Timestamp,
1618 ) -> Result<(), ChainError> {
1619 let maybe_committee = self
1620 .execution_state
1621 .system
1622 .current_committee()
1623 .await
1624 .with_execution_context(ChainExecutionContext::Block)?;
1625 let ownership = self.execution_state.system.ownership.get().await?.clone();
1626 let fallback_owners = maybe_committee
1627 .iter()
1628 .flat_map(|(_, committee)| committee.account_keys_and_weights());
1629 self.pending_validated_blobs.clear();
1630 self.pending_proposed_blobs.clear();
1631 self.manager
1632 .reset(ownership, next_height, local_time, fallback_owners)
1633 }
1634
1635 #[instrument(skip_all, fields(
1639 chain_id = %self.chain_id(),
1640 block_height = %block.header.height
1641 ))]
1642 async fn process_outgoing_messages(
1643 &mut self,
1644 block: &Block,
1645 tracked: Option<&ChainIdSet>,
1646 ) -> Result<Vec<ChainId>, ChainError> {
1647 let recipients = block.recipients();
1650 let block_height = block.header.height;
1651 let next_height = self.tip_state.get().next_block_height;
1652
1653 let targets = recipients.into_iter().collect::<Vec<_>>();
1656 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1657 let mut scheduled_tracked = Vec::new();
1658 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1659 if block_height > next_height {
1660 if *outbox.next_height_to_schedule.get() > block_height {
1663 continue; }
1665 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1666 {
1667 Some(height) => {
1668 Some(self.block_hashes.get(&height).await?.ok_or_else(|| {
1669 ChainError::CorruptedChainState("missing entry in block_hashes".into())
1670 })?)
1671 }
1672 None => None, };
1674 match (
1676 maybe_prev_hash,
1677 block.body.previous_message_blocks.get(target),
1678 ) {
1679 (None, None) => {
1680 }
1683 (Some(_), None) => {
1684 }
1692 (None, Some((_, prev_msg_block_height))) => {
1693 if *prev_msg_block_height >= next_height {
1698 continue;
1699 }
1700 }
1701 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1702 if prev_hash != prev_msg_block_hash {
1708 continue;
1709 }
1710 }
1711 }
1712 }
1713 if outbox.schedule_message(block_height)?
1714 && tracked.is_none_or(|set| set.contains(target))
1715 {
1716 scheduled_tracked.push(*target);
1717 }
1718 #[cfg(with_metrics)]
1719 crate::outbox::metrics::OUTBOX_SIZE
1720 .with_label_values(&[])
1721 .observe(outbox.queue.count() as f64);
1722 }
1723
1724 if !scheduled_tracked.is_empty() {
1725 let scheduled_count =
1727 u32::try_from(scheduled_tracked.len()).map_err(|_| ArithmeticError::Overflow)?;
1728 *self
1729 .outbox_counters
1730 .get_mut()
1731 .entry(block_height)
1732 .or_default() += scheduled_count;
1733 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1734 for target in &scheduled_tracked {
1735 nonempty_outboxes.insert(*target);
1736 }
1737 }
1738
1739 #[cfg(with_metrics)]
1740 metrics::NUM_OUTBOXES
1741 .with_label_values(&[])
1742 .observe(self.nonempty_outboxes.get().len() as f64);
1743 #[cfg(with_metrics)]
1744 metrics::OUTBOX_COUNTERS_SIZE
1745 .with_label_values(&[])
1746 .observe(self.outbox_counters.get().len() as f64);
1747 Ok(targets)
1748 }
1749
1750 #[instrument(skip_all, fields(
1754 chain_id = %self.chain_id(),
1755 block_height = %block.header.height
1756 ))]
1757 async fn process_emitted_events(
1758 &mut self,
1759 block: &Block,
1760 ) -> Result<BTreeSet<StreamId>, ChainError> {
1761 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1762 for event in block.body.events.iter().flatten() {
1763 emitted_streams
1764 .entry(event.stream_id.clone())
1765 .or_default()
1766 .insert(event.index);
1767 }
1768 let mut stream_ids = Vec::new();
1769 let mut list_indices = Vec::new();
1770 for (stream_id, indices) in emitted_streams {
1771 stream_ids.push(stream_id);
1772 list_indices.push(indices);
1773 }
1774
1775 let mut updated_streams = BTreeSet::new();
1776 for ((stream_id, next_index), indices) in self
1777 .next_expected_events
1778 .multi_get_pairs(stream_ids)
1779 .await?
1780 .into_iter()
1781 .zip(list_indices)
1782 {
1783 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1784 1
1786 } else {
1787 0
1788 };
1789 let mut current_expected_index = next_index.unwrap_or(initial_index);
1790 for index in indices {
1791 if index == current_expected_index {
1792 updated_streams.insert(stream_id.clone());
1793 current_expected_index = index.saturating_add(1);
1794 }
1795 }
1796 if current_expected_index != 0 {
1797 self.next_expected_events
1798 .insert(&stream_id, current_expected_index)?;
1799 }
1800 }
1801 Ok(updated_streams)
1802 }
1803}
1804
1805#[test]
1806fn empty_block_size() {
1807 let size = bcs::serialized_size(&crate::block::Block::new(
1808 crate::test::make_first_block(
1809 linera_execution::test_utils::dummy_chain_description(0).id(),
1810 ),
1811 crate::data_types::BlockExecutionOutcome::default(),
1812 ))
1813 .unwrap();
1814 assert_eq!(size, EMPTY_BLOCK_SIZE);
1815}