1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14 OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19};
20use linera_execution::{
21 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
22 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
23 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26 context::Context,
27 log_view::LogView,
28 map_view::MapView,
29 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
30 register_view::RegisterView,
31 views::{ClonableView, RootView, View},
32};
33use serde::{Deserialize, Serialize};
34use tracing::{info, instrument};
35
36use crate::{
37 block::{Block, ConfirmedBlock},
38 block_tracker::BlockExecutionTracker,
39 data_types::{
40 BlockExecutionOutcome, BundleExecutionPolicy, ChainAndHeight, IncomingBundle,
41 MessageAction, MessageBundle, ProposedBlock, Transaction,
42 },
43 inbox::{InboxError, InboxStateView},
44 manager::ChainManager,
45 outbox::OutboxStateView,
46 pending_blobs::PendingBlobsView,
47 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
48};
49
50#[cfg(test)]
51#[path = "unit_tests/chain_tests.rs"]
52mod chain_tests;
53
54#[cfg(with_metrics)]
55use linera_base::prometheus_util::MeasureLatency;
56
57#[cfg(with_metrics)]
58pub(crate) mod metrics {
59 use std::sync::LazyLock;
60
61 use linera_base::prometheus_util::{
62 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
63 register_int_counter_vec,
64 };
65 use linera_execution::ResourceTracker;
66 use prometheus::{HistogramVec, IntCounterVec};
67
68 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
70 });
71
72 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
73 register_histogram_vec(
74 "block_execution_latency",
75 "Block execution latency",
76 &[],
77 exponential_bucket_interval(50.0_f64, 10_000_000.0),
78 )
79 });
80
81 #[cfg(with_metrics)]
82 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
83 register_histogram_vec(
84 "message_execution_latency",
85 "Message execution latency",
86 &[],
87 exponential_bucket_interval(0.1_f64, 50_000.0),
88 )
89 });
90
91 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
92 register_histogram_vec(
93 "operation_execution_latency",
94 "Operation execution latency",
95 &[],
96 exponential_bucket_interval(0.1_f64, 50_000.0),
97 )
98 });
99
100 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
101 register_histogram_vec(
102 "wasm_fuel_used_per_block",
103 "Wasm fuel used per block",
104 &[],
105 exponential_bucket_interval(10.0, 1_000_000.0),
106 )
107 });
108
109 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
110 register_histogram_vec(
111 "evm_fuel_used_per_block",
112 "EVM fuel used per block",
113 &[],
114 exponential_bucket_interval(10.0, 1_000_000.0),
115 )
116 });
117
118 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
119 register_histogram_vec(
120 "vm_num_reads_per_block",
121 "VM number of reads per block",
122 &[],
123 exponential_bucket_interval(0.1, 100.0),
124 )
125 });
126
127 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128 register_histogram_vec(
129 "vm_bytes_read_per_block",
130 "VM number of bytes read per block",
131 &[],
132 exponential_bucket_interval(0.1, 10_000_000.0),
133 )
134 });
135
136 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
137 register_histogram_vec(
138 "vm_bytes_written_per_block",
139 "VM number of bytes written per block",
140 &[],
141 exponential_bucket_interval(0.1, 10_000_000.0),
142 )
143 });
144
145 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
146 register_histogram_vec(
147 "state_hash_computation_latency",
148 "Time to recompute the state hash",
149 &[],
150 exponential_bucket_latencies(2000.0),
151 )
152 });
153
154 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
155 register_histogram_vec(
156 "num_inboxes",
157 "Number of inboxes",
158 &[],
159 exponential_bucket_interval(1.0, 10_000.0),
160 )
161 });
162
163 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
164 register_histogram_vec(
165 "num_outboxes",
166 "Number of outboxes",
167 &[],
168 exponential_bucket_interval(1.0, 10_000.0),
169 )
170 });
171
172 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
174 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
175 WASM_FUEL_USED_PER_BLOCK
176 .with_label_values(&[])
177 .observe(tracker.wasm_fuel as f64);
178 EVM_FUEL_USED_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.evm_fuel as f64);
181 VM_NUM_READS_PER_BLOCK
182 .with_label_values(&[])
183 .observe(tracker.read_operations as f64);
184 VM_BYTES_READ_PER_BLOCK
185 .with_label_values(&[])
186 .observe(tracker.bytes_read as f64);
187 VM_BYTES_WRITTEN_PER_BLOCK
188 .with_label_values(&[])
189 .observe(tracker.bytes_written as f64);
190 }
191}
192
193pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
195
196#[cfg_attr(
198 with_graphql,
199 derive(async_graphql::SimpleObject),
200 graphql(cache_control(no_cache))
201)]
202#[derive(Debug, RootView, ClonableView, Allocative)]
203#[allocative(bound = "C")]
204pub struct ChainStateView<C>
205where
206 C: Clone + Context + 'static,
207{
208 pub execution_state: ExecutionStateView<C>,
210 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
212
213 pub tip_state: RegisterView<C, ChainTipState>,
215
216 pub manager: ChainManager<C>,
218 pub pending_validated_blobs: PendingBlobsView<C>,
221 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
223
224 pub confirmed_log: LogView<C, CryptoHash>,
227 pub received_log: LogView<C, ChainAndHeight>,
229 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
231
232 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
234 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
236 pub next_expected_events: MapView<C, StreamId, u32>,
239 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
242 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
244
245 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
247}
248
249#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
251#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
252pub struct ChainTipState {
253 pub block_hash: Option<CryptoHash>,
255 pub next_block_height: BlockHeight,
257 pub num_incoming_bundles: u32,
259 pub num_operations: u32,
261 pub num_outgoing_messages: u32,
263}
264
265impl ChainTipState {
266 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
269 ensure!(
270 new_block.height == self.next_block_height,
271 ChainError::UnexpectedBlockHeight {
272 expected_block_height: self.next_block_height,
273 found_block_height: new_block.height
274 }
275 );
276 ensure!(
277 new_block.previous_block_hash == self.block_hash,
278 ChainError::UnexpectedPreviousBlockHash
279 );
280 Ok(())
281 }
282
283 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
286 ensure!(
287 self.next_block_height >= height,
288 ChainError::MissingEarlierBlocks {
289 current_block_height: self.next_block_height,
290 }
291 );
292 Ok(self.next_block_height > height)
293 }
294
295 pub fn update_counters(
297 &mut self,
298 transactions: &[Transaction],
299 messages: &[Vec<OutgoingMessage>],
300 ) -> Result<(), ChainError> {
301 let mut num_incoming_bundles = 0u32;
302 let mut num_operations = 0u32;
303
304 for transaction in transactions {
305 match transaction {
306 Transaction::ReceiveMessages(_) => {
307 num_incoming_bundles = num_incoming_bundles
308 .checked_add(1)
309 .ok_or(ArithmeticError::Overflow)?;
310 }
311 Transaction::ExecuteOperation(_) => {
312 num_operations = num_operations
313 .checked_add(1)
314 .ok_or(ArithmeticError::Overflow)?;
315 }
316 }
317 }
318
319 self.num_incoming_bundles = self
320 .num_incoming_bundles
321 .checked_add(num_incoming_bundles)
322 .ok_or(ArithmeticError::Overflow)?;
323
324 self.num_operations = self
325 .num_operations
326 .checked_add(num_operations)
327 .ok_or(ArithmeticError::Overflow)?;
328
329 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
330 .map_err(|_| ArithmeticError::Overflow)?;
331 self.num_outgoing_messages = self
332 .num_outgoing_messages
333 .checked_add(num_outgoing_messages)
334 .ok_or(ArithmeticError::Overflow)?;
335
336 Ok(())
337 }
338}
339
340impl<C> ChainStateView<C>
341where
342 C: Context + Clone + 'static,
343 C::Extra: ExecutionRuntimeContext,
344{
345 pub fn chain_id(&self) -> ChainId {
347 self.context().extra().chain_id()
348 }
349
350 #[instrument(skip_all, fields(
351 chain_id = %self.chain_id(),
352 ))]
353 pub async fn query_application(
354 &mut self,
355 local_time: Timestamp,
356 query: Query,
357 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
358 ) -> Result<QueryOutcome, ChainError> {
359 let context = QueryContext {
360 chain_id: self.chain_id(),
361 next_block_height: self.tip_state.get().next_block_height,
362 local_time,
363 };
364 self.execution_state
365 .query_application(context, query, service_runtime_endpoint)
366 .await
367 .with_execution_context(ChainExecutionContext::Query)
368 }
369
370 #[instrument(skip_all, fields(
371 chain_id = %self.chain_id(),
372 application_id = %application_id
373 ))]
374 pub async fn describe_application(
375 &mut self,
376 application_id: ApplicationId,
377 ) -> Result<ApplicationDescription, ChainError> {
378 self.execution_state
379 .system
380 .describe_application(application_id, &mut TransactionTracker::default())
381 .await
382 .with_execution_context(ChainExecutionContext::DescribeApplication)
383 }
384
385 #[instrument(skip_all, fields(
386 chain_id = %self.chain_id(),
387 target = %target,
388 height = %height
389 ))]
390 pub async fn mark_messages_as_received(
391 &mut self,
392 target: &ChainId,
393 height: BlockHeight,
394 ) -> Result<bool, ChainError> {
395 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
396 let updates = outbox.mark_messages_as_received(height).await?;
397 if updates.is_empty() {
398 return Ok(false);
399 }
400 for update in updates {
401 let counter = self
402 .outbox_counters
403 .get_mut()
404 .get_mut(&update)
405 .ok_or_else(|| {
406 ChainError::InternalError("message counter should be present".into())
407 })?;
408 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
409 if *counter == 0 {
410 self.outbox_counters.get_mut().remove(&update);
412 }
413 }
414 if outbox.queue.count() == 0 {
415 self.nonempty_outboxes.get_mut().remove(target);
416 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
418 self.outboxes.remove_entry(target)?;
419 }
420 }
421 #[cfg(with_metrics)]
422 metrics::NUM_OUTBOXES
423 .with_label_values(&[])
424 .observe(self.outboxes.count().await? as f64);
425 Ok(true)
426 }
427
428 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
431 tracing::debug!(
432 "Messages left in {:.8}'s outbox: {:?}",
433 self.chain_id(),
434 self.outbox_counters.get()
435 );
436 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
437 key > &height
438 } else {
439 true
440 }
441 }
442
443 pub fn is_active(&self) -> bool {
445 self.execution_state.system.is_active()
446 }
447
448 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
450 let chain_id = self.chain_id();
451 if self
453 .execution_state
454 .system
455 .initialize_chain(chain_id)
456 .await
457 .with_execution_context(ChainExecutionContext::Block)?
458 {
459 return Ok(());
461 }
462 let hash = self.execution_state.crypto_hash_mut().await?;
464 self.execution_state_hash.set(Some(hash));
465 let maybe_committee = self.execution_state.system.current_committee().into_iter();
466 self.manager.reset(
468 self.execution_state.system.ownership.get().clone(),
469 BlockHeight(0),
470 local_time,
471 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
472 )?;
473 Ok(())
474 }
475
476 pub async fn next_block_height_to_receive(
477 &self,
478 origin: &ChainId,
479 ) -> Result<BlockHeight, ChainError> {
480 let inbox = self.inboxes.try_load_entry(origin).await?;
481 match inbox {
482 Some(inbox) => inbox.next_block_height_to_receive(),
483 None => Ok(BlockHeight::ZERO),
484 }
485 }
486
487 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
491 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
492 return Ok(height.saturating_add(BlockHeight(1)));
493 }
494 Ok(self.tip_state.get().next_block_height)
495 }
496
497 pub async fn last_anticipated_block_height(
498 &self,
499 origin: &ChainId,
500 ) -> Result<Option<BlockHeight>, ChainError> {
501 let inbox = self.inboxes.try_load_entry(origin).await?;
502 match inbox {
503 Some(inbox) => match inbox.removed_bundles.back().await? {
504 Some(bundle) => Ok(Some(bundle.height)),
505 None => Ok(None),
506 },
507 None => Ok(None),
508 }
509 }
510
511 #[instrument(skip_all, fields(
518 chain_id = %self.chain_id(),
519 origin = %origin,
520 bundle_height = %bundle.height
521 ))]
522 pub async fn receive_message_bundle(
523 &mut self,
524 origin: &ChainId,
525 bundle: MessageBundle,
526 local_time: Timestamp,
527 add_to_received_log: bool,
528 ) -> Result<(), ChainError> {
529 assert!(!bundle.messages.is_empty());
530 let chain_id = self.chain_id();
531 tracing::trace!(
532 "Processing new messages to {chain_id:.8} from {origin} at height {}",
533 bundle.height,
534 );
535 let chain_and_height = ChainAndHeight {
536 chain_id: *origin,
537 height: bundle.height,
538 };
539
540 match self.initialize_if_needed(local_time).await {
541 Ok(_) => (),
542 Err(ChainError::ExecutionError(exec_err, _))
545 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
546 if blobs.iter().all(|blob_id| {
547 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
548 })) => {}
549 err => {
550 return err;
551 }
552 }
553
554 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
556 #[cfg(with_metrics)]
557 metrics::NUM_INBOXES
558 .with_label_values(&[])
559 .observe(self.inboxes.count().await? as f64);
560 inbox
561 .add_bundle(bundle)
562 .await
563 .map_err(|error| match error {
564 InboxError::ViewError(error) => ChainError::ViewError(error),
565 error => ChainError::InternalError(format!(
566 "while processing messages in certified block: {error}"
567 )),
568 })?;
569
570 if add_to_received_log {
572 self.received_log.push(chain_and_height);
573 }
574 Ok(())
575 }
576
577 pub fn update_received_certificate_trackers(
579 &mut self,
580 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
581 ) {
582 for (name, tracker) in new_trackers {
583 self.received_certificate_trackers
584 .get_mut()
585 .entry(name)
586 .and_modify(|t| {
587 if tracker > *t {
590 *t = tracker;
591 }
592 })
593 .or_insert(tracker);
594 }
595 }
596
597 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
598 self.execution_state
599 .system
600 .current_committee()
601 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
602 }
603
604 pub fn ownership(&self) -> &ChainOwnership {
605 self.execution_state.system.ownership.get()
606 }
607
608 #[instrument(skip_all, fields(
614 chain_id = %self.chain_id(),
615 ))]
616 pub async fn remove_bundles_from_inboxes(
617 &mut self,
618 timestamp: Timestamp,
619 must_be_present: bool,
620 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
621 ) -> Result<(), ChainError> {
622 let chain_id = self.chain_id();
623 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
624 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
625 ensure!(
626 bundle.timestamp <= timestamp,
627 ChainError::IncorrectBundleTimestamp {
628 chain_id,
629 bundle_timestamp: bundle.timestamp,
630 block_timestamp: timestamp,
631 }
632 );
633 let bundles = bundles_by_origin.entry(*origin).or_default();
634 bundles.push(bundle);
635 }
636 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
637 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
638 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
639 tracing::trace!(
640 "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
641 bundles
642 .iter()
643 .map(|bundle| bundle.height.to_string())
644 .collect::<Vec<_>>()
645 .join(", ")
646 );
647 for bundle in bundles {
648 let was_present = inbox
650 .remove_bundle(bundle)
651 .await
652 .map_err(|error| (chain_id, origin, error))?;
653 if must_be_present {
654 ensure!(
655 was_present,
656 ChainError::MissingCrossChainUpdate {
657 chain_id,
658 origin,
659 height: bundle.height,
660 }
661 );
662 }
663 }
664 }
665 #[cfg(with_metrics)]
666 metrics::NUM_INBOXES
667 .with_label_values(&[])
668 .observe(self.inboxes.count().await? as f64);
669 Ok(())
670 }
671
672 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
674 self.nonempty_outboxes.get().iter().copied().collect()
675 }
676
677 pub async fn load_outboxes(
679 &self,
680 targets: &[ChainId],
681 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
682 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
683 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
684 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
685 }
686
687 #[allow(clippy::too_many_arguments)]
689 #[instrument(skip_all, fields(
690 chain_id = %block.chain_id,
691 block_height = %block.height
692 ))]
693 async fn execute_block_inner(
694 chain: &mut ExecutionStateView<C>,
695 confirmed_log: &LogView<C, CryptoHash>,
696 block: &mut ProposedBlock,
697 local_time: Timestamp,
698 round: Option<u32>,
699 published_blobs: &[Blob],
700 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
701 exec_policy: BundleExecutionPolicy,
702 ) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
703 if !matches!(exec_policy, BundleExecutionPolicy::Abort) {
706 assert!(
707 replaying_oracle_responses.is_none(),
708 "Cannot use AutoRetry policy when replaying oracle responses"
709 );
710 }
711
712 #[cfg(with_metrics)]
713 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
714 chain.system.timestamp.set(block.timestamp);
715
716 let committee_policy = chain
717 .system
718 .current_committee()
719 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
720 .1
721 .policy()
722 .clone();
723
724 let mut resource_controller = ResourceController::new(
725 Arc::new(committee_policy),
726 ResourceTracker::default(),
727 block.authenticated_owner,
728 );
729
730 for blob in published_blobs {
731 let blob_id = blob.id();
732 resource_controller
733 .policy()
734 .check_blob_size(blob.content())
735 .with_execution_context(ChainExecutionContext::Block)?;
736 chain.system.used_blobs.insert(&blob_id)?;
737 }
738
739 let mut block_execution_tracker = BlockExecutionTracker::new(
740 &mut resource_controller,
741 published_blobs
742 .iter()
743 .map(|blob| (blob.id(), blob))
744 .collect(),
745 local_time,
746 replaying_oracle_responses,
747 block,
748 )?;
749
750 let max_failures = match exec_policy {
752 BundleExecutionPolicy::Abort => 0,
753 BundleExecutionPolicy::AutoRetry { max_failures } => max_failures,
754 };
755 let auto_retry = !matches!(exec_policy, BundleExecutionPolicy::Abort);
756 let mut failure_count = 0u32;
757
758 let mut i = 0;
759 while i < block.transactions.len() {
760 let transaction = &mut block.transactions[i];
761 let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
762
763 let checkpoint = if auto_retry && is_bundle {
765 Some((
766 chain.clone_unchecked()?,
767 block_execution_tracker.create_checkpoint(),
768 ))
769 } else {
770 None
771 };
772
773 let result = block_execution_tracker
774 .execute_transaction(&*transaction, round, chain)
775 .await;
776
777 let (error, context, incoming_bundle, saved_chain, saved_tracker) =
782 match (result, transaction, checkpoint) {
783 (Ok(()), _, _) => {
784 i += 1;
785 continue;
786 }
787 (
788 Err(ChainError::ExecutionError(error, context)),
789 Transaction::ReceiveMessages(incoming_bundle),
790 Some((saved_chain, saved_tracker)),
791 ) if !error.is_transient_error() => {
792 (error, context, incoming_bundle, saved_chain, saved_tracker)
793 }
794 (Err(e), _, _) => return Err(e),
795 };
796
797 *chain = saved_chain;
799 block_execution_tracker.restore_checkpoint(&saved_tracker);
800
801 if error.is_limit_error() && i > 0 {
802 failure_count += 1;
803 let maybe_sender = if failure_count > max_failures {
805 info!(
806 failure_count,
807 max_failures,
808 "Exceeded max bundle failures, discarding all remaining message bundles"
809 );
810 None
811 } else {
812 info!(
814 %error,
815 index = i,
816 origin = %incoming_bundle.origin,
817 "Message bundle exceeded block limits and will be discarded for \
818 retry in a later block"
819 );
820 Some(incoming_bundle.origin)
821 };
822 Self::discard_remaining_bundles(block, i, maybe_sender);
823 } else if incoming_bundle.bundle.is_protected()
825 || incoming_bundle.action == MessageAction::Reject
826 {
827 return Err(ChainError::ExecutionError(error, context));
829 } else {
830 info!(
833 %error,
834 index = i,
835 origin = %incoming_bundle.origin,
836 "Message bundle failed to execute and will be rejected"
837 );
838 incoming_bundle.action = MessageAction::Reject;
839 }
841 }
842
843 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
846
847 let recipients = block_execution_tracker.recipients();
848 let mut recipient_heights = Vec::new();
849 let mut indices = Vec::new();
850 for (recipient, height) in chain
851 .previous_message_blocks
852 .multi_get_pairs(recipients)
853 .await?
854 {
855 chain
856 .previous_message_blocks
857 .insert(&recipient, block.height)?;
858 if let Some(height) = height {
859 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
860 indices.push(index);
861 recipient_heights.push((recipient, height));
862 }
863 }
864 let hashes = confirmed_log.multi_get(indices).await?;
865 let mut previous_message_blocks = BTreeMap::new();
866 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
867 let hash = hash.ok_or_else(|| {
868 ChainError::InternalError("missing entry in confirmed_log".into())
869 })?;
870 previous_message_blocks.insert(recipient, (hash, height));
871 }
872
873 let streams = block_execution_tracker.event_streams();
874 let mut stream_heights = Vec::new();
875 let mut indices = Vec::new();
876 for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
877 chain.previous_event_blocks.insert(&stream, block.height)?;
878 if let Some(height) = height {
879 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
880 indices.push(index);
881 stream_heights.push((stream, height));
882 }
883 }
884 let hashes = confirmed_log.multi_get(indices).await?;
885 let mut previous_event_blocks = BTreeMap::new();
886 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
887 let hash = hash.ok_or_else(|| {
888 ChainError::InternalError("missing entry in confirmed_log".into())
889 })?;
890 previous_event_blocks.insert(stream, (hash, height));
891 }
892
893 let state_hash = {
894 #[cfg(with_metrics)]
895 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
896 chain.crypto_hash_mut().await?
897 };
898
899 let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
900 block_execution_tracker.finalize(block.transactions.len());
901
902 Ok((
903 BlockExecutionOutcome {
904 messages,
905 previous_message_blocks,
906 previous_event_blocks,
907 state_hash,
908 oracle_responses,
909 events,
910 blobs,
911 operation_results,
912 },
913 resource_tracker,
914 ))
915 }
916
917 fn discard_remaining_bundles(
919 block: &mut ProposedBlock,
920 mut index: usize,
921 maybe_origin: Option<ChainId>,
922 ) {
923 while index < block.transactions.len() {
924 if matches!(
925 &block.transactions[index],
926 Transaction::ReceiveMessages(bundle)
927 if maybe_origin.is_none_or(|origin| bundle.origin == origin)
928 ) {
929 block.transactions.remove(index);
930 } else {
931 index += 1;
932 }
933 }
934 }
935
936 #[instrument(skip_all, fields(
947 chain_id = %self.chain_id(),
948 block_height = %block.height
949 ))]
950 pub async fn execute_block(
951 &mut self,
952 mut block: ProposedBlock,
953 local_time: Timestamp,
954 round: Option<u32>,
955 published_blobs: &[Blob],
956 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
957 policy: BundleExecutionPolicy,
958 ) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
959 assert_eq!(
960 block.chain_id,
961 self.execution_state.context().extra().chain_id()
962 );
963
964 self.initialize_if_needed(local_time).await?;
965
966 let chain_timestamp = *self.execution_state.system.timestamp.get();
967 ensure!(
968 chain_timestamp <= block.timestamp,
969 ChainError::InvalidBlockTimestamp {
970 parent: chain_timestamp,
971 new: block.timestamp
972 }
973 );
974 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
975
976 ensure!(
977 block.published_blob_ids()
978 == published_blobs
979 .iter()
980 .map(|blob| blob.id())
981 .collect::<BTreeSet<_>>(),
982 ChainError::InternalError("published_blobs mismatch".to_string())
983 );
984
985 if *self.execution_state.system.closed.get() {
986 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
987 }
988
989 Self::check_app_permissions(
990 self.execution_state.system.application_permissions.get(),
991 &block,
992 )?;
993
994 Self::execute_block_inner(
995 &mut self.execution_state,
996 &self.confirmed_log,
997 &mut block,
998 local_time,
999 round,
1000 published_blobs,
1001 replaying_oracle_responses,
1002 policy,
1003 )
1004 .await
1005 .map(|(outcome, tracker)| (block, outcome, tracker))
1006 }
1007
1008 #[instrument(skip_all, fields(
1012 chain_id = %self.chain_id(),
1013 block_height = %block.inner().inner().header.height
1014 ))]
1015 pub async fn apply_confirmed_block(
1016 &mut self,
1017 block: &ConfirmedBlock,
1018 local_time: Timestamp,
1019 ) -> Result<BTreeSet<StreamId>, ChainError> {
1020 let hash = block.inner().hash();
1021 let block = block.inner().inner();
1022 self.execution_state_hash.set(Some(block.header.state_hash));
1023 let updated_streams = self.process_emitted_events(block).await?;
1024 self.process_outgoing_messages(block).await?;
1025
1026 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
1028
1029 let tip = self.tip_state.get_mut();
1031 tip.block_hash = Some(hash);
1032 tip.next_block_height.try_add_assign_one()?;
1033 tip.update_counters(&block.body.transactions, &block.body.messages)?;
1034 self.confirmed_log.push(hash);
1035 self.preprocessed_blocks.remove(&block.header.height)?;
1036 Ok(updated_streams)
1037 }
1038
1039 #[instrument(skip_all, fields(
1042 chain_id = %self.chain_id(),
1043 block_height = %block.inner().inner().header.height
1044 ))]
1045 pub async fn preprocess_block(
1046 &mut self,
1047 block: &ConfirmedBlock,
1048 ) -> Result<BTreeSet<StreamId>, ChainError> {
1049 let hash = block.inner().hash();
1050 let block = block.inner().inner();
1051 let height = block.header.height;
1052 if height < self.tip_state.get().next_block_height {
1053 return Ok(BTreeSet::new());
1054 }
1055 self.process_outgoing_messages(block).await?;
1056 let updated_streams = self.process_emitted_events(block).await?;
1057 self.preprocessed_blocks.insert(&height, hash)?;
1058 Ok(updated_streams)
1059 }
1060
1061 pub fn is_child(&self) -> bool {
1063 let Some(description) = self.execution_state.system.description.get() else {
1064 return true;
1066 };
1067 description.is_child()
1068 }
1069
1070 #[instrument(skip_all, fields(
1072 block_height = %block.height,
1073 num_transactions = %block.transactions.len()
1074 ))]
1075 fn check_app_permissions(
1076 app_permissions: &ApplicationPermissions,
1077 block: &ProposedBlock,
1078 ) -> Result<(), ChainError> {
1079 let mut mandatory = HashSet::<ApplicationId>::from_iter(
1080 app_permissions.mandatory_applications.iter().copied(),
1081 );
1082 for transaction in &block.transactions {
1083 match transaction {
1084 Transaction::ExecuteOperation(operation)
1085 if operation.is_exempt_from_permissions() =>
1086 {
1087 mandatory.clear()
1088 }
1089 Transaction::ExecuteOperation(operation) => {
1090 ensure!(
1091 app_permissions.can_execute_operations(&operation.application_id()),
1092 ChainError::AuthorizedApplications(
1093 app_permissions.execute_operations.clone().unwrap()
1094 )
1095 );
1096 if let Operation::User { application_id, .. } = operation {
1097 mandatory.remove(application_id);
1098 }
1099 }
1100 Transaction::ReceiveMessages(incoming_bundle)
1101 if incoming_bundle.action == MessageAction::Accept =>
1102 {
1103 for pending in incoming_bundle.messages() {
1104 if let Message::User { application_id, .. } = &pending.message {
1105 mandatory.remove(application_id);
1106 }
1107 }
1108 }
1109 Transaction::ReceiveMessages(_) => {}
1110 }
1111 }
1112 ensure!(
1113 mandatory.is_empty(),
1114 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1115 );
1116 Ok(())
1117 }
1118
1119 #[instrument(skip_all, fields(
1124 chain_id = %self.chain_id(),
1125 next_block_height = %self.tip_state.get().next_block_height,
1126 ))]
1127 pub async fn block_hashes(
1128 &self,
1129 heights: impl IntoIterator<Item = BlockHeight>,
1130 ) -> Result<Vec<CryptoHash>, ChainError> {
1131 let next_height = self.tip_state.get().next_block_height;
1132 let (confirmed_heights, unconfirmed_heights) = heights
1134 .into_iter()
1135 .partition::<Vec<_>, _>(|height| *height < next_height);
1136 let confirmed_indices = confirmed_heights
1137 .into_iter()
1138 .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1139 .collect::<Result<_, _>>()?;
1140 let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1141 let unconfirmed_hashes = self
1143 .preprocessed_blocks
1144 .multi_get(&unconfirmed_heights)
1145 .await?;
1146 Ok(confirmed_hashes
1147 .into_iter()
1148 .chain(unconfirmed_hashes)
1149 .flatten()
1150 .collect())
1151 }
1152
1153 fn reset_chain_manager(
1155 &mut self,
1156 next_height: BlockHeight,
1157 local_time: Timestamp,
1158 ) -> Result<(), ChainError> {
1159 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1160 let ownership = self.execution_state.system.ownership.get().clone();
1161 let fallback_owners =
1162 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1163 self.pending_validated_blobs.clear();
1164 self.pending_proposed_blobs.clear();
1165 self.manager
1166 .reset(ownership, next_height, local_time, fallback_owners)
1167 }
1168
1169 #[instrument(skip_all, fields(
1173 chain_id = %self.chain_id(),
1174 block_height = %block.header.height
1175 ))]
1176 async fn process_outgoing_messages(
1177 &mut self,
1178 block: &Block,
1179 ) -> Result<Vec<ChainId>, ChainError> {
1180 let recipients = block.recipients();
1183 let block_height = block.header.height;
1184 let next_height = self.tip_state.get().next_block_height;
1185
1186 let outbox_counters = self.outbox_counters.get_mut();
1188 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1189 let targets = recipients.into_iter().collect::<Vec<_>>();
1190 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1191 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1192 if block_height > next_height {
1193 if *outbox.next_height_to_schedule.get() > block_height {
1196 continue; }
1198 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1199 {
1200 Some(height) if height < next_height => {
1203 let index =
1204 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1205 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1206 ChainError::InternalError("missing entry in confirmed_log".into())
1207 })?)
1208 }
1209 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1212 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1213 )?),
1214 None => None, };
1216 match (
1218 maybe_prev_hash,
1219 block.body.previous_message_blocks.get(target),
1220 ) {
1221 (None, None) => {
1222 }
1225 (Some(_), None) => {
1226 return Err(ChainError::InternalError(
1229 "block indicates no previous message block,\
1230 but we have one in the outbox"
1231 .into(),
1232 ));
1233 }
1234 (None, Some((_, prev_msg_block_height))) => {
1235 if *prev_msg_block_height >= next_height {
1240 continue;
1241 }
1242 }
1243 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1244 if prev_hash != prev_msg_block_hash {
1246 continue;
1247 }
1248 }
1249 }
1250 }
1251 if outbox.schedule_message(block_height)? {
1252 *outbox_counters.entry(block_height).or_default() += 1;
1253 nonempty_outboxes.insert(*target);
1254 }
1255 }
1256
1257 #[cfg(with_metrics)]
1258 metrics::NUM_OUTBOXES
1259 .with_label_values(&[])
1260 .observe(self.outboxes.count().await? as f64);
1261 Ok(targets)
1262 }
1263
1264 #[instrument(skip_all, fields(
1268 chain_id = %self.chain_id(),
1269 block_height = %block.header.height
1270 ))]
1271 async fn process_emitted_events(
1272 &mut self,
1273 block: &Block,
1274 ) -> Result<BTreeSet<StreamId>, ChainError> {
1275 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1276 for event in block.body.events.iter().flatten() {
1277 emitted_streams
1278 .entry(event.stream_id.clone())
1279 .or_default()
1280 .insert(event.index);
1281 }
1282 let mut stream_ids = Vec::new();
1283 let mut list_indices = Vec::new();
1284 for (stream_id, indices) in emitted_streams {
1285 stream_ids.push(stream_id);
1286 list_indices.push(indices);
1287 }
1288
1289 let mut updated_streams = BTreeSet::new();
1290 for ((stream_id, next_index), indices) in self
1291 .next_expected_events
1292 .multi_get_pairs(stream_ids)
1293 .await?
1294 .into_iter()
1295 .zip(list_indices)
1296 {
1297 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1298 1
1300 } else {
1301 0
1302 };
1303 let mut current_expected_index = next_index.unwrap_or(initial_index);
1304 for index in indices {
1305 if index == current_expected_index {
1306 updated_streams.insert(stream_id.clone());
1307 current_expected_index = index.saturating_add(1);
1308 }
1309 }
1310 if current_expected_index != 0 {
1311 self.next_expected_events
1312 .insert(&stream_id, current_expected_index)?;
1313 }
1314 }
1315 Ok(updated_streams)
1316 }
1317}
1318
1319#[test]
1320fn empty_block_size() {
1321 let size = bcs::serialized_size(&crate::block::Block::new(
1322 crate::test::make_first_block(
1323 linera_execution::test_utils::dummy_chain_description(0).id(),
1324 ),
1325 crate::data_types::BlockExecutionOutcome::default(),
1326 ))
1327 .unwrap();
1328 assert_eq!(size, EMPTY_BLOCK_SIZE);
1329}