1use std::{
7 borrow::Cow,
8 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
9 sync::{self, Arc},
10};
11
12use futures::future::Either;
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16 crypto::{CryptoHash, ValidatorPublicKey},
17 data_types::{
18 ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, OracleResponse, Round,
19 Timestamp,
20 },
21 ensure,
22 hashed::Hashed,
23 identifiers::{AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, StreamId},
24};
25use linera_cache::{Arc as CacheArc, UniqueValueCache, ValueCache};
26use linera_chain::{
27 data_types::{
28 BlockProposal, BundleExecutionPolicy, IncomingBundle, MessageAction, MessageBundle,
29 OriginalProposal, ProposalContent, ProposedBlock,
30 },
31 manager::{self, ManagerSafetySnapshot},
32 types::{
33 Block, ConfirmedBlock, ConfirmedBlockCertificate, TimeoutCertificate,
34 ValidatedBlockCertificate,
35 },
36 ChainError, ChainExecutionContext, ChainIdSet, ChainStateView, ChainTipState,
37 ExecutionResultExt as _,
38};
39use linera_execution::{
40 system::{EpochEventData, EventSubscriptions, EPOCH_STREAM_NAME},
41 ExecutionRuntimeContext as _, ExecutionStateView, Query, QueryContext, QueryOutcome,
42 ResourceTracker, ServiceRuntimeEndpoint,
43};
44use linera_storage::{Clock as _, Storage};
45use linera_views::{
46 batch::Batch,
47 context::{Context, InactiveContext},
48 store::WritableKeyValueStore as _,
49 views::{ReplaceContext as _, RootView as _, View as _},
50};
51use tokio::sync::oneshot;
52use tracing::{debug, instrument, trace, warn};
53
54use crate::{
55 chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier},
56 client::{ChainModes, ListeningMode},
57 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
58 worker::{BatchRequest, NetworkActions, Notification, Reason, WorkerError},
59};
60
61pub(crate) type EventSubscriptionsResult = Vec<((ChainId, StreamId), EventSubscriptions)>;
63
64#[cfg(with_metrics)]
65mod metrics {
66 use std::sync::LazyLock;
67
68 use linera_base::prometheus_util::{
69 exponential_bucket_interval, exponential_bucket_latencies, register_histogram,
70 register_histogram_vec,
71 };
72 use prometheus::{Histogram, HistogramVec};
73
74 pub static CREATE_NETWORK_ACTIONS_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
75 register_histogram(
76 "create_network_actions_latency",
77 "Time (ms) to create network actions",
78 exponential_bucket_latencies(10_000.0),
79 )
80 });
81
82 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
83 register_histogram_vec(
84 "num_inboxes",
85 "Number of inboxes",
86 &[],
87 exponential_bucket_interval(1.0, 10_000.0),
88 )
89 });
90}
91
92pub(crate) struct ChainWorkerState<StorageClient>
94where
95 StorageClient: Storage,
96{
97 config: ChainWorkerConfig,
98 storage: StorageClient,
99 chain: ChainStateView<StorageClient::Context>,
100 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
101 service_runtime_task: Option<web_thread_pool::Task<()>>,
106 last_access: Arc<AtomicTimestamp>,
111 block_values: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
112 execution_state_cache:
113 Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
114 chain_modes: Option<Arc<sync::RwLock<ChainModes>>>,
115 delivery_notifier: DeliveryNotifier,
116 knows_chain_is_active: bool,
117 poisoned: bool,
120}
121
122pub(crate) enum CrossChainUpdateResult {
124 Updated(BlockHeight),
126 NothingToDo,
128 GapDetected {
132 origin: ChainId,
133 retransmit_from: BlockHeight,
134 },
135}
136
137pub enum BlockOutcome {
139 Processed,
140 Preprocessed,
141 Skipped,
142}
143
144#[derive(Clone, Copy, Debug, Eq, PartialEq)]
146pub enum ProcessConfirmedBlockMode {
147 Auto,
151 Execute,
155 Preprocess,
159}
160
161impl<StorageClient> ChainWorkerState<StorageClient>
162where
163 StorageClient: Storage + Clone + 'static,
164{
165 #[instrument(skip_all, fields(
167 chain_id = %chain_id
168 ))]
169 #[expect(clippy::too_many_arguments)]
170 pub(crate) async fn load(
171 config: ChainWorkerConfig,
172 storage: StorageClient,
173 block_values: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
174 execution_state_cache: Option<
175 Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
176 >,
177 chain_modes: Option<Arc<sync::RwLock<ChainModes>>>,
178 delivery_notifier: DeliveryNotifier,
179 chain_id: ChainId,
180 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
181 service_runtime_task: Option<web_thread_pool::Task<()>>,
182 ) -> Result<Self, WorkerError> {
183 let chain = storage.load_chain(chain_id).await?;
184
185 Ok(ChainWorkerState {
186 config,
187 storage,
188 chain,
189 service_runtime_endpoint,
190 service_runtime_task,
191 last_access: Arc::new(AtomicTimestamp::now()),
192 block_values,
193 execution_state_cache,
194 chain_modes,
195 delivery_notifier,
196 knows_chain_is_active: false,
197 poisoned: false,
198 })
199 }
200
201 fn chain_id(&self) -> ChainId {
203 self.chain.chain_id()
204 }
205
206 pub(crate) fn chain(&self) -> &ChainStateView<StorageClient::Context> {
208 &self.chain
209 }
210
211 async fn committee_for_epoch(
218 &self,
219 epoch: Epoch,
220 ) -> Result<linera_execution::committee::Committee, WorkerError> {
221 let hash = self
222 .chain
223 .execution_state
224 .context()
225 .extra()
226 .get_committee_hashes(epoch..=epoch)
227 .await
228 .map_err(|error| {
229 ChainError::ExecutionError(Box::new(error), ChainExecutionContext::Block)
230 })?
231 .remove(&epoch)
232 .ok_or_else(|| {
233 ChainError::InternalError(format!(
234 "missing committee for epoch {epoch}; this is a bug"
235 ))
236 })?;
237 let committee = self
238 .chain
239 .execution_state
240 .context()
241 .extra()
242 .get_or_load_committee_by_hash(hash)
243 .await
244 .map_err(|error| {
245 ChainError::ExecutionError(Box::new(error), ChainExecutionContext::Block)
246 })?;
247 Ok((*committee).clone())
248 }
249
250 pub(crate) async fn select_message_bundles(
258 &self,
259 origin: &ChainId,
260 next_height_to_receive: BlockHeight,
261 last_anticipated_block_height: Option<BlockHeight>,
262 mut bundles: Vec<(Epoch, MessageBundle)>,
263 ) -> Result<Vec<MessageBundle>, WorkerError> {
264 let recipient = self.chain_id();
265 let mut latest_height = None;
266 let mut skipped_len = 0;
267 let mut trusted_len = 0;
268 for (i, (epoch, bundle)) in bundles.iter().enumerate() {
269 ensure!(
270 latest_height <= Some(bundle.height),
271 WorkerError::InvalidCrossChainRequest
272 );
273 latest_height = Some(bundle.height);
274 if bundle.height < next_height_to_receive {
275 skipped_len = i + 1;
276 }
277 let is_revoked = self
278 .storage
279 .is_epoch_revoked(*epoch)
280 .await
281 .map_err(|error| {
282 WorkerError::ChainError(Box::new(ChainError::ExecutionError(
283 Box::new(error),
284 ChainExecutionContext::Block,
285 )))
286 })?;
287 if !is_revoked || Some(bundle.height) <= last_anticipated_block_height {
288 trusted_len = i + 1;
289 }
290 }
291 if skipped_len > 0 {
292 let (_, sample_bundle) = &bundles[skipped_len - 1];
293 debug!(
294 "Ignoring repeated messages to {recipient:.8} from {origin:} at height {}",
295 sample_bundle.height,
296 );
297 }
298 if skipped_len < bundles.len() && trusted_len < bundles.len() {
299 let (sample_epoch, sample_bundle) = &bundles[trusted_len];
300 warn!(
301 "Refusing messages to {recipient:.8} from {origin:} at height {} \
302 because the epoch {} is not trusted any more",
303 sample_bundle.height, sample_epoch,
304 );
305 }
306 Ok(if skipped_len < trusted_len {
307 bundles
308 .drain(skipped_len..trusted_len)
309 .map(|(_, bundle)| bundle)
310 .collect()
311 } else {
312 vec![]
313 })
314 }
315
316 pub(crate) fn knows_chain_is_active(&self) -> bool {
318 self.knows_chain_is_active
319 }
320
321 pub(crate) fn rollback(&mut self) {
323 self.chain.rollback();
324 }
325
326 pub(crate) fn check_not_poisoned(&self) -> Result<(), WorkerError> {
329 ensure!(!self.poisoned, WorkerError::PoisonedWorker);
330 Ok(())
331 }
332
333 pub(crate) fn touch(&self) {
335 self.last_access.store_now();
336 }
337
338 pub(crate) fn last_access_arc(&self) -> Arc<AtomicTimestamp> {
340 Arc::clone(&self.last_access)
341 }
342
343 pub(crate) fn clear_service_runtime(&mut self) -> Option<web_thread_pool::Task<()>> {
346 self.service_runtime_endpoint.take();
347 self.service_runtime_task.take()
348 }
349
350 pub(crate) async fn cross_chain_network_actions_if_reconciled(
354 &self,
355 ) -> Result<Option<NetworkActions>, WorkerError> {
356 let tracked = self.tracked_full_chains();
357 if !self.chain.outbox_index_is_reconciled(tracked.as_deref()) {
358 return Ok(None);
359 }
360 Ok(Some(
361 self.build_network_actions(None, tracked.as_deref().map(|h| h.inner()))
362 .await?,
363 ))
364 }
365
366 #[instrument(skip_all, fields(chain_id = %self.chain_id()))]
373 pub(crate) async fn reconcile_and_cross_chain_network_actions(
374 &mut self,
375 ) -> Result<NetworkActions, WorkerError> {
376 let tracked = self.tracked_full_chains();
377 self.chain
378 .reconcile_outbox_index(tracked.as_deref())
379 .await?;
380 let actions = self
381 .build_network_actions(None, tracked.as_deref().map(|h| h.inner()))
382 .await?;
383 self.save().await?;
384 Ok(actions)
385 }
386
387 #[tracing::instrument(level = "debug", skip(self))]
389 pub(crate) async fn handle_chain_info_query(
390 &mut self,
391 query: ChainInfoQuery,
392 ) -> Result<ChainInfoResponse, WorkerError> {
393 if let Some((height, round)) = query.request_leader_timeout {
394 self.vote_for_leader_timeout(height, round).await?;
395 }
396 if query.request_fallback {
397 self.vote_for_fallback().await?;
398 }
399 self.prepare_chain_info_response(query).await
400 }
401
402 #[instrument(skip_all, fields(
404 chain_id = %self.chain_id(),
405 blob_id = %blob_id
406 ))]
407 pub(crate) async fn download_pending_blob(
408 &self,
409 blob_id: BlobId,
410 ) -> Result<CacheArc<Blob>, WorkerError> {
411 if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
412 return Ok(self.storage.cache_blob(blob));
413 }
414 self.storage
415 .read_blob(blob_id)
416 .await?
417 .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
418 }
419
420 #[instrument(skip_all, fields(
423 chain_id = %self.chain_id()
424 ))]
425 async fn get_required_blobs(
426 &self,
427 required_blob_ids: impl IntoIterator<Item = BlobId>,
428 created_blobs: BTreeMap<BlobId, Blob>,
429 ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
430 let maybe_blobs = self
431 .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
432 .await?;
433 let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
434 ensure!(
435 not_found_blob_ids.is_empty(),
436 WorkerError::BlobsNotFound(not_found_blob_ids)
437 );
438 Ok(maybe_blobs
439 .into_iter()
440 .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
441 .collect())
442 }
443
444 #[instrument(skip_all, fields(
446 chain_id = %self.chain_id()
447 ))]
448 async fn maybe_get_required_blobs(
449 &self,
450 blob_ids: impl IntoIterator<Item = BlobId>,
451 mut created_blobs: Option<BTreeMap<BlobId, Blob>>,
452 ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
453 let maybe_blobs = blob_ids.into_iter().collect::<BTreeSet<_>>();
454 let mut maybe_blobs = maybe_blobs
455 .into_iter()
456 .map(|x| (x, None))
457 .collect::<Vec<(BlobId, Option<Blob>)>>();
458
459 if let Some(blob_map) = &mut created_blobs {
460 for (blob_id, value) in &mut maybe_blobs {
461 if let Some(blob) = blob_map.remove(blob_id) {
462 *value = Some(blob);
463 }
464 }
465 }
466
467 let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
468 let second_block_blobs = self.chain.manager.pending_blobs(&missing_blob_ids).await?;
469 for (index, blob) in missing_indices.into_iter().zip(second_block_blobs) {
470 maybe_blobs[index].1 = blob;
471 }
472
473 let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
474 let third_block_blobs = self
475 .chain
476 .pending_validated_blobs
477 .multi_get(&missing_blob_ids)
478 .await?;
479 for (index, blob) in missing_indices.into_iter().zip(third_block_blobs) {
480 maybe_blobs[index].1 = blob;
481 }
482
483 let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
484 if !missing_indices.is_empty() {
485 let all_entries_pending_blobs = self
486 .chain
487 .pending_proposed_blobs
488 .try_load_all_entries()
489 .await?;
490 for (index, blob_id) in missing_indices.into_iter().zip(missing_blob_ids) {
491 for (_, pending_blobs) in &all_entries_pending_blobs {
492 if let Some(blob) = pending_blobs.get(&blob_id).await? {
493 maybe_blobs[index].1 = Some(blob);
494 break;
495 }
496 }
497 }
498 }
499
500 let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
501 let fourth_block_blobs = self.storage.read_blobs(&missing_blob_ids).await?;
502 for (index, blob) in missing_indices.into_iter().zip(fourth_block_blobs) {
503 maybe_blobs[index].1 = blob.map(CacheArc::unwrap_or_clone);
504 }
505 Ok(maybe_blobs.into_iter().collect())
506 }
507
508 #[instrument(skip_all, fields(
510 chain_id = %self.chain_id()
511 ))]
512 async fn create_cross_chain_actions_for_recipient(
513 &self,
514 recipient: ChainId,
515 ) -> Result<NetworkActions, WorkerError> {
516 let outbox = self.chain.outboxes.try_load_entry(&recipient).await?;
517 let Some(outbox) = outbox else {
518 return Ok(NetworkActions::default());
519 };
520 let heights = outbox.queue.elements().await?;
521 if heights.is_empty() {
522 return Ok(NetworkActions::default());
523 }
524 let heights_by_recipient = BTreeMap::from([(recipient, heights)]);
525 let cross_chain_requests = self
526 .create_cross_chain_requests(heights_by_recipient)
527 .await?;
528 Ok(NetworkActions {
529 cross_chain_requests,
530 notifications: Vec::new(),
531 })
532 }
533
534 fn tracked_full_chains(&self) -> Option<Arc<Hashed<ChainIdSet>>> {
537 let chain_modes = self.chain_modes.as_ref()?;
538 let full = chain_modes
539 .read()
540 .expect("Panics should not happen while holding a lock to `chain_modes`")
541 .full();
542 Some(full)
543 }
544
545 fn is_tracked(&self, chain_id: &ChainId) -> bool {
548 self.chain_modes.as_ref().is_none_or(|chain_modes| {
549 chain_modes
550 .read()
551 .expect("Panics should not happen while holding a lock to `chain_modes`")
552 .get(chain_id)
553 .is_some_and(ListeningMode::is_full)
554 })
555 }
556
557 async fn reconcile_tracked_outboxes(
560 &mut self,
561 ) -> Result<Option<Arc<Hashed<ChainIdSet>>>, WorkerError> {
562 let full_chains = self.tracked_full_chains();
563 self.chain
564 .reconcile_outbox_index(full_chains.as_deref())
565 .await?;
566 Ok(full_chains)
567 }
568
569 async fn create_network_actions(
572 &mut self,
573 old_round: Option<Round>,
574 ) -> Result<NetworkActions, WorkerError> {
575 let tracked = self.reconcile_tracked_outboxes().await?;
578 self.build_network_actions(old_round, tracked.as_deref().map(|h| h.inner()))
579 .await
580 }
581
582 async fn build_network_actions(
584 &self,
585 old_round: Option<Round>,
586 tracked: Option<&ChainIdSet>,
587 ) -> Result<NetworkActions, WorkerError> {
588 #[cfg(with_metrics)]
589 let _latency = metrics::CREATE_NETWORK_ACTIONS_LATENCY.measure_latency();
590 let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
591 let targets = self.chain.nonempty_outbox_chain_ids();
592 if let Some(tracked) = tracked {
593 if let Some(target) = targets.iter().find(|target| !tracked.contains(*target)) {
594 return Err(ChainError::CorruptedChainState(format!(
595 "outbox index contains untracked target {target}"
596 ))
597 .into());
598 }
599 }
600 let outboxes = self.chain.load_outboxes(&targets).await?;
601 for (target, outbox) in targets.into_iter().zip(outboxes) {
602 let heights = outbox.queue.elements().await?;
603 heights_by_recipient.insert(target, heights);
604 }
605 let cross_chain_requests = self
606 .create_cross_chain_requests(heights_by_recipient)
607 .await?;
608 let mut notifications = Vec::new();
609 if let Some(old_round) = old_round {
610 let round = self.chain.manager.current_round();
611 if round > old_round {
612 let height = self.chain.tip_state.get().next_block_height;
613 notifications.push(Notification {
614 chain_id: self.chain_id(),
615 reason: Reason::NewRound { height, round },
616 });
617 }
618 }
619 Ok(NetworkActions {
620 cross_chain_requests,
621 notifications,
622 })
623 }
624
625 async fn read_confirmed_blocks(
628 &self,
629 hashes: &[CryptoHash],
630 ) -> Result<Vec<Option<CacheArc<ConfirmedBlock>>>, WorkerError> {
631 let mut blocks = Vec::with_capacity(hashes.len());
632 let mut uncached_indices = Vec::new();
633 let mut uncached_hashes = Vec::new();
634
635 for (i, hash) in hashes.iter().enumerate() {
636 if let Some(block) = self.block_values.get(hash) {
637 blocks.push(Some(block));
638 } else {
639 blocks.push(None);
640 uncached_indices.push(i);
641 uncached_hashes.push(*hash);
642 }
643 }
644
645 if !uncached_hashes.is_empty() {
646 let from_storage = self.storage.read_confirmed_blocks(uncached_hashes).await?;
647 for (i, maybe_block) in uncached_indices.into_iter().zip(from_storage) {
648 blocks[i] = maybe_block;
649 }
650 }
651
652 Ok(blocks)
653 }
654
655 #[instrument(skip_all, fields(
656 chain_id = %self.chain_id(),
657 num_recipients = %heights_by_recipient.len()
658 ))]
659 async fn create_cross_chain_requests(
660 &self,
661 heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
662 ) -> Result<Vec<CrossChainRequest>, WorkerError> {
663 let heights = heights_by_recipient
665 .values()
666 .flatten()
667 .copied()
668 .collect::<BTreeSet<_>>();
669 let hashes = self
670 .chain
671 .block_hashes_for_heights(heights.iter().copied())
672 .await?;
673
674 let blocks = self.read_confirmed_blocks(&hashes).await?;
675
676 let mut height_to_blocks = HashMap::new();
677 for (block, hash) in blocks.into_iter().zip(hashes) {
678 let block = block.ok_or_else(|| WorkerError::ReadCertificatesError(vec![hash]))?;
679 height_to_blocks.insert(block.height(), block);
680 }
681
682 let sender = self.chain.chain_id();
683 let mut cross_chain_requests = Vec::new();
684 for (recipient, heights) in heights_by_recipient {
685 let previous_height = heights.first().and_then(|first_height| {
689 let block = height_to_blocks.get(first_height)?;
690 let (_, prev_height) =
691 block.block().body.previous_message_blocks.get(&recipient)?;
692 Some(*prev_height)
693 });
694 let mut bundles = Vec::new();
695 let mut bundles_size = 0;
696 for height in heights {
697 let Some(confirmed_block) = height_to_blocks.get(&height) else {
698 tracing::warn!(
699 %height,
700 %recipient,
701 "spurious entry in outbox; skipping this and higher sender blocks"
702 );
703 break;
704 };
705 let new_bundles = confirmed_block
706 .block()
707 .message_bundles_for(recipient, confirmed_block.inner().hash())
708 .collect::<Vec<_>>();
709 let new_size = new_bundles
710 .iter()
711 .map(|(_epoch, bundle)| bundle.estimated_size())
712 .sum::<usize>();
713 if bundles_size + new_size > self.config.cross_chain_message_chunk_limit {
716 if bundles.is_empty() {
717 warn!(
718 "Single block at height {height} produces an UpdateRecipient \
719 of ~{new_size} bytes, exceeding the chunk limit of {}",
720 self.config.cross_chain_message_chunk_limit
721 );
722 } else {
723 debug!(
724 "Stopping cross-chain batch for {recipient} at height {height}: \
725 adding ~{new_size} bytes would exceed chunk limit of {} \
726 (current batch ~{bundles_size} bytes)",
727 self.config.cross_chain_message_chunk_limit
728 );
729 break;
730 }
731 }
732 bundles.extend(new_bundles);
733 bundles_size += new_size;
734 }
735 if !bundles.is_empty() {
736 cross_chain_requests.push(CrossChainRequest::UpdateRecipient {
737 sender,
738 recipient,
739 bundles,
740 previous_height,
741 });
742 }
743 }
744 Ok(cross_chain_requests)
745 }
746
747 #[instrument(skip_all, fields(
749 chain_id = %self.chain_id(),
750 height = %certificate.inner().height()
751 ))]
752 pub(crate) async fn process_timeout(
753 &mut self,
754 certificate: TimeoutCertificate,
755 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
756 self.initialize_and_save_if_needed().await?;
759 let (chain_epoch, committee) = self.chain.current_committee().await?;
760 certificate.check(&committee)?;
761 if self
762 .chain
763 .tip_state
764 .get()
765 .already_validated_block(certificate.inner().height())?
766 {
767 return Ok((self.chain_info_response().await?, NetworkActions::default()));
768 }
769 ensure!(
770 certificate.inner().epoch() == chain_epoch,
771 WorkerError::InvalidEpoch {
772 chain_id: certificate.inner().chain_id(),
773 chain_epoch,
774 epoch: certificate.inner().epoch()
775 }
776 );
777 let old_round = self.chain.manager.current_round();
778 self.chain
779 .manager
780 .handle_timeout_certificate(certificate, self.storage.clock().current_time());
781 self.save().await?;
782 let actions = self.create_network_actions(Some(old_round)).await?;
783 Ok((self.chain_info_response().await?, actions))
784 }
785
786 #[instrument(skip_all, fields(
791 chain_id = %self.chain_id(),
792 block_height = %proposal.content.block.height
793 ))]
794 async fn load_proposal_blobs(
795 &mut self,
796 proposal: &BlockProposal,
797 ) -> Result<Vec<Blob>, WorkerError> {
798 let owner = proposal.owner();
799 let BlockProposal {
800 content:
801 ProposalContent {
802 block,
803 round,
804 outcome: _,
805 },
806 original_proposal,
807 signature: _,
808 } = proposal;
809
810 let mut maybe_blobs = self
811 .maybe_get_required_blobs(proposal.required_blob_ids(), None)
812 .await?;
813 let missing_blob_ids = missing_blob_ids(&maybe_blobs);
814 if !missing_blob_ids.is_empty() {
815 let chain = &mut self.chain;
816 if chain.ownership().await?.open_multi_leader_rounds {
817 chain.pending_proposed_blobs.clear();
819 }
820 let validated = matches!(original_proposal, Some(OriginalProposal::Regular { .. }));
821 chain
822 .pending_proposed_blobs
823 .try_load_entry_mut(&owner)
824 .await?
825 .update(*round, validated, maybe_blobs)?;
826 self.save().await?;
827 return Err(WorkerError::BlobsNotFound(missing_blob_ids));
828 }
829 let published_blobs = block
830 .published_blob_ids()
831 .iter()
832 .filter_map(|blob_id| maybe_blobs.remove(blob_id).flatten())
833 .collect::<Vec<_>>();
834 Ok(published_blobs)
835 }
836
837 #[instrument(skip_all, fields(
839 chain_id = %self.chain_id(),
840 block_height = %certificate.block().header.height
841 ))]
842 pub(crate) async fn process_validated_block(
843 &mut self,
844 certificate: ValidatedBlockCertificate,
845 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
846 let block = certificate.block();
847
848 let header = &block.header;
849 let height = header.height;
850 self.initialize_and_save_if_needed().await?;
853 let tip_state = self.chain.tip_state.get();
854 ensure!(
855 header.height == tip_state.next_block_height,
856 ChainError::UnexpectedBlockHeight {
857 expected_block_height: tip_state.next_block_height,
858 found_block_height: header.height,
859 }
860 );
861 let (epoch, committee) = self.chain.current_committee().await?;
862 check_block_epoch(epoch, header.chain_id, header.epoch)?;
863 certificate.check(&committee)?;
864 let already_committed_block = self.chain.tip_state.get().already_validated_block(height)?;
865 let should_skip_validated_block = || {
866 self.chain
867 .manager
868 .check_validated_block(&certificate)
869 .map(|outcome| outcome == manager::Outcome::Skip)
870 };
871 if already_committed_block || should_skip_validated_block()? {
872 return Ok((
874 self.chain_info_response().await?,
875 NetworkActions::default(),
876 BlockOutcome::Skipped,
877 ));
878 }
879
880 self.block_values
881 .insert_hashed(Cow::Borrowed(certificate.inner().inner()));
882 let required_blob_ids = block.required_blob_ids();
883 let maybe_blobs = self
884 .maybe_get_required_blobs(required_blob_ids, Some(block.created_blobs()))
885 .await?;
886 let missing_blob_ids = missing_blob_ids(&maybe_blobs);
887 if !missing_blob_ids.is_empty() {
888 self.chain
889 .pending_validated_blobs
890 .update(certificate.round, true, maybe_blobs)?;
891 self.save().await?;
892 return Err(WorkerError::BlobsNotFound(missing_blob_ids));
893 }
894 let blobs = maybe_blobs
895 .into_iter()
896 .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
897 .collect();
898 let old_round = self.chain.manager.current_round();
899 self.chain.manager.create_final_vote(
900 certificate,
901 self.config.key_pair(),
902 self.storage.clock().current_time(),
903 blobs,
904 )?;
905 self.save().await?;
906 let actions = self.create_network_actions(Some(old_round)).await?;
907 Ok((
908 self.chain_info_response().await?,
909 actions,
910 BlockOutcome::Processed,
911 ))
912 }
913
914 #[instrument(skip_all, fields(
916 chain_id = %certificate.block().header.chain_id,
917 height = %certificate.block().header.height,
918 block_hash = %certificate.hash(),
919 ))]
920 pub(crate) async fn process_confirmed_block(
921 &mut self,
922 certificate: ConfirmedBlockCertificate,
923 mode: ProcessConfirmedBlockMode,
924 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
925 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
926 let block = certificate.block();
927 let block_hash = certificate.hash();
928 let height = block.header.height;
929 let chain_id = block.header.chain_id;
930
931 let in_trust_set = self
939 .chain
940 .pre_checkpoint_block_trust
941 .contains(&block_hash)
942 .await?;
943 if in_trust_set {
944 self.chain.pre_checkpoint_block_trust.remove(&block_hash)?;
945 }
946
947 let tip = self.chain.tip_state.get().clone();
949 if !in_trust_set && tip.next_block_height > height {
950 let actions = self.create_network_actions(None).await?;
951 self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
952 .await;
953 return Ok((
954 self.chain_info_response().await?,
955 actions,
956 BlockOutcome::Skipped,
957 ));
958 }
959
960 let committee = self.committee_for_epoch(block.header.epoch).await?;
962 certificate.check(&committee)?;
963
964 let required_blob_ids = block.required_blob_ids();
968 let blobs_result = self
969 .get_required_blobs(required_blob_ids.iter().copied(), block.created_blobs())
970 .await
971 .map(|blobs| blobs.into_values().collect::<Vec<_>>());
972
973 if let Ok(blobs) = &blobs_result {
974 self.storage
975 .write_blobs_and_certificate(blobs, &certificate)
976 .await?;
977 let events = block
978 .body
979 .events
980 .iter()
981 .flatten()
982 .map(|event| (event.id(chain_id), event.value.clone()));
983 self.storage.write_events(events).await?;
984 }
985
986 let blob_state = certificate.value().to_blob_state(blobs_result.is_ok());
988 let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
989 self.storage
990 .maybe_write_blob_states(&blob_ids, blob_state)
991 .await?;
992
993 let blobs = blobs_result?
994 .into_iter()
995 .map(|blob| (blob.id(), blob))
996 .collect::<BTreeMap<_, _>>();
997
998 use ProcessConfirmedBlockMode::{Auto, Execute, Preprocess};
1006 let gap = tip.next_block_height != height;
1007 let starts_with_checkpoint = block.starts_with_checkpoint();
1008 match (mode, gap, starts_with_checkpoint) {
1009 (Preprocess, _, _) | (Auto, true, false) => {
1010 self.preprocess_certified_block(certificate, notify_when_messages_are_delivered)
1011 .await
1012 }
1013 (Execute, true, false) => Err(WorkerError::InvalidBlockChaining),
1014 (Auto | Execute, true, true) => {
1015 self.execute_block_with_checkpoint_restore(
1016 certificate,
1017 blobs,
1018 notify_when_messages_are_delivered,
1019 )
1020 .await
1021 }
1022 (Auto | Execute, false, _) => {
1023 self.execute_contiguous_block(
1024 certificate,
1025 blobs,
1026 tip,
1027 notify_when_messages_are_delivered,
1028 )
1029 .await
1030 }
1031 }
1032 }
1033
1034 async fn preprocess_certified_block(
1037 &mut self,
1038 certificate: ConfirmedBlockCertificate,
1039 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1040 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1041 let block_hash = certificate.hash();
1042 let block = certificate.block();
1043 let chain_id = block.header.chain_id;
1044 let height = block.header.height;
1045
1046 let tracked = self.reconcile_tracked_outboxes().await?;
1047 let updated_event_streams = self
1048 .chain
1049 .preprocess_block(certificate.value(), tracked.as_deref().map(|h| h.inner()))
1050 .await?;
1051 self.save().await?;
1052 let mut actions = self.create_network_actions(None).await?;
1053 if !updated_event_streams.is_empty() {
1054 actions.notifications.push(Notification {
1055 chain_id,
1056 reason: Reason::NewEvents {
1057 height,
1058 block_hash,
1059 event_streams: updated_event_streams,
1060 },
1061 });
1062 }
1063 trace!("Preprocessed confirmed block {height}");
1064 self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
1065 .await;
1066 Ok((
1067 self.chain_info_response().await?,
1068 actions,
1069 BlockOutcome::Preprocessed,
1070 ))
1071 }
1072
1073 async fn execute_block_with_checkpoint_restore(
1078 &mut self,
1079 certificate: ConfirmedBlockCertificate,
1080 blobs: BTreeMap<BlobId, Blob>,
1081 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1082 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1083 let (bytes, chain_id, height, previous_block_hash, outbox_block_hashes, inbox_cursors) = {
1084 let block = certificate.block();
1085 let Some(OracleResponse::Checkpoint {
1086 execution_state_blobs,
1087 outbox_block_hashes,
1088 inbox_cursors,
1089 ..
1090 }) = block.body.oracle_responses.first().and_then(|r| r.first())
1091 else {
1092 return Err(ChainError::InternalError(
1093 "Checkpoint block missing OracleResponse::Checkpoint".into(),
1094 )
1095 .into());
1096 };
1097 let mut bytes = Vec::new();
1098 let mut missing = Vec::new();
1099 for hash in execution_state_blobs {
1100 let blob_id = BlobId::new(*hash, BlobType::CheckpointExecutionState);
1101 match blobs.get(&blob_id) {
1102 Some(blob) => bytes.extend_from_slice(blob.bytes()),
1103 None => missing.push(blob_id),
1104 }
1105 }
1106 ensure!(missing.is_empty(), WorkerError::BlobsNotFound(missing));
1107 (
1108 bytes,
1109 block.header.chain_id,
1110 block.header.height,
1111 block.header.previous_block_hash,
1112 outbox_block_hashes.clone(),
1113 inbox_cursors.clone(),
1114 )
1115 };
1116 let mut missing_blocks = Vec::new();
1124 for hash in &outbox_block_hashes {
1125 if !self.storage.contains_certificate(*hash).await? {
1126 missing_blocks.push(*hash);
1127 }
1128 }
1129 if !missing_blocks.is_empty() {
1130 for hash in &missing_blocks {
1131 self.chain.pre_checkpoint_block_trust.insert(hash)?;
1132 }
1133 self.save().await?;
1134 return Err(WorkerError::BlocksNotFound(missing_blocks));
1135 }
1136 self.chain
1137 .execution_state
1138 .restore_from_content(&bytes)
1139 .await?;
1140 self.chain = self.storage.load_chain(chain_id).await?;
1143 let heights = self.chain.collect_unfinalized_heights().await?;
1151 ensure!(
1152 heights.len() == outbox_block_hashes.len(),
1153 ChainError::InternalError(format!(
1154 "checkpoint oracle response has {} outbox block hashes but the \
1155 restored state references {} distinct heights",
1156 outbox_block_hashes.len(),
1157 heights.len(),
1158 ))
1159 );
1160 for (height, hash) in heights.into_iter().zip(outbox_block_hashes) {
1161 self.chain.block_hashes.insert(&height, hash)?;
1162 }
1163 let tracked = self.tracked_full_chains();
1170 self.chain
1171 .restore_outboxes_from_unfinalized(tracked.as_deref())
1172 .await?;
1173 for (origin, cursor) in inbox_cursors {
1174 let mut inbox = self.chain.inboxes.try_load_entry_mut(&origin).await?;
1175 inbox.restore_from_checkpoint(cursor).await?;
1176 }
1177 let new_tip = ChainTipState {
1188 block_hash: previous_block_hash,
1189 next_block_height: height,
1190 ..Default::default()
1191 };
1192 self.chain.tip_state.set(new_tip.clone());
1193 self.save().await?;
1194 self.execute_contiguous_block(
1195 certificate,
1196 blobs,
1197 new_tip,
1198 notify_when_messages_are_delivered,
1199 )
1200 .await
1201 }
1202
1203 async fn execute_contiguous_block(
1206 &mut self,
1207 certificate: ConfirmedBlockCertificate,
1208 mut blobs: BTreeMap<BlobId, Blob>,
1209 tip: ChainTipState,
1210 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1211 ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1212 let block_hash = certificate.hash();
1213 let block = certificate.block();
1214 let chain_id = block.header.chain_id;
1215 let height = block.header.height;
1216
1217 ensure!(
1219 tip.block_hash == block.header.previous_block_hash,
1220 WorkerError::InvalidBlockChaining
1221 );
1222
1223 self.initialize_and_save_if_needed().await?;
1226 let (epoch, _) = self.chain.current_committee().await?;
1227 check_block_epoch(epoch, chain_id, block.header.epoch)?;
1228
1229 let published_blobs = block
1230 .published_blob_ids()
1231 .iter()
1232 .filter_map(|blob_id| blobs.remove(blob_id))
1233 .collect::<Vec<_>>();
1234
1235 let local_time = self.storage.clock().current_time();
1236 if block.header.timestamp.duration_since(local_time) > self.config.block_time_grace_period {
1237 warn!(
1238 block_timestamp = %block.header.timestamp,
1239 %local_time,
1240 "Confirmed block has a timestamp in the future beyond the block time grace period"
1241 );
1242 }
1243 let tracked = self.reconcile_tracked_outboxes().await?;
1244 let chain = &mut self.chain;
1245 chain
1246 .remove_bundles_from_inboxes(
1247 block.header.timestamp,
1248 false,
1249 block.body.incoming_bundles(),
1250 )
1251 .await?;
1252 let confirmed_block = if let Some(mut execution_state) = self
1253 .execution_state_cache
1254 .as_ref()
1255 .and_then(|cache| cache.remove(&block_hash))
1256 {
1257 chain.execution_state = execution_state
1258 .with_context(|ctx| {
1259 chain
1260 .execution_state
1261 .context()
1262 .clone_with_base_key(ctx.base_key().bytes.clone())
1263 })
1264 .await;
1265 certificate.into_value()
1266 } else {
1267 let (proposed_block, outcome) = certificate.into_value().into_block().into_proposal();
1268 let oracle_responses = Some(outcome.oracle_responses.clone());
1269 let (proposed_block, verified, _resource_tracker, _) = chain
1270 .execute_block(
1271 proposed_block,
1272 local_time,
1273 None,
1274 &published_blobs,
1275 oracle_responses,
1276 BundleExecutionPolicy::committed(),
1277 )
1278 .await?;
1279 if outcome != verified {
1281 return Err(ChainError::CorruptedChainState(format!(
1282 "computed block outcome differs from the certificate.\n\
1283 Computed: {verified:#?}\n\
1284 Submitted: {outcome:#?}"
1285 ))
1286 .into());
1287 }
1288 ConfirmedBlock::new(Block::new(proposed_block, verified))
1289 };
1290
1291 let updated_streams = chain
1292 .apply_confirmed_block(
1293 &confirmed_block,
1294 local_time,
1295 tracked.as_deref().map(|h| h.inner()),
1296 )
1297 .await?;
1298 let mut actions = self.create_network_actions(None).await?;
1299 trace!("Processed confirmed block {height}");
1300 actions.notifications.push(Notification {
1301 chain_id,
1302 reason: Reason::NewBlock {
1303 height,
1304 hash: block_hash,
1305 },
1306 });
1307 if !updated_streams.is_empty() {
1308 actions.notifications.push(Notification {
1309 chain_id,
1310 reason: Reason::NewEvents {
1311 height,
1312 block_hash,
1313 event_streams: updated_streams,
1314 },
1315 });
1316 }
1317 self.save().await?;
1318
1319 self.block_values
1320 .insert_hashed(Cow::Owned(confirmed_block.into_inner()));
1321
1322 self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
1323 .await;
1324
1325 Ok((
1326 self.chain_info_response().await?,
1327 actions,
1328 BlockOutcome::Processed,
1329 ))
1330 }
1331
1332 #[instrument(level = "trace", skip(self, notify_when_messages_are_delivered))]
1335 async fn register_delivery_notifier(
1336 &self,
1337 height: BlockHeight,
1338 actions: &NetworkActions,
1339 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1340 ) {
1341 if let Some(notifier) = notify_when_messages_are_delivered {
1342 if actions
1343 .cross_chain_requests
1344 .iter()
1345 .any(|request| request.has_messages_lower_or_equal_than(height))
1346 {
1347 self.delivery_notifier.register(height, notifier);
1348 } else {
1349 if let Err(()) = notifier.send(()) {
1352 debug!("Failed to notify message delivery to caller (early case)");
1353 }
1354 }
1355 }
1356 }
1357
1358 #[instrument(level = "debug", skip(self, bundles), fields(chain_id = %self.chain_id()))]
1360 pub(crate) async fn process_cross_chain_update(
1361 &mut self,
1362 origin: ChainId,
1363 bundles: Vec<(Epoch, MessageBundle)>,
1364 sender_previous_height: Option<BlockHeight>,
1365 ) -> Result<CrossChainUpdateResult, WorkerError> {
1366 let mut inbox = self.chain.inboxes.try_load_entry_mut(&origin).await?;
1368 let next_height_to_receive = inbox.next_block_height_to_receive()?;
1369 let last_anticipated_block_height = inbox
1370 .removed_bundles
1371 .back()
1372 .await?
1373 .map(|bundle| bundle.height);
1374
1375 if let Some(prev) = sender_previous_height {
1378 if prev >= next_height_to_receive {
1379 let chain_id = self.chain_id();
1380 if self.config.allow_revert_confirm && self.config.recovery_allowed_for(&chain_id) {
1381 warn!(
1382 %chain_id,
1383 "Inbox gap detected from {origin}: \
1384 sender declares previous height {prev} but we only have up to \
1385 {next_height_to_receive}; requesting resend",
1386 );
1387 return Ok(CrossChainUpdateResult::GapDetected {
1388 origin,
1389 retransmit_from: next_height_to_receive,
1390 });
1391 }
1392 return Err(ChainError::InboxGapDetected {
1393 chain_id,
1394 origin,
1395 expected_height: prev,
1396 actual_height: bundles.first().map(|(_, b)| b.height).unwrap_or_default(),
1397 }
1398 .into());
1399 }
1400 }
1401
1402 let bundles = self
1403 .select_message_bundles(
1404 &origin,
1405 next_height_to_receive,
1406 last_anticipated_block_height,
1407 bundles,
1408 )
1409 .await?;
1410 let Some(last_updated_height) = bundles.last().map(|bundle| bundle.height) else {
1411 return Ok(CrossChainUpdateResult::NothingToDo);
1412 };
1413 let local_time = self.storage.clock().current_time();
1415 let mut previous_height = None;
1416 for bundle in bundles {
1417 let add_to_received_log = previous_height != Some(bundle.height);
1418 previous_height = Some(bundle.height);
1419 self.chain
1421 .receive_message_bundle_with_inbox(
1422 &mut inbox,
1423 &origin,
1424 bundle,
1425 local_time,
1426 add_to_received_log,
1427 )
1428 .await?;
1429 }
1430 inbox.observe_size_metric();
1431 drop(inbox);
1432 if !self.config.allow_inactive_chains && !self.chain.is_active().await? {
1433 warn!(
1437 chain_id = %self.chain_id(),
1438 "Refusing to deliver messages from {origin} \
1439 at height {last_updated_height} because the recipient is still inactive",
1440 );
1441 return Ok(CrossChainUpdateResult::NothingToDo);
1442 }
1443 Ok(CrossChainUpdateResult::Updated(last_updated_height))
1444 }
1445
1446 #[instrument(skip_all, fields(
1448 chain_id = %self.chain_id(),
1449 %recipient,
1450 %latest_height
1451 ))]
1452 pub(crate) async fn confirm_updated_recipient(
1453 &mut self,
1454 recipient: ChainId,
1455 latest_height: BlockHeight,
1456 ) -> Result<bool, WorkerError> {
1457 let tracked = self.reconcile_tracked_outboxes().await?;
1460 Ok(self
1463 .chain
1464 .mark_messages_as_received(
1465 &recipient,
1466 latest_height,
1467 tracked.as_deref().map(|h| h.inner()),
1468 )
1469 .await?
1470 && self.chain.all_messages_delivered_up_to(latest_height))
1471 }
1472
1473 pub(crate) fn notify_delivery(&self, height: BlockHeight) {
1475 self.delivery_notifier.notify(height);
1476 }
1477
1478 pub(crate) async fn process_batch(&mut self, requests: Vec<BatchRequest>) {
1483 let mut update_results = Vec::new();
1484 let mut confirm_results = Vec::new();
1485 let mut need_save = false;
1486 let mut need_rollback = false;
1487 let mut max_delivered_height: Option<BlockHeight> = None;
1488
1489 for request in requests {
1490 match request {
1491 BatchRequest::Update {
1492 origin,
1493 bundles,
1494 previous_height,
1495 result_sender,
1496 } => {
1497 if need_rollback {
1498 send_result(result_sender, Err(WorkerError::BatchRolledBack));
1499 continue;
1500 }
1501 let result = self
1502 .process_cross_chain_update(origin, bundles, previous_height)
1503 .await;
1504 let update_result = match result {
1505 Ok(update_result) => update_result,
1506 Err(error) => {
1507 need_rollback = true;
1508 send_result(result_sender, Err(error));
1509 continue;
1510 }
1511 };
1512 match &update_result {
1513 CrossChainUpdateResult::Updated(_) => need_save = true,
1514 CrossChainUpdateResult::GapDetected { .. }
1515 | CrossChainUpdateResult::NothingToDo => {}
1516 }
1517 update_results.push((result_sender, update_result));
1518 }
1519 BatchRequest::Confirm {
1520 recipient,
1521 latest_height,
1522 result_sender,
1523 } => {
1524 if need_rollback {
1525 send_result(result_sender, Err(WorkerError::BatchRolledBack));
1526 continue;
1527 }
1528 match self
1529 .confirm_updated_recipient(recipient, latest_height)
1530 .await
1531 {
1532 Ok(fully_delivered) => {
1533 need_save = true;
1534 if fully_delivered {
1535 max_delivered_height = Some(
1536 max_delivered_height
1537 .map_or(latest_height, |h| h.max(latest_height)),
1538 );
1539 }
1540 confirm_results.push((result_sender, recipient));
1541 }
1542 Err(error) => {
1543 need_rollback = true;
1544 send_result(result_sender, Err(error));
1545 }
1546 }
1547 }
1548 }
1549 }
1550 if !need_rollback && need_save {
1551 if let Err(error) = self.save().await {
1552 tracing::error!(%error, "failed to save batch; rolling back");
1553 need_rollback = true;
1554 }
1555 }
1556 if need_rollback {
1557 for (result_sender, _) in update_results {
1558 send_result(result_sender, Err(WorkerError::BatchRolledBack));
1559 }
1560 for (result_sender, _) in confirm_results {
1561 send_result(result_sender, Err(WorkerError::BatchRolledBack));
1562 }
1563 return;
1564 }
1565
1566 if let Some(height) = max_delivered_height {
1567 self.notify_delivery(height);
1568 }
1569
1570 for (result_sender, update_result) in update_results {
1571 send_result(result_sender, Ok(update_result));
1572 }
1573 for (result_sender, recipient) in confirm_results {
1574 let result = self
1575 .create_cross_chain_actions_for_recipient(recipient)
1576 .await;
1577 send_result(result_sender, result);
1578 }
1579 }
1580
1581 #[instrument(skip_all, fields(
1586 chain_id = %self.chain_id(),
1587 %recipient,
1588 %retransmit_from,
1589 ))]
1590 pub(crate) async fn handle_revert_confirm(
1591 &mut self,
1592 recipient: ChainId,
1593 retransmit_from: BlockHeight,
1594 ) -> Result<NetworkActions, WorkerError> {
1595 self.reconcile_tracked_outboxes().await?;
1596 let Some(latest_height) = self
1599 .chain
1600 .execution_state
1601 .previous_message_blocks
1602 .get(&recipient)
1603 .await?
1604 else {
1605 warn!("RevertConfirm: no record of sending to {recipient}");
1606 return Ok(NetworkActions::default());
1607 };
1608
1609 let mut heights_to_re_add = Vec::new();
1610 let mut current_height = latest_height;
1611 while current_height >= retransmit_from {
1612 heights_to_re_add.push(current_height);
1616 let hash = match &*self
1618 .chain
1619 .block_hashes_for_heights([current_height])
1620 .await?
1621 {
1622 [hash] => *hash,
1623 _ => {
1624 return Err(WorkerError::BlockHashNotFound {
1625 height: current_height,
1626 chain_id: self.chain_id(),
1627 })
1628 }
1629 };
1630 let block = self
1631 .read_confirmed_blocks(&[hash])
1632 .await?
1633 .pop()
1634 .flatten()
1635 .ok_or_else(|| WorkerError::LocalBlockNotFound {
1636 height: current_height,
1637 chain_id: self.chain_id(),
1638 })?;
1639 match block.block().body.previous_message_blocks.get(&recipient) {
1640 Some((_, prev_height)) if *prev_height >= retransmit_from => {
1641 current_height = *prev_height;
1642 }
1643 _ => break,
1644 }
1645 }
1646
1647 let new_heights = self
1649 .chain
1650 .outboxes
1651 .try_load_entry_mut(&recipient)
1652 .await?
1653 .revert(&heights_to_re_add)
1654 .await?;
1655
1656 if new_heights.is_empty() {
1657 debug!("RevertConfirm: all heights already in outbox for {recipient}");
1658 return Ok(NetworkActions::default());
1659 }
1660
1661 let new_heights_len = new_heights.len();
1664 if self.is_tracked(&recipient) {
1665 for h in new_heights {
1666 *self.chain.outbox_counters.get_mut().entry(h).or_default() += 1;
1667 }
1668 self.chain.nonempty_outboxes.get_mut().insert(recipient);
1669 }
1670
1671 let actions = self
1673 .create_cross_chain_actions_for_recipient(recipient)
1674 .await?;
1675
1676 self.save().await?;
1678
1679 warn!(
1680 "RevertConfirm: re-added {new_heights_len} heights to outbox for {recipient}, \
1681 starting from height {retransmit_from}"
1682 );
1683
1684 Ok(actions)
1685 }
1686
1687 pub(crate) async fn maybe_reset_corrupted_chain_state(
1691 &mut self,
1692 ) -> Result<Option<Vec<CrossChainRequest>>, WorkerError> {
1693 let Some(min_duration) = self.config.reset_on_corrupted_chain_state else {
1694 return Ok(None);
1695 };
1696 let chain_id = self.chain_id();
1697 if !self.config.recovery_allowed_for(&chain_id) {
1698 return Ok(None);
1699 }
1700 let local_time = self.storage.clock().current_time();
1701 let block_zero_time = *self.chain.block_zero_executed_at.get();
1702 let elapsed = local_time.duration_since(block_zero_time);
1703 if elapsed < min_duration {
1704 warn!(
1705 %chain_id, ?elapsed, ?min_duration,
1706 "Not resetting corrupted chain state; not enough time elapsed \
1707 since last block 0 execution"
1708 );
1709 return Ok(None);
1710 }
1711 warn!(%chain_id, "Corrupted chain state detected; resetting and re-executing");
1712 Ok(Some(self.reset_and_reexecute_chain().await?))
1713 }
1714
1715 #[instrument(skip_all, fields(
1719 chain_id = %self.chain_id(),
1720 ))]
1721 pub(crate) async fn reset_and_reexecute_chain(
1722 &mut self,
1723 ) -> Result<Vec<CrossChainRequest>, WorkerError> {
1724 let chain_id = self.chain_id();
1725 let tip_height = self.chain.tip_state.get().next_block_height;
1726
1727 let sender_ids = self.chain.inboxes.indices().await?;
1729 let block_hashes = self.chain.block_hashes.index_values().await?;
1730
1731 let manager_snapshot = ManagerSafetySnapshot::capture(&self.chain.manager).await?;
1734
1735 self.wipe_and_reload_chain().await?;
1744 self.knows_chain_is_active = false;
1745 warn!(
1746 %chain_id,
1747 "Cleared chain state up to height {tip_height}; \
1748 re-executing all blocks"
1749 );
1750
1751 for (height, hash) in block_hashes {
1753 let cert = self
1754 .storage
1755 .read_certificate(hash)
1756 .await?
1757 .map(CacheArc::unwrap_or_clone)
1758 .ok_or_else(|| WorkerError::LocalBlockNotFound { height, chain_id })?;
1759 Box::pin(self.process_confirmed_block(cert, ProcessConfirmedBlockMode::Execute, None))
1760 .await?;
1761 }
1762
1763 let new_tip_height = self.chain.tip_state.get().next_block_height;
1771 if new_tip_height == tip_height {
1772 manager_snapshot.restore(&mut self.chain.manager)?;
1773 self.save().await?;
1774 } else {
1775 warn!(
1776 %tip_height, %new_tip_height,
1777 "Dropping manager snapshot: pre-reset tip differs from post-reset tip"
1778 );
1779 }
1780
1781 let revert_requests = sender_ids
1784 .into_iter()
1785 .map(|sender| CrossChainRequest::RevertConfirm {
1786 sender,
1787 recipient: chain_id,
1788 retransmit_from: BlockHeight::ZERO,
1789 })
1790 .collect::<Vec<_>>();
1791
1792 warn!(
1793 tip_height = %self.chain.tip_state.get().next_block_height,
1794 num_revert_confirms = revert_requests.len(),
1795 "Chain reset and re-executed; sending RevertConfirm to senders"
1796 );
1797
1798 Ok(revert_requests)
1799 }
1800
1801 #[instrument(skip_all, fields(
1802 chain_id = %self.chain_id(),
1803 num_trackers = %new_trackers.len()
1804 ))]
1805 pub(crate) async fn update_received_certificate_trackers(
1806 &mut self,
1807 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1808 ) -> Result<(), WorkerError> {
1809 self.chain
1810 .update_received_certificate_trackers(new_trackers);
1811 self.save().await?;
1812 Ok(())
1813 }
1814
1815 #[instrument(skip_all, fields(
1817 chain_id = %self.chain_id(),
1818 start = %start,
1819 end = %end
1820 ))]
1821 pub(crate) async fn get_preprocessed_block_hashes(
1822 &self,
1823 start: BlockHeight,
1824 end: BlockHeight,
1825 ) -> Result<Vec<CryptoHash>, WorkerError> {
1826 let mut hashes = Vec::new();
1827 let mut height = start;
1828 while height < end {
1829 match self.chain.block_hashes.get(&height).await? {
1830 Some(hash) => hashes.push(hash),
1831 None => break,
1832 }
1833 height = height.try_add_one()?;
1834 }
1835 Ok(hashes)
1836 }
1837
1838 #[instrument(skip_all, fields(
1840 chain_id = %self.chain_id(),
1841 origin = %origin
1842 ))]
1843 pub(crate) async fn get_inbox_next_height(
1844 &self,
1845 origin: ChainId,
1846 ) -> Result<BlockHeight, WorkerError> {
1847 Ok(match self.chain.inboxes.try_load_entry(&origin).await? {
1848 Some(inbox) => inbox.next_block_height_to_receive()?,
1849 None => BlockHeight::ZERO,
1850 })
1851 }
1852
1853 #[instrument(skip_all, fields(
1856 chain_id = %self.chain_id(),
1857 num_blob_ids = %blob_ids.len()
1858 ))]
1859 pub(crate) async fn get_locking_blobs(
1860 &self,
1861 blob_ids: Vec<BlobId>,
1862 ) -> Result<Option<Vec<Blob>>, WorkerError> {
1863 let results = self
1864 .chain
1865 .manager
1866 .locking_blobs
1867 .multi_get(&blob_ids)
1868 .await?;
1869 Ok(results.into_iter().collect())
1870 }
1871
1872 pub(crate) async fn get_block_hashes(
1874 &self,
1875 heights: Vec<BlockHeight>,
1876 ) -> Result<Vec<CryptoHash>, WorkerError> {
1877 Ok(self.chain.block_hashes_for_heights(heights).await?)
1878 }
1879
1880 pub(crate) async fn get_proposed_blobs(
1882 &self,
1883 blob_ids: Vec<BlobId>,
1884 ) -> Result<Vec<Blob>, WorkerError> {
1885 let results = self
1886 .chain
1887 .manager
1888 .proposed_blobs
1889 .multi_get(&blob_ids)
1890 .await?;
1891 let mut blobs = Vec::with_capacity(blob_ids.len());
1892 let mut missing = Vec::new();
1893 for (blob_id, maybe_blob) in blob_ids.into_iter().zip(results) {
1894 match maybe_blob {
1895 Some(blob) => blobs.push(blob),
1896 None => missing.push(blob_id),
1897 }
1898 }
1899 if !missing.is_empty() {
1900 return Err(WorkerError::BlobsNotFound(missing));
1901 }
1902 Ok(blobs)
1903 }
1904
1905 pub(crate) async fn get_event_subscriptions(
1907 &self,
1908 ) -> Result<EventSubscriptionsResult, WorkerError> {
1909 Ok(self
1910 .chain
1911 .execution_state
1912 .system
1913 .event_subscriptions
1914 .index_values()
1915 .await?)
1916 }
1917
1918 pub(crate) async fn get_next_expected_event(
1920 &self,
1921 stream_id: StreamId,
1922 ) -> Result<Option<u32>, WorkerError> {
1923 Ok(self.chain.next_expected_events.get(&stream_id).await?)
1924 }
1925
1926 pub(crate) async fn get_next_expected_events(
1928 &self,
1929 stream_ids: Vec<StreamId>,
1930 ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1931 let values = self
1932 .chain
1933 .next_expected_events
1934 .multi_get(&stream_ids)
1935 .await?;
1936 Ok(stream_ids
1937 .into_iter()
1938 .zip(values)
1939 .filter_map(|(id, val)| Some((id, val?)))
1940 .collect())
1941 }
1942
1943 pub(crate) async fn get_received_certificate_trackers(
1945 &self,
1946 ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1947 Ok(self.chain.received_certificate_trackers.get().clone())
1948 }
1949
1950 pub(crate) async fn get_tip_state_and_outbox_info(
1952 &self,
1953 receiver_id: ChainId,
1954 ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1955 let next_block_height = self.chain.tip_state.get().next_block_height;
1956 let next_height_to_schedule = self
1957 .chain
1958 .outboxes
1959 .try_load_entry(&receiver_id)
1960 .await?
1961 .map(|outbox| *outbox.next_height_to_schedule.get());
1962 Ok((next_block_height, next_height_to_schedule))
1963 }
1964
1965 pub(crate) fn get_next_height_to_preprocess(&self) -> BlockHeight {
1967 *self.chain.next_height_to_preprocess.get()
1968 }
1969
1970 #[instrument(skip_all, fields(
1972 chain_id = %self.chain_id(),
1973 height = %height,
1974 round = %round
1975 ))]
1976 async fn vote_for_leader_timeout(
1977 &mut self,
1978 height: BlockHeight,
1979 round: Round,
1980 ) -> Result<(), WorkerError> {
1981 let chain = &mut self.chain;
1982 ensure!(
1983 height == chain.tip_state.get().next_block_height,
1984 WorkerError::UnexpectedBlockHeight {
1985 expected_block_height: chain.tip_state.get().next_block_height,
1986 found_block_height: height
1987 }
1988 );
1989 let epoch = chain.execution_state.system.epoch.get();
1990 let chain_id = chain.chain_id();
1991 let key_pair = self.config.key_pair();
1992 let local_time = self.storage.clock().current_time();
1993 if chain
1994 .manager
1995 .create_timeout_vote(chain_id, height, round, *epoch, key_pair, local_time)?
1996 {
1997 self.save().await?;
1998 }
1999 Ok(())
2000 }
2001
2002 #[instrument(skip_all, fields(
2007 chain_id = %self.chain_id()
2008 ))]
2009 async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> {
2010 let chain = &mut self.chain;
2011 let epoch = *chain.execution_state.system.epoch.get();
2012 let Some(admin_chain_id) = chain.execution_state.system.admin_chain_id.get() else {
2013 return Ok(());
2014 };
2015
2016 let next_epoch_index = epoch.0.saturating_add(1);
2018 let event_id = EventId {
2019 chain_id: *admin_chain_id,
2020 stream_id: StreamId::system(EPOCH_STREAM_NAME),
2021 index: next_epoch_index,
2022 };
2023
2024 let Some(event_bytes) = self.storage.read_event(event_id).await? else {
2025 return Ok(()); };
2027
2028 let event_data: EpochEventData = bcs::from_bytes(&event_bytes)?;
2029 let elapsed = self
2030 .storage
2031 .clock()
2032 .current_time()
2033 .delta_since(event_data.timestamp);
2034 if elapsed >= chain.ownership().await?.timeout_config.fallback_duration {
2035 let chain_id = chain.chain_id();
2036 let height = chain.tip_state.get().next_block_height;
2037 let key_pair = self.config.key_pair();
2038 if chain
2039 .manager
2040 .vote_fallback(chain_id, height, epoch, key_pair)
2041 {
2042 self.save().await?;
2043 }
2044 }
2045 Ok(())
2046 }
2047
2048 #[instrument(skip_all, fields(
2049 chain_id = %self.chain_id(),
2050 blob_id = %blob.id()
2051 ))]
2052 pub(crate) async fn handle_pending_blob(
2053 &mut self,
2054 blob: Blob,
2055 ) -> Result<ChainInfoResponse, WorkerError> {
2056 let mut was_expected = self
2057 .chain
2058 .pending_validated_blobs
2059 .maybe_insert(&blob)
2060 .await?;
2061 for (_, mut pending_blobs) in self
2062 .chain
2063 .pending_proposed_blobs
2064 .try_load_all_entries_mut()
2065 .await?
2066 {
2067 if !pending_blobs.validated.get() {
2068 let (_, committee) = self.chain.current_committee().await?;
2069 let policy = committee.policy();
2070 policy
2071 .check_blob_size(blob.content())
2072 .with_execution_context(ChainExecutionContext::Block)?;
2073 ensure!(
2074 u64::try_from(pending_blobs.pending_blobs.iterative_count().await?)
2075 .is_ok_and(|count| count < policy.maximum_published_blobs),
2076 WorkerError::TooManyPublishedBlobs(policy.maximum_published_blobs)
2077 );
2078 }
2079 was_expected = was_expected || pending_blobs.maybe_insert(&blob).await?;
2080 }
2081 ensure!(was_expected, WorkerError::UnexpectedBlob);
2082 self.save().await?;
2083 self.chain_info_response().await
2084 }
2085
2086 #[cfg(with_testing)]
2091 #[instrument(skip_all, fields(
2092 chain_id = %self.chain_id(),
2093 height = %height
2094 ))]
2095 pub(crate) async fn read_certificate(
2096 &self,
2097 height: BlockHeight,
2098 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, WorkerError> {
2099 let certificate_hash = match self.chain.block_hashes.get(&height).await? {
2100 Some(hash) => hash,
2101 None => return Ok(None),
2102 };
2103 let certificate = self
2104 .storage
2105 .read_certificate(certificate_hash)
2106 .await?
2107 .ok_or(WorkerError::BlocksNotFound(vec![certificate_hash]))?;
2108 Ok(Some(certificate))
2109 }
2110
2111 #[instrument(skip_all, fields(
2113 chain_id = %self.chain_id(),
2114 query_application_id = %query.application_id()
2115 ))]
2116 pub(crate) async fn query_application(
2117 &mut self,
2118 query: Query,
2119 block_hash: Option<CryptoHash>,
2120 ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
2121 self.initialize_and_save_if_needed().await?;
2122 let next_block_height = self.chain.tip_state.get().next_block_height;
2123 let local_time = self.storage.clock().current_time();
2124 let cached_state = block_hash
2127 .zip(self.execution_state_cache.as_ref())
2128 .and_then(|(h, cache)| Some(h).zip(cache.remove(&h)));
2129 if let Some((requested_block, mut state)) = cached_state {
2130 let next_block_height = next_block_height
2131 .try_add_one()
2132 .expect("block height to not overflow");
2133 let context = QueryContext {
2134 chain_id: self.chain_id(),
2135 next_block_height,
2136 local_time,
2137 };
2138 let outcome = state
2139 .with_context(|ctx| {
2140 self.chain
2141 .execution_state
2142 .context()
2143 .clone_with_base_key(ctx.base_key().bytes.clone())
2144 })
2145 .await
2146 .query_application(context, query, self.service_runtime_endpoint.as_mut())
2147 .await
2148 .with_execution_context(ChainExecutionContext::Query)?;
2149 if let Some(cache) = &self.execution_state_cache {
2150 cache.insert(&requested_block, state);
2151 }
2152 Ok((outcome, next_block_height))
2153 } else {
2154 if block_hash.is_some() {
2155 tracing::debug!(
2156 "requested block hash not found in cache, querying committed state"
2157 );
2158 }
2159 let outcome = self
2160 .chain
2161 .query_application(local_time, query, self.service_runtime_endpoint.as_mut())
2162 .await?;
2163 Ok((outcome, next_block_height))
2164 }
2165 }
2166
2167 #[instrument(skip_all, fields(
2173 chain_id = %self.chain_id(),
2174 application_id = %application_id
2175 ))]
2176 pub(crate) async fn describe_application_readonly(
2177 &self,
2178 application_id: ApplicationId,
2179 ) -> Result<ApplicationDescription, WorkerError> {
2180 let blob_id = application_id.description_blob_id();
2181 let blob = self
2182 .storage
2183 .read_blob(blob_id)
2184 .await?
2185 .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))?;
2186 Ok(bcs::from_bytes(blob.bytes())?)
2187 }
2188
2189 #[instrument(skip_all, fields(
2195 chain_id = %self.chain_id(),
2196 block_height = %block.height
2197 ))]
2198 pub(crate) async fn stage_block_execution(
2199 &mut self,
2200 block: ProposedBlock,
2201 round: Option<u32>,
2202 published_blobs: &[Blob],
2203 policy: BundleExecutionPolicy,
2204 ) -> Result<
2205 (
2206 ProposedBlock,
2207 Block,
2208 ChainInfoResponse,
2209 ResourceTracker,
2210 HashSet<ChainId>,
2211 ),
2212 WorkerError,
2213 > {
2214 self.initialize_and_save_if_needed().await?;
2215 let local_time = self.storage.clock().current_time();
2216 let (_, committee) = self.chain.current_committee().await?;
2217 block.check_proposal_size(committee.policy().maximum_block_proposal_size)?;
2218
2219 self.chain
2220 .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
2221 .await?;
2222 let (executed_block, resource_tracker, never_reject_origins) =
2223 Box::pin(self.execute_block(block, local_time, round, published_blobs, policy)).await?;
2224
2225 let info = ChainInfo::from_chain_view(&mut self.chain).await?;
2227 let mut response = ChainInfoResponse::new(info, None);
2228 if let Some(owner) = executed_block.header.authenticated_owner {
2229 response.info.requested_owner_balance = self
2230 .chain
2231 .execution_state
2232 .system
2233 .balances
2234 .get(&owner)
2235 .await?;
2236 }
2237
2238 let (proposed_block, _) = executed_block.clone().into_proposal();
2239 Ok((
2240 proposed_block,
2241 executed_block,
2242 response,
2243 resource_tracker,
2244 never_reject_origins,
2245 ))
2246 }
2247
2248 #[instrument(skip_all, fields(
2255 chain_id = %self.chain_id(),
2256 block_height = %proposal.content.block.height
2257 ))]
2258 pub(crate) async fn handle_block_proposal(
2259 &mut self,
2260 proposal: BlockProposal,
2261 ) -> (Result<ChainInfoResponse, WorkerError>, NetworkActions) {
2262 let old_round = self.chain.manager.current_round();
2263 match self.try_handle_block_proposal(proposal).await {
2264 Ok((response, actions)) => (Ok(response), actions),
2265 Err(err) => {
2266 let actions = if self.chain.manager.current_round() != old_round {
2271 self.create_network_actions(Some(old_round))
2272 .await
2273 .unwrap_or_default()
2274 } else {
2275 NetworkActions::default()
2276 };
2277 (Err(err), actions)
2278 }
2279 }
2280 }
2281
2282 async fn try_handle_block_proposal(
2283 &mut self,
2284 proposal: BlockProposal,
2285 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
2286 self.initialize_and_save_if_needed().await?;
2287 proposal
2288 .check_invariants()
2289 .map_err(|msg| WorkerError::InvalidBlockProposal(msg.to_string()))?;
2290 proposal.check_signature()?;
2291 let owner = proposal.owner();
2292 let BlockProposal {
2293 content,
2294 original_proposal,
2295 signature: _,
2296 } = &proposal;
2297 let block = &content.block;
2298 let chain = &self.chain;
2299 chain.tip_state.get().verify_block_chaining(block)?;
2301 let (epoch, committee) = chain.current_committee().await?;
2303 check_block_epoch(epoch, block.chain_id, block.epoch)?;
2304 let policy = committee.policy().clone();
2305 block.check_proposal_size(policy.maximum_block_proposal_size)?;
2306 ensure!(
2308 chain.manager.can_propose(&owner, proposal.content.round),
2309 WorkerError::InvalidOwner
2310 );
2311 let old_round = self.chain.manager.current_round();
2312 match original_proposal {
2313 None => {
2314 if let Some(signer) = block.authenticated_owner {
2315 ensure!(signer == owner, WorkerError::InvalidSigner(owner));
2317 }
2318 }
2319 Some(OriginalProposal::Regular { certificate }) => {
2320 certificate.check(&committee)?;
2322 }
2323 Some(OriginalProposal::Fast(signature)) => {
2324 let original_proposal = BlockProposal {
2325 content: ProposalContent {
2326 block: content.block.clone(),
2327 round: Round::Fast,
2328 outcome: None,
2329 },
2330 signature: *signature,
2331 original_proposal: None,
2332 };
2333 let super_owner = original_proposal.owner();
2334 ensure!(
2335 chain
2336 .manager
2337 .ownership
2338 .get()
2339 .super_owners
2340 .contains(&super_owner),
2341 WorkerError::InvalidOwner
2342 );
2343 if let Some(signer) = block.authenticated_owner {
2344 ensure!(signer == super_owner, WorkerError::InvalidSigner(signer));
2346 }
2347 original_proposal.check_signature()?;
2348 }
2349 }
2350 let local_time = self.storage.clock().current_time();
2351 match chain.manager.check_proposed_block(&proposal) {
2352 Ok(manager::Outcome::Skip) => {
2353 return Ok((self.chain_info_response().await?, NetworkActions::default()));
2355 }
2356 Ok(manager::Outcome::Accept) => {}
2357 Err(err) => {
2358 if matches!(err, ChainError::HasIncompatibleConfirmedVote(_, _))
2366 && self
2367 .chain
2368 .manager
2369 .update_signed_proposal(&proposal, local_time)
2370 {
2371 self.save().await?;
2372 }
2373 return Err(err.into());
2374 }
2375 }
2376
2377 if self
2380 .chain
2381 .manager
2382 .update_signed_proposal(&proposal, local_time)
2383 {
2384 self.save().await?;
2385 }
2386
2387 let published_blobs = self.load_proposal_blobs(&proposal).await?;
2388 let ProposalContent {
2389 block,
2390 round,
2391 outcome,
2392 } = content;
2393
2394 if self.config.key_pair().is_some()
2395 && block.timestamp.duration_since(local_time) > self.config.block_time_grace_period
2396 {
2397 return Err(WorkerError::InvalidTimestamp {
2398 local_time,
2399 block_timestamp: block.timestamp,
2400 block_time_grace_period: self.config.block_time_grace_period,
2401 });
2402 }
2403 self.chain
2408 .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
2409 .await?;
2410 let block = if let Some(outcome) = outcome {
2411 outcome.clone().with(proposal.content.block.clone())
2412 } else {
2413 let (executed_block, _resource_tracker, _) = Box::pin(self.execute_block(
2414 block.clone(),
2415 local_time,
2416 round.multi_leader(),
2417 &published_blobs,
2418 BundleExecutionPolicy::committed(),
2419 ))
2420 .await?;
2421 executed_block
2422 };
2423
2424 ensure!(
2425 !round.is_fast() || !block.has_oracle_responses(),
2426 WorkerError::FastBlockUsingOracles
2427 );
2428 let chain = &mut self.chain;
2429 chain
2431 .tip_state
2432 .get_mut()
2433 .update_counters(&block.body.transactions, &block.body.messages)?;
2434 chain.rollback();
2436
2437 let blobs = self
2439 .get_required_blobs(proposal.expected_blob_ids(), block.created_blobs())
2440 .await?;
2441 let key_pair = self.config.key_pair();
2442 let manager = &mut self.chain.manager;
2443 match manager.create_vote(&proposal, block, key_pair, local_time, blobs)? {
2444 Some(Either::Left(vote)) => {
2446 self.block_values
2447 .insert_hashed(Cow::Borrowed(vote.value.inner()));
2448 }
2449 Some(Either::Right(vote)) => {
2450 self.block_values
2451 .insert_hashed(Cow::Borrowed(vote.value.inner()));
2452 }
2453 None => (),
2454 }
2455 self.save().await?;
2456 let actions = self.create_network_actions(Some(old_round)).await?;
2457 Ok((self.chain_info_response().await?, actions))
2458 }
2459
2460 #[instrument(skip_all, fields(
2462 chain_id = %self.chain_id()
2463 ))]
2464 async fn prepare_chain_info_response(
2465 &mut self,
2466 query: ChainInfoQuery,
2467 ) -> Result<ChainInfoResponse, WorkerError> {
2468 self.initialize_and_save_if_needed().await?;
2469 let mut info = ChainInfo::from_chain_view(&mut self.chain).await?;
2470 let chain = &self.chain;
2471 if query.request_owner_balance == AccountOwner::CHAIN {
2472 info.requested_owner_balance = Some(*chain.execution_state.system.balance.get());
2473 } else {
2474 info.requested_owner_balance = chain
2475 .execution_state
2476 .system
2477 .balances
2478 .get(&query.request_owner_balance)
2479 .await?;
2480 }
2481 if let Some(next_block_height) = query.test_next_block_height {
2482 ensure!(
2484 chain.tip_state.get().next_block_height == next_block_height,
2485 WorkerError::UnexpectedBlockHeight {
2486 expected_block_height: chain.tip_state.get().next_block_height,
2487 found_block_height: next_block_height,
2488 }
2489 );
2490 }
2491 if query.request_pending_message_bundles {
2492 let mut bundles = Vec::new();
2493 let nonempty_origins: Vec<ChainId> =
2494 chain.nonempty_inboxes.get().iter().copied().collect();
2495 #[cfg(with_metrics)]
2496 metrics::NUM_INBOXES
2497 .with_label_values(&[])
2498 .observe(nonempty_origins.len() as f64);
2499 let action = if *chain.execution_state.system.closed.get() {
2500 MessageAction::Reject
2501 } else {
2502 MessageAction::Accept
2503 };
2504 let inboxes = chain.inboxes.try_load_entries(&nonempty_origins).await?;
2505 for (origin, inbox) in nonempty_origins.into_iter().zip(inboxes) {
2506 let inbox = inbox.ok_or_else(|| {
2507 ChainError::InternalError(format!("Missing inbox for origin {origin}"))
2508 })?;
2509 for bundle in inbox.added_bundles.elements().await? {
2510 bundles.push(IncomingBundle {
2511 origin,
2512 bundle,
2513 action,
2514 });
2515 }
2516 }
2517 info.requested_pending_message_bundles = bundles;
2518 }
2519 let hashes = chain
2520 .block_hashes_for_heights(query.request_sent_certificate_hashes_by_heights)
2521 .await?;
2522 info.requested_sent_certificate_hashes = hashes;
2523 if let Some(start) = query.request_received_log_excluding_first_n {
2524 let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
2525 let max_received_log_entries = self.config.chain_info_max_received_log_entries;
2526 let end = start
2527 .saturating_add(max_received_log_entries)
2528 .min(chain.received_log.count());
2529 info.requested_received_log = chain.received_log.read(start..end).await?;
2530 }
2531 if query.request_manager_values {
2532 info.manager.add_values(&chain.manager);
2533 }
2534 if !query.request_previous_event_blocks.is_empty() {
2535 let stream_ids = query.request_previous_event_blocks;
2536 let heights = chain
2537 .execution_state
2538 .previous_event_blocks
2539 .multi_get(&stream_ids)
2540 .await?;
2541 let mut streams_with_heights = Vec::new();
2542 for (stream_id, height) in stream_ids.into_iter().zip(heights) {
2543 if let Some(height) = height {
2544 streams_with_heights.push((stream_id, height));
2545 }
2546 }
2547 let hashes = chain
2548 .block_hashes
2549 .multi_get(streams_with_heights.iter().map(|(_, height)| height))
2550 .await?;
2551 for (maybe_hash, (stream_id, height)) in hashes.into_iter().zip(streams_with_heights) {
2552 let hash = maybe_hash.ok_or_else(|| WorkerError::BlockHashNotFound {
2553 height,
2554 chain_id: info.chain_id,
2555 })?;
2556 info.requested_previous_event_blocks
2557 .insert(stream_id, (height, hash));
2558 }
2559 }
2560 if query.request_latest_checkpoint_height {
2561 info.requested_latest_checkpoint_height = *self.chain.latest_checkpoint_height.get();
2562 }
2563 Ok(ChainInfoResponse::new(info, self.config.key_pair()))
2564 }
2565
2566 #[instrument(skip_all, fields(
2570 chain_id = %self.chain_id(),
2571 block_height = %block.height
2572 ))]
2573 async fn execute_block(
2574 &mut self,
2575 block: ProposedBlock,
2576 local_time: Timestamp,
2577 round: Option<u32>,
2578 published_blobs: &[Blob],
2579 policy: BundleExecutionPolicy,
2580 ) -> Result<(Block, ResourceTracker, HashSet<ChainId>), WorkerError> {
2581 let (proposed_block, outcome, resource_tracker, never_reject_origins) = Box::pin(
2582 self.chain
2583 .execute_block(block, local_time, round, published_blobs, None, policy),
2584 )
2585 .await?;
2586 let executed_block = Block::new(proposed_block, outcome);
2587 let block_hash = CryptoHash::new(&executed_block);
2588 if let Some(cache) = &self.execution_state_cache {
2589 cache.insert(
2590 &block_hash,
2591 Box::pin(
2592 self.chain
2593 .execution_state
2594 .with_context(|ctx| InactiveContext(ctx.base_key().clone())),
2595 )
2596 .await,
2597 );
2598 }
2599 Ok((executed_block, resource_tracker, never_reject_origins))
2600 }
2601
2602 #[instrument(skip_all, fields(
2604 chain_id = %self.chain_id()
2605 ))]
2606 pub(crate) async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> {
2607 if !self.knows_chain_is_active {
2608 let local_time = self.storage.clock().current_time();
2609 self.chain.initialize_if_needed(local_time).await?;
2610 self.save().await?;
2611 self.knows_chain_is_active = true;
2612 }
2613 Ok(())
2614 }
2615
2616 pub(crate) async fn chain_info_response(&mut self) -> Result<ChainInfoResponse, WorkerError> {
2617 let info = ChainInfo::from_chain_view(&mut self.chain).await?;
2618 Ok(ChainInfoResponse::new(info, self.config.key_pair()))
2619 }
2620
2621 #[instrument(skip_all, fields(
2625 chain_id = %self.chain_id()
2626 ))]
2627 pub(crate) async fn save(&mut self) -> Result<(), WorkerError> {
2628 if let Err(error) = self.chain.save().await {
2629 if error.must_reload_view() {
2630 tracing::error!(
2631 ?error,
2632 chain_id = %self.chain_id(),
2633 "Chain save failed with a nonrecoverable error; marking worker as poisoned"
2634 );
2635 self.poisoned = true;
2636 }
2637 return Err(WorkerError::ViewError(error));
2638 }
2639 Ok(())
2640 }
2641
2642 #[instrument(skip_all, fields(
2647 chain_id = %self.chain_id()
2648 ))]
2649 async fn wipe_and_reload_chain(&mut self) -> Result<(), WorkerError> {
2650 let context = self.chain.context().clone();
2651 let mut batch = Batch::new();
2652 batch.delete_key_prefix(Vec::new());
2653 if let Err(error) = context.store().write_batch(batch).await {
2654 tracing::error!(
2655 ?error,
2656 chain_id = %self.chain_id(),
2657 "Wiping chain storage failed; marking worker as poisoned"
2658 );
2659 self.poisoned = true;
2660 return Err(WorkerError::PoisonedWorker);
2661 }
2662 match ChainStateView::load(context).await {
2663 Ok(chain) => {
2664 self.chain = chain;
2665 Ok(())
2666 }
2667 Err(error) => {
2668 tracing::error!(
2669 ?error,
2670 chain_id = %self.chain_id(),
2671 "Reloading chain after wipe failed; marking worker as poisoned"
2672 );
2673 self.poisoned = true;
2674 Err(WorkerError::PoisonedWorker)
2675 }
2676 }
2677 }
2678}
2679
2680pub(crate) fn send_result<T>(sender: oneshot::Sender<T>, value: T) {
2683 if sender.send(value).is_err() {
2684 tracing::debug!("cannot send cross-chain result; receiver dropped");
2685 }
2686}
2687
2688fn missing_indices_blob_ids(maybe_blobs: &[(BlobId, Option<Blob>)]) -> (Vec<usize>, Vec<BlobId>) {
2690 let mut missing_indices = Vec::new();
2691 let mut missing_blob_ids = Vec::new();
2692 for (index, (blob_id, blob)) in maybe_blobs.iter().enumerate() {
2693 if blob.is_none() {
2694 missing_indices.push(index);
2695 missing_blob_ids.push(*blob_id);
2696 }
2697 }
2698 (missing_indices, missing_blob_ids)
2699}
2700
2701fn missing_blob_ids<'a>(
2703 maybe_blobs: impl IntoIterator<Item = (&'a BlobId, &'a Option<Blob>)>,
2704) -> Vec<BlobId> {
2705 maybe_blobs
2706 .into_iter()
2707 .filter(|(_, maybe_blob)| maybe_blob.is_none())
2708 .map(|(blob_id, _)| *blob_id)
2709 .collect()
2710}
2711
2712fn check_block_epoch(
2714 chain_epoch: Epoch,
2715 block_chain: ChainId,
2716 block_epoch: Epoch,
2717) -> Result<(), WorkerError> {
2718 ensure!(
2719 block_epoch == chain_epoch,
2720 WorkerError::InvalidEpoch {
2721 chain_id: block_chain,
2722 epoch: block_epoch,
2723 chain_epoch
2724 }
2725 );
2726 Ok(())
2727}