1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14 OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19 time::{Duration, Instant},
20};
21use linera_execution::{
22 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27 context::Context,
28 log_view::LogView,
29 map_view::MapView,
30 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31 register_view::RegisterView,
32 views::{ClonableView, RootView, View},
33};
34use serde::{Deserialize, Serialize};
35use tracing::{info, instrument};
36
37use crate::{
38 block::{Block, ConfirmedBlock},
39 block_tracker::BlockExecutionTracker,
40 data_types::{
41 BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
42 IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
43 },
44 inbox::{InboxError, InboxStateView},
45 manager::ChainManager,
46 outbox::OutboxStateView,
47 pending_blobs::PendingBlobsView,
48 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
49};
50
51#[cfg(test)]
52#[path = "unit_tests/chain_tests.rs"]
53mod chain_tests;
54
55#[cfg(with_metrics)]
56use linera_base::prometheus_util::MeasureLatency;
57
58#[cfg(with_metrics)]
59pub(crate) mod metrics {
60 use std::sync::LazyLock;
61
62 use linera_base::prometheus_util::{
63 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
64 };
65 use linera_execution::ResourceTracker;
66 use prometheus::{HistogramVec, IntCounterVec};
67
68 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
70 });
71
72 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
73 register_histogram_vec(
74 "block_execution_latency",
75 "Block execution latency",
76 &[],
77 exponential_bucket_interval(50.0_f64, 10_000_000.0),
78 )
79 });
80
81 #[cfg(with_metrics)]
82 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
83 register_histogram_vec(
84 "message_execution_latency",
85 "Message execution latency",
86 &[],
87 exponential_bucket_interval(0.1_f64, 50_000.0),
88 )
89 });
90
91 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
92 register_histogram_vec(
93 "operation_execution_latency",
94 "Operation execution latency",
95 &[],
96 exponential_bucket_interval(0.1_f64, 50_000.0),
97 )
98 });
99
100 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
101 register_histogram_vec(
102 "wasm_fuel_used_per_block",
103 "Wasm fuel used per block",
104 &[],
105 exponential_bucket_interval(10.0, 100_000_000.0),
106 )
107 });
108
109 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
110 register_histogram_vec(
111 "evm_fuel_used_per_block",
112 "EVM fuel used per block",
113 &[],
114 exponential_bucket_interval(10.0, 100_000_000.0),
115 )
116 });
117
118 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
119 register_histogram_vec(
120 "vm_num_reads_per_block",
121 "VM number of reads per block",
122 &[],
123 exponential_bucket_interval(0.1, 100.0),
124 )
125 });
126
127 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128 register_histogram_vec(
129 "vm_bytes_read_per_block",
130 "VM number of bytes read per block",
131 &[],
132 exponential_bucket_interval(0.1, 10_000_000.0),
133 )
134 });
135
136 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
137 register_histogram_vec(
138 "vm_bytes_written_per_block",
139 "VM number of bytes written per block",
140 &[],
141 exponential_bucket_interval(0.1, 10_000_000.0),
142 )
143 });
144
145 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
146 register_histogram_vec(
147 "state_hash_computation_latency",
148 "Time to recompute the state hash, in microseconds",
149 &[],
150 exponential_bucket_interval(1.0, 2_000_000.0),
151 )
152 });
153
154 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
155 register_histogram_vec(
156 "num_outboxes",
157 "Number of outboxes",
158 &[],
159 exponential_bucket_interval(1.0, 10_000.0),
160 )
161 });
162
163 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
165 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
166 WASM_FUEL_USED_PER_BLOCK
167 .with_label_values(&[])
168 .observe(tracker.wasm_fuel as f64);
169 EVM_FUEL_USED_PER_BLOCK
170 .with_label_values(&[])
171 .observe(tracker.evm_fuel as f64);
172 VM_NUM_READS_PER_BLOCK
173 .with_label_values(&[])
174 .observe(tracker.read_operations as f64);
175 VM_BYTES_READ_PER_BLOCK
176 .with_label_values(&[])
177 .observe(tracker.bytes_read as f64);
178 VM_BYTES_WRITTEN_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.bytes_written as f64);
181 }
182}
183
184pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
186
187#[cfg_attr(
189 with_graphql,
190 derive(async_graphql::SimpleObject),
191 graphql(cache_control(no_cache))
192)]
193#[derive(Debug, RootView, ClonableView, Allocative)]
194#[allocative(bound = "C")]
195pub struct ChainStateView<C>
196where
197 C: Clone + Context + 'static,
198{
199 pub execution_state: ExecutionStateView<C>,
201 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
203
204 pub tip_state: RegisterView<C, ChainTipState>,
206
207 pub manager: ChainManager<C>,
209 pub pending_validated_blobs: PendingBlobsView<C>,
212 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
214
215 pub confirmed_log: LogView<C, CryptoHash>,
218 pub received_log: LogView<C, ChainAndHeight>,
220 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
222
223 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
225 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
227 pub next_expected_events: MapView<C, StreamId, u32>,
230 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
233 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
235
236 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
238 pub nonempty_inboxes: RegisterView<C, BTreeSet<ChainId>>,
240
241 pub block_zero_executed_at: RegisterView<C, Timestamp>,
245}
246
247#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
249#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
250pub struct ChainTipState {
251 pub block_hash: Option<CryptoHash>,
253 pub next_block_height: BlockHeight,
255 pub num_incoming_bundles: u32,
257 pub num_operations: u32,
259 pub num_outgoing_messages: u32,
261}
262
263impl ChainTipState {
264 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
267 ensure!(
268 new_block.height == self.next_block_height,
269 ChainError::UnexpectedBlockHeight {
270 expected_block_height: self.next_block_height,
271 found_block_height: new_block.height
272 }
273 );
274 ensure!(
275 new_block.previous_block_hash == self.block_hash,
276 ChainError::UnexpectedPreviousBlockHash
277 );
278 Ok(())
279 }
280
281 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
284 ensure!(
285 self.next_block_height >= height,
286 ChainError::MissingEarlierBlocks {
287 current_block_height: self.next_block_height,
288 }
289 );
290 Ok(self.next_block_height > height)
291 }
292
293 pub fn update_counters(
295 &mut self,
296 transactions: &[Transaction],
297 messages: &[Vec<OutgoingMessage>],
298 ) -> Result<(), ChainError> {
299 let mut num_incoming_bundles = 0u32;
300 let mut num_operations = 0u32;
301
302 for transaction in transactions {
303 match transaction {
304 Transaction::ReceiveMessages(_) => {
305 num_incoming_bundles = num_incoming_bundles
306 .checked_add(1)
307 .ok_or(ArithmeticError::Overflow)?;
308 }
309 Transaction::ExecuteOperation(_) => {
310 num_operations = num_operations
311 .checked_add(1)
312 .ok_or(ArithmeticError::Overflow)?;
313 }
314 }
315 }
316
317 self.num_incoming_bundles = self
318 .num_incoming_bundles
319 .checked_add(num_incoming_bundles)
320 .ok_or(ArithmeticError::Overflow)?;
321
322 self.num_operations = self
323 .num_operations
324 .checked_add(num_operations)
325 .ok_or(ArithmeticError::Overflow)?;
326
327 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
328 .map_err(|_| ArithmeticError::Overflow)?;
329 self.num_outgoing_messages = self
330 .num_outgoing_messages
331 .checked_add(num_outgoing_messages)
332 .ok_or(ArithmeticError::Overflow)?;
333
334 Ok(())
335 }
336}
337
338impl<C> ChainStateView<C>
339where
340 C: Context + Clone + 'static,
341 C::Extra: ExecutionRuntimeContext,
342{
343 pub fn chain_id(&self) -> ChainId {
345 self.context().extra().chain_id()
346 }
347
348 #[instrument(skip_all, fields(
349 chain_id = %self.chain_id(),
350 ))]
351 pub async fn query_application(
352 &mut self,
353 local_time: Timestamp,
354 query: Query,
355 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
356 ) -> Result<QueryOutcome, ChainError> {
357 let context = QueryContext {
358 chain_id: self.chain_id(),
359 next_block_height: self.tip_state.get().next_block_height,
360 local_time,
361 };
362 self.execution_state
363 .query_application(context, query, service_runtime_endpoint)
364 .await
365 .with_execution_context(ChainExecutionContext::Query)
366 }
367
368 #[instrument(skip_all, fields(
369 chain_id = %self.chain_id(),
370 application_id = %application_id
371 ))]
372 pub async fn describe_application(
373 &mut self,
374 application_id: ApplicationId,
375 ) -> Result<ApplicationDescription, ChainError> {
376 self.execution_state
377 .system
378 .describe_application(application_id, &mut TransactionTracker::default())
379 .await
380 .with_execution_context(ChainExecutionContext::DescribeApplication)
381 }
382
383 #[instrument(skip_all, fields(
384 chain_id = %self.chain_id(),
385 target = %target,
386 height = %height
387 ))]
388 pub async fn mark_messages_as_received(
389 &mut self,
390 target: &ChainId,
391 height: BlockHeight,
392 ) -> Result<bool, ChainError> {
393 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
394 let updates = outbox.mark_messages_as_received(height).await?;
395 if updates.is_empty() {
396 return Ok(false);
397 }
398 for update in updates {
399 let counter = self
400 .outbox_counters
401 .get_mut()
402 .get_mut(&update)
403 .ok_or_else(|| {
404 ChainError::CorruptedChainState("message counter should be present".into())
405 })?;
406 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
407 if *counter == 0 {
408 self.outbox_counters.get_mut().remove(&update);
410 }
411 }
412 if outbox.queue.count() == 0 {
413 self.nonempty_outboxes.get_mut().remove(target);
414 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
416 self.outboxes.remove_entry(target)?;
417 }
418 }
419 #[cfg(with_metrics)]
420 metrics::NUM_OUTBOXES
421 .with_label_values(&[])
422 .observe(self.nonempty_outboxes.get().len() as f64);
423 Ok(true)
424 }
425
426 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
429 tracing::debug!(
430 "Messages left in {:.8}'s outbox: {:?}",
431 self.chain_id(),
432 self.outbox_counters.get()
433 );
434 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
435 key > &height
436 } else {
437 true
438 }
439 }
440
441 pub async fn is_active(&self) -> Result<bool, ChainError> {
443 Ok(self.execution_state.system.is_active().await?)
444 }
445
446 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
448 let chain_id = self.chain_id();
449 if self
451 .execution_state
452 .system
453 .initialize_chain(chain_id)
454 .await
455 .with_execution_context(ChainExecutionContext::Block)?
456 {
457 return Ok(());
459 }
460 let hash = self.execution_state.crypto_hash_mut().await?;
462 self.execution_state_hash.set(Some(hash));
463 let maybe_committee = self
464 .execution_state
465 .system
466 .current_committee()
467 .await
468 .with_execution_context(ChainExecutionContext::Block)?;
469 self.manager.reset(
471 self.execution_state.system.ownership.get().await?.clone(),
472 BlockHeight(0),
473 local_time,
474 maybe_committee
475 .iter()
476 .flat_map(|(_, committee)| committee.account_keys_and_weights()),
477 )?;
478 Ok(())
479 }
480
481 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
485 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
486 return Ok(height.saturating_add(BlockHeight(1)));
487 }
488 Ok(self.tip_state.get().next_block_height)
489 }
490
491 #[instrument(skip_all, fields(
498 chain_id = %self.chain_id(),
499 origin = %origin,
500 bundle_height = %bundle.height
501 ))]
502 pub async fn receive_message_bundle_with_inbox(
503 &mut self,
504 inbox: &mut InboxStateView<C>,
505 origin: &ChainId,
506 bundle: MessageBundle,
507 local_time: Timestamp,
508 add_to_received_log: bool,
509 ) -> Result<(), ChainError> {
510 assert!(!bundle.messages.is_empty());
511 let chain_id = self.chain_id();
512 tracing::trace!(
513 "Processing new messages from {origin} at height {}",
514 bundle.height,
515 );
516 let chain_and_height = ChainAndHeight {
517 chain_id: *origin,
518 height: bundle.height,
519 };
520
521 match self.initialize_if_needed(local_time).await {
522 Ok(_) => (),
523 Err(ChainError::ExecutionError(exec_err, _))
526 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
527 if blobs.iter().all(|blob_id| {
528 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
529 })) => {}
530 err => {
531 return err;
532 }
533 }
534
535 let newly_added = inbox
537 .add_bundle(bundle)
538 .await
539 .map_err(|error| match error {
540 InboxError::ViewError(error) => ChainError::ViewError(error),
541 error => ChainError::CorruptedChainState(format!(
542 "while processing messages in certified block: {error}"
543 )),
544 })?;
545 if newly_added {
546 self.nonempty_inboxes.get_mut().insert(*origin);
547 }
548
549 if add_to_received_log {
551 self.received_log.push(chain_and_height);
552 }
553 Ok(())
554 }
555
556 pub fn update_received_certificate_trackers(
558 &mut self,
559 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
560 ) {
561 for (name, tracker) in new_trackers {
562 self.received_certificate_trackers
563 .get_mut()
564 .entry(name)
565 .and_modify(|t| {
566 if tracker > *t {
569 *t = tracker;
570 }
571 })
572 .or_insert(tracker);
573 }
574 }
575
576 pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
577 let chain_id = self.chain_id();
578 self.execution_state
579 .system
580 .current_committee()
581 .await
582 .with_execution_context(ChainExecutionContext::Block)?
583 .ok_or(ChainError::InactiveChain(chain_id))
584 }
585
586 pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
587 Ok(self.execution_state.system.ownership.get().await?)
588 }
589
590 #[instrument(skip_all, fields(
596 chain_id = %self.chain_id(),
597 ))]
598 pub async fn remove_bundles_from_inboxes(
599 &mut self,
600 timestamp: Timestamp,
601 must_be_present: bool,
602 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
603 ) -> Result<(), ChainError> {
604 let chain_id = self.chain_id();
605 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
606 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
607 ensure!(
608 bundle.timestamp <= timestamp,
609 ChainError::IncorrectBundleTimestamp {
610 chain_id,
611 bundle_timestamp: bundle.timestamp,
612 block_timestamp: timestamp,
613 }
614 );
615 let bundles = bundles_by_origin.entry(*origin).or_default();
616 bundles.push(bundle);
617 }
618 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
619 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
620 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
621 tracing::trace!(
622 "Removing [{}] from inbox for {origin}",
623 bundles
624 .iter()
625 .map(|bundle| bundle.height.to_string())
626 .collect::<Vec<_>>()
627 .join(", ")
628 );
629 for bundle in bundles {
630 let was_present = inbox
632 .remove_bundle(bundle)
633 .await
634 .map_err(|error| (chain_id, origin, error))?;
635 if must_be_present {
636 ensure!(
637 was_present,
638 ChainError::MissingCrossChainUpdate {
639 chain_id,
640 origin,
641 height: bundle.height,
642 }
643 );
644 }
645 }
646 inbox.observe_size_metric();
647 if inbox.added_bundles.count() == 0 {
648 self.nonempty_inboxes.get_mut().remove(&origin);
649 }
650 }
651 Ok(())
652 }
653
654 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
656 self.nonempty_outboxes.get().iter().copied().collect()
657 }
658
659 pub async fn load_outboxes(
661 &self,
662 targets: &[ChainId],
663 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
664 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
665 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
666 optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
667 }
668
669 #[allow(clippy::too_many_arguments)]
671 #[instrument(skip_all, fields(
672 chain_id = %block.chain_id,
673 block_height = %block.height
674 ))]
675 async fn execute_block_inner(
676 chain: &mut ExecutionStateView<C>,
677 confirmed_log: &LogView<C, CryptoHash>,
678 block: &mut ProposedBlock,
679 local_time: Timestamp,
680 round: Option<u32>,
681 published_blobs: &[Blob],
682 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
683 exec_policy: BundleExecutionPolicy,
684 ) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
685 if !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort) {
688 assert!(
689 replaying_oracle_responses.is_none(),
690 "Cannot use AutoRetry policy when replaying oracle responses"
691 );
692 }
693
694 #[cfg(with_metrics)]
695 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
696 chain.system.timestamp.set(block.timestamp);
697
698 let committee_policy = chain
699 .system
700 .current_committee()
701 .await
702 .with_execution_context(ChainExecutionContext::Block)?
703 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
704 .1
705 .policy()
706 .clone();
707
708 let mut resource_controller = ResourceController::new(
709 Arc::new(committee_policy),
710 ResourceTracker::default(),
711 block.authenticated_owner,
712 );
713
714 for blob in published_blobs {
715 let blob_id = blob.id();
716 resource_controller
717 .policy()
718 .check_blob_size(blob.content())
719 .with_execution_context(ChainExecutionContext::Block)?;
720 chain.system.used_blobs.insert(&blob_id)?;
721 }
722
723 let mut block_execution_tracker = BlockExecutionTracker::new(
724 &mut resource_controller,
725 published_blobs
726 .iter()
727 .map(|blob| (blob.id(), blob))
728 .collect(),
729 local_time,
730 replaying_oracle_responses,
731 block,
732 )?;
733
734 let max_failures = match exec_policy.on_failure {
736 BundleFailurePolicy::Abort => 0,
737 BundleFailurePolicy::AutoRetry { max_failures } => max_failures,
738 };
739 let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
740 let mut failure_count = 0u32;
741
742 let time_budget = exec_policy.time_budget;
743 let mut cumulative_bundle_time = Duration::ZERO;
744
745 let mut i = 0;
746 while i < block.transactions.len() {
747 let transaction = &mut block.transactions[i];
748 let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
749
750 if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
752 info!(
753 ?cumulative_bundle_time,
754 ?time_budget,
755 "Time budget for bundle staging exceeded, discarding remaining bundles"
756 );
757 Self::discard_remaining_bundles(block, i, None);
758 continue;
759 }
760
761 let checkpoint = if auto_retry && is_bundle {
763 Some((
764 chain.clone_unchecked()?,
765 block_execution_tracker.create_checkpoint(),
766 ))
767 } else {
768 None
769 };
770
771 let bundle_start = if is_bundle && time_budget.is_some() {
772 Some(Instant::now())
773 } else {
774 None
775 };
776
777 let result = block_execution_tracker
778 .execute_transaction(&*transaction, round, chain)
779 .await;
780
781 if let Some(start) = bundle_start {
782 cumulative_bundle_time += start.elapsed();
783 }
784
785 let (error, context, incoming_bundle, saved_chain, saved_tracker) =
790 match (result, transaction, checkpoint) {
791 (Ok(()), _, _) => {
792 i += 1;
793 continue;
794 }
795 (
796 Err(ChainError::ExecutionError(error, context)),
797 Transaction::ReceiveMessages(incoming_bundle),
798 Some((saved_chain, saved_tracker)),
799 ) if !error.is_transient_error() => {
800 (error, context, incoming_bundle, saved_chain, saved_tracker)
801 }
802 (Err(e), _, _) => return Err(e),
803 };
804
805 *chain = saved_chain;
807 block_execution_tracker.restore_checkpoint(&saved_tracker);
808
809 if error.is_limit_error() && i > 0 {
810 failure_count += 1;
811 let maybe_sender = if failure_count > max_failures {
813 info!(
814 failure_count,
815 max_failures,
816 "Exceeded max bundle failures, discarding all remaining message bundles"
817 );
818 None
819 } else {
820 info!(
822 %error,
823 index = i,
824 origin = %incoming_bundle.origin,
825 "Message bundle exceeded block limits and will be discarded for \
826 retry in a later block"
827 );
828 Some(incoming_bundle.origin)
829 };
830 Self::discard_remaining_bundles(block, i, maybe_sender);
831 } else if incoming_bundle.bundle.is_protected()
833 || incoming_bundle.action == MessageAction::Reject
834 {
835 return Err(ChainError::ExecutionError(error, context));
837 } else {
838 info!(
841 %error,
842 index = i,
843 origin = %incoming_bundle.origin,
844 "Message bundle failed to execute and will be rejected"
845 );
846 incoming_bundle.action = MessageAction::Reject;
847 }
849 }
850
851 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
854
855 let recipients = block_execution_tracker.recipients();
856 let mut recipient_heights = Vec::new();
857 let mut indices = Vec::new();
858 for (recipient, height) in chain
859 .previous_message_blocks
860 .multi_get_pairs(recipients)
861 .await?
862 {
863 chain
864 .previous_message_blocks
865 .insert(&recipient, block.height)?;
866 if let Some(height) = height {
867 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
868 indices.push(index);
869 recipient_heights.push((recipient, height));
870 }
871 }
872 let hashes = confirmed_log.multi_get(indices).await?;
873 let mut previous_message_blocks = BTreeMap::new();
874 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
875 let hash = hash.ok_or_else(|| {
876 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
877 })?;
878 previous_message_blocks.insert(recipient, (hash, height));
879 }
880
881 let streams = block_execution_tracker.event_streams();
882 let mut stream_heights = Vec::new();
883 let mut indices = Vec::new();
884 for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
885 chain.previous_event_blocks.insert(&stream, block.height)?;
886 if let Some(height) = height {
887 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
888 indices.push(index);
889 stream_heights.push((stream, height));
890 }
891 }
892 let hashes = confirmed_log.multi_get(indices).await?;
893 let mut previous_event_blocks = BTreeMap::new();
894 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
895 let hash = hash.ok_or_else(|| {
896 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
897 })?;
898 previous_event_blocks.insert(stream, (hash, height));
899 }
900
901 let state_hash = {
902 #[cfg(with_metrics)]
903 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
904 chain.crypto_hash_mut().await?
905 };
906
907 let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
908 block_execution_tracker.finalize(block.transactions.len());
909
910 Ok((
911 BlockExecutionOutcome {
912 messages,
913 previous_message_blocks,
914 previous_event_blocks,
915 state_hash,
916 oracle_responses,
917 events,
918 blobs,
919 operation_results,
920 },
921 resource_tracker,
922 ))
923 }
924
925 fn discard_remaining_bundles(
927 block: &mut ProposedBlock,
928 mut index: usize,
929 maybe_origin: Option<ChainId>,
930 ) {
931 while index < block.transactions.len() {
932 if matches!(
933 &block.transactions[index],
934 Transaction::ReceiveMessages(bundle)
935 if maybe_origin.is_none_or(|origin| bundle.origin == origin)
936 ) {
937 block.transactions.remove(index);
938 } else {
939 index += 1;
940 }
941 }
942 }
943
944 #[instrument(skip_all, fields(
955 chain_id = %self.chain_id(),
956 block_height = %block.height
957 ))]
958 pub async fn execute_block(
959 &mut self,
960 mut block: ProposedBlock,
961 local_time: Timestamp,
962 round: Option<u32>,
963 published_blobs: &[Blob],
964 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
965 policy: BundleExecutionPolicy,
966 ) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
967 assert_eq!(
968 block.chain_id,
969 self.execution_state.context().extra().chain_id()
970 );
971
972 self.initialize_if_needed(local_time).await?;
973
974 let chain_timestamp = *self.execution_state.system.timestamp.get();
975 ensure!(
976 chain_timestamp <= block.timestamp,
977 ChainError::InvalidBlockTimestamp {
978 parent: chain_timestamp,
979 new: block.timestamp
980 }
981 );
982 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
983
984 ensure!(
985 block.published_blob_ids()
986 == published_blobs
987 .iter()
988 .map(|blob| blob.id())
989 .collect::<BTreeSet<_>>(),
990 ChainError::InternalError("published_blobs mismatch".to_string())
991 );
992
993 if *self.execution_state.system.closed.get() {
994 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
995 }
996
997 Self::check_app_permissions(
998 self.execution_state
999 .system
1000 .application_permissions
1001 .get()
1002 .await?,
1003 &block,
1004 )?;
1005
1006 Self::execute_block_inner(
1007 &mut self.execution_state,
1008 &self.confirmed_log,
1009 &mut block,
1010 local_time,
1011 round,
1012 published_blobs,
1013 replaying_oracle_responses,
1014 policy,
1015 )
1016 .await
1017 .map(|(outcome, tracker)| (block, outcome, tracker))
1018 }
1019
1020 #[instrument(skip_all, fields(
1024 chain_id = %self.chain_id(),
1025 block_height = %block.inner().inner().header.height
1026 ))]
1027 pub async fn apply_confirmed_block(
1028 &mut self,
1029 block: &ConfirmedBlock,
1030 local_time: Timestamp,
1031 ) -> Result<BTreeSet<StreamId>, ChainError> {
1032 let hash = block.inner().hash();
1033 let block = block.inner().inner();
1034 if block.header.height == BlockHeight::ZERO {
1035 self.block_zero_executed_at.set(local_time);
1036 }
1037 self.execution_state_hash.set(Some(block.header.state_hash));
1038 let updated_streams = self.process_emitted_events(block).await?;
1039 self.process_outgoing_messages(block).await?;
1040
1041 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1043 .await?;
1044
1045 let tip = self.tip_state.get_mut();
1047 tip.block_hash = Some(hash);
1048 tip.next_block_height.try_add_assign_one()?;
1049 tip.update_counters(&block.body.transactions, &block.body.messages)?;
1050 self.confirmed_log.push(hash);
1051 self.preprocessed_blocks.remove(&block.header.height)?;
1052 Ok(updated_streams)
1053 }
1054
1055 #[instrument(skip_all, fields(
1058 chain_id = %self.chain_id(),
1059 block_height = %block.inner().inner().header.height
1060 ))]
1061 pub async fn preprocess_block(
1062 &mut self,
1063 block: &ConfirmedBlock,
1064 ) -> Result<BTreeSet<StreamId>, ChainError> {
1065 let hash = block.inner().hash();
1066 let block = block.inner().inner();
1067 let height = block.header.height;
1068 if height < self.tip_state.get().next_block_height {
1069 return Ok(BTreeSet::new());
1070 }
1071 self.process_outgoing_messages(block).await?;
1072 let updated_streams = self.process_emitted_events(block).await?;
1073 self.preprocessed_blocks.insert(&height, hash)?;
1074 Ok(updated_streams)
1075 }
1076
1077 #[instrument(skip_all, fields(
1079 block_height = %block.height,
1080 num_transactions = %block.transactions.len()
1081 ))]
1082 fn check_app_permissions(
1083 app_permissions: &ApplicationPermissions,
1084 block: &ProposedBlock,
1085 ) -> Result<(), ChainError> {
1086 let mut mandatory = app_permissions
1087 .mandatory_applications
1088 .iter()
1089 .copied()
1090 .collect::<HashSet<ApplicationId>>();
1091 for transaction in &block.transactions {
1092 match transaction {
1093 Transaction::ExecuteOperation(operation)
1094 if operation.is_exempt_from_permissions() =>
1095 {
1096 mandatory.clear()
1097 }
1098 Transaction::ExecuteOperation(operation) => {
1099 ensure!(
1100 app_permissions.can_execute_operations(&operation.application_id()),
1101 ChainError::AuthorizedApplications(
1102 app_permissions.execute_operations.clone().unwrap()
1103 )
1104 );
1105 if let Operation::User { application_id, .. } = operation {
1106 mandatory.remove(application_id);
1107 }
1108 }
1109 Transaction::ReceiveMessages(incoming_bundle)
1110 if incoming_bundle.action == MessageAction::Accept =>
1111 {
1112 for pending in incoming_bundle.messages() {
1113 if let Message::User { application_id, .. } = &pending.message {
1114 mandatory.remove(application_id);
1115 }
1116 }
1117 }
1118 Transaction::ReceiveMessages(_) => {}
1119 }
1120 }
1121 ensure!(
1122 mandatory.is_empty(),
1123 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1124 );
1125 Ok(())
1126 }
1127
1128 #[instrument(skip_all, fields(
1133 chain_id = %self.chain_id(),
1134 next_block_height = %self.tip_state.get().next_block_height,
1135 ))]
1136 pub async fn block_hashes(
1137 &self,
1138 heights: impl IntoIterator<Item = BlockHeight>,
1139 ) -> Result<Vec<CryptoHash>, ChainError> {
1140 let next_height = self.tip_state.get().next_block_height;
1141 let (confirmed_heights, unconfirmed_heights) = heights
1143 .into_iter()
1144 .partition::<Vec<_>, _>(|height| *height < next_height);
1145 let confirmed_indices = confirmed_heights
1146 .into_iter()
1147 .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1148 .collect::<Result<_, _>>()?;
1149 let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1150 let unconfirmed_hashes = self
1152 .preprocessed_blocks
1153 .multi_get(&unconfirmed_heights)
1154 .await?;
1155 Ok(confirmed_hashes
1156 .into_iter()
1157 .chain(unconfirmed_hashes)
1158 .flatten()
1159 .collect())
1160 }
1161
1162 async fn reset_chain_manager(
1164 &mut self,
1165 next_height: BlockHeight,
1166 local_time: Timestamp,
1167 ) -> Result<(), ChainError> {
1168 let maybe_committee = self
1169 .execution_state
1170 .system
1171 .current_committee()
1172 .await
1173 .with_execution_context(ChainExecutionContext::Block)?;
1174 let ownership = self.execution_state.system.ownership.get().await?.clone();
1175 let fallback_owners = maybe_committee
1176 .iter()
1177 .flat_map(|(_, committee)| committee.account_keys_and_weights());
1178 self.pending_validated_blobs.clear();
1179 self.pending_proposed_blobs.clear();
1180 self.manager
1181 .reset(ownership, next_height, local_time, fallback_owners)
1182 }
1183
1184 #[instrument(skip_all, fields(
1188 chain_id = %self.chain_id(),
1189 block_height = %block.header.height
1190 ))]
1191 async fn process_outgoing_messages(
1192 &mut self,
1193 block: &Block,
1194 ) -> Result<Vec<ChainId>, ChainError> {
1195 let recipients = block.recipients();
1198 let block_height = block.header.height;
1199 let next_height = self.tip_state.get().next_block_height;
1200
1201 let outbox_counters = self.outbox_counters.get_mut();
1203 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1204 let targets = recipients.into_iter().collect::<Vec<_>>();
1205 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1206 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1207 if block_height > next_height {
1208 if *outbox.next_height_to_schedule.get() > block_height {
1211 continue; }
1213 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1214 {
1215 Some(height) if height < next_height => {
1218 let index =
1219 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1220 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1221 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
1222 })?)
1223 }
1224 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1227 || {
1228 ChainError::CorruptedChainState(
1229 "missing entry in preprocessed_blocks".into(),
1230 )
1231 },
1232 )?),
1233 None => None, };
1235 match (
1237 maybe_prev_hash,
1238 block.body.previous_message_blocks.get(target),
1239 ) {
1240 (None, None) => {
1241 }
1244 (Some(_), None) => {
1245 return Err(ChainError::CorruptedChainState(
1248 "block indicates no previous message block,\
1249 but we have one in the outbox"
1250 .into(),
1251 ));
1252 }
1253 (None, Some((_, prev_msg_block_height))) => {
1254 if *prev_msg_block_height >= next_height {
1259 continue;
1260 }
1261 }
1262 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1263 if prev_hash != prev_msg_block_hash {
1265 continue;
1266 }
1267 }
1268 }
1269 }
1270 if outbox.schedule_message(block_height)? {
1271 *outbox_counters.entry(block_height).or_default() += 1;
1272 nonempty_outboxes.insert(*target);
1273 }
1274 #[cfg(with_metrics)]
1275 crate::outbox::metrics::OUTBOX_SIZE
1276 .with_label_values(&[])
1277 .observe(outbox.queue.count() as f64);
1278 }
1279
1280 #[cfg(with_metrics)]
1281 metrics::NUM_OUTBOXES
1282 .with_label_values(&[])
1283 .observe(nonempty_outboxes.len() as f64);
1284 Ok(targets)
1285 }
1286
1287 #[instrument(skip_all, fields(
1291 chain_id = %self.chain_id(),
1292 block_height = %block.header.height
1293 ))]
1294 async fn process_emitted_events(
1295 &mut self,
1296 block: &Block,
1297 ) -> Result<BTreeSet<StreamId>, ChainError> {
1298 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1299 for event in block.body.events.iter().flatten() {
1300 emitted_streams
1301 .entry(event.stream_id.clone())
1302 .or_default()
1303 .insert(event.index);
1304 }
1305 let mut stream_ids = Vec::new();
1306 let mut list_indices = Vec::new();
1307 for (stream_id, indices) in emitted_streams {
1308 stream_ids.push(stream_id);
1309 list_indices.push(indices);
1310 }
1311
1312 let mut updated_streams = BTreeSet::new();
1313 for ((stream_id, next_index), indices) in self
1314 .next_expected_events
1315 .multi_get_pairs(stream_ids)
1316 .await?
1317 .into_iter()
1318 .zip(list_indices)
1319 {
1320 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1321 1
1323 } else {
1324 0
1325 };
1326 let mut current_expected_index = next_index.unwrap_or(initial_index);
1327 for index in indices {
1328 if index == current_expected_index {
1329 updated_streams.insert(stream_id.clone());
1330 current_expected_index = index.saturating_add(1);
1331 }
1332 }
1333 if current_expected_index != 0 {
1334 self.next_expected_events
1335 .insert(&stream_id, current_expected_index)?;
1336 }
1337 }
1338 Ok(updated_streams)
1339 }
1340}
1341
1342#[test]
1343fn empty_block_size() {
1344 let size = bcs::serialized_size(&crate::block::Block::new(
1345 crate::test::make_first_block(
1346 linera_execution::test_utils::dummy_chain_description(0).id(),
1347 ),
1348 crate::data_types::BlockExecutionOutcome::default(),
1349 ))
1350 .unwrap();
1351 assert_eq!(size, EMPTY_BLOCK_SIZE);
1352}