1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14 OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19};
20use linera_execution::{
21 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
22 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
23 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26 context::Context,
27 log_view::LogView,
28 map_view::MapView,
29 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
30 register_view::RegisterView,
31 views::{ClonableView, RootView, View},
32};
33use serde::{Deserialize, Serialize};
34use tracing::instrument;
35
36use crate::{
37 block::{Block, ConfirmedBlock},
38 block_tracker::BlockExecutionTracker,
39 data_types::{
40 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
41 Transaction,
42 },
43 inbox::{InboxError, InboxStateView},
44 manager::ChainManager,
45 outbox::OutboxStateView,
46 pending_blobs::PendingBlobsView,
47 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
48};
49
50#[cfg(test)]
51#[path = "unit_tests/chain_tests.rs"]
52mod chain_tests;
53
54#[cfg(with_metrics)]
55use linera_base::prometheus_util::MeasureLatency;
56
57#[cfg(with_metrics)]
58pub(crate) mod metrics {
59 use std::sync::LazyLock;
60
61 use linera_base::prometheus_util::{
62 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
63 register_int_counter_vec,
64 };
65 use linera_execution::ResourceTracker;
66 use prometheus::{HistogramVec, IntCounterVec};
67
68 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
70 });
71
72 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
73 register_histogram_vec(
74 "block_execution_latency",
75 "Block execution latency",
76 &[],
77 exponential_bucket_interval(50.0_f64, 10_000_000.0),
78 )
79 });
80
81 #[cfg(with_metrics)]
82 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
83 register_histogram_vec(
84 "message_execution_latency",
85 "Message execution latency",
86 &[],
87 exponential_bucket_interval(0.1_f64, 50_000.0),
88 )
89 });
90
91 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
92 register_histogram_vec(
93 "operation_execution_latency",
94 "Operation execution latency",
95 &[],
96 exponential_bucket_interval(0.1_f64, 50_000.0),
97 )
98 });
99
100 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
101 register_histogram_vec(
102 "wasm_fuel_used_per_block",
103 "Wasm fuel used per block",
104 &[],
105 exponential_bucket_interval(10.0, 1_000_000.0),
106 )
107 });
108
109 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
110 register_histogram_vec(
111 "evm_fuel_used_per_block",
112 "EVM fuel used per block",
113 &[],
114 exponential_bucket_interval(10.0, 1_000_000.0),
115 )
116 });
117
118 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
119 register_histogram_vec(
120 "vm_num_reads_per_block",
121 "VM number of reads per block",
122 &[],
123 exponential_bucket_interval(0.1, 100.0),
124 )
125 });
126
127 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128 register_histogram_vec(
129 "vm_bytes_read_per_block",
130 "VM number of bytes read per block",
131 &[],
132 exponential_bucket_interval(0.1, 10_000_000.0),
133 )
134 });
135
136 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
137 register_histogram_vec(
138 "vm_bytes_written_per_block",
139 "VM number of bytes written per block",
140 &[],
141 exponential_bucket_interval(0.1, 10_000_000.0),
142 )
143 });
144
145 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
146 register_histogram_vec(
147 "state_hash_computation_latency",
148 "Time to recompute the state hash",
149 &[],
150 exponential_bucket_latencies(500.0),
151 )
152 });
153
154 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
155 register_histogram_vec(
156 "num_inboxes",
157 "Number of inboxes",
158 &[],
159 exponential_bucket_interval(1.0, 10_000.0),
160 )
161 });
162
163 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
164 register_histogram_vec(
165 "num_outboxes",
166 "Number of outboxes",
167 &[],
168 exponential_bucket_interval(1.0, 10_000.0),
169 )
170 });
171
172 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
174 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
175 WASM_FUEL_USED_PER_BLOCK
176 .with_label_values(&[])
177 .observe(tracker.wasm_fuel as f64);
178 EVM_FUEL_USED_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.evm_fuel as f64);
181 VM_NUM_READS_PER_BLOCK
182 .with_label_values(&[])
183 .observe(tracker.read_operations as f64);
184 VM_BYTES_READ_PER_BLOCK
185 .with_label_values(&[])
186 .observe(tracker.bytes_read as f64);
187 VM_BYTES_WRITTEN_PER_BLOCK
188 .with_label_values(&[])
189 .observe(tracker.bytes_written as f64);
190 }
191}
192
193pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
195
196#[cfg_attr(
198 with_graphql,
199 derive(async_graphql::SimpleObject),
200 graphql(cache_control(no_cache))
201)]
202#[derive(Debug, RootView, ClonableView, Allocative)]
203#[allocative(bound = "C")]
204pub struct ChainStateView<C>
205where
206 C: Clone + Context + 'static,
207{
208 pub execution_state: ExecutionStateView<C>,
210 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
212
213 pub tip_state: RegisterView<C, ChainTipState>,
215
216 pub manager: ChainManager<C>,
218 pub pending_validated_blobs: PendingBlobsView<C>,
221 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
223
224 pub confirmed_log: LogView<C, CryptoHash>,
227 pub received_log: LogView<C, ChainAndHeight>,
229 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
231
232 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
234 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
236 pub next_expected_events: MapView<C, StreamId, u32>,
239 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
242 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
244
245 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
247}
248
249#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
251#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
252pub struct ChainTipState {
253 pub block_hash: Option<CryptoHash>,
255 pub next_block_height: BlockHeight,
257 pub num_incoming_bundles: u32,
259 pub num_operations: u32,
261 pub num_outgoing_messages: u32,
263}
264
265impl ChainTipState {
266 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
269 ensure!(
270 new_block.height == self.next_block_height,
271 ChainError::UnexpectedBlockHeight {
272 expected_block_height: self.next_block_height,
273 found_block_height: new_block.height
274 }
275 );
276 ensure!(
277 new_block.previous_block_hash == self.block_hash,
278 ChainError::UnexpectedPreviousBlockHash
279 );
280 Ok(())
281 }
282
283 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
286 ensure!(
287 self.next_block_height >= height,
288 ChainError::MissingEarlierBlocks {
289 current_block_height: self.next_block_height,
290 }
291 );
292 Ok(self.next_block_height > height)
293 }
294
295 pub fn update_counters(
297 &mut self,
298 transactions: &[Transaction],
299 messages: &[Vec<OutgoingMessage>],
300 ) -> Result<(), ChainError> {
301 let mut num_incoming_bundles = 0u32;
302 let mut num_operations = 0u32;
303
304 for transaction in transactions {
305 match transaction {
306 Transaction::ReceiveMessages(_) => {
307 num_incoming_bundles = num_incoming_bundles
308 .checked_add(1)
309 .ok_or(ArithmeticError::Overflow)?;
310 }
311 Transaction::ExecuteOperation(_) => {
312 num_operations = num_operations
313 .checked_add(1)
314 .ok_or(ArithmeticError::Overflow)?;
315 }
316 }
317 }
318
319 self.num_incoming_bundles = self
320 .num_incoming_bundles
321 .checked_add(num_incoming_bundles)
322 .ok_or(ArithmeticError::Overflow)?;
323
324 self.num_operations = self
325 .num_operations
326 .checked_add(num_operations)
327 .ok_or(ArithmeticError::Overflow)?;
328
329 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
330 .map_err(|_| ArithmeticError::Overflow)?;
331 self.num_outgoing_messages = self
332 .num_outgoing_messages
333 .checked_add(num_outgoing_messages)
334 .ok_or(ArithmeticError::Overflow)?;
335
336 Ok(())
337 }
338}
339
340impl<C> ChainStateView<C>
341where
342 C: Context + Clone + 'static,
343 C::Extra: ExecutionRuntimeContext,
344{
345 pub fn chain_id(&self) -> ChainId {
347 self.context().extra().chain_id()
348 }
349
350 #[instrument(skip_all, fields(
351 chain_id = %self.chain_id(),
352 ))]
353 pub async fn query_application(
354 &mut self,
355 local_time: Timestamp,
356 query: Query,
357 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
358 ) -> Result<QueryOutcome, ChainError> {
359 let context = QueryContext {
360 chain_id: self.chain_id(),
361 next_block_height: self.tip_state.get().next_block_height,
362 local_time,
363 };
364 self.execution_state
365 .query_application(context, query, service_runtime_endpoint)
366 .await
367 .with_execution_context(ChainExecutionContext::Query)
368 }
369
370 #[instrument(skip_all, fields(
371 chain_id = %self.chain_id(),
372 application_id = %application_id
373 ))]
374 pub async fn describe_application(
375 &mut self,
376 application_id: ApplicationId,
377 ) -> Result<ApplicationDescription, ChainError> {
378 self.execution_state
379 .system
380 .describe_application(application_id, &mut TransactionTracker::default())
381 .await
382 .with_execution_context(ChainExecutionContext::DescribeApplication)
383 }
384
385 #[instrument(skip_all, fields(
386 chain_id = %self.chain_id(),
387 target = %target,
388 height = %height
389 ))]
390 pub async fn mark_messages_as_received(
391 &mut self,
392 target: &ChainId,
393 height: BlockHeight,
394 ) -> Result<bool, ChainError> {
395 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
396 let updates = outbox.mark_messages_as_received(height).await?;
397 if updates.is_empty() {
398 return Ok(false);
399 }
400 for update in updates {
401 let counter = self
402 .outbox_counters
403 .get_mut()
404 .get_mut(&update)
405 .ok_or_else(|| {
406 ChainError::InternalError("message counter should be present".into())
407 })?;
408 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
409 if *counter == 0 {
410 self.outbox_counters.get_mut().remove(&update);
412 }
413 }
414 if outbox.queue.count() == 0 {
415 self.nonempty_outboxes.get_mut().remove(target);
416 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
418 self.outboxes.remove_entry(target)?;
419 }
420 }
421 #[cfg(with_metrics)]
422 metrics::NUM_OUTBOXES
423 .with_label_values(&[])
424 .observe(self.outboxes.count().await? as f64);
425 Ok(true)
426 }
427
428 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
431 tracing::debug!(
432 "Messages left in {:.8}'s outbox: {:?}",
433 self.chain_id(),
434 self.outbox_counters.get()
435 );
436 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
437 key > &height
438 } else {
439 true
440 }
441 }
442
443 pub fn is_active(&self) -> bool {
445 self.execution_state.system.is_active()
446 }
447
448 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
450 let chain_id = self.chain_id();
451 if self
453 .execution_state
454 .system
455 .initialize_chain(chain_id)
456 .await
457 .with_execution_context(ChainExecutionContext::Block)?
458 {
459 return Ok(());
461 }
462 let hash = self.execution_state.crypto_hash_mut().await?;
464 self.execution_state_hash.set(Some(hash));
465 let maybe_committee = self.execution_state.system.current_committee().into_iter();
466 self.manager.reset(
468 self.execution_state.system.ownership.get().clone(),
469 BlockHeight(0),
470 local_time,
471 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
472 )?;
473 Ok(())
474 }
475
476 pub async fn next_block_height_to_receive(
477 &self,
478 origin: &ChainId,
479 ) -> Result<BlockHeight, ChainError> {
480 let inbox = self.inboxes.try_load_entry(origin).await?;
481 match inbox {
482 Some(inbox) => inbox.next_block_height_to_receive(),
483 None => Ok(BlockHeight::ZERO),
484 }
485 }
486
487 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
491 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
492 return Ok(height.saturating_add(BlockHeight(1)));
493 }
494 Ok(self.tip_state.get().next_block_height)
495 }
496
497 pub async fn last_anticipated_block_height(
498 &self,
499 origin: &ChainId,
500 ) -> Result<Option<BlockHeight>, ChainError> {
501 let inbox = self.inboxes.try_load_entry(origin).await?;
502 match inbox {
503 Some(inbox) => match inbox.removed_bundles.back().await? {
504 Some(bundle) => Ok(Some(bundle.height)),
505 None => Ok(None),
506 },
507 None => Ok(None),
508 }
509 }
510
511 #[instrument(skip_all, fields(
518 chain_id = %self.chain_id(),
519 origin = %origin,
520 bundle_height = %bundle.height
521 ))]
522 pub async fn receive_message_bundle(
523 &mut self,
524 origin: &ChainId,
525 bundle: MessageBundle,
526 local_time: Timestamp,
527 add_to_received_log: bool,
528 ) -> Result<(), ChainError> {
529 assert!(!bundle.messages.is_empty());
530 let chain_id = self.chain_id();
531 tracing::trace!(
532 "Processing new messages to {chain_id:.8} from {origin} at height {}",
533 bundle.height,
534 );
535 let chain_and_height = ChainAndHeight {
536 chain_id: *origin,
537 height: bundle.height,
538 };
539
540 match self.initialize_if_needed(local_time).await {
541 Ok(_) => (),
542 Err(ChainError::ExecutionError(exec_err, _))
545 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
546 if blobs.iter().all(|blob_id| {
547 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
548 })) => {}
549 err => {
550 return err;
551 }
552 }
553
554 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
556 #[cfg(with_metrics)]
557 metrics::NUM_INBOXES
558 .with_label_values(&[])
559 .observe(self.inboxes.count().await? as f64);
560 inbox
561 .add_bundle(bundle)
562 .await
563 .map_err(|error| match error {
564 InboxError::ViewError(error) => ChainError::ViewError(error),
565 error => ChainError::InternalError(format!(
566 "while processing messages in certified block: {error}"
567 )),
568 })?;
569
570 if add_to_received_log {
572 self.received_log.push(chain_and_height);
573 }
574 Ok(())
575 }
576
577 pub fn update_received_certificate_trackers(
579 &mut self,
580 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
581 ) {
582 for (name, tracker) in new_trackers {
583 self.received_certificate_trackers
584 .get_mut()
585 .entry(name)
586 .and_modify(|t| {
587 if tracker > *t {
590 *t = tracker;
591 }
592 })
593 .or_insert(tracker);
594 }
595 }
596
597 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
598 self.execution_state
599 .system
600 .current_committee()
601 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
602 }
603
604 pub fn ownership(&self) -> &ChainOwnership {
605 self.execution_state.system.ownership.get()
606 }
607
608 #[instrument(skip_all, fields(
614 chain_id = %self.chain_id(),
615 ))]
616 pub async fn remove_bundles_from_inboxes(
617 &mut self,
618 timestamp: Timestamp,
619 must_be_present: bool,
620 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
621 ) -> Result<(), ChainError> {
622 let chain_id = self.chain_id();
623 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
624 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
625 ensure!(
626 bundle.timestamp <= timestamp,
627 ChainError::IncorrectBundleTimestamp {
628 chain_id,
629 bundle_timestamp: bundle.timestamp,
630 block_timestamp: timestamp,
631 }
632 );
633 let bundles = bundles_by_origin.entry(*origin).or_default();
634 bundles.push(bundle);
635 }
636 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
637 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
638 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
639 tracing::trace!(
640 "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
641 bundles
642 .iter()
643 .map(|bundle| bundle.height.to_string())
644 .collect::<Vec<_>>()
645 .join(", ")
646 );
647 for bundle in bundles {
648 let was_present = inbox
650 .remove_bundle(bundle)
651 .await
652 .map_err(|error| (chain_id, origin, error))?;
653 if must_be_present {
654 ensure!(
655 was_present,
656 ChainError::MissingCrossChainUpdate {
657 chain_id,
658 origin,
659 height: bundle.height,
660 }
661 );
662 }
663 }
664 }
665 #[cfg(with_metrics)]
666 metrics::NUM_INBOXES
667 .with_label_values(&[])
668 .observe(self.inboxes.count().await? as f64);
669 Ok(())
670 }
671
672 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
674 self.nonempty_outboxes.get().iter().copied().collect()
675 }
676
677 pub async fn load_outboxes(
679 &self,
680 targets: &[ChainId],
681 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
682 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
683 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
684 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
685 }
686
687 #[instrument(skip_all, fields(
690 chain_id = %block.chain_id,
691 block_height = %block.height
692 ))]
693 async fn execute_block_inner(
694 chain: &mut ExecutionStateView<C>,
695 confirmed_log: &LogView<C, CryptoHash>,
696 block: &ProposedBlock,
697 local_time: Timestamp,
698 round: Option<u32>,
699 published_blobs: &[Blob],
700 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
701 ) -> Result<BlockExecutionOutcome, ChainError> {
702 #[cfg(with_metrics)]
703 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
704 chain.system.timestamp.set(block.timestamp);
705
706 let policy = chain
707 .system
708 .current_committee()
709 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
710 .1
711 .policy()
712 .clone();
713
714 let mut resource_controller = ResourceController::new(
715 Arc::new(policy),
716 ResourceTracker::default(),
717 block.authenticated_owner,
718 );
719
720 for blob in published_blobs {
721 let blob_id = blob.id();
722 resource_controller
723 .policy()
724 .check_blob_size(blob.content())
725 .with_execution_context(ChainExecutionContext::Block)?;
726 chain.system.used_blobs.insert(&blob_id)?;
727 }
728
729 let mut block_execution_tracker = BlockExecutionTracker::new(
732 &mut resource_controller,
733 published_blobs
734 .iter()
735 .map(|blob| (blob.id(), blob))
736 .collect(),
737 local_time,
738 replaying_oracle_responses,
739 block,
740 )?;
741
742 for transaction in block.transaction_refs() {
743 block_execution_tracker
744 .execute_transaction(transaction, round, chain)
745 .await?;
746 }
747
748 let recipients = block_execution_tracker.recipients();
749 let mut recipient_heights = Vec::new();
750 let mut indices = Vec::new();
751 for (recipient, height) in chain
752 .previous_message_blocks
753 .multi_get_pairs(recipients)
754 .await?
755 {
756 chain
757 .previous_message_blocks
758 .insert(&recipient, block.height)?;
759 if let Some(height) = height {
760 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
761 indices.push(index);
762 recipient_heights.push((recipient, height));
763 }
764 }
765 let hashes = confirmed_log.multi_get(indices).await?;
766 let mut previous_message_blocks = BTreeMap::new();
767 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
768 let hash = hash.ok_or_else(|| {
769 ChainError::InternalError("missing entry in confirmed_log".into())
770 })?;
771 previous_message_blocks.insert(recipient, (hash, height));
772 }
773
774 let streams = block_execution_tracker.event_streams();
775 let mut stream_heights = Vec::new();
776 let mut indices = Vec::new();
777 for (stream, height) in chain.previous_event_blocks.multi_get_pairs(streams).await? {
778 chain.previous_event_blocks.insert(&stream, block.height)?;
779 if let Some(height) = height {
780 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
781 indices.push(index);
782 stream_heights.push((stream, height));
783 }
784 }
785 let hashes = confirmed_log.multi_get(indices).await?;
786 let mut previous_event_blocks = BTreeMap::new();
787 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
788 let hash = hash.ok_or_else(|| {
789 ChainError::InternalError("missing entry in confirmed_log".into())
790 })?;
791 previous_event_blocks.insert(stream, (hash, height));
792 }
793
794 let state_hash = {
795 #[cfg(with_metrics)]
796 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
797 chain.crypto_hash_mut().await?
798 };
799
800 let (messages, oracle_responses, events, blobs, operation_results) =
801 block_execution_tracker.finalize();
802
803 Ok(BlockExecutionOutcome {
804 messages,
805 previous_message_blocks,
806 previous_event_blocks,
807 state_hash,
808 oracle_responses,
809 events,
810 blobs,
811 operation_results,
812 })
813 }
814
815 #[instrument(skip_all, fields(
818 chain_id = %self.chain_id(),
819 block_height = %block.height
820 ))]
821 pub async fn execute_block(
822 &mut self,
823 block: &ProposedBlock,
824 local_time: Timestamp,
825 round: Option<u32>,
826 published_blobs: &[Blob],
827 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
828 ) -> Result<BlockExecutionOutcome, ChainError> {
829 assert_eq!(
830 block.chain_id,
831 self.execution_state.context().extra().chain_id()
832 );
833
834 self.initialize_if_needed(local_time).await?;
835
836 let chain_timestamp = *self.execution_state.system.timestamp.get();
837 ensure!(
838 chain_timestamp <= block.timestamp,
839 ChainError::InvalidBlockTimestamp {
840 parent: chain_timestamp,
841 new: block.timestamp
842 }
843 );
844 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
845
846 ensure!(
847 block.published_blob_ids()
848 == published_blobs
849 .iter()
850 .map(|blob| blob.id())
851 .collect::<BTreeSet<_>>(),
852 ChainError::InternalError("published_blobs mismatch".to_string())
853 );
854
855 if *self.execution_state.system.closed.get() {
856 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
857 }
858
859 Self::check_app_permissions(
860 self.execution_state.system.application_permissions.get(),
861 block,
862 )?;
863
864 Self::execute_block_inner(
865 &mut self.execution_state,
866 &self.confirmed_log,
867 block,
868 local_time,
869 round,
870 published_blobs,
871 replaying_oracle_responses,
872 )
873 .await
874 }
875
876 #[instrument(skip_all, fields(
880 chain_id = %self.chain_id(),
881 block_height = %block.inner().inner().header.height
882 ))]
883 pub async fn apply_confirmed_block(
884 &mut self,
885 block: &ConfirmedBlock,
886 local_time: Timestamp,
887 ) -> Result<BTreeSet<StreamId>, ChainError> {
888 let hash = block.inner().hash();
889 let block = block.inner().inner();
890 self.execution_state_hash.set(Some(block.header.state_hash));
891 let updated_streams = self.process_emitted_events(block).await?;
892 self.process_outgoing_messages(block).await?;
893
894 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
896
897 let tip = self.tip_state.get_mut();
899 tip.block_hash = Some(hash);
900 tip.next_block_height.try_add_assign_one()?;
901 tip.update_counters(&block.body.transactions, &block.body.messages)?;
902 self.confirmed_log.push(hash);
903 self.preprocessed_blocks.remove(&block.header.height)?;
904 Ok(updated_streams)
905 }
906
907 #[instrument(skip_all, fields(
910 chain_id = %self.chain_id(),
911 block_height = %block.inner().inner().header.height
912 ))]
913 pub async fn preprocess_block(
914 &mut self,
915 block: &ConfirmedBlock,
916 ) -> Result<BTreeSet<StreamId>, ChainError> {
917 let hash = block.inner().hash();
918 let block = block.inner().inner();
919 let height = block.header.height;
920 if height < self.tip_state.get().next_block_height {
921 return Ok(BTreeSet::new());
922 }
923 self.process_outgoing_messages(block).await?;
924 let updated_streams = self.process_emitted_events(block).await?;
925 self.preprocessed_blocks.insert(&height, hash)?;
926 Ok(updated_streams)
927 }
928
929 pub fn is_child(&self) -> bool {
931 let Some(description) = self.execution_state.system.description.get() else {
932 return true;
934 };
935 description.is_child()
936 }
937
938 #[instrument(skip_all, fields(
940 block_height = %block.height,
941 num_transactions = %block.transactions.len()
942 ))]
943 fn check_app_permissions(
944 app_permissions: &ApplicationPermissions,
945 block: &ProposedBlock,
946 ) -> Result<(), ChainError> {
947 let mut mandatory = HashSet::<ApplicationId>::from_iter(
948 app_permissions.mandatory_applications.iter().copied(),
949 );
950 for transaction in &block.transactions {
951 match transaction {
952 Transaction::ExecuteOperation(operation)
953 if operation.is_exempt_from_permissions() =>
954 {
955 mandatory.clear()
956 }
957 Transaction::ExecuteOperation(operation) => {
958 ensure!(
959 app_permissions.can_execute_operations(&operation.application_id()),
960 ChainError::AuthorizedApplications(
961 app_permissions.execute_operations.clone().unwrap()
962 )
963 );
964 if let Operation::User { application_id, .. } = operation {
965 mandatory.remove(application_id);
966 }
967 }
968 Transaction::ReceiveMessages(incoming_bundle) => {
969 for pending in incoming_bundle.messages() {
970 if let Message::User { application_id, .. } = &pending.message {
971 mandatory.remove(application_id);
972 }
973 }
974 }
975 }
976 }
977 ensure!(
978 mandatory.is_empty(),
979 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
980 );
981 Ok(())
982 }
983
984 #[instrument(skip_all, fields(
989 chain_id = %self.chain_id(),
990 next_block_height = %self.tip_state.get().next_block_height,
991 ))]
992 pub async fn block_hashes(
993 &self,
994 heights: impl IntoIterator<Item = BlockHeight>,
995 ) -> Result<Vec<CryptoHash>, ChainError> {
996 let next_height = self.tip_state.get().next_block_height;
997 let (confirmed_heights, unconfirmed_heights) = heights
999 .into_iter()
1000 .partition::<Vec<_>, _>(|height| *height < next_height);
1001 let confirmed_indices = confirmed_heights
1002 .into_iter()
1003 .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1004 .collect::<Result<_, _>>()?;
1005 let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1006 let unconfirmed_hashes = self
1008 .preprocessed_blocks
1009 .multi_get(&unconfirmed_heights)
1010 .await?;
1011 Ok(confirmed_hashes
1012 .into_iter()
1013 .chain(unconfirmed_hashes)
1014 .flatten()
1015 .collect())
1016 }
1017
1018 fn reset_chain_manager(
1020 &mut self,
1021 next_height: BlockHeight,
1022 local_time: Timestamp,
1023 ) -> Result<(), ChainError> {
1024 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1025 let ownership = self.execution_state.system.ownership.get().clone();
1026 let fallback_owners =
1027 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1028 self.pending_validated_blobs.clear();
1029 self.pending_proposed_blobs.clear();
1030 self.manager
1031 .reset(ownership, next_height, local_time, fallback_owners)
1032 }
1033
1034 #[instrument(skip_all, fields(
1038 chain_id = %self.chain_id(),
1039 block_height = %block.header.height
1040 ))]
1041 async fn process_outgoing_messages(
1042 &mut self,
1043 block: &Block,
1044 ) -> Result<Vec<ChainId>, ChainError> {
1045 let recipients = block.recipients();
1048 let block_height = block.header.height;
1049 let next_height = self.tip_state.get().next_block_height;
1050
1051 let outbox_counters = self.outbox_counters.get_mut();
1053 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1054 let targets = recipients.into_iter().collect::<Vec<_>>();
1055 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1056 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1057 if block_height > next_height {
1058 if *outbox.next_height_to_schedule.get() > block_height {
1061 continue; }
1063 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1064 {
1065 Some(height) if height < next_height => {
1068 let index =
1069 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1070 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1071 ChainError::InternalError("missing entry in confirmed_log".into())
1072 })?)
1073 }
1074 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1077 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1078 )?),
1079 None => None, };
1081 match (
1083 maybe_prev_hash,
1084 block.body.previous_message_blocks.get(target),
1085 ) {
1086 (None, None) => {
1087 }
1090 (Some(_), None) => {
1091 return Err(ChainError::InternalError(
1094 "block indicates no previous message block,\
1095 but we have one in the outbox"
1096 .into(),
1097 ));
1098 }
1099 (None, Some((_, prev_msg_block_height))) => {
1100 if *prev_msg_block_height >= next_height {
1105 continue;
1106 }
1107 }
1108 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1109 if prev_hash != prev_msg_block_hash {
1111 continue;
1112 }
1113 }
1114 }
1115 }
1116 if outbox.schedule_message(block_height)? {
1117 *outbox_counters.entry(block_height).or_default() += 1;
1118 nonempty_outboxes.insert(*target);
1119 }
1120 }
1121
1122 #[cfg(with_metrics)]
1123 metrics::NUM_OUTBOXES
1124 .with_label_values(&[])
1125 .observe(self.outboxes.count().await? as f64);
1126 Ok(targets)
1127 }
1128
1129 #[instrument(skip_all, fields(
1133 chain_id = %self.chain_id(),
1134 block_height = %block.header.height
1135 ))]
1136 async fn process_emitted_events(
1137 &mut self,
1138 block: &Block,
1139 ) -> Result<BTreeSet<StreamId>, ChainError> {
1140 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1141 for event in block.body.events.iter().flatten() {
1142 emitted_streams
1143 .entry(event.stream_id.clone())
1144 .or_default()
1145 .insert(event.index);
1146 }
1147 let mut stream_ids = Vec::new();
1148 let mut list_indices = Vec::new();
1149 for (stream_id, indices) in emitted_streams {
1150 stream_ids.push(stream_id);
1151 list_indices.push(indices);
1152 }
1153
1154 let mut updated_streams = BTreeSet::new();
1155 for ((stream_id, next_index), indices) in self
1156 .next_expected_events
1157 .multi_get_pairs(stream_ids)
1158 .await?
1159 .into_iter()
1160 .zip(list_indices)
1161 {
1162 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1163 1
1165 } else {
1166 0
1167 };
1168 let mut current_expected_index = next_index.unwrap_or(initial_index);
1169 for index in indices {
1170 if index == current_expected_index {
1171 updated_streams.insert(stream_id.clone());
1172 current_expected_index = index.saturating_add(1);
1173 }
1174 }
1175 if current_expected_index != 0 {
1176 self.next_expected_events
1177 .insert(&stream_id, current_expected_index)?;
1178 }
1179 }
1180 Ok(updated_streams)
1181 }
1182}
1183
1184#[test]
1185fn empty_block_size() {
1186 let size = bcs::serialized_size(&crate::block::Block::new(
1187 crate::test::make_first_block(
1188 linera_execution::test_utils::dummy_chain_description(0).id(),
1189 ),
1190 crate::data_types::BlockExecutionOutcome::default(),
1191 ))
1192 .unwrap();
1193 assert_eq!(size, EMPTY_BLOCK_SIZE);
1194}