1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 ops::RangeBounds,
7 sync::Arc,
8};
9
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
14 BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19};
20use linera_execution::{
21 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
22 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
23 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26 bucket_queue_view::BucketQueueView,
27 context::Context,
28 log_view::LogView,
29 map_view::MapView,
30 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31 register_view::RegisterView,
32 set_view::SetView,
33 views::{ClonableView, CryptoHashView, RootView, View},
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37
38use crate::{
39 block::{Block, ConfirmedBlock},
40 block_tracker::BlockExecutionTracker,
41 data_types::{
42 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
43 Transaction,
44 },
45 inbox::{Cursor, InboxError, InboxStateView},
46 manager::ChainManager,
47 outbox::OutboxStateView,
48 pending_blobs::PendingBlobsView,
49 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61 use std::sync::LazyLock;
62
63 use linera_base::prometheus_util::{
64 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65 register_int_counter_vec,
66 };
67 use linera_execution::ResourceTracker;
68 use prometheus::{HistogramVec, IntCounterVec};
69
70 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72 });
73
74 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75 register_histogram_vec(
76 "block_execution_latency",
77 "Block execution latency",
78 &[],
79 exponential_bucket_latencies(1000.0),
80 )
81 });
82
83 #[cfg(with_metrics)]
84 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85 register_histogram_vec(
86 "message_execution_latency",
87 "Message execution latency",
88 &[],
89 exponential_bucket_latencies(50.0),
90 )
91 });
92
93 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "operation_execution_latency",
96 "Operation execution latency",
97 &[],
98 exponential_bucket_latencies(50.0),
99 )
100 });
101
102 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "wasm_fuel_used_per_block",
105 "Wasm fuel used per block",
106 &[],
107 exponential_bucket_interval(10.0, 1_000_000.0),
108 )
109 });
110
111 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "evm_fuel_used_per_block",
114 "EVM fuel used per block",
115 &[],
116 exponential_bucket_interval(10.0, 1_000_000.0),
117 )
118 });
119
120 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "vm_num_reads_per_block",
123 "VM number of reads per block",
124 &[],
125 exponential_bucket_interval(0.1, 100.0),
126 )
127 });
128
129 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "vm_bytes_read_per_block",
132 "VM number of bytes read per block",
133 &[],
134 exponential_bucket_interval(0.1, 10_000_000.0),
135 )
136 });
137
138 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139 register_histogram_vec(
140 "vm_bytes_written_per_block",
141 "VM number of bytes written per block",
142 &[],
143 exponential_bucket_interval(0.1, 10_000_000.0),
144 )
145 });
146
147 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148 register_histogram_vec(
149 "state_hash_computation_latency",
150 "Time to recompute the state hash",
151 &[],
152 exponential_bucket_latencies(500.0),
153 )
154 });
155
156 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157 register_histogram_vec(
158 "num_inboxes",
159 "Number of inboxes",
160 &[],
161 exponential_bucket_interval(1.0, 10_000.0),
162 )
163 });
164
165 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166 register_histogram_vec(
167 "num_outboxes",
168 "Number of outboxes",
169 &[],
170 exponential_bucket_interval(1.0, 10_000.0),
171 )
172 });
173
174 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177 WASM_FUEL_USED_PER_BLOCK
178 .with_label_values(&[])
179 .observe(tracker.wasm_fuel as f64);
180 EVM_FUEL_USED_PER_BLOCK
181 .with_label_values(&[])
182 .observe(tracker.evm_fuel as f64);
183 VM_NUM_READS_PER_BLOCK
184 .with_label_values(&[])
185 .observe(tracker.read_operations as f64);
186 VM_BYTES_READ_PER_BLOCK
187 .with_label_values(&[])
188 .observe(tracker.bytes_read as f64);
189 VM_BYTES_WRITTEN_PER_BLOCK
190 .with_label_values(&[])
191 .observe(tracker.bytes_written as f64);
192 }
193}
194
195pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct TimestampedBundleInInbox {
202 pub entry: BundleInInbox,
204 pub seen: Timestamp,
206}
207
208#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
211pub struct BundleInInbox {
212 pub origin: ChainId,
214 pub cursor: Cursor,
216}
217
218impl BundleInInbox {
219 fn new(origin: ChainId, bundle: &MessageBundle) -> Self {
220 BundleInInbox {
221 cursor: Cursor::from(bundle),
222 origin,
223 }
224 }
225}
226
227const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
230
231#[cfg_attr(
233 with_graphql,
234 derive(async_graphql::SimpleObject),
235 graphql(cache_control(no_cache))
236)]
237#[derive(Debug, RootView, ClonableView)]
238pub struct ChainStateView<C>
239where
240 C: Clone + Context + Send + Sync + 'static,
241{
242 pub execution_state: ExecutionStateView<C>,
244 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
246
247 pub tip_state: RegisterView<C, ChainTipState>,
249
250 pub manager: ChainManager<C>,
252 pub pending_validated_blobs: PendingBlobsView<C>,
255 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
257
258 pub confirmed_log: LogView<C, CryptoHash>,
261 pub received_log: LogView<C, ChainAndHeight>,
263 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
265
266 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
268 pub unskippable_bundles:
270 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
271 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
273 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
275 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
277 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
279 pub next_expected_events: MapView<C, StreamId, u32>,
282 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
285 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
287
288 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
290}
291
292#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
294#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
295pub struct ChainTipState {
296 pub block_hash: Option<CryptoHash>,
298 pub next_block_height: BlockHeight,
300 pub num_incoming_bundles: u32,
302 pub num_operations: u32,
304 pub num_outgoing_messages: u32,
306}
307
308impl ChainTipState {
309 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
312 ensure!(
313 new_block.height == self.next_block_height,
314 ChainError::UnexpectedBlockHeight {
315 expected_block_height: self.next_block_height,
316 found_block_height: new_block.height
317 }
318 );
319 ensure!(
320 new_block.previous_block_hash == self.block_hash,
321 ChainError::UnexpectedPreviousBlockHash
322 );
323 Ok(())
324 }
325
326 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
329 ensure!(
330 self.next_block_height >= height,
331 ChainError::MissingEarlierBlocks {
332 current_block_height: self.next_block_height,
333 }
334 );
335 Ok(self.next_block_height > height)
336 }
337
338 pub fn update_counters(
340 &mut self,
341 transactions: &[Transaction],
342 messages: &[Vec<OutgoingMessage>],
343 ) -> Result<(), ChainError> {
344 let mut num_incoming_bundles = 0u32;
345 let mut num_operations = 0u32;
346
347 for transaction in transactions {
348 match transaction {
349 Transaction::ReceiveMessages(_) => {
350 num_incoming_bundles = num_incoming_bundles
351 .checked_add(1)
352 .ok_or(ArithmeticError::Overflow)?;
353 }
354 Transaction::ExecuteOperation(_) => {
355 num_operations = num_operations
356 .checked_add(1)
357 .ok_or(ArithmeticError::Overflow)?;
358 }
359 }
360 }
361
362 self.num_incoming_bundles = self
363 .num_incoming_bundles
364 .checked_add(num_incoming_bundles)
365 .ok_or(ArithmeticError::Overflow)?;
366
367 self.num_operations = self
368 .num_operations
369 .checked_add(num_operations)
370 .ok_or(ArithmeticError::Overflow)?;
371
372 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
373 .map_err(|_| ArithmeticError::Overflow)?;
374 self.num_outgoing_messages = self
375 .num_outgoing_messages
376 .checked_add(num_outgoing_messages)
377 .ok_or(ArithmeticError::Overflow)?;
378
379 Ok(())
380 }
381}
382
383impl<C> ChainStateView<C>
384where
385 C: Context + Clone + Send + Sync + 'static,
386 C::Extra: ExecutionRuntimeContext,
387{
388 pub fn chain_id(&self) -> ChainId {
390 self.context().extra().chain_id()
391 }
392
393 #[instrument(skip_all, fields(
394 chain_id = %self.chain_id(),
395 ))]
396 pub async fn query_application(
397 &mut self,
398 local_time: Timestamp,
399 query: Query,
400 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
401 ) -> Result<QueryOutcome, ChainError> {
402 let context = QueryContext {
403 chain_id: self.chain_id(),
404 next_block_height: self.tip_state.get().next_block_height,
405 local_time,
406 };
407 self.execution_state
408 .query_application(context, query, service_runtime_endpoint)
409 .await
410 .with_execution_context(ChainExecutionContext::Query)
411 }
412
413 #[instrument(skip_all, fields(
414 chain_id = %self.chain_id(),
415 application_id = %application_id
416 ))]
417 pub async fn describe_application(
418 &mut self,
419 application_id: ApplicationId,
420 ) -> Result<ApplicationDescription, ChainError> {
421 self.execution_state
422 .system
423 .describe_application(application_id, &mut TransactionTracker::default())
424 .await
425 .with_execution_context(ChainExecutionContext::DescribeApplication)
426 }
427
428 #[instrument(skip_all, fields(
429 chain_id = %self.chain_id(),
430 target = %target,
431 height = %height
432 ))]
433 pub async fn mark_messages_as_received(
434 &mut self,
435 target: &ChainId,
436 height: BlockHeight,
437 ) -> Result<bool, ChainError> {
438 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
439 let updates = outbox.mark_messages_as_received(height).await?;
440 if updates.is_empty() {
441 return Ok(false);
442 }
443 for update in updates {
444 let counter = self
445 .outbox_counters
446 .get_mut()
447 .get_mut(&update)
448 .ok_or_else(|| {
449 ChainError::InternalError("message counter should be present".into())
450 })?;
451 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
452 if *counter == 0 {
453 self.outbox_counters.get_mut().remove(&update);
455 }
456 }
457 if outbox.queue.count() == 0 {
458 self.nonempty_outboxes.get_mut().remove(target);
459 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
461 self.outboxes.remove_entry(target)?;
462 }
463 }
464 #[cfg(with_metrics)]
465 metrics::NUM_OUTBOXES
466 .with_label_values(&[])
467 .observe(self.outboxes.count().await? as f64);
468 Ok(true)
469 }
470
471 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
474 tracing::debug!(
475 "Messages left in {:.8}'s outbox: {:?}",
476 self.chain_id(),
477 self.outbox_counters.get()
478 );
479 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
480 key > &height
481 } else {
482 true
483 }
484 }
485
486 pub fn is_active(&self) -> bool {
488 self.execution_state.system.is_active()
489 }
490
491 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
493 if self
495 .execution_state
496 .system
497 .initialize_chain(self.chain_id())
498 .await
499 .with_execution_context(ChainExecutionContext::Block)?
500 {
501 return Ok(());
503 }
504 let hash = self.execution_state.crypto_hash_mut().await?;
506 self.execution_state_hash.set(Some(hash));
507 let maybe_committee = self.execution_state.system.current_committee().into_iter();
508 self.manager.reset(
510 self.execution_state.system.ownership.get().clone(),
511 BlockHeight(0),
512 local_time,
513 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
514 )?;
515 Ok(())
516 }
517
518 pub async fn next_block_height_to_receive(
519 &self,
520 origin: &ChainId,
521 ) -> Result<BlockHeight, ChainError> {
522 let inbox = self.inboxes.try_load_entry(origin).await?;
523 match inbox {
524 Some(inbox) => inbox.next_block_height_to_receive(),
525 None => Ok(BlockHeight::ZERO),
526 }
527 }
528
529 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
533 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
534 return Ok(height.saturating_add(BlockHeight(1)));
535 }
536 Ok(self.tip_state.get().next_block_height)
537 }
538
539 pub async fn last_anticipated_block_height(
540 &self,
541 origin: &ChainId,
542 ) -> Result<Option<BlockHeight>, ChainError> {
543 let inbox = self.inboxes.try_load_entry(origin).await?;
544 match inbox {
545 Some(inbox) => match inbox.removed_bundles.back().await? {
546 Some(bundle) => Ok(Some(bundle.height)),
547 None => Ok(None),
548 },
549 None => Ok(None),
550 }
551 }
552
553 #[instrument(skip_all, fields(
560 chain_id = %self.chain_id(),
561 origin = %origin,
562 bundle_height = %bundle.height
563 ))]
564 pub async fn receive_message_bundle(
565 &mut self,
566 origin: &ChainId,
567 bundle: MessageBundle,
568 local_time: Timestamp,
569 add_to_received_log: bool,
570 ) -> Result<(), ChainError> {
571 assert!(!bundle.messages.is_empty());
572 let chain_id = self.chain_id();
573 tracing::trace!(
574 "Processing new messages to {chain_id:.8} from {origin} at height {}",
575 bundle.height,
576 );
577 let chain_and_height = ChainAndHeight {
578 chain_id: *origin,
579 height: bundle.height,
580 };
581
582 match self.initialize_if_needed(local_time).await {
583 Ok(_) => (),
584 Err(ChainError::ExecutionError(exec_err, _))
587 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
588 if blobs.iter().all(|blob_id| {
589 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
590 })) => {}
591 err => {
592 return err;
593 }
594 }
595
596 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
598 #[cfg(with_metrics)]
599 metrics::NUM_INBOXES
600 .with_label_values(&[])
601 .observe(self.inboxes.count().await? as f64);
602 let entry = BundleInInbox::new(*origin, &bundle);
603 let skippable = bundle.is_skippable();
604 let newly_added = inbox
605 .add_bundle(bundle)
606 .await
607 .map_err(|error| match error {
608 InboxError::ViewError(error) => ChainError::ViewError(error),
609 error => ChainError::InternalError(format!(
610 "while processing messages in certified block: {error}"
611 )),
612 })?;
613 if newly_added && !skippable {
614 let seen = local_time;
615 self.unskippable_bundles
616 .push_back(TimestampedBundleInInbox { entry, seen });
617 }
618
619 if add_to_received_log {
621 self.received_log.push(chain_and_height);
622 }
623 Ok(())
624 }
625
626 pub fn update_received_certificate_trackers(
628 &mut self,
629 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
630 ) {
631 for (name, tracker) in new_trackers {
632 self.received_certificate_trackers
633 .get_mut()
634 .entry(name)
635 .and_modify(|t| {
636 if tracker > *t {
639 *t = tracker;
640 }
641 })
642 .or_insert(tracker);
643 }
644 }
645
646 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
647 self.execution_state
648 .system
649 .current_committee()
650 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
651 }
652
653 pub fn ownership(&self) -> &ChainOwnership {
654 self.execution_state.system.ownership.get()
655 }
656
657 #[instrument(skip_all, fields(
663 chain_id = %self.chain_id(),
664 ))]
665 pub async fn remove_bundles_from_inboxes(
666 &mut self,
667 timestamp: Timestamp,
668 must_be_present: bool,
669 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
670 ) -> Result<(), ChainError> {
671 let chain_id = self.chain_id();
672 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
673 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
674 ensure!(
675 bundle.timestamp <= timestamp,
676 ChainError::IncorrectBundleTimestamp {
677 chain_id,
678 bundle_timestamp: bundle.timestamp,
679 block_timestamp: timestamp,
680 }
681 );
682 let bundles = bundles_by_origin.entry(*origin).or_default();
683 bundles.push(bundle);
684 }
685 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
686 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
687 let mut removed_unskippable = HashSet::new();
688 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
689 tracing::trace!(
690 "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
691 bundles
692 .iter()
693 .map(|bundle| bundle.height.to_string())
694 .collect::<Vec<_>>()
695 .join(", ")
696 );
697 for bundle in bundles {
698 let was_present = inbox
700 .remove_bundle(bundle)
701 .await
702 .map_err(|error| (chain_id, origin, error))?;
703 if must_be_present {
704 ensure!(
705 was_present,
706 ChainError::MissingCrossChainUpdate {
707 chain_id,
708 origin,
709 height: bundle.height,
710 }
711 );
712 }
713 if was_present && !bundle.is_skippable() {
714 removed_unskippable.insert(BundleInInbox::new(origin, bundle));
715 }
716 }
717 }
718 if !removed_unskippable.is_empty() {
719 let maybe_front = self.unskippable_bundles.front();
721 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
722 self.unskippable_bundles.delete_front().await?;
723 while let Some(ts_entry) = self.unskippable_bundles.front() {
724 if !removed_unskippable.remove(&ts_entry.entry) {
725 if !self
726 .removed_unskippable_bundles
727 .contains(&ts_entry.entry)
728 .await?
729 {
730 break;
731 }
732 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
733 }
734 self.unskippable_bundles.delete_front().await?;
735 }
736 }
737 for entry in removed_unskippable {
738 self.removed_unskippable_bundles.insert(&entry)?;
739 }
740 }
741 #[cfg(with_metrics)]
742 metrics::NUM_INBOXES
743 .with_label_values(&[])
744 .observe(self.inboxes.count().await? as f64);
745 Ok(())
746 }
747
748 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
750 self.nonempty_outboxes.get().iter().copied().collect()
751 }
752
753 pub async fn load_outboxes(
755 &self,
756 targets: &[ChainId],
757 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
758 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
759 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
760 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
761 }
762
763 #[instrument(skip_all, fields(
766 chain_id = %block.chain_id,
767 block_height = %block.height
768 ))]
769 #[expect(clippy::too_many_arguments)]
770 async fn execute_block_inner(
771 chain: &mut ExecutionStateView<C>,
772 confirmed_log: &LogView<C, CryptoHash>,
773 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
774 previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
775 block: &ProposedBlock,
776 local_time: Timestamp,
777 round: Option<u32>,
778 published_blobs: &[Blob],
779 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
780 ) -> Result<BlockExecutionOutcome, ChainError> {
781 #[cfg(with_metrics)]
782 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
783 chain.system.timestamp.set(block.timestamp);
784
785 let policy = chain
786 .system
787 .current_committee()
788 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
789 .1
790 .policy()
791 .clone();
792
793 let mut resource_controller = ResourceController::new(
794 Arc::new(policy),
795 ResourceTracker::default(),
796 block.authenticated_owner,
797 );
798
799 for blob in published_blobs {
800 let blob_id = blob.id();
801 resource_controller
802 .policy()
803 .check_blob_size(blob.content())
804 .with_execution_context(ChainExecutionContext::Block)?;
805 chain.system.used_blobs.insert(&blob_id)?;
806 }
807
808 let mut block_execution_tracker = BlockExecutionTracker::new(
811 &mut resource_controller,
812 published_blobs
813 .iter()
814 .map(|blob| (blob.id(), blob))
815 .collect(),
816 local_time,
817 replaying_oracle_responses,
818 block,
819 )?;
820
821 for transaction in block.transaction_refs() {
822 block_execution_tracker
823 .execute_transaction(transaction, round, chain)
824 .await?;
825 }
826
827 let recipients = block_execution_tracker.recipients();
828 let mut recipient_heights = Vec::new();
829 let mut indices = Vec::new();
830 for (recipient, height) in previous_message_blocks_view
831 .multi_get_pairs(recipients)
832 .await?
833 {
834 if let Some(height) = height {
835 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
836 indices.push(index);
837 recipient_heights.push((recipient, height));
838 }
839 }
840 let hashes = confirmed_log.multi_get(indices).await?;
841 let mut previous_message_blocks = BTreeMap::new();
842 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
843 let hash = hash.ok_or_else(|| {
844 ChainError::InternalError("missing entry in confirmed_log".into())
845 })?;
846 previous_message_blocks.insert(recipient, (hash, height));
847 }
848
849 let streams = block_execution_tracker.event_streams();
850 let mut stream_heights = Vec::new();
851 let mut indices = Vec::new();
852 for (stream, height) in previous_event_blocks_view.multi_get_pairs(streams).await? {
853 if let Some(height) = height {
854 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
855 indices.push(index);
856 stream_heights.push((stream, height));
857 }
858 }
859 let hashes = confirmed_log.multi_get(indices).await?;
860 let mut previous_event_blocks = BTreeMap::new();
861 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
862 let hash = hash.ok_or_else(|| {
863 ChainError::InternalError("missing entry in confirmed_log".into())
864 })?;
865 previous_event_blocks.insert(stream, (hash, height));
866 }
867
868 let state_hash = {
869 #[cfg(with_metrics)]
870 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
871 chain.crypto_hash_mut().await?
872 };
873
874 let (messages, oracle_responses, events, blobs, operation_results) =
875 block_execution_tracker.finalize();
876
877 Ok(BlockExecutionOutcome {
878 messages,
879 previous_message_blocks,
880 previous_event_blocks,
881 state_hash,
882 oracle_responses,
883 events,
884 blobs,
885 operation_results,
886 })
887 }
888
889 #[instrument(skip_all, fields(
892 chain_id = %self.chain_id(),
893 block_height = %block.height
894 ))]
895 pub async fn execute_block(
896 &mut self,
897 block: &ProposedBlock,
898 local_time: Timestamp,
899 round: Option<u32>,
900 published_blobs: &[Blob],
901 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
902 ) -> Result<BlockExecutionOutcome, ChainError> {
903 assert_eq!(
904 block.chain_id,
905 self.execution_state.context().extra().chain_id()
906 );
907
908 self.initialize_if_needed(local_time).await?;
909
910 let chain_timestamp = *self.execution_state.system.timestamp.get();
911 ensure!(
912 chain_timestamp <= block.timestamp,
913 ChainError::InvalidBlockTimestamp {
914 parent: chain_timestamp,
915 new: block.timestamp
916 }
917 );
918 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
919
920 ensure!(
921 block.published_blob_ids()
922 == published_blobs
923 .iter()
924 .map(|blob| blob.id())
925 .collect::<BTreeSet<_>>(),
926 ChainError::InternalError("published_blobs mismatch".to_string())
927 );
928
929 if *self.execution_state.system.closed.get() {
930 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
931 }
932
933 Self::check_app_permissions(
934 self.execution_state.system.application_permissions.get(),
935 block,
936 )?;
937
938 Self::execute_block_inner(
939 &mut self.execution_state,
940 &self.confirmed_log,
941 &self.previous_message_blocks,
942 &self.previous_event_blocks,
943 block,
944 local_time,
945 round,
946 published_blobs,
947 replaying_oracle_responses,
948 )
949 .await
950 }
951
952 #[instrument(skip_all, fields(
956 chain_id = %self.chain_id(),
957 block_height = %block.inner().inner().header.height
958 ))]
959 pub async fn apply_confirmed_block(
960 &mut self,
961 block: &ConfirmedBlock,
962 local_time: Timestamp,
963 ) -> Result<BTreeSet<StreamId>, ChainError> {
964 let hash = block.inner().hash();
965 let block = block.inner().inner();
966 self.execution_state_hash.set(Some(block.header.state_hash));
967 let updated_streams = self.process_emitted_events(block).await?;
968 let recipients = self.process_outgoing_messages(block).await?;
969
970 for recipient in recipients {
971 self.previous_message_blocks
972 .insert(&recipient, block.header.height)?;
973 }
974 for event in block.body.events.iter().flatten() {
975 self.previous_event_blocks
976 .insert(&event.stream_id, block.header.height)?;
977 }
978 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
980
981 let tip = self.tip_state.get_mut();
983 tip.block_hash = Some(hash);
984 tip.next_block_height.try_add_assign_one()?;
985 tip.update_counters(&block.body.transactions, &block.body.messages)?;
986 self.confirmed_log.push(hash);
987 self.preprocessed_blocks.remove(&block.header.height)?;
988 Ok(updated_streams)
989 }
990
991 #[instrument(skip_all, fields(
994 chain_id = %self.chain_id(),
995 block_height = %block.inner().inner().header.height
996 ))]
997 pub async fn preprocess_block(
998 &mut self,
999 block: &ConfirmedBlock,
1000 ) -> Result<BTreeSet<StreamId>, ChainError> {
1001 let hash = block.inner().hash();
1002 let block = block.inner().inner();
1003 let height = block.header.height;
1004 if height < self.tip_state.get().next_block_height {
1005 return Ok(BTreeSet::new());
1006 }
1007 self.process_outgoing_messages(block).await?;
1008 let updated_streams = self.process_emitted_events(block).await?;
1009 self.preprocessed_blocks.insert(&height, hash)?;
1010 Ok(updated_streams)
1011 }
1012
1013 pub fn is_child(&self) -> bool {
1015 let Some(description) = self.execution_state.system.description.get() else {
1016 return true;
1018 };
1019 description.is_child()
1020 }
1021
1022 #[instrument(skip_all, fields(
1024 block_height = %block.height,
1025 num_transactions = %block.transactions.len()
1026 ))]
1027 fn check_app_permissions(
1028 app_permissions: &ApplicationPermissions,
1029 block: &ProposedBlock,
1030 ) -> Result<(), ChainError> {
1031 let mut mandatory = HashSet::<ApplicationId>::from_iter(
1032 app_permissions.mandatory_applications.iter().copied(),
1033 );
1034 for transaction in &block.transactions {
1035 match transaction {
1036 Transaction::ExecuteOperation(operation)
1037 if operation.is_exempt_from_permissions() =>
1038 {
1039 mandatory.clear()
1040 }
1041 Transaction::ExecuteOperation(operation) => {
1042 ensure!(
1043 app_permissions.can_execute_operations(&operation.application_id()),
1044 ChainError::AuthorizedApplications(
1045 app_permissions.execute_operations.clone().unwrap()
1046 )
1047 );
1048 if let Operation::User { application_id, .. } = operation {
1049 mandatory.remove(application_id);
1050 }
1051 }
1052 Transaction::ReceiveMessages(incoming_bundle) => {
1053 for pending in incoming_bundle.messages() {
1054 if let Message::User { application_id, .. } = &pending.message {
1055 mandatory.remove(application_id);
1056 }
1057 }
1058 }
1059 }
1060 }
1061 ensure!(
1062 mandatory.is_empty(),
1063 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1064 );
1065 Ok(())
1066 }
1067
1068 #[instrument(skip_all, fields(
1070 chain_id = %self.chain_id(),
1071 next_block_height = %self.tip_state.get().next_block_height,
1072 start_height = ?range.start_bound(),
1073 end_height = ?range.end_bound()
1074 ))]
1075 pub async fn block_hashes(
1076 &self,
1077 range: impl RangeBounds<BlockHeight>,
1078 ) -> Result<Vec<CryptoHash>, ChainError> {
1079 let next_height = self.tip_state.get().next_block_height;
1080 let Some((start, end)) = range.to_inclusive() else {
1082 return Ok(Vec::new());
1083 };
1084 let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1086 let usize_start = usize::try_from(start)?;
1087 let usize_end = usize::try_from(end.min(last_height))?;
1088 self.confirmed_log.read(usize_start..=usize_end).await?
1089 } else {
1090 Vec::new()
1091 };
1092 let block_heights = (start.max(next_height).0..=end.0)
1094 .map(BlockHeight)
1095 .collect::<Vec<_>>();
1096 for hash in self
1097 .preprocessed_blocks
1098 .multi_get(&block_heights)
1099 .await?
1100 .into_iter()
1101 .flatten()
1102 {
1103 hashes.push(hash);
1104 }
1105 Ok(hashes)
1106 }
1107
1108 fn reset_chain_manager(
1110 &mut self,
1111 next_height: BlockHeight,
1112 local_time: Timestamp,
1113 ) -> Result<(), ChainError> {
1114 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1115 let ownership = self.execution_state.system.ownership.get().clone();
1116 let fallback_owners =
1117 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1118 self.pending_validated_blobs.clear();
1119 self.pending_proposed_blobs.clear();
1120 self.manager
1121 .reset(ownership, next_height, local_time, fallback_owners)
1122 }
1123
1124 #[instrument(skip_all, fields(
1128 chain_id = %self.chain_id(),
1129 block_height = %block.header.height
1130 ))]
1131 async fn process_outgoing_messages(
1132 &mut self,
1133 block: &Block,
1134 ) -> Result<Vec<ChainId>, ChainError> {
1135 let recipients = block.recipients();
1138 let block_height = block.header.height;
1139 let next_height = self.tip_state.get().next_block_height;
1140
1141 let outbox_counters = self.outbox_counters.get_mut();
1143 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1144 let targets = recipients.into_iter().collect::<Vec<_>>();
1145 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1146 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1147 if block_height > next_height {
1148 if *outbox.next_height_to_schedule.get() > block_height {
1151 continue; }
1153 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1154 {
1155 Some(height) if height < next_height => {
1158 let index =
1159 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1160 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1161 ChainError::InternalError("missing entry in confirmed_log".into())
1162 })?)
1163 }
1164 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1167 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1168 )?),
1169 None => None, };
1171 match (
1173 maybe_prev_hash,
1174 block.body.previous_message_blocks.get(target),
1175 ) {
1176 (None, None) => {
1177 }
1180 (Some(_), None) => {
1181 return Err(ChainError::InternalError(
1184 "block indicates no previous message block,\
1185 but we have one in the outbox"
1186 .into(),
1187 ));
1188 }
1189 (None, Some((_, prev_msg_block_height))) => {
1190 if *prev_msg_block_height >= next_height {
1195 continue;
1196 }
1197 }
1198 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1199 if prev_hash != prev_msg_block_hash {
1201 continue;
1202 }
1203 }
1204 }
1205 }
1206 if outbox.schedule_message(block_height)? {
1207 *outbox_counters.entry(block_height).or_default() += 1;
1208 nonempty_outboxes.insert(*target);
1209 }
1210 }
1211
1212 #[cfg(with_metrics)]
1213 metrics::NUM_OUTBOXES
1214 .with_label_values(&[])
1215 .observe(self.outboxes.count().await? as f64);
1216 Ok(targets)
1217 }
1218
1219 #[instrument(skip_all, fields(
1223 chain_id = %self.chain_id(),
1224 block_height = %block.header.height
1225 ))]
1226 async fn process_emitted_events(
1227 &mut self,
1228 block: &Block,
1229 ) -> Result<BTreeSet<StreamId>, ChainError> {
1230 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1231 for event in block.body.events.iter().flatten() {
1232 emitted_streams
1233 .entry(event.stream_id.clone())
1234 .or_default()
1235 .insert(event.index);
1236 }
1237 let mut stream_ids = Vec::new();
1238 let mut list_indices = Vec::new();
1239 for (stream_id, indices) in emitted_streams {
1240 stream_ids.push(stream_id);
1241 list_indices.push(indices);
1242 }
1243
1244 let mut updated_streams = BTreeSet::new();
1245 for ((stream_id, next_index), indices) in self
1246 .next_expected_events
1247 .multi_get_pairs(stream_ids)
1248 .await?
1249 .into_iter()
1250 .zip(list_indices)
1251 {
1252 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1253 1
1255 } else {
1256 0
1257 };
1258 let mut current_expected_index = next_index.unwrap_or(initial_index);
1259 for index in indices {
1260 if index == current_expected_index {
1261 updated_streams.insert(stream_id.clone());
1262 current_expected_index = index.saturating_add(1);
1263 }
1264 }
1265 if current_expected_index != 0 {
1266 self.next_expected_events
1267 .insert(&stream_id, current_expected_index)?;
1268 }
1269 }
1270 Ok(updated_streams)
1271 }
1272}
1273
1274#[test]
1275fn empty_block_size() {
1276 let size = bcs::serialized_size(&crate::block::Block::new(
1277 crate::test::make_first_block(
1278 linera_execution::test_utils::dummy_chain_description(0).id(),
1279 ),
1280 crate::data_types::BlockExecutionOutcome::default(),
1281 ))
1282 .unwrap();
1283 assert_eq!(size, EMPTY_BLOCK_SIZE);
1284}