1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 ops::RangeBounds,
7 sync::Arc,
8};
9
10use futures::stream::{self, StreamExt, TryStreamExt};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{
14 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
15 BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
16 },
17 ensure,
18 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
19 ownership::ChainOwnership,
20};
21use linera_execution::{
22 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25};
26use linera_views::{
27 bucket_queue_view::BucketQueueView,
28 context::Context,
29 log_view::LogView,
30 map_view::MapView,
31 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
32 register_view::RegisterView,
33 set_view::SetView,
34 store::ReadableKeyValueStore as _,
35 views::{ClonableView, CryptoHashView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38
39use crate::{
40 block::{Block, ConfirmedBlock},
41 block_tracker::BlockExecutionTracker,
42 data_types::{
43 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
44 Transaction,
45 },
46 inbox::{Cursor, InboxError, InboxStateView},
47 manager::ChainManager,
48 outbox::OutboxStateView,
49 pending_blobs::PendingBlobsView,
50 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
51};
52
53#[cfg(test)]
54#[path = "unit_tests/chain_tests.rs"]
55mod chain_tests;
56
57#[cfg(with_metrics)]
58use linera_base::prometheus_util::MeasureLatency;
59
60#[cfg(with_metrics)]
61pub(crate) mod metrics {
62 use std::sync::LazyLock;
63
64 use linera_base::prometheus_util::{
65 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66 register_int_counter_vec,
67 };
68 use linera_execution::ResourceTracker;
69 use prometheus::{HistogramVec, IntCounterVec};
70
71 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73 });
74
75 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76 register_histogram_vec(
77 "block_execution_latency",
78 "Block execution latency",
79 &[],
80 exponential_bucket_latencies(1000.0),
81 )
82 });
83
84 #[cfg(with_metrics)]
85 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86 register_histogram_vec(
87 "message_execution_latency",
88 "Message execution latency",
89 &[],
90 exponential_bucket_latencies(50.0),
91 )
92 });
93
94 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95 register_histogram_vec(
96 "operation_execution_latency",
97 "Operation execution latency",
98 &[],
99 exponential_bucket_latencies(50.0),
100 )
101 });
102
103 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104 register_histogram_vec(
105 "wasm_fuel_used_per_block",
106 "Wasm fuel used per block",
107 &[],
108 exponential_bucket_interval(10.0, 1_000_000.0),
109 )
110 });
111
112 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "evm_fuel_used_per_block",
115 "EVM fuel used per block",
116 &[],
117 exponential_bucket_interval(10.0, 1_000_000.0),
118 )
119 });
120
121 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122 register_histogram_vec(
123 "vm_num_reads_per_block",
124 "VM number of reads per block",
125 &[],
126 exponential_bucket_interval(0.1, 100.0),
127 )
128 });
129
130 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131 register_histogram_vec(
132 "vm_bytes_read_per_block",
133 "VM number of bytes read per block",
134 &[],
135 exponential_bucket_interval(0.1, 10_000_000.0),
136 )
137 });
138
139 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140 register_histogram_vec(
141 "vm_bytes_written_per_block",
142 "VM number of bytes written per block",
143 &[],
144 exponential_bucket_interval(0.1, 10_000_000.0),
145 )
146 });
147
148 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149 register_histogram_vec(
150 "state_hash_computation_latency",
151 "Time to recompute the state hash",
152 &[],
153 exponential_bucket_latencies(500.0),
154 )
155 });
156
157 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158 register_histogram_vec(
159 "num_inboxes",
160 "Number of inboxes",
161 &[],
162 exponential_bucket_interval(1.0, 10_000.0),
163 )
164 });
165
166 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
167 register_histogram_vec(
168 "num_outboxes",
169 "Number of outboxes",
170 &[],
171 exponential_bucket_interval(1.0, 10_000.0),
172 )
173 });
174
175 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
177 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
178 WASM_FUEL_USED_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.wasm_fuel as f64);
181 EVM_FUEL_USED_PER_BLOCK
182 .with_label_values(&[])
183 .observe(tracker.evm_fuel as f64);
184 VM_NUM_READS_PER_BLOCK
185 .with_label_values(&[])
186 .observe(tracker.read_operations as f64);
187 VM_BYTES_READ_PER_BLOCK
188 .with_label_values(&[])
189 .observe(tracker.bytes_read as f64);
190 VM_BYTES_WRITTEN_PER_BLOCK
191 .with_label_values(&[])
192 .observe(tracker.bytes_written as f64);
193 }
194}
195
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
198
199#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TimestampedBundleInInbox {
203 pub entry: BundleInInbox,
205 pub seen: Timestamp,
207}
208
209#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
211#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
212pub struct BundleInInbox {
213 pub origin: ChainId,
215 pub cursor: Cursor,
217}
218
219impl BundleInInbox {
220 fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
221 BundleInInbox {
222 cursor: Cursor::from(bundle),
223 origin,
224 }
225 }
226}
227
228const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
231
232#[cfg_attr(
234 with_graphql,
235 derive(async_graphql::SimpleObject),
236 graphql(cache_control(no_cache))
237)]
238#[derive(Debug, RootView, ClonableView)]
239pub struct ChainStateView<C>
240where
241 C: Clone + Context + Send + Sync + 'static,
242{
243 pub execution_state: ExecutionStateView<C>,
245 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
247
248 pub tip_state: RegisterView<C, ChainTipState>,
250
251 pub manager: ChainManager<C>,
253 pub pending_validated_blobs: PendingBlobsView<C>,
256 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
258
259 pub confirmed_log: LogView<C, CryptoHash>,
262 pub received_log: LogView<C, ChainAndHeight>,
264 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
266
267 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
269 pub unskippable_bundles:
271 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
272 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
274 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
276 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
278 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
280 pub next_expected_events: MapView<C, StreamId, u32>,
283 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
286 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
288
289 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
291}
292
293#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
295#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
296pub struct ChainTipState {
297 pub block_hash: Option<CryptoHash>,
299 pub next_block_height: BlockHeight,
301 pub num_incoming_bundles: u32,
303 pub num_operations: u32,
305 pub num_outgoing_messages: u32,
307}
308
309impl ChainTipState {
310 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
313 ensure!(
314 new_block.height == self.next_block_height,
315 ChainError::UnexpectedBlockHeight {
316 expected_block_height: self.next_block_height,
317 found_block_height: new_block.height
318 }
319 );
320 ensure!(
321 new_block.previous_block_hash == self.block_hash,
322 ChainError::UnexpectedPreviousBlockHash
323 );
324 Ok(())
325 }
326
327 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
330 ensure!(
331 self.next_block_height >= height,
332 ChainError::MissingEarlierBlocks {
333 current_block_height: self.next_block_height,
334 }
335 );
336 Ok(self.next_block_height > height)
337 }
338
339 pub fn update_counters(
341 &mut self,
342 transactions: &[Transaction],
343 messages: &[Vec<OutgoingMessage>],
344 ) -> Result<(), ChainError> {
345 let mut num_incoming_bundles = 0u32;
346 let mut num_operations = 0u32;
347
348 for transaction in transactions {
349 match transaction {
350 Transaction::ReceiveMessages(_) => {
351 num_incoming_bundles = num_incoming_bundles
352 .checked_add(1)
353 .ok_or(ArithmeticError::Overflow)?;
354 }
355 Transaction::ExecuteOperation(_) => {
356 num_operations = num_operations
357 .checked_add(1)
358 .ok_or(ArithmeticError::Overflow)?;
359 }
360 }
361 }
362
363 self.num_incoming_bundles = self
364 .num_incoming_bundles
365 .checked_add(num_incoming_bundles)
366 .ok_or(ArithmeticError::Overflow)?;
367
368 self.num_operations = self
369 .num_operations
370 .checked_add(num_operations)
371 .ok_or(ArithmeticError::Overflow)?;
372
373 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
374 .map_err(|_| ArithmeticError::Overflow)?;
375 self.num_outgoing_messages = self
376 .num_outgoing_messages
377 .checked_add(num_outgoing_messages)
378 .ok_or(ArithmeticError::Overflow)?;
379
380 Ok(())
381 }
382}
383
384impl<C> ChainStateView<C>
385where
386 C: Context + Clone + Send + Sync + 'static,
387 C::Extra: ExecutionRuntimeContext,
388{
389 pub fn chain_id(&self) -> ChainId {
391 self.context().extra().chain_id()
392 }
393
394 pub async fn query_application(
395 &mut self,
396 local_time: Timestamp,
397 query: Query,
398 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
399 ) -> Result<QueryOutcome, ChainError> {
400 let context = QueryContext {
401 chain_id: self.chain_id(),
402 next_block_height: self.tip_state.get().next_block_height,
403 local_time,
404 };
405 self.execution_state
406 .query_application(context, query, service_runtime_endpoint)
407 .await
408 .with_execution_context(ChainExecutionContext::Query)
409 }
410
411 pub async fn describe_application(
412 &mut self,
413 application_id: ApplicationId,
414 ) -> Result<ApplicationDescription, ChainError> {
415 self.execution_state
416 .system
417 .describe_application(application_id, &mut TransactionTracker::default())
418 .await
419 .with_execution_context(ChainExecutionContext::DescribeApplication)
420 }
421
422 pub async fn mark_messages_as_received(
423 &mut self,
424 target: &ChainId,
425 height: BlockHeight,
426 ) -> Result<bool, ChainError> {
427 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
428 let updates = outbox.mark_messages_as_received(height).await?;
429 if updates.is_empty() {
430 return Ok(false);
431 }
432 for update in updates {
433 let counter = self
434 .outbox_counters
435 .get_mut()
436 .get_mut(&update)
437 .ok_or_else(|| {
438 ChainError::InternalError("message counter should be present".into())
439 })?;
440 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
441 if *counter == 0 {
442 self.outbox_counters.get_mut().remove(&update);
444 }
445 }
446 if outbox.queue.count() == 0 {
447 self.nonempty_outboxes.get_mut().remove(target);
448 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
450 self.outboxes.remove_entry(target)?;
451 }
452 }
453 #[cfg(with_metrics)]
454 metrics::NUM_OUTBOXES
455 .with_label_values(&[])
456 .observe(self.outboxes.count().await? as f64);
457 Ok(true)
458 }
459
460 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
463 tracing::debug!(
464 "Messages left in {:.8}'s outbox: {:?}",
465 self.chain_id(),
466 self.outbox_counters.get()
467 );
468 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
469 key > &height
470 } else {
471 true
472 }
473 }
474
475 pub fn is_active(&self) -> bool {
477 self.execution_state.system.is_active()
478 }
479
480 pub async fn ensure_is_active(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
482 if self
484 .execution_state
485 .system
486 .initialize_chain(self.chain_id())
487 .await
488 .with_execution_context(ChainExecutionContext::Block)?
489 {
490 return Ok(());
492 }
493 let hash = self.execution_state.crypto_hash().await?;
495 self.execution_state_hash.set(Some(hash));
496 let maybe_committee = self.execution_state.system.current_committee().into_iter();
497 self.manager.reset(
499 self.execution_state.system.ownership.get().clone(),
500 BlockHeight(0),
501 local_time,
502 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
503 )?;
504 Ok(())
505 }
506
507 pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
510 let chain_id = self.chain_id();
511 let pairs = self.inboxes.try_load_all_entries().await?;
512 let max_stream_queries = self.context().store().max_stream_queries();
513 let stream = stream::iter(pairs)
514 .map(|(origin, inbox)| async move {
515 if let Some(bundle) = inbox.removed_bundles.front().await? {
516 return Err(ChainError::MissingCrossChainUpdate {
517 chain_id,
518 origin,
519 height: bundle.height,
520 });
521 }
522 Ok::<(), ChainError>(())
523 })
524 .buffer_unordered(max_stream_queries);
525 stream.try_collect::<Vec<_>>().await?;
526 Ok(())
527 }
528
529 pub async fn next_block_height_to_receive(
530 &self,
531 origin: &ChainId,
532 ) -> Result<BlockHeight, ChainError> {
533 let inbox = self.inboxes.try_load_entry(origin).await?;
534 match inbox {
535 Some(inbox) => inbox.next_block_height_to_receive(),
536 None => Ok(BlockHeight::ZERO),
537 }
538 }
539
540 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
544 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
545 return Ok(height.saturating_add(BlockHeight(1)));
546 }
547 Ok(self.tip_state.get().next_block_height)
548 }
549
550 pub async fn last_anticipated_block_height(
551 &self,
552 origin: &ChainId,
553 ) -> Result<Option<BlockHeight>, ChainError> {
554 let inbox = self.inboxes.try_load_entry(origin).await?;
555 match inbox {
556 Some(inbox) => match inbox.removed_bundles.back().await? {
557 Some(bundle) => Ok(Some(bundle.height)),
558 None => Ok(None),
559 },
560 None => Ok(None),
561 }
562 }
563
564 pub async fn receive_message_bundle(
571 &mut self,
572 origin: &ChainId,
573 bundle: MessageBundle,
574 local_time: Timestamp,
575 add_to_received_log: bool,
576 ) -> Result<(), ChainError> {
577 assert!(!bundle.messages.is_empty());
578 let chain_id = self.chain_id();
579 tracing::trace!(
580 "Processing new messages to {chain_id:.8} from {origin} at height {}",
581 bundle.height,
582 );
583 let chain_and_height = ChainAndHeight {
584 chain_id: *origin,
585 height: bundle.height,
586 };
587
588 match self.ensure_is_active(local_time).await {
589 Ok(_) => (),
590 Err(ChainError::ExecutionError(exec_err, _))
593 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
594 if blobs.iter().all(|blob_id| {
595 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
596 })) => {}
597 err => {
598 return err;
599 }
600 }
601
602 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
604 #[cfg(with_metrics)]
605 metrics::NUM_INBOXES
606 .with_label_values(&[])
607 .observe(self.inboxes.count().await? as f64);
608 let entry = BundleInInbox::new(*origin, &bundle);
609 let skippable = bundle.is_skippable();
610 let newly_added = inbox
611 .add_bundle(bundle)
612 .await
613 .map_err(|error| match error {
614 InboxError::ViewError(error) => ChainError::ViewError(error),
615 error => ChainError::InternalError(format!(
616 "while processing messages in certified block: {error}"
617 )),
618 })?;
619 if newly_added && !skippable {
620 let seen = local_time;
621 self.unskippable_bundles
622 .push_back(TimestampedBundleInInbox { entry, seen });
623 }
624
625 if add_to_received_log {
627 self.received_log.push(chain_and_height);
628 }
629 Ok(())
630 }
631
632 pub fn update_received_certificate_trackers(
634 &mut self,
635 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
636 ) {
637 for (name, tracker) in new_trackers {
638 self.received_certificate_trackers
639 .get_mut()
640 .entry(name)
641 .and_modify(|t| {
642 if tracker > *t {
645 *t = tracker;
646 }
647 })
648 .or_insert(tracker);
649 }
650 }
651
652 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
653 self.execution_state
654 .system
655 .current_committee()
656 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
657 }
658
659 pub fn ownership(&self) -> &ChainOwnership {
660 self.execution_state.system.ownership.get()
661 }
662
663 pub async fn remove_bundles_from_inboxes(
665 &mut self,
666 timestamp: Timestamp,
667 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
668 ) -> Result<(), ChainError> {
669 let chain_id = self.chain_id();
670 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
671 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
672 ensure!(
673 bundle.timestamp <= timestamp,
674 ChainError::IncorrectBundleTimestamp {
675 chain_id,
676 bundle_timestamp: bundle.timestamp,
677 block_timestamp: timestamp,
678 }
679 );
680 let bundles = bundles_by_origin.entry(*origin).or_default();
681 bundles.push(bundle);
682 }
683 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
684 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
685 let mut removed_unskippable = HashSet::new();
686 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
687 tracing::trace!(
688 "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
689 bundles
690 .iter()
691 .map(|bundle| bundle.height.to_string())
692 .collect::<Vec<_>>()
693 .join(", ")
694 );
695 for bundle in bundles {
696 let was_present = inbox
698 .remove_bundle(bundle)
699 .await
700 .map_err(|error| (chain_id, origin, error))?;
701 if was_present && !bundle.is_skippable() {
702 removed_unskippable.insert(BundleInInbox::new(origin, bundle));
703 }
704 }
705 }
706 if !removed_unskippable.is_empty() {
707 let maybe_front = self.unskippable_bundles.front();
709 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
710 self.unskippable_bundles.delete_front().await?;
711 while let Some(ts_entry) = self.unskippable_bundles.front() {
712 if !removed_unskippable.remove(&ts_entry.entry) {
713 if !self
714 .removed_unskippable_bundles
715 .contains(&ts_entry.entry)
716 .await?
717 {
718 break;
719 }
720 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
721 }
722 self.unskippable_bundles.delete_front().await?;
723 }
724 }
725 for entry in removed_unskippable {
726 self.removed_unskippable_bundles.insert(&entry)?;
727 }
728 }
729 #[cfg(with_metrics)]
730 metrics::NUM_INBOXES
731 .with_label_values(&[])
732 .observe(self.inboxes.count().await? as f64);
733 Ok(())
734 }
735
736 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
738 self.nonempty_outboxes.get().iter().copied().collect()
739 }
740
741 pub async fn load_outboxes(
743 &self,
744 targets: &[ChainId],
745 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
746 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
747 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
748 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
749 }
750
751 #[expect(clippy::too_many_arguments)]
754 async fn execute_block_inner(
755 chain: &mut ExecutionStateView<C>,
756 confirmed_log: &LogView<C, CryptoHash>,
757 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
758 previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
759 block: &ProposedBlock,
760 local_time: Timestamp,
761 round: Option<u32>,
762 published_blobs: &[Blob],
763 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
764 ) -> Result<BlockExecutionOutcome, ChainError> {
765 #[cfg(with_metrics)]
766 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
767 chain.system.timestamp.set(block.timestamp);
768
769 let policy = chain
770 .system
771 .current_committee()
772 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
773 .1
774 .policy()
775 .clone();
776
777 let mut resource_controller = ResourceController::new(
778 Arc::new(policy),
779 ResourceTracker::default(),
780 block.authenticated_signer,
781 );
782
783 for blob in published_blobs {
784 let blob_id = blob.id();
785 resource_controller
786 .policy()
787 .check_blob_size(blob.content())
788 .with_execution_context(ChainExecutionContext::Block)?;
789 chain.system.used_blobs.insert(&blob_id)?;
790 }
791
792 let mut block_execution_tracker = BlockExecutionTracker::new(
795 &mut resource_controller,
796 published_blobs
797 .iter()
798 .map(|blob| (blob.id(), blob))
799 .collect(),
800 local_time,
801 replaying_oracle_responses,
802 block,
803 )?;
804
805 for transaction in block.transaction_refs() {
806 block_execution_tracker
807 .execute_transaction(transaction, round, chain)
808 .await?;
809 }
810
811 let recipients = block_execution_tracker.recipients();
812 let mut previous_message_blocks = BTreeMap::new();
813 for recipient in recipients {
814 if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
815 let hash = confirmed_log
816 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
817 .await?
818 .ok_or_else(|| {
819 ChainError::InternalError("missing entry in confirmed_log".into())
820 })?;
821 previous_message_blocks.insert(recipient, (hash, height));
822 }
823 }
824
825 let streams = block_execution_tracker.event_streams();
826 let mut previous_event_blocks = BTreeMap::new();
827 for stream in streams {
828 if let Some(height) = previous_event_blocks_view.get(&stream).await? {
829 let hash = confirmed_log
830 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
831 .await?
832 .ok_or_else(|| {
833 ChainError::InternalError("missing entry in confirmed_log".into())
834 })?;
835 previous_event_blocks.insert(stream, (hash, height));
836 }
837 }
838
839 let state_hash = {
840 #[cfg(with_metrics)]
841 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
842 chain.crypto_hash().await?
843 };
844
845 let (messages, oracle_responses, events, blobs, operation_results) =
846 block_execution_tracker.finalize();
847
848 Ok(BlockExecutionOutcome {
849 messages,
850 previous_message_blocks,
851 previous_event_blocks,
852 state_hash,
853 oracle_responses,
854 events,
855 blobs,
856 operation_results,
857 })
858 }
859
860 pub async fn execute_block(
863 &mut self,
864 block: &ProposedBlock,
865 local_time: Timestamp,
866 round: Option<u32>,
867 published_blobs: &[Blob],
868 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
869 ) -> Result<BlockExecutionOutcome, ChainError> {
870 assert_eq!(
871 block.chain_id,
872 self.execution_state.context().extra().chain_id()
873 );
874
875 self.ensure_is_active(local_time).await?;
876
877 let chain_timestamp = *self.execution_state.system.timestamp.get();
878 ensure!(
879 chain_timestamp <= block.timestamp,
880 ChainError::InvalidBlockTimestamp {
881 parent: chain_timestamp,
882 new: block.timestamp
883 }
884 );
885 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
886
887 ensure!(
888 block.published_blob_ids()
889 == published_blobs
890 .iter()
891 .map(|blob| blob.id())
892 .collect::<BTreeSet<_>>(),
893 ChainError::InternalError("published_blobs mismatch".to_string())
894 );
895
896 if *self.execution_state.system.closed.get() {
897 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
898 }
899
900 Self::check_app_permissions(
901 self.execution_state.system.application_permissions.get(),
902 block,
903 )?;
904
905 Self::execute_block_inner(
906 &mut self.execution_state,
907 &self.confirmed_log,
908 &self.previous_message_blocks,
909 &self.previous_event_blocks,
910 block,
911 local_time,
912 round,
913 published_blobs,
914 replaying_oracle_responses,
915 )
916 .await
917 }
918
919 pub async fn apply_confirmed_block(
923 &mut self,
924 block: &ConfirmedBlock,
925 local_time: Timestamp,
926 ) -> Result<BTreeSet<StreamId>, ChainError> {
927 let hash = block.inner().hash();
928 let block = block.inner().inner();
929 self.execution_state_hash.set(Some(block.header.state_hash));
930 let updated_streams = self.process_emitted_events(block).await?;
931 let recipients = self.process_outgoing_messages(block).await?;
932
933 for recipient in recipients {
934 self.previous_message_blocks
935 .insert(&recipient, block.header.height)?;
936 }
937 for event in block.body.events.iter().flatten() {
938 self.previous_event_blocks
939 .insert(&event.stream_id, block.header.height)?;
940 }
941 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
943
944 let tip = self.tip_state.get_mut();
946 tip.block_hash = Some(hash);
947 tip.next_block_height.try_add_assign_one()?;
948 tip.update_counters(&block.body.transactions, &block.body.messages)?;
949 self.confirmed_log.push(hash);
950 self.preprocessed_blocks.remove(&block.header.height)?;
951 Ok(updated_streams)
952 }
953
954 pub async fn preprocess_block(
957 &mut self,
958 block: &ConfirmedBlock,
959 ) -> Result<BTreeSet<StreamId>, ChainError> {
960 let hash = block.inner().hash();
961 let block = block.inner().inner();
962 let height = block.header.height;
963 if height < self.tip_state.get().next_block_height {
964 return Ok(BTreeSet::new());
965 }
966 self.process_outgoing_messages(block).await?;
967 let updated_streams = self.process_emitted_events(block).await?;
968 self.preprocessed_blocks.insert(&height, hash)?;
969 Ok(updated_streams)
970 }
971
972 pub fn is_child(&self) -> bool {
974 let Some(description) = self.execution_state.system.description.get() else {
975 return true;
977 };
978 description.is_child()
979 }
980
981 fn check_app_permissions(
983 app_permissions: &ApplicationPermissions,
984 block: &ProposedBlock,
985 ) -> Result<(), ChainError> {
986 let mut mandatory = HashSet::<ApplicationId>::from_iter(
987 app_permissions.mandatory_applications.iter().copied(),
988 );
989 for transaction in &block.transactions {
990 match transaction {
991 Transaction::ExecuteOperation(operation)
992 if operation.is_exempt_from_permissions() =>
993 {
994 mandatory.clear()
995 }
996 Transaction::ExecuteOperation(operation) => {
997 ensure!(
998 app_permissions.can_execute_operations(&operation.application_id()),
999 ChainError::AuthorizedApplications(
1000 app_permissions.execute_operations.clone().unwrap()
1001 )
1002 );
1003 if let Operation::User { application_id, .. } = operation {
1004 mandatory.remove(application_id);
1005 }
1006 }
1007 Transaction::ReceiveMessages(incoming_bundle) => {
1008 for pending in incoming_bundle.messages() {
1009 if let Message::User { application_id, .. } = &pending.message {
1010 mandatory.remove(application_id);
1011 }
1012 }
1013 }
1014 }
1015 }
1016 ensure!(
1017 mandatory.is_empty(),
1018 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1019 );
1020 Ok(())
1021 }
1022
1023 pub async fn block_hashes(
1025 &self,
1026 range: impl RangeBounds<BlockHeight>,
1027 ) -> Result<Vec<CryptoHash>, ChainError> {
1028 let next_height = self.tip_state.get().next_block_height;
1029 let Some((start, end)) = range.to_inclusive() else {
1031 return Ok(Vec::new());
1032 };
1033 let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1035 let usize_start = usize::try_from(start)?;
1036 let usize_end = usize::try_from(end.min(last_height))?;
1037 self.confirmed_log.read(usize_start..=usize_end).await?
1038 } else {
1039 Vec::new()
1040 };
1041 for height in start.max(next_height).0..=end.0 {
1043 if let Some(hash) = self.preprocessed_blocks.get(&BlockHeight(height)).await? {
1044 hashes.push(hash);
1045 }
1046 }
1047 Ok(hashes)
1048 }
1049
1050 fn reset_chain_manager(
1052 &mut self,
1053 next_height: BlockHeight,
1054 local_time: Timestamp,
1055 ) -> Result<(), ChainError> {
1056 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1057 let ownership = self.execution_state.system.ownership.get().clone();
1058 let fallback_owners =
1059 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1060 self.pending_validated_blobs.clear();
1061 self.pending_proposed_blobs.clear();
1062 self.manager
1063 .reset(ownership, next_height, local_time, fallback_owners)
1064 }
1065
1066 async fn process_outgoing_messages(
1070 &mut self,
1071 block: &Block,
1072 ) -> Result<Vec<ChainId>, ChainError> {
1073 let recipients = block.recipients();
1076 let block_height = block.header.height;
1077 let next_height = self.tip_state.get().next_block_height;
1078
1079 let outbox_counters = self.outbox_counters.get_mut();
1081 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1082 let targets = recipients.into_iter().collect::<Vec<_>>();
1083 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1084 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1085 if block_height > next_height {
1086 if *outbox.next_height_to_schedule.get() > block_height {
1089 continue; }
1091 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1092 {
1093 Some(height) if height < next_height => {
1096 let index =
1097 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1098 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1099 ChainError::InternalError("missing entry in confirmed_log".into())
1100 })?)
1101 }
1102 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1105 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1106 )?),
1107 None => None, };
1109 match (
1111 maybe_prev_hash,
1112 block.body.previous_message_blocks.get(target),
1113 ) {
1114 (None, None) => {
1115 }
1118 (Some(_), None) => {
1119 return Err(ChainError::InternalError(
1122 "block indicates no previous message block,\
1123 but we have one in the outbox"
1124 .into(),
1125 ));
1126 }
1127 (None, Some((_, prev_msg_block_height))) => {
1128 if *prev_msg_block_height >= next_height {
1133 continue;
1134 }
1135 }
1136 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1137 if prev_hash != prev_msg_block_hash {
1139 continue;
1140 }
1141 }
1142 }
1143 }
1144 if outbox.schedule_message(block_height)? {
1145 *outbox_counters.entry(block_height).or_default() += 1;
1146 nonempty_outboxes.insert(*target);
1147 }
1148 }
1149
1150 #[cfg(with_metrics)]
1151 metrics::NUM_OUTBOXES
1152 .with_label_values(&[])
1153 .observe(self.outboxes.count().await? as f64);
1154 Ok(targets)
1155 }
1156
1157 async fn process_emitted_events(
1161 &mut self,
1162 block: &Block,
1163 ) -> Result<BTreeSet<StreamId>, ChainError> {
1164 let mut emitted_streams: BTreeMap<StreamId, BTreeSet<u32>> = BTreeMap::new();
1165 for event in block.body.events.iter().flatten() {
1166 emitted_streams
1167 .entry(event.stream_id.clone())
1168 .or_default()
1169 .insert(event.index);
1170 }
1171
1172 let mut updated_streams = BTreeSet::new();
1173 for (stream_id, indices) in emitted_streams {
1174 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1175 1
1177 } else {
1178 0
1179 };
1180 let mut current_expected_index = self
1181 .next_expected_events
1182 .get(&stream_id)
1183 .await?
1184 .unwrap_or(initial_index);
1185 for index in indices {
1186 if index == current_expected_index {
1187 updated_streams.insert(stream_id.clone());
1188 current_expected_index = index.saturating_add(1);
1189 }
1190 }
1191 if current_expected_index != 0 {
1192 self.next_expected_events
1193 .insert(&stream_id, current_expected_index)?;
1194 }
1195 }
1196 Ok(updated_streams)
1197 }
1198}
1199
1200#[test]
1201fn empty_block_size() {
1202 let size = bcs::serialized_size(&crate::block::Block::new(
1203 crate::test::make_first_block(
1204 linera_execution::test_utils::dummy_chain_description(0).id(),
1205 ),
1206 crate::data_types::BlockExecutionOutcome::default(),
1207 ))
1208 .unwrap();
1209 assert_eq!(size, EMPTY_BLOCK_SIZE);
1210}