1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 ops::RangeBounds,
7 sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{
14 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15 BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16 },
17 ensure,
18 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19 ownership::ChainOwnership,
20};
21use linera_execution::{
22 committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, Operation,
23 OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController, ResourceTracker,
24 ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27 bucket_queue_view::BucketQueueView,
28 context::Context,
29 log_view::LogView,
30 map_view::MapView,
31 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32 register_view::RegisterView,
33 set_view::SetView,
34 store::ReadableKeyValueStore as _,
35 views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40 block::{Block, ConfirmedBlock},
41 block_tracker::BlockExecutionTracker,
42 data_types::{
43 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
44 },
45 inbox::{Cursor, InboxError, InboxStateView},
46 manager::ChainManager,
47 outbox::OutboxStateView,
48 pending_blobs::PendingBlobsView,
49 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61 use std::sync::LazyLock;
62
63 use linera_base::prometheus_util::{
64 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65 register_int_counter_vec,
66 };
67 use linera_execution::ResourceTracker;
68 use prometheus::{HistogramVec, IntCounterVec};
69
70 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72 });
73
74 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75 register_histogram_vec(
76 "block_execution_latency",
77 "Block execution latency",
78 &[],
79 exponential_bucket_latencies(1000.0),
80 )
81 });
82
83 #[cfg(with_metrics)]
84 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85 register_histogram_vec(
86 "message_execution_latency",
87 "Message execution latency",
88 &[],
89 exponential_bucket_latencies(50.0),
90 )
91 });
92
93 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "operation_execution_latency",
96 "Operation execution latency",
97 &[],
98 exponential_bucket_latencies(50.0),
99 )
100 });
101
102 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "wasm_fuel_used_per_block",
105 "Wasm fuel used per block",
106 &[],
107 exponential_bucket_interval(10.0, 1_000_000.0),
108 )
109 });
110
111 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "evm_fuel_used_per_block",
114 "EVM fuel used per block",
115 &[],
116 exponential_bucket_interval(10.0, 1_000_000.0),
117 )
118 });
119
120 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "vm_num_reads_per_block",
123 "VM number of reads per block",
124 &[],
125 exponential_bucket_interval(0.1, 100.0),
126 )
127 });
128
129 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "vm_bytes_read_per_block",
132 "VM number of bytes read per block",
133 &[],
134 exponential_bucket_interval(0.1, 10_000_000.0),
135 )
136 });
137
138 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139 register_histogram_vec(
140 "vm_bytes_written_per_block",
141 "VM number of bytes written per block",
142 &[],
143 exponential_bucket_interval(0.1, 10_000_000.0),
144 )
145 });
146
147 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148 register_histogram_vec(
149 "state_hash_computation_latency",
150 "Time to recompute the state hash",
151 &[],
152 exponential_bucket_latencies(500.0),
153 )
154 });
155
156 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157 register_histogram_vec(
158 "num_inboxes",
159 "Number of inboxes",
160 &[],
161 exponential_bucket_interval(1.0, 10_000.0),
162 )
163 });
164
165 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166 register_histogram_vec(
167 "num_outboxes",
168 "Number of outboxes",
169 &[],
170 exponential_bucket_interval(1.0, 10_000.0),
171 )
172 });
173
174 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177 WASM_FUEL_USED_PER_BLOCK
178 .with_label_values(&[])
179 .observe(tracker.wasm_fuel as f64);
180 EVM_FUEL_USED_PER_BLOCK
181 .with_label_values(&[])
182 .observe(tracker.evm_fuel as f64);
183 VM_NUM_READS_PER_BLOCK
184 .with_label_values(&[])
185 .observe(tracker.read_operations as f64);
186 VM_BYTES_READ_PER_BLOCK
187 .with_label_values(&[])
188 .observe(tracker.bytes_read as f64);
189 VM_BYTES_WRITTEN_PER_BLOCK
190 .with_label_values(&[])
191 .observe(tracker.bytes_written as f64);
192 }
193}
194
195pub(crate) const EMPTY_BLOCK_SIZE: usize = 95;
197
198#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct TimestampedBundleInInbox {
202 pub entry: BundleInInbox,
204 pub seen: Timestamp,
206}
207
208#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
211pub struct BundleInInbox {
212 pub origin: ChainId,
214 pub cursor: Cursor,
216}
217
218impl BundleInInbox {
219 fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
220 BundleInInbox {
221 cursor: Cursor::from(bundle),
222 origin,
223 }
224 }
225}
226
227const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
230
231#[cfg_attr(
233 with_graphql,
234 derive(async_graphql::SimpleObject),
235 graphql(cache_control(no_cache))
236)]
237#[derive(Debug, RootView, ClonableView)]
238pub struct ChainStateView<C>
239where
240 C: Clone + Context + Send + Sync + 'static,
241{
242 pub execution_state: ExecutionStateView<C>,
244 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
246
247 pub tip_state: RegisterView<C, ChainTipState>,
249
250 pub manager: ChainManager<C>,
252 pub pending_validated_blobs: PendingBlobsView<C>,
255 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
257
258 pub confirmed_log: LogView<C, CryptoHash>,
261 pub received_log: LogView<C, ChainAndHeight>,
263 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
265
266 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
268 pub unskippable_bundles:
270 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
271 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
273 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
275 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
277 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
279 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
282 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
284
285 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
287}
288
289#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
291#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
292pub struct ChainTipState {
293 pub block_hash: Option<CryptoHash>,
295 pub next_block_height: BlockHeight,
297 pub num_incoming_bundles: u32,
299 pub num_operations: u32,
301 pub num_outgoing_messages: u32,
303}
304
305impl ChainTipState {
306 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
309 ensure!(
310 new_block.height == self.next_block_height,
311 ChainError::UnexpectedBlockHeight {
312 expected_block_height: self.next_block_height,
313 found_block_height: new_block.height
314 }
315 );
316 ensure!(
317 new_block.previous_block_hash == self.block_hash,
318 ChainError::UnexpectedPreviousBlockHash
319 );
320 Ok(())
321 }
322
323 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
326 ensure!(
327 self.next_block_height >= height,
328 ChainError::MissingEarlierBlocks {
329 current_block_height: self.next_block_height,
330 }
331 );
332 Ok(self.next_block_height > height)
333 }
334
335 pub fn update_counters(
337 &mut self,
338 incoming_bundles: &[IncomingBundle],
339 operations: &[Operation],
340 messages: &[Vec<OutgoingMessage>],
341 ) -> Result<(), ChainError> {
342 let num_incoming_bundles =
343 u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
344 self.num_incoming_bundles = self
345 .num_incoming_bundles
346 .checked_add(num_incoming_bundles)
347 .ok_or(ArithmeticError::Overflow)?;
348
349 let num_operations =
350 u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
351 self.num_operations = self
352 .num_operations
353 .checked_add(num_operations)
354 .ok_or(ArithmeticError::Overflow)?;
355
356 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
357 .map_err(|_| ArithmeticError::Overflow)?;
358 self.num_outgoing_messages = self
359 .num_outgoing_messages
360 .checked_add(num_outgoing_messages)
361 .ok_or(ArithmeticError::Overflow)?;
362
363 Ok(())
364 }
365}
366
367impl<C> ChainStateView<C>
368where
369 C: Context + Clone + Send + Sync + 'static,
370 C::Extra: ExecutionRuntimeContext,
371{
372 pub fn chain_id(&self) -> ChainId {
374 self.context().extra().chain_id()
375 }
376
377 pub async fn query_application(
378 &mut self,
379 local_time: Timestamp,
380 query: Query,
381 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
382 ) -> Result<QueryOutcome, ChainError> {
383 let context = QueryContext {
384 chain_id: self.chain_id(),
385 next_block_height: self.tip_state.get().next_block_height,
386 local_time,
387 };
388 self.execution_state
389 .query_application(context, query, service_runtime_endpoint)
390 .await
391 .with_execution_context(ChainExecutionContext::Query)
392 }
393
394 pub async fn describe_application(
395 &mut self,
396 application_id: ApplicationId,
397 ) -> Result<ApplicationDescription, ChainError> {
398 self.execution_state
399 .system
400 .describe_application(application_id, &mut TransactionTracker::default())
401 .await
402 .with_execution_context(ChainExecutionContext::DescribeApplication)
403 }
404
405 pub async fn mark_messages_as_received(
406 &mut self,
407 target: &ChainId,
408 height: BlockHeight,
409 ) -> Result<bool, ChainError> {
410 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
411 let updates = outbox.mark_messages_as_received(height).await?;
412 if updates.is_empty() {
413 return Ok(false);
414 }
415 for update in updates {
416 let counter = self
417 .outbox_counters
418 .get_mut()
419 .get_mut(&update)
420 .ok_or_else(|| {
421 ChainError::InternalError("message counter should be present".into())
422 })?;
423 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
424 if *counter == 0 {
425 self.outbox_counters.get_mut().remove(&update);
427 }
428 }
429 if outbox.queue.count() == 0 {
430 self.nonempty_outboxes.get_mut().remove(target);
431 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
433 self.outboxes.remove_entry(target)?;
434 }
435 }
436 #[cfg(with_metrics)]
437 metrics::NUM_OUTBOXES
438 .with_label_values(&[])
439 .observe(self.outboxes.count().await? as f64);
440 Ok(true)
441 }
442
443 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
446 tracing::debug!(
447 "Messages left in {:.8}'s outbox: {:?}",
448 self.chain_id(),
449 self.outbox_counters.get()
450 );
451 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
452 key > &height
453 } else {
454 true
455 }
456 }
457
458 pub fn is_active(&self) -> bool {
460 self.execution_state.system.is_active()
461 }
462
463 pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
465 if self
467 .execution_state
468 .system
469 .initialize_chain(self.chain_id())
470 .await
471 .with_execution_context(ChainExecutionContext::Block)?
472 {
473 return Ok(());
475 }
476 let hash = self.execution_state.crypto_hash().await?;
478 self.execution_state_hash.set(Some(hash));
479 let maybe_committee = self.execution_state.system.current_committee().into_iter();
480 self.manager.reset(
482 self.execution_state.system.ownership.get().clone(),
483 BlockHeight(0),
484 local_time,
485 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
486 )?;
487 self.save().await?;
488 Ok(())
489 }
490
491 pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
494 let chain_id = self.chain_id();
495 let pairs = self.inboxes.try_load_all_entries().await?;
496 let max_stream_queries = self.context().store().max_stream_queries();
497 let stream = stream::iter(pairs)
498 .map(|(origin, inbox)| async move {
499 if let Some(bundle) = inbox.removed_bundles.front().await? {
500 return Err(ChainError::MissingCrossChainUpdate {
501 chain_id,
502 origin,
503 height: bundle.height,
504 });
505 }
506 Ok::<(), ChainError>(())
507 })
508 .buffer_unordered(max_stream_queries);
509 stream.try_collect::<Vec<_>>().await?;
510 Ok(())
511 }
512
513 pub async fn next_block_height_to_receive(
514 &self,
515 origin: &ChainId,
516 ) -> Result<BlockHeight, ChainError> {
517 let inbox = self.inboxes.try_load_entry(origin).await?;
518 match inbox {
519 Some(inbox) => inbox.next_block_height_to_receive(),
520 None => Ok(BlockHeight::ZERO),
521 }
522 }
523
524 pub async fn last_anticipated_block_height(
525 &self,
526 origin: &ChainId,
527 ) -> Result<Option<BlockHeight>, ChainError> {
528 let inbox = self.inboxes.try_load_entry(origin).await?;
529 match inbox {
530 Some(inbox) => match inbox.removed_bundles.back().await? {
531 Some(bundle) => Ok(Some(bundle.height)),
532 None => Ok(None),
533 },
534 None => Ok(None),
535 }
536 }
537
538 pub async fn receive_message_bundle(
545 &mut self,
546 origin: &ChainId,
547 bundle: MessageBundle,
548 local_time: Timestamp,
549 add_to_received_log: bool,
550 ) -> Result<(), ChainError> {
551 assert!(!bundle.messages.is_empty());
552 let chain_id = self.chain_id();
553 tracing::trace!(
554 "Processing new messages to {chain_id:.8} from {origin} at height {}",
555 bundle.height,
556 );
557 let chain_and_height = ChainAndHeight {
558 chain_id: *origin,
559 height: bundle.height,
560 };
561
562 match self.ensure_is_active(local_time).await {
563 Ok(_) => (),
564 Err(ChainError::ExecutionError(exec_err, _))
567 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
568 if blobs.iter().all(|blob_id| {
569 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
570 })) => {}
571 err => {
572 return err;
573 }
574 }
575
576 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
578 #[cfg(with_metrics)]
579 metrics::NUM_INBOXES
580 .with_label_values(&[])
581 .observe(self.inboxes.count().await? as f64);
582 let entry = BundleInInbox::new(*origin, &bundle);
583 let skippable = bundle.is_skippable();
584 let newly_added = inbox
585 .add_bundle(bundle)
586 .await
587 .map_err(|error| match error {
588 InboxError::ViewError(error) => ChainError::ViewError(error),
589 error => ChainError::InternalError(format!(
590 "while processing messages in certified block: {error}"
591 )),
592 })?;
593 if newly_added && !skippable {
594 let seen = local_time;
595 self.unskippable_bundles
596 .push_back(TimestampedBundleInInbox { entry, seen });
597 }
598
599 if add_to_received_log {
601 self.received_log.push(chain_and_height);
602 }
603 Ok(())
604 }
605
606 pub fn update_received_certificate_trackers(
608 &mut self,
609 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
610 ) {
611 for (name, tracker) in new_trackers {
612 self.received_certificate_trackers
613 .get_mut()
614 .entry(name)
615 .and_modify(|t| {
616 if tracker > *t {
619 *t = tracker;
620 }
621 })
622 .or_insert(tracker);
623 }
624 }
625
626 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
627 self.execution_state
628 .system
629 .current_committee()
630 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
631 }
632
633 pub fn ownership(&self) -> &ChainOwnership {
634 self.execution_state.system.ownership.get()
635 }
636
637 pub async fn remove_bundles_from_inboxes(
639 &mut self,
640 timestamp: Timestamp,
641 incoming_bundles: &[IncomingBundle],
642 ) -> Result<(), ChainError> {
643 let chain_id = self.chain_id();
644 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
645 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
646 ensure!(
647 bundle.timestamp <= timestamp,
648 ChainError::IncorrectBundleTimestamp {
649 chain_id,
650 bundle_timestamp: bundle.timestamp,
651 block_timestamp: timestamp,
652 }
653 );
654 let bundles = bundles_by_origin.entry(origin).or_default();
655 bundles.push(bundle);
656 }
657 let origins = bundles_by_origin.keys().copied();
658 let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
659 let mut removed_unskippable = HashSet::new();
660 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
661 tracing::trace!(
662 "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
663 bundles
664 .iter()
665 .map(|bundle| bundle.height)
666 .collect::<Vec<_>>()
667 );
668 for bundle in bundles {
669 let was_present = inbox
671 .remove_bundle(bundle)
672 .await
673 .map_err(|error| (chain_id, *origin, error))?;
674 if was_present && !bundle.is_skippable() {
675 removed_unskippable.insert(BundleInInbox::new(*origin, bundle));
676 }
677 }
678 }
679 if !removed_unskippable.is_empty() {
680 let maybe_front = self.unskippable_bundles.front();
682 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
683 self.unskippable_bundles.delete_front().await?;
684 while let Some(ts_entry) = self.unskippable_bundles.front() {
685 if !removed_unskippable.remove(&ts_entry.entry) {
686 if !self
687 .removed_unskippable_bundles
688 .contains(&ts_entry.entry)
689 .await?
690 {
691 break;
692 }
693 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
694 }
695 self.unskippable_bundles.delete_front().await?;
696 }
697 }
698 for entry in removed_unskippable {
699 self.removed_unskippable_bundles.insert(&entry)?;
700 }
701 }
702 #[cfg(with_metrics)]
703 metrics::NUM_INBOXES
704 .with_label_values(&[])
705 .observe(self.inboxes.count().await? as f64);
706 Ok(())
707 }
708
709 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
711 self.nonempty_outboxes.get().iter().copied().collect()
712 }
713
714 pub async fn load_outboxes(
716 &self,
717 targets: &[ChainId],
718 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
719 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
720 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
721 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
722 }
723
724 #[expect(clippy::too_many_arguments)]
727 async fn execute_block_inner(
728 chain: &mut ExecutionStateView<C>,
729 confirmed_log: &LogView<C, CryptoHash>,
730 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
731 previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
732 block: &ProposedBlock,
733 local_time: Timestamp,
734 round: Option<u32>,
735 published_blobs: &[Blob],
736 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
737 ) -> Result<BlockExecutionOutcome, ChainError> {
738 #[cfg(with_metrics)]
739 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
740 chain.system.timestamp.set(block.timestamp);
741
742 let policy = chain
743 .system
744 .current_committee()
745 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
746 .1
747 .policy()
748 .clone();
749
750 let mut resource_controller = ResourceController::new(
751 Arc::new(policy),
752 ResourceTracker::default(),
753 block.authenticated_signer,
754 );
755
756 for blob in published_blobs {
757 let blob_id = blob.id();
758 resource_controller
759 .policy()
760 .check_blob_size(blob.content())
761 .with_execution_context(ChainExecutionContext::Block)?;
762 chain.system.used_blobs.insert(&blob_id)?;
763 }
764
765 let mut block_execution_tracker = BlockExecutionTracker::new(
768 &mut resource_controller,
769 published_blobs
770 .iter()
771 .map(|blob| (blob.id(), blob))
772 .collect(),
773 local_time,
774 replaying_oracle_responses,
775 block,
776 )?;
777
778 for transaction in block.transactions() {
779 block_execution_tracker
780 .execute_transaction(transaction, round, chain)
781 .await?;
782 }
783
784 let recipients = block_execution_tracker.recipients();
785 let mut previous_message_blocks = BTreeMap::new();
786 for recipient in recipients {
787 if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
788 let hash = confirmed_log
789 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
790 .await?
791 .ok_or_else(|| {
792 ChainError::InternalError("missing entry in confirmed_log".into())
793 })?;
794 previous_message_blocks.insert(recipient, (hash, height));
795 }
796 }
797
798 let streams = block_execution_tracker.event_streams();
799 let mut previous_event_blocks = BTreeMap::new();
800 for stream in streams {
801 if let Some(height) = previous_event_blocks_view.get(&stream).await? {
802 let hash = confirmed_log
803 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
804 .await?
805 .ok_or_else(|| {
806 ChainError::InternalError("missing entry in confirmed_log".into())
807 })?;
808 previous_event_blocks.insert(stream, (hash, height));
809 }
810 }
811
812 let state_hash = {
813 #[cfg(with_metrics)]
814 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
815 chain.crypto_hash().await?
816 };
817
818 let (messages, oracle_responses, events, blobs, operation_results) =
819 block_execution_tracker.finalize();
820
821 Ok(BlockExecutionOutcome {
822 messages,
823 previous_message_blocks,
824 previous_event_blocks,
825 state_hash,
826 oracle_responses,
827 events,
828 blobs,
829 operation_results,
830 })
831 }
832
833 pub async fn execute_block(
836 &mut self,
837 block: &ProposedBlock,
838 local_time: Timestamp,
839 round: Option<u32>,
840 published_blobs: &[Blob],
841 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
842 ) -> Result<BlockExecutionOutcome, ChainError> {
843 assert_eq!(
844 block.chain_id,
845 self.execution_state.context().extra().chain_id()
846 );
847
848 self.ensure_is_active(local_time).await?;
849
850 let chain_timestamp = *self.execution_state.system.timestamp.get();
851 ensure!(
852 chain_timestamp <= block.timestamp,
853 ChainError::InvalidBlockTimestamp {
854 parent: chain_timestamp,
855 new: block.timestamp
856 }
857 );
858 ensure!(
859 !block.incoming_bundles.is_empty() || !block.operations.is_empty(),
860 ChainError::EmptyBlock
861 );
862
863 ensure!(
864 block.published_blob_ids()
865 == published_blobs
866 .iter()
867 .map(|blob| blob.id())
868 .collect::<BTreeSet<_>>(),
869 ChainError::InternalError("published_blobs mismatch".to_string())
870 );
871
872 if *self.execution_state.system.closed.get() {
873 ensure!(
874 !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
875 ChainError::ClosedChain
876 );
877 }
878
879 Self::check_app_permissions(
880 self.execution_state.system.application_permissions.get(),
881 block,
882 )?;
883
884 Self::execute_block_inner(
885 &mut self.execution_state,
886 &self.confirmed_log,
887 &self.previous_message_blocks,
888 &self.previous_event_blocks,
889 block,
890 local_time,
891 round,
892 published_blobs,
893 replaying_oracle_responses,
894 )
895 .await
896 }
897
898 pub async fn apply_confirmed_block(
901 &mut self,
902 block: &ConfirmedBlock,
903 local_time: Timestamp,
904 ) -> Result<(), ChainError> {
905 let hash = block.inner().hash();
906 let block = block.inner().inner();
907 self.execution_state_hash.set(Some(block.header.state_hash));
908 let recipients = self.process_outgoing_messages(block).await?;
909
910 for recipient in recipients {
911 self.previous_message_blocks
912 .insert(&recipient, block.header.height)?;
913 }
914 for event in block.body.events.iter().flatten() {
915 self.previous_event_blocks
916 .insert(&event.stream_id, block.header.height)?;
917 }
918 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
920
921 let tip = self.tip_state.get_mut();
923 tip.block_hash = Some(hash);
924 tip.next_block_height.try_add_assign_one()?;
925 tip.update_counters(
926 &block.body.incoming_bundles,
927 &block.body.operations,
928 &block.body.messages,
929 )?;
930 self.confirmed_log.push(hash);
931 self.preprocessed_blocks.remove(&block.header.height)?;
932 Ok(())
933 }
934
935 pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
937 let hash = block.inner().hash();
938 let block = block.inner().inner();
939 let height = block.header.height;
940 if height < self.tip_state.get().next_block_height {
941 return Ok(());
942 }
943 self.process_outgoing_messages(block).await?;
944 self.preprocessed_blocks.insert(&height, hash)?;
945 Ok(())
946 }
947
948 pub fn is_child(&self) -> bool {
950 let Some(description) = self.execution_state.system.description.get() else {
951 return true;
953 };
954 description.is_child()
955 }
956
957 fn check_app_permissions(
959 app_permissions: &ApplicationPermissions,
960 block: &ProposedBlock,
961 ) -> Result<(), ChainError> {
962 let mut mandatory = HashSet::<ApplicationId>::from_iter(
963 app_permissions.mandatory_applications.iter().cloned(),
964 );
965 for operation in &block.operations {
966 if operation.is_exempt_from_permissions() {
967 mandatory.clear();
968 continue;
969 }
970 ensure!(
971 app_permissions.can_execute_operations(&operation.application_id()),
972 ChainError::AuthorizedApplications(
973 app_permissions.execute_operations.clone().unwrap()
974 )
975 );
976 if let Operation::User { application_id, .. } = operation {
977 mandatory.remove(application_id);
978 }
979 }
980 for pending in block.incoming_messages() {
981 if mandatory.is_empty() {
982 break;
983 }
984 if let Message::User { application_id, .. } = &pending.message {
985 mandatory.remove(application_id);
986 }
987 }
988 ensure!(
989 mandatory.is_empty(),
990 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
991 );
992 Ok(())
993 }
994
995 pub async fn block_hashes(
997 &self,
998 range: impl RangeBounds<BlockHeight>,
999 ) -> Result<Vec<CryptoHash>, ChainError> {
1000 let next_height = self.tip_state.get().next_block_height;
1001 let Some((start, end)) = range.to_inclusive() else {
1003 return Ok(Vec::new());
1004 };
1005 let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1007 let usize_start = usize::try_from(start)?;
1008 let usize_end = usize::try_from(end.min(last_height))?;
1009 self.confirmed_log.read(usize_start..=usize_end).await?
1010 } else {
1011 Vec::new()
1012 };
1013 for height in start.max(next_height).0..=end.0 {
1015 if let Some(hash) = self.preprocessed_blocks.get(&BlockHeight(height)).await? {
1016 hashes.push(hash);
1017 }
1018 }
1019 Ok(hashes)
1020 }
1021
1022 fn reset_chain_manager(
1024 &mut self,
1025 next_height: BlockHeight,
1026 local_time: Timestamp,
1027 ) -> Result<(), ChainError> {
1028 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1029 let ownership = self.execution_state.system.ownership.get().clone();
1030 let fallback_owners =
1031 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1032 self.pending_validated_blobs.clear();
1033 self.pending_proposed_blobs.clear();
1034 self.manager
1035 .reset(ownership, next_height, local_time, fallback_owners)
1036 }
1037
1038 async fn process_outgoing_messages(
1042 &mut self,
1043 block: &Block,
1044 ) -> Result<Vec<ChainId>, ChainError> {
1045 let recipients = block.recipients();
1048 let block_height = block.header.height;
1049 let next_height = self.tip_state.get().next_block_height;
1050
1051 let outbox_counters = self.outbox_counters.get_mut();
1053 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1054 let targets = recipients.into_iter().collect::<Vec<_>>();
1055 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1056 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1057 if block_height > next_height {
1058 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1061 {
1062 Some(height) if height < next_height => {
1065 let index =
1066 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1067 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1068 ChainError::InternalError("missing entry in confirmed_log".into())
1069 })?)
1070 }
1071 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1074 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1075 )?),
1076 None => None, };
1078 match (
1080 maybe_prev_hash,
1081 block.body.previous_message_blocks.get(target),
1082 ) {
1083 (None, None) => {
1084 }
1087 (Some(_), None) => {
1088 return Err(ChainError::InternalError(
1091 "block indicates no previous message block,\
1092 but we have one in the outbox"
1093 .into(),
1094 ));
1095 }
1096 (None, Some((_, prev_msg_block_height))) => {
1097 if *prev_msg_block_height >= next_height {
1102 continue;
1103 }
1104 }
1105 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1106 if prev_hash != prev_msg_block_hash {
1108 continue;
1109 }
1110 }
1111 }
1112 }
1113 if outbox.schedule_message(block_height)? {
1114 *outbox_counters.entry(block_height).or_default() += 1;
1115 nonempty_outboxes.insert(*target);
1116 }
1117 }
1118
1119 #[cfg(with_metrics)]
1120 metrics::NUM_OUTBOXES
1121 .with_label_values(&[])
1122 .observe(self.outboxes.count().await? as f64);
1123 Ok(targets)
1124 }
1125}
1126
1127#[test]
1128fn empty_block_size() {
1129 let size = bcs::serialized_size(&crate::block::Block::new(
1130 crate::test::make_first_block(
1131 linera_execution::test_utils::dummy_chain_description(0).id(),
1132 ),
1133 crate::data_types::BlockExecutionOutcome::default(),
1134 ))
1135 .unwrap();
1136 assert_eq!(size, EMPTY_BLOCK_SIZE);
1137}