1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 ops::RangeBounds,
7 sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{
14 Amount, ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15 BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16 },
17 ensure,
18 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, MessageId},
19 ownership::ChainOwnership,
20};
21use linera_execution::{
22 committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext,
23 Operation, OperationContext, OutgoingMessage, Query, QueryContext, QueryOutcome,
24 ResourceController, ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27 bucket_queue_view::BucketQueueView,
28 context::Context,
29 log_view::LogView,
30 map_view::MapView,
31 reentrant_collection_view::ReentrantCollectionView,
32 register_view::RegisterView,
33 set_view::SetView,
34 store::ReadableKeyValueStore as _,
35 views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40 block::{Block, ConfirmedBlock},
41 block_tracker::BlockExecutionTracker,
42 data_types::{
43 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageAction, MessageBundle,
44 PostedMessage, ProposedBlock, Transaction,
45 },
46 inbox::{Cursor, InboxError, InboxStateView},
47 manager::ChainManager,
48 outbox::OutboxStateView,
49 pending_blobs::PendingBlobsView,
50 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62 use std::sync::LazyLock;
63
64 use linera_base::prometheus_util::{
65 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66 register_int_counter_vec,
67 };
68 use linera_execution::ResourceTracker;
69 use prometheus::{HistogramVec, IntCounterVec};
70
71 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73 });
74
75 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76 register_histogram_vec(
77 "block_execution_latency",
78 "Block execution latency",
79 &[],
80 exponential_bucket_latencies(1000.0),
81 )
82 });
83
84 #[cfg(with_metrics)]
85 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86 register_histogram_vec(
87 "message_execution_latency",
88 "Message execution latency",
89 &[],
90 exponential_bucket_latencies(50.0),
91 )
92 });
93
94 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95 register_histogram_vec(
96 "operation_execution_latency",
97 "Operation execution latency",
98 &[],
99 exponential_bucket_latencies(50.0),
100 )
101 });
102
103 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104 register_histogram_vec(
105 "wasm_fuel_used_per_block",
106 "Wasm fuel used per block",
107 &[],
108 exponential_bucket_interval(10.0, 1_000_000.0),
109 )
110 });
111
112 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "evm_fuel_used_per_block",
115 "EVM fuel used per block",
116 &[],
117 exponential_bucket_interval(10.0, 1_000_000.0),
118 )
119 });
120
121 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122 register_histogram_vec(
123 "vm_num_reads_per_block",
124 "VM number of reads per block",
125 &[],
126 exponential_bucket_interval(0.1, 100.0),
127 )
128 });
129
130 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131 register_histogram_vec(
132 "vm_bytes_read_per_block",
133 "VM number of bytes read per block",
134 &[],
135 exponential_bucket_interval(0.1, 10_000_000.0),
136 )
137 });
138
139 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140 register_histogram_vec(
141 "vm_bytes_written_per_block",
142 "VM number of bytes written per block",
143 &[],
144 exponential_bucket_interval(0.1, 10_000_000.0),
145 )
146 });
147
148 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149 register_histogram_vec(
150 "state_hash_computation_latency",
151 "Time to recompute the state hash",
152 &[],
153 exponential_bucket_latencies(500.0),
154 )
155 });
156
157 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158 register_histogram_vec(
159 "num_inboxes",
160 "Number of inboxes",
161 &[],
162 exponential_bucket_interval(1.0, 10_000.0),
163 )
164 });
165
166 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
167 register_histogram_vec(
168 "num_outboxes",
169 "Number of outboxes",
170 &[],
171 exponential_bucket_interval(1.0, 10_000.0),
172 )
173 });
174
175 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
177 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
178 WASM_FUEL_USED_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.wasm_fuel as f64);
181 EVM_FUEL_USED_PER_BLOCK
182 .with_label_values(&[])
183 .observe(tracker.evm_fuel as f64);
184 VM_NUM_READS_PER_BLOCK
185 .with_label_values(&[])
186 .observe(tracker.read_operations as f64);
187 VM_BYTES_READ_PER_BLOCK
188 .with_label_values(&[])
189 .observe(tracker.bytes_read as f64);
190 VM_BYTES_WRITTEN_PER_BLOCK
191 .with_label_values(&[])
192 .observe(tracker.bytes_written as f64);
193 }
194}
195
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
198
199#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TimestampedBundleInInbox {
203 pub entry: BundleInInbox,
205 pub seen: Timestamp,
207}
208
209#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
211#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
212pub struct BundleInInbox {
213 pub origin: ChainId,
215 pub cursor: Cursor,
217}
218
219impl BundleInInbox {
220 fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
221 BundleInInbox {
222 cursor: Cursor::from(bundle),
223 origin,
224 }
225 }
226}
227
228const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
231
232#[cfg_attr(
234 with_graphql,
235 derive(async_graphql::SimpleObject),
236 graphql(cache_control(no_cache))
237)]
238#[derive(Debug, RootView, ClonableView)]
239pub struct ChainStateView<C>
240where
241 C: Clone + Context + Send + Sync + 'static,
242{
243 pub execution_state: ExecutionStateView<C>,
245 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
247
248 pub tip_state: RegisterView<C, ChainTipState>,
250
251 pub manager: ChainManager<C>,
253 pub pending_validated_blobs: PendingBlobsView<C>,
256 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
258
259 pub confirmed_log: LogView<C, CryptoHash>,
262 pub received_log: LogView<C, ChainAndHeight>,
264 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
266
267 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
269 pub unskippable_bundles:
271 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
272 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
274 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
276 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
278 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
281
282 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
284}
285
286#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
288#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
289pub struct ChainTipState {
290 pub block_hash: Option<CryptoHash>,
292 pub next_block_height: BlockHeight,
294 pub num_incoming_bundles: u32,
296 pub num_operations: u32,
298 pub num_outgoing_messages: u32,
300}
301
302impl ChainTipState {
303 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
306 ensure!(
307 new_block.height == self.next_block_height,
308 ChainError::UnexpectedBlockHeight {
309 expected_block_height: self.next_block_height,
310 found_block_height: new_block.height
311 }
312 );
313 ensure!(
314 new_block.previous_block_hash == self.block_hash,
315 ChainError::UnexpectedPreviousBlockHash
316 );
317 Ok(())
318 }
319
320 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
323 ensure!(
324 self.next_block_height >= height,
325 ChainError::MissingEarlierBlocks {
326 current_block_height: self.next_block_height,
327 }
328 );
329 Ok(self.next_block_height > height)
330 }
331
332 pub fn update_counters(
334 &mut self,
335 incoming_bundles: &[IncomingBundle],
336 operations: &[Operation],
337 messages: &[Vec<OutgoingMessage>],
338 ) -> Result<(), ChainError> {
339 let num_incoming_bundles =
340 u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
341 self.num_incoming_bundles = self
342 .num_incoming_bundles
343 .checked_add(num_incoming_bundles)
344 .ok_or(ArithmeticError::Overflow)?;
345
346 let num_operations =
347 u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
348 self.num_operations = self
349 .num_operations
350 .checked_add(num_operations)
351 .ok_or(ArithmeticError::Overflow)?;
352
353 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
354 .map_err(|_| ArithmeticError::Overflow)?;
355 self.num_outgoing_messages = self
356 .num_outgoing_messages
357 .checked_add(num_outgoing_messages)
358 .ok_or(ArithmeticError::Overflow)?;
359
360 Ok(())
361 }
362}
363
364impl<C> ChainStateView<C>
365where
366 C: Context + Clone + Send + Sync + 'static,
367 C::Extra: ExecutionRuntimeContext,
368{
369 pub fn chain_id(&self) -> ChainId {
371 self.context().extra().chain_id()
372 }
373
374 pub async fn query_application(
375 &mut self,
376 local_time: Timestamp,
377 query: Query,
378 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
379 ) -> Result<QueryOutcome, ChainError> {
380 let context = QueryContext {
381 chain_id: self.chain_id(),
382 next_block_height: self.tip_state.get().next_block_height,
383 local_time,
384 };
385 self.execution_state
386 .query_application(context, query, service_runtime_endpoint)
387 .await
388 .with_execution_context(ChainExecutionContext::Query)
389 }
390
391 pub async fn describe_application(
392 &mut self,
393 application_id: ApplicationId,
394 ) -> Result<ApplicationDescription, ChainError> {
395 self.execution_state
396 .system
397 .describe_application(application_id, &mut TransactionTracker::default())
398 .await
399 .with_execution_context(ChainExecutionContext::DescribeApplication)
400 }
401
402 pub async fn mark_messages_as_received(
403 &mut self,
404 target: &ChainId,
405 height: BlockHeight,
406 ) -> Result<bool, ChainError> {
407 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
408 let updates = outbox.mark_messages_as_received(height).await?;
409 if updates.is_empty() {
410 return Ok(false);
411 }
412 for update in updates {
413 let counter = self
414 .outbox_counters
415 .get_mut()
416 .get_mut(&update)
417 .expect("message counter should be present");
418 *counter = counter
419 .checked_sub(1)
420 .expect("message counter should not underflow");
421 if *counter == 0 {
422 self.outbox_counters.get_mut().remove(&update);
424 }
425 }
426 #[cfg(with_metrics)]
427 metrics::NUM_OUTBOXES
428 .with_label_values(&[])
429 .observe(self.outboxes.count().await? as f64);
430 Ok(true)
431 }
432
433 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
436 tracing::debug!(
437 "Messages left in {:.8}'s outbox: {:?}",
438 self.chain_id(),
439 self.outbox_counters.get()
440 );
441 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
442 key > &height
443 } else {
444 true
445 }
446 }
447
448 pub fn is_active(&self) -> bool {
450 self.execution_state.system.is_active()
451 }
452
453 pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
455 if self
457 .execution_state
458 .system
459 .initialize_chain(self.chain_id())
460 .await
461 .with_execution_context(ChainExecutionContext::Block)?
462 {
463 return Ok(());
465 }
466 let hash = self.execution_state.crypto_hash().await?;
468 self.execution_state_hash.set(Some(hash));
469 let maybe_committee = self.execution_state.system.current_committee().into_iter();
470 self.manager.reset(
472 self.execution_state.system.ownership.get().clone(),
473 BlockHeight(0),
474 local_time,
475 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
476 )?;
477 self.save().await?;
478 Ok(())
479 }
480
481 pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
484 let chain_id = self.chain_id();
485 let pairs = self.inboxes.try_load_all_entries().await?;
486 let max_stream_queries = self.context().store().max_stream_queries();
487 let stream = stream::iter(pairs)
488 .map(|(origin, inbox)| async move {
489 if let Some(bundle) = inbox.removed_bundles.front().await? {
490 return Err(ChainError::MissingCrossChainUpdate {
491 chain_id,
492 origin,
493 height: bundle.height,
494 });
495 }
496 Ok::<(), ChainError>(())
497 })
498 .buffer_unordered(max_stream_queries);
499 stream.try_collect::<Vec<_>>().await?;
500 Ok(())
501 }
502
503 pub async fn next_block_height_to_receive(
504 &self,
505 origin: &ChainId,
506 ) -> Result<BlockHeight, ChainError> {
507 let inbox = self.inboxes.try_load_entry(origin).await?;
508 match inbox {
509 Some(inbox) => inbox.next_block_height_to_receive(),
510 None => Ok(BlockHeight::ZERO),
511 }
512 }
513
514 pub async fn last_anticipated_block_height(
515 &self,
516 origin: &ChainId,
517 ) -> Result<Option<BlockHeight>, ChainError> {
518 let inbox = self.inboxes.try_load_entry(origin).await?;
519 match inbox {
520 Some(inbox) => match inbox.removed_bundles.back().await? {
521 Some(bundle) => Ok(Some(bundle.height)),
522 None => Ok(None),
523 },
524 None => Ok(None),
525 }
526 }
527
528 pub async fn receive_message_bundle(
535 &mut self,
536 origin: &ChainId,
537 bundle: MessageBundle,
538 local_time: Timestamp,
539 add_to_received_log: bool,
540 ) -> Result<(), ChainError> {
541 assert!(!bundle.messages.is_empty());
542 let chain_id = self.chain_id();
543 tracing::trace!(
544 "Processing new messages to {chain_id:.8} from {origin} at height {}",
545 bundle.height,
546 );
547 let chain_and_height = ChainAndHeight {
548 chain_id: *origin,
549 height: bundle.height,
550 };
551
552 match self.ensure_is_active(local_time).await {
553 Ok(_) => (),
554 Err(ChainError::ExecutionError(exec_err, _))
557 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
558 if blobs.iter().all(|blob_id| {
559 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
560 })) => {}
561 err => {
562 return err;
563 }
564 }
565
566 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
568 #[cfg(with_metrics)]
569 metrics::NUM_INBOXES
570 .with_label_values(&[])
571 .observe(self.inboxes.count().await? as f64);
572 let entry = BundleInInbox::new(*origin, &bundle);
573 let skippable = bundle.is_skippable();
574 let newly_added = inbox
575 .add_bundle(bundle)
576 .await
577 .map_err(|error| match error {
578 InboxError::ViewError(error) => ChainError::ViewError(error),
579 error => ChainError::InternalError(format!(
580 "while processing messages in certified block: {error}"
581 )),
582 })?;
583 if newly_added && !skippable {
584 let seen = local_time;
585 self.unskippable_bundles
586 .push_back(TimestampedBundleInInbox { entry, seen });
587 }
588
589 if add_to_received_log {
591 self.received_log.push(chain_and_height);
592 }
593 Ok(())
594 }
595
596 pub fn update_received_certificate_trackers(
598 &mut self,
599 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
600 ) {
601 for (name, tracker) in new_trackers {
602 self.received_certificate_trackers
603 .get_mut()
604 .entry(name)
605 .and_modify(|t| {
606 if tracker > *t {
609 *t = tracker;
610 }
611 })
612 .or_insert(tracker);
613 }
614 }
615
616 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
617 self.execution_state
618 .system
619 .current_committee()
620 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
621 }
622
623 pub fn ownership(&self) -> &ChainOwnership {
624 self.execution_state.system.ownership.get()
625 }
626
627 pub async fn remove_bundles_from_inboxes(
629 &mut self,
630 timestamp: Timestamp,
631 incoming_bundles: &[IncomingBundle],
632 ) -> Result<(), ChainError> {
633 let chain_id = self.chain_id();
634 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
635 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
636 ensure!(
637 bundle.timestamp <= timestamp,
638 ChainError::IncorrectBundleTimestamp {
639 chain_id,
640 bundle_timestamp: bundle.timestamp,
641 block_timestamp: timestamp,
642 }
643 );
644 let bundles = bundles_by_origin.entry(origin).or_default();
645 bundles.push(bundle);
646 }
647 let origins = bundles_by_origin.keys().copied();
648 let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
649 let mut removed_unskippable = HashSet::new();
650 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
651 tracing::trace!(
652 "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
653 bundles
654 .iter()
655 .map(|bundle| bundle.height)
656 .collect::<Vec<_>>()
657 );
658 for bundle in bundles {
659 let was_present = inbox
661 .remove_bundle(bundle)
662 .await
663 .map_err(|error| (chain_id, *origin, error))?;
664 if was_present && !bundle.is_skippable() {
665 removed_unskippable.insert(BundleInInbox::new(*origin, bundle));
666 }
667 }
668 }
669 if !removed_unskippable.is_empty() {
670 let maybe_front = self.unskippable_bundles.front();
672 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
673 self.unskippable_bundles.delete_front().await?;
674 while let Some(ts_entry) = self.unskippable_bundles.front() {
675 if !removed_unskippable.remove(&ts_entry.entry) {
676 if !self
677 .removed_unskippable_bundles
678 .contains(&ts_entry.entry)
679 .await?
680 {
681 break;
682 }
683 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
684 }
685 self.unskippable_bundles.delete_front().await?;
686 }
687 }
688 for entry in removed_unskippable {
689 self.removed_unskippable_bundles.insert(&entry)?;
690 }
691 }
692 #[cfg(with_metrics)]
693 metrics::NUM_INBOXES
694 .with_label_values(&[])
695 .observe(self.inboxes.count().await? as f64);
696 Ok(())
697 }
698
699 #[expect(clippy::too_many_arguments)]
702 async fn execute_block_inner(
703 chain: &mut ExecutionStateView<C>,
704 confirmed_log: &LogView<C, CryptoHash>,
705 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
706 block: &ProposedBlock,
707 local_time: Timestamp,
708 round: Option<u32>,
709 published_blobs: &[Blob],
710 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
711 ) -> Result<BlockExecutionOutcome, ChainError> {
712 #[cfg(with_metrics)]
713 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
714 chain.system.timestamp.set(block.timestamp);
715
716 let policy = chain
717 .system
718 .current_committee()
719 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
720 .1
721 .policy()
722 .clone();
723
724 let mut resource_controller = ResourceController::new(
725 Arc::new(policy),
726 ResourceTracker::default(),
727 block.authenticated_signer,
728 );
729
730 for blob in published_blobs {
731 let blob_id = blob.id();
732 resource_controller
733 .policy()
734 .check_blob_size(blob.content())
735 .with_execution_context(ChainExecutionContext::Block)?;
736 chain.system.used_blobs.insert(&blob_id)?;
737 }
738
739 let mut block_execution_tracker = BlockExecutionTracker::new(
742 &mut resource_controller,
743 published_blobs
744 .iter()
745 .map(|blob| (blob.id(), blob))
746 .collect(),
747 local_time,
748 replaying_oracle_responses,
749 block,
750 )?;
751
752 for transaction in block.transactions() {
753 let chain_execution_context =
754 block_execution_tracker.chain_execution_context(&transaction);
755 let mut txn_tracker = block_execution_tracker.new_transaction_tracker()?;
756 match transaction {
757 Transaction::ReceiveMessages(incoming_bundle) => {
758 block_execution_tracker
759 .resource_controller_mut()
760 .track_block_size_of(&incoming_bundle)
761 .with_execution_context(chain_execution_context)?;
762 for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
763 Box::pin(Self::execute_message_in_block(
764 chain,
765 message_id,
766 posted_message,
767 incoming_bundle,
768 block,
769 round,
770 &mut txn_tracker,
771 block_execution_tracker.resource_controller_mut(),
772 ))
773 .await?;
774 }
775 }
776 Transaction::ExecuteOperation(operation) => {
777 block_execution_tracker
778 .resource_controller_mut()
779 .with_state(&mut chain.system)
780 .await?
781 .track_block_size_of(&operation)
782 .with_execution_context(chain_execution_context)?;
783 #[cfg(with_metrics)]
784 let _operation_latency = metrics::OPERATION_EXECUTION_LATENCY.measure_latency();
785 let context = OperationContext {
786 chain_id: block.chain_id,
787 height: block.height,
788 round,
789 authenticated_signer: block.authenticated_signer,
790 authenticated_caller_id: None,
791 timestamp: block.timestamp,
792 };
793 Box::pin(chain.execute_operation(
794 context,
795 operation.clone(),
796 &mut txn_tracker,
797 block_execution_tracker.resource_controller_mut(),
798 ))
799 .await
800 .with_execution_context(chain_execution_context)?;
801 block_execution_tracker
802 .resource_controller_mut()
803 .with_state(&mut chain.system)
804 .await?
805 .track_operation(operation)
806 .with_execution_context(chain_execution_context)?;
807 }
808 }
809
810 let txn_outcome = txn_tracker
811 .into_outcome()
812 .with_execution_context(chain_execution_context)?;
813
814 block_execution_tracker
815 .process_txn_outcome(&txn_outcome, &mut chain.system, chain_execution_context)
816 .await?;
817 }
818
819 let recipients = block_execution_tracker.recipients();
820 let mut previous_message_blocks = BTreeMap::new();
821 for recipient in recipients {
822 if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
823 let hash = confirmed_log
824 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
825 .await?
826 .ok_or_else(|| {
827 ChainError::InternalError("missing entry in confirmed_log".into())
828 })?;
829 previous_message_blocks.insert(recipient, hash);
830 }
831 }
832
833 let state_hash = {
834 #[cfg(with_metrics)]
835 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
836 chain.crypto_hash().await?
837 };
838
839 let (messages, oracle_responses, events, blobs, operation_results) =
840 block_execution_tracker.finalize();
841
842 Ok(BlockExecutionOutcome {
843 messages,
844 previous_message_blocks,
845 state_hash,
846 oracle_responses,
847 events,
848 blobs,
849 operation_results,
850 })
851 }
852
853 pub async fn execute_block(
856 &mut self,
857 block: &ProposedBlock,
858 local_time: Timestamp,
859 round: Option<u32>,
860 published_blobs: &[Blob],
861 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
862 ) -> Result<BlockExecutionOutcome, ChainError> {
863 assert_eq!(
864 block.chain_id,
865 self.execution_state.context().extra().chain_id()
866 );
867
868 self.ensure_is_active(local_time).await?;
869
870 let chain_timestamp = *self.execution_state.system.timestamp.get();
871 ensure!(
872 chain_timestamp <= block.timestamp,
873 ChainError::InvalidBlockTimestamp {
874 parent: chain_timestamp,
875 new: block.timestamp
876 }
877 );
878 ensure!(
879 !block.incoming_bundles.is_empty() || !block.operations.is_empty(),
880 ChainError::EmptyBlock
881 );
882
883 ensure!(
884 block.published_blob_ids()
885 == published_blobs
886 .iter()
887 .map(|blob| blob.id())
888 .collect::<BTreeSet<_>>(),
889 ChainError::InternalError("published_blobs mismatch".to_string())
890 );
891
892 if *self.execution_state.system.closed.get() {
893 ensure!(
894 !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
895 ChainError::ClosedChain
896 );
897 }
898
899 Self::check_app_permissions(
900 self.execution_state.system.application_permissions.get(),
901 block,
902 )?;
903
904 Self::execute_block_inner(
905 &mut self.execution_state,
906 &self.confirmed_log,
907 &self.previous_message_blocks,
908 block,
909 local_time,
910 round,
911 published_blobs,
912 replaying_oracle_responses,
913 )
914 .await
915 }
916
917 pub async fn apply_confirmed_block(
920 &mut self,
921 block: &ConfirmedBlock,
922 local_time: Timestamp,
923 ) -> Result<(), ChainError> {
924 let hash = block.inner().hash();
925 let block = block.inner().inner();
926 self.execution_state_hash.set(Some(block.header.state_hash));
927 let recipients = self.process_outgoing_messages(block).await?;
928
929 for recipient in recipients {
930 self.previous_message_blocks
931 .insert(&recipient, block.header.height)?;
932 }
933 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
935
936 let tip = self.tip_state.get_mut();
938 tip.block_hash = Some(hash);
939 tip.next_block_height.try_add_assign_one()?;
940 tip.update_counters(
941 &block.body.incoming_bundles,
942 &block.body.operations,
943 &block.body.messages,
944 )?;
945 self.confirmed_log.push(hash);
946 self.preprocessed_blocks.remove(&block.header.height)?;
947 Ok(())
948 }
949
950 pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
952 let hash = block.inner().hash();
953 let block = block.inner().inner();
954 let height = block.header.height;
955 if height < self.tip_state.get().next_block_height {
956 return Ok(());
957 }
958 self.process_outgoing_messages(block).await?;
959 self.preprocessed_blocks.insert(&height, hash)?;
960 Ok(())
961 }
962
963 #[expect(clippy::too_many_arguments)]
965 async fn execute_message_in_block(
966 chain: &mut ExecutionStateView<C>,
967 message_id: MessageId,
968 posted_message: &PostedMessage,
969 incoming_bundle: &IncomingBundle,
970 block: &ProposedBlock,
971 round: Option<u32>,
972 txn_tracker: &mut TransactionTracker,
973 resource_controller: &mut ResourceController<Option<AccountOwner>>,
974 ) -> Result<(), ChainError> {
975 #[cfg(with_metrics)]
976 let _message_latency = metrics::MESSAGE_EXECUTION_LATENCY.measure_latency();
977 let context = MessageContext {
978 chain_id: block.chain_id,
979 is_bouncing: posted_message.is_bouncing(),
980 height: block.height,
981 round,
982 message_id,
983 authenticated_signer: posted_message.authenticated_signer,
984 refund_grant_to: posted_message.refund_grant_to,
985 timestamp: block.timestamp,
986 };
987 let mut grant = posted_message.grant;
988 match incoming_bundle.action {
989 MessageAction::Accept => {
990 let chain_execution_context =
991 ChainExecutionContext::IncomingBundle(txn_tracker.transaction_index());
992 ensure!(!chain.system.closed.get(), ChainError::ClosedChain);
994
995 Box::pin(chain.execute_message(
996 context,
997 posted_message.message.clone(),
998 (grant > Amount::ZERO).then_some(&mut grant),
999 txn_tracker,
1000 resource_controller,
1001 ))
1002 .await
1003 .with_execution_context(chain_execution_context)?;
1004 chain
1005 .send_refund(context, grant, txn_tracker)
1006 .await
1007 .with_execution_context(chain_execution_context)?;
1008 }
1009 MessageAction::Reject => {
1010 ensure!(
1013 !posted_message.is_protected() || *chain.system.closed.get(),
1014 ChainError::CannotRejectMessage {
1015 chain_id: block.chain_id,
1016 origin: incoming_bundle.origin,
1017 posted_message: Box::new(posted_message.clone()),
1018 }
1019 );
1020 if posted_message.is_tracked() {
1021 chain
1023 .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1024 .await
1025 .with_execution_context(ChainExecutionContext::Block)?;
1026 } else {
1027 chain
1029 .send_refund(context, grant, txn_tracker)
1030 .await
1031 .with_execution_context(ChainExecutionContext::Block)?;
1032 }
1033 }
1034 }
1035 Ok(())
1036 }
1037
1038 pub fn is_child(&self) -> bool {
1040 let Some(description) = self.execution_state.system.description.get() else {
1041 return true;
1043 };
1044 description.is_child()
1045 }
1046
1047 fn check_app_permissions(
1049 app_permissions: &ApplicationPermissions,
1050 block: &ProposedBlock,
1051 ) -> Result<(), ChainError> {
1052 let mut mandatory = HashSet::<ApplicationId>::from_iter(
1053 app_permissions.mandatory_applications.iter().cloned(),
1054 );
1055 for operation in &block.operations {
1056 if operation.is_exempt_from_permissions() {
1057 mandatory.clear();
1058 continue;
1059 }
1060 ensure!(
1061 app_permissions.can_execute_operations(&operation.application_id()),
1062 ChainError::AuthorizedApplications(
1063 app_permissions.execute_operations.clone().unwrap()
1064 )
1065 );
1066 if let Operation::User { application_id, .. } = operation {
1067 mandatory.remove(application_id);
1068 }
1069 }
1070 for pending in block.incoming_messages() {
1071 if mandatory.is_empty() {
1072 break;
1073 }
1074 if let Message::User { application_id, .. } = &pending.message {
1075 mandatory.remove(application_id);
1076 }
1077 }
1078 ensure!(
1079 mandatory.is_empty(),
1080 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1081 );
1082 Ok(())
1083 }
1084
1085 pub async fn block_hashes(
1088 &self,
1089 range: impl RangeBounds<BlockHeight>,
1090 ) -> Result<Vec<CryptoHash>, ChainError> {
1091 let next_height = self.tip_state.get().next_block_height;
1092 let Some((start, end)) = range.to_inclusive() else {
1094 return Ok(Vec::new());
1095 };
1096 let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1097 let usize_start = usize::try_from(start)?;
1098 let usize_end = usize::try_from(end.min(last_height))?;
1099 self.confirmed_log.read(usize_start..=usize_end).await?
1100 } else {
1101 Vec::new()
1102 };
1103 for height in start.max(next_height).0..=end.0 {
1104 hashes.push(
1105 self.preprocessed_blocks
1106 .get(&BlockHeight(height))
1107 .await?
1108 .ok_or_else(|| {
1109 ChainError::InternalError("missing entry in preprocessed_blocks".into())
1110 })?,
1111 );
1112 }
1113 Ok(hashes)
1114 }
1115
1116 fn reset_chain_manager(
1118 &mut self,
1119 next_height: BlockHeight,
1120 local_time: Timestamp,
1121 ) -> Result<(), ChainError> {
1122 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1123 let ownership = self.execution_state.system.ownership.get().clone();
1124 let fallback_owners =
1125 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1126 self.pending_validated_blobs.clear();
1127 self.pending_proposed_blobs.clear();
1128 self.manager
1129 .reset(ownership, next_height, local_time, fallback_owners)
1130 }
1131
1132 async fn process_outgoing_messages(
1136 &mut self,
1137 block: &Block,
1138 ) -> Result<Vec<ChainId>, ChainError> {
1139 let recipients = block.recipients();
1142 let block_height = block.header.height;
1143 let next_height = self.tip_state.get().next_block_height;
1144
1145 let outbox_counters = self.outbox_counters.get_mut();
1147 let targets = recipients.into_iter().collect::<Vec<_>>();
1148 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1149 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1150 if block_height > next_height {
1151 let prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok() {
1153 Some(height) if height < next_height => {
1154 let index =
1155 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1156 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1157 ChainError::InternalError("missing entry in confirmed_log".into())
1158 })?)
1159 }
1160 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1161 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1162 )?),
1163 None => None,
1164 };
1165 if prev_hash.as_ref() != block.body.previous_message_blocks.get(target) {
1167 continue;
1168 }
1169 }
1170 if outbox.schedule_message(block_height)? {
1171 *outbox_counters.entry(block_height).or_default() += 1;
1172 }
1173 }
1174
1175 #[cfg(with_metrics)]
1176 metrics::NUM_OUTBOXES
1177 .with_label_values(&[])
1178 .observe(self.outboxes.count().await? as f64);
1179 Ok(targets)
1180 }
1181}
1182
1183#[test]
1184fn empty_block_size() {
1185 let size = bcs::serialized_size(&crate::block::Block::new(
1186 crate::test::make_first_block(
1187 linera_execution::test_utils::dummy_chain_description(0).id(),
1188 ),
1189 crate::data_types::BlockExecutionOutcome::default(),
1190 ))
1191 .unwrap();
1192 assert_eq!(size, EMPTY_BLOCK_SIZE);
1193}