Skip to main content

linera_core/chain_worker/
state.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The state and functionality of a chain worker.
5
6use std::{
7    borrow::Cow,
8    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
9    sync::{self, Arc},
10};
11
12use futures::future::Either;
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::MeasureLatency as _;
15use linera_base::{
16    crypto::{CryptoHash, ValidatorPublicKey},
17    data_types::{
18        ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch, OracleResponse, Round,
19        Timestamp,
20    },
21    ensure,
22    hashed::Hashed,
23    identifiers::{AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, StreamId},
24};
25use linera_cache::{Arc as CacheArc, UniqueValueCache, ValueCache};
26use linera_chain::{
27    data_types::{
28        BlockProposal, BundleExecutionPolicy, IncomingBundle, MessageAction, MessageBundle,
29        OriginalProposal, ProposalContent, ProposedBlock,
30    },
31    manager::{self, ManagerSafetySnapshot},
32    types::{
33        Block, ConfirmedBlock, ConfirmedBlockCertificate, TimeoutCertificate,
34        ValidatedBlockCertificate,
35    },
36    ChainError, ChainExecutionContext, ChainIdSet, ChainStateView, ChainTipState,
37    ExecutionResultExt as _,
38};
39use linera_execution::{
40    system::{EpochEventData, EventSubscriptions, EPOCH_STREAM_NAME},
41    ExecutionRuntimeContext as _, ExecutionStateView, Query, QueryContext, QueryOutcome,
42    ResourceTracker, ServiceRuntimeEndpoint,
43};
44use linera_storage::{Clock as _, Storage};
45use linera_views::{
46    batch::Batch,
47    context::{Context, InactiveContext},
48    store::WritableKeyValueStore as _,
49    views::{ReplaceContext as _, RootView as _, View as _},
50};
51use tokio::sync::oneshot;
52use tracing::{debug, instrument, trace, warn};
53
54use crate::{
55    chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier},
56    client::{ChainModes, ListeningMode},
57    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
58    worker::{BatchRequest, NetworkActions, Notification, Reason, WorkerError},
59};
60
61/// Type alias for event subscriptions result.
62pub(crate) type EventSubscriptionsResult = Vec<((ChainId, StreamId), EventSubscriptions)>;
63
64#[cfg(with_metrics)]
65mod metrics {
66    use std::sync::LazyLock;
67
68    use linera_base::prometheus_util::{
69        exponential_bucket_interval, exponential_bucket_latencies, register_histogram,
70        register_histogram_vec,
71    };
72    use prometheus::{Histogram, HistogramVec};
73
74    pub static CREATE_NETWORK_ACTIONS_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
75        register_histogram(
76            "create_network_actions_latency",
77            "Time (ms) to create network actions",
78            exponential_bucket_latencies(10_000.0),
79        )
80    });
81
82    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
83        register_histogram_vec(
84            "num_inboxes",
85            "Number of inboxes",
86            &[],
87            exponential_bucket_interval(1.0, 10_000.0),
88        )
89    });
90}
91
92/// The state of the chain worker.
93pub(crate) struct ChainWorkerState<StorageClient>
94where
95    StorageClient: Storage,
96{
97    config: ChainWorkerConfig,
98    storage: StorageClient,
99    chain: ChainStateView<StorageClient::Context>,
100    service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
101    /// The background task running the service runtime. Must be kept alive for the
102    /// lifetime of the worker: the pool `Guard` wrapper returns the thread-pool slot
103    /// when dropped, so dropping this early lets the pool schedule unrelated work on a
104    /// thread that is still running the service runtime.
105    service_runtime_task: Option<web_thread_pool::Task<()>>,
106    /// Timestamp of the last access.
107    /// Used by the keep-alive task to determine when the worker has been idle.
108    /// Wrapped in `Arc` so the keep-alive task can read it without acquiring
109    /// the `RwLock`.
110    last_access: Arc<AtomicTimestamp>,
111    block_values: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
112    execution_state_cache:
113        Option<Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>>,
114    chain_modes: Option<Arc<sync::RwLock<ChainModes>>>,
115    delivery_notifier: DeliveryNotifier,
116    knows_chain_is_active: bool,
117    /// Set to `true` if a database `save` failure has left storage potentially
118    /// inconsistent.
119    poisoned: bool,
120}
121
122/// The result of processing a cross-chain update.
123pub(crate) enum CrossChainUpdateResult {
124    /// The update was applied up to the given height. The caller must save.
125    Updated(BlockHeight),
126    /// All bundles were already received; nothing to do.
127    NothingToDo,
128    /// A gap was detected in the inbox for messages from `origin`. If
129    /// `allow_revert_confirm` is enabled, a `RevertConfirm` request should be sent
130    /// to retransmit bundles starting from `retransmit_from`.
131    GapDetected {
132        origin: ChainId,
133        retransmit_from: BlockHeight,
134    },
135}
136
137/// Whether the block was processed or skipped. Used for metrics.
138pub enum BlockOutcome {
139    Processed,
140    Preprocessed,
141    Skipped,
142}
143
144/// How to handle a confirmed block.
145#[derive(Clone, Copy, Debug, Eq, PartialEq)]
146pub enum ProcessConfirmedBlockMode {
147    /// Execute the block if it is contiguous (or bridgeable via a checkpoint);
148    /// otherwise preprocess it. Used by validators and by any caller that
149    /// wants graceful fallback when there's a gap.
150    Auto,
151    /// Execute the block. Fail with [`WorkerError::InvalidBlockChaining`] if
152    /// there's a gap that can't be bridged. Use when the caller knows the
153    /// block must be contiguous and wants a hard error otherwise.
154    Execute,
155    /// Only preprocess the block, never execute it — even if it would be
156    /// contiguous. Used by the client for sender-chain blocks it is not
157    /// otherwise tracking, since their execution state is irrelevant.
158    Preprocess,
159}
160
161impl<StorageClient> ChainWorkerState<StorageClient>
162where
163    StorageClient: Storage + Clone + 'static,
164{
165    /// Creates a new [`ChainWorkerState`] using the provided `storage` client.
166    #[instrument(skip_all, fields(
167        chain_id = %chain_id
168    ))]
169    #[expect(clippy::too_many_arguments)]
170    pub(crate) async fn load(
171        config: ChainWorkerConfig,
172        storage: StorageClient,
173        block_values: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
174        execution_state_cache: Option<
175            Arc<UniqueValueCache<CryptoHash, ExecutionStateView<InactiveContext>>>,
176        >,
177        chain_modes: Option<Arc<sync::RwLock<ChainModes>>>,
178        delivery_notifier: DeliveryNotifier,
179        chain_id: ChainId,
180        service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
181        service_runtime_task: Option<web_thread_pool::Task<()>>,
182    ) -> Result<Self, WorkerError> {
183        let chain = storage.load_chain(chain_id).await?;
184
185        Ok(ChainWorkerState {
186            config,
187            storage,
188            chain,
189            service_runtime_endpoint,
190            service_runtime_task,
191            last_access: Arc::new(AtomicTimestamp::now()),
192            block_values,
193            execution_state_cache,
194            chain_modes,
195            delivery_notifier,
196            knows_chain_is_active: false,
197            poisoned: false,
198        })
199    }
200
201    /// Returns the [`ChainId`] of the chain handled by this worker.
202    fn chain_id(&self) -> ChainId {
203        self.chain.chain_id()
204    }
205
206    /// Returns a reference to the chain state view.
207    pub(crate) fn chain(&self) -> &ChainStateView<StorageClient::Context> {
208        &self.chain
209    }
210
211    /// Resolves the committee that signed certificates at the given epoch by
212    /// walking the admin chain's epoch event stream — works even after the
213    /// epoch has been revoked, so long as the admin chain still has the event.
214    /// Surfaces `EventsNotFound` (as a chained `ChainError::ExecutionError`)
215    /// when the admin event isn't local, so the caller's retry/sync paths can
216    /// fetch it before re-asking.
217    async fn committee_for_epoch(
218        &self,
219        epoch: Epoch,
220    ) -> Result<linera_execution::committee::Committee, WorkerError> {
221        let hash = self
222            .chain
223            .execution_state
224            .context()
225            .extra()
226            .get_committee_hashes(epoch..=epoch)
227            .await
228            .map_err(|error| {
229                ChainError::ExecutionError(Box::new(error), ChainExecutionContext::Block)
230            })?
231            .remove(&epoch)
232            .ok_or_else(|| {
233                ChainError::InternalError(format!(
234                    "missing committee for epoch {epoch}; this is a bug"
235                ))
236            })?;
237        let committee = self
238            .chain
239            .execution_state
240            .context()
241            .extra()
242            .get_or_load_committee_by_hash(hash)
243            .await
244            .map_err(|error| {
245                ChainError::ExecutionError(Box::new(error), ChainExecutionContext::Block)
246            })?;
247        Ok((*committee).clone())
248    }
249
250    /// Filters bundles destined for this chain to drop ones already received and to refuse
251    /// ones whose epoch has been revoked on the admin chain.
252    ///
253    /// A revoked-epoch bundle is still accepted if (a) it has already been executed by
254    /// anticipation (`bundle.height <= last_anticipated_block_height`), or (b) a later
255    /// bundle in the same batch is in a still-trusted epoch — that bundle's certificate
256    /// transitively re-certifies all preceding ones via prev-hash chaining.
257    pub(crate) async fn select_message_bundles(
258        &self,
259        origin: &ChainId,
260        next_height_to_receive: BlockHeight,
261        last_anticipated_block_height: Option<BlockHeight>,
262        mut bundles: Vec<(Epoch, MessageBundle)>,
263    ) -> Result<Vec<MessageBundle>, WorkerError> {
264        let recipient = self.chain_id();
265        let mut latest_height = None;
266        let mut skipped_len = 0;
267        let mut trusted_len = 0;
268        for (i, (epoch, bundle)) in bundles.iter().enumerate() {
269            ensure!(
270                latest_height <= Some(bundle.height),
271                WorkerError::InvalidCrossChainRequest
272            );
273            latest_height = Some(bundle.height);
274            if bundle.height < next_height_to_receive {
275                skipped_len = i + 1;
276            }
277            let is_revoked = self
278                .storage
279                .is_epoch_revoked(*epoch)
280                .await
281                .map_err(|error| {
282                    WorkerError::ChainError(Box::new(ChainError::ExecutionError(
283                        Box::new(error),
284                        ChainExecutionContext::Block,
285                    )))
286                })?;
287            if !is_revoked || Some(bundle.height) <= last_anticipated_block_height {
288                trusted_len = i + 1;
289            }
290        }
291        if skipped_len > 0 {
292            let (_, sample_bundle) = &bundles[skipped_len - 1];
293            debug!(
294                "Ignoring repeated messages to {recipient:.8} from {origin:} at height {}",
295                sample_bundle.height,
296            );
297        }
298        if skipped_len < bundles.len() && trusted_len < bundles.len() {
299            let (sample_epoch, sample_bundle) = &bundles[trusted_len];
300            warn!(
301                "Refusing messages to {recipient:.8} from {origin:} at height {} \
302                 because the epoch {} is not trusted any more",
303                sample_bundle.height, sample_epoch,
304            );
305        }
306        Ok(if skipped_len < trusted_len {
307            bundles
308                .drain(skipped_len..trusted_len)
309                .map(|(_, bundle)| bundle)
310                .collect()
311        } else {
312            vec![]
313        })
314    }
315
316    /// Returns whether this chain is known to be active (initialized).
317    pub(crate) fn knows_chain_is_active(&self) -> bool {
318        self.knows_chain_is_active
319    }
320
321    /// Rolls back any uncommitted changes to the chain state.
322    pub(crate) fn rollback(&mut self) {
323        self.chain.rollback();
324    }
325
326    /// Returns `WorkerError::PoisonedWorker` if the worker is poisoned due to a database
327    /// `save` failure.
328    pub(crate) fn check_not_poisoned(&self) -> Result<(), WorkerError> {
329        ensure!(!self.poisoned, WorkerError::PoisonedWorker);
330        Ok(())
331    }
332
333    /// Updates the last-access timestamp to the current time.
334    pub(crate) fn touch(&self) {
335        self.last_access.store_now();
336    }
337
338    /// Returns a clone of the last-access `Arc`, for use by the keep-alive task.
339    pub(crate) fn last_access_arc(&self) -> Arc<AtomicTimestamp> {
340        Arc::clone(&self.last_access)
341    }
342
343    /// Drops the service runtime endpoint, signaling the runtime task to stop.
344    /// Returns the runtime task so the caller can await it outside the lock.
345    pub(crate) fn clear_service_runtime(&mut self) -> Option<web_thread_pool::Task<()>> {
346        self.service_runtime_endpoint.take();
347        self.service_runtime_task.take()
348    }
349
350    /// Returns the pending cross-chain network actions if the outbox index is already
351    /// reconciled to the current tracked set, or `None` if it must first be reconciled (which
352    /// needs a write lock).
353    pub(crate) async fn cross_chain_network_actions_if_reconciled(
354        &self,
355    ) -> Result<Option<NetworkActions>, WorkerError> {
356        let tracked = self.tracked_full_chains();
357        if !self.chain.outbox_index_is_reconciled(tracked.as_deref()) {
358            return Ok(None);
359        }
360        Ok(Some(
361            self.build_network_actions(None, tracked.as_deref().map(|h| h.inner()))
362                .await?,
363        ))
364    }
365
366    /// Reconciles the outbox index with the current tracked set, then returns the pending
367    /// cross-chain network actions for this chain, without initializing the chain's execution
368    /// state. Intended for callers that only need to re-emit cross-chain requests from the
369    /// outbox of a sender chain whose `ChainDescription` we may never have needed.
370    ///
371    /// This is the slow path of [`Self::cross_chain_network_actions_if_reconciled`].
372    #[instrument(skip_all, fields(chain_id = %self.chain_id()))]
373    pub(crate) async fn reconcile_and_cross_chain_network_actions(
374        &mut self,
375    ) -> Result<NetworkActions, WorkerError> {
376        let tracked = self.tracked_full_chains();
377        self.chain
378            .reconcile_outbox_index(tracked.as_deref())
379            .await?;
380        let actions = self
381            .build_network_actions(None, tracked.as_deref().map(|h| h.inner()))
382            .await?;
383        self.save().await?;
384        Ok(actions)
385    }
386
387    /// Handles a [`ChainInfoQuery`], potentially voting on the next block.
388    #[tracing::instrument(level = "debug", skip(self))]
389    pub(crate) async fn handle_chain_info_query(
390        &mut self,
391        query: ChainInfoQuery,
392    ) -> Result<ChainInfoResponse, WorkerError> {
393        if let Some((height, round)) = query.request_leader_timeout {
394            self.vote_for_leader_timeout(height, round).await?;
395        }
396        if query.request_fallback {
397            self.vote_for_fallback().await?;
398        }
399        self.prepare_chain_info_response(query).await
400    }
401
402    /// Returns the requested blob, if it belongs to the current locking block or pending proposal.
403    #[instrument(skip_all, fields(
404        chain_id = %self.chain_id(),
405        blob_id = %blob_id
406    ))]
407    pub(crate) async fn download_pending_blob(
408        &self,
409        blob_id: BlobId,
410    ) -> Result<CacheArc<Blob>, WorkerError> {
411        if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
412            return Ok(self.storage.cache_blob(blob));
413        }
414        self.storage
415            .read_blob(blob_id)
416            .await?
417            .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
418    }
419
420    /// Reads the blobs from the chain manager or from storage. Returns an error if any are
421    /// missing.
422    #[instrument(skip_all, fields(
423        chain_id = %self.chain_id()
424    ))]
425    async fn get_required_blobs(
426        &self,
427        required_blob_ids: impl IntoIterator<Item = BlobId>,
428        created_blobs: BTreeMap<BlobId, Blob>,
429    ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
430        let maybe_blobs = self
431            .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
432            .await?;
433        let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
434        ensure!(
435            not_found_blob_ids.is_empty(),
436            WorkerError::BlobsNotFound(not_found_blob_ids)
437        );
438        Ok(maybe_blobs
439            .into_iter()
440            .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
441            .collect())
442    }
443
444    /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found.
445    #[instrument(skip_all, fields(
446        chain_id = %self.chain_id()
447    ))]
448    async fn maybe_get_required_blobs(
449        &self,
450        blob_ids: impl IntoIterator<Item = BlobId>,
451        mut created_blobs: Option<BTreeMap<BlobId, Blob>>,
452    ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
453        let maybe_blobs = blob_ids.into_iter().collect::<BTreeSet<_>>();
454        let mut maybe_blobs = maybe_blobs
455            .into_iter()
456            .map(|x| (x, None))
457            .collect::<Vec<(BlobId, Option<Blob>)>>();
458
459        if let Some(blob_map) = &mut created_blobs {
460            for (blob_id, value) in &mut maybe_blobs {
461                if let Some(blob) = blob_map.remove(blob_id) {
462                    *value = Some(blob);
463                }
464            }
465        }
466
467        let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
468        let second_block_blobs = self.chain.manager.pending_blobs(&missing_blob_ids).await?;
469        for (index, blob) in missing_indices.into_iter().zip(second_block_blobs) {
470            maybe_blobs[index].1 = blob;
471        }
472
473        let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
474        let third_block_blobs = self
475            .chain
476            .pending_validated_blobs
477            .multi_get(&missing_blob_ids)
478            .await?;
479        for (index, blob) in missing_indices.into_iter().zip(third_block_blobs) {
480            maybe_blobs[index].1 = blob;
481        }
482
483        let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
484        if !missing_indices.is_empty() {
485            let all_entries_pending_blobs = self
486                .chain
487                .pending_proposed_blobs
488                .try_load_all_entries()
489                .await?;
490            for (index, blob_id) in missing_indices.into_iter().zip(missing_blob_ids) {
491                for (_, pending_blobs) in &all_entries_pending_blobs {
492                    if let Some(blob) = pending_blobs.get(&blob_id).await? {
493                        maybe_blobs[index].1 = Some(blob);
494                        break;
495                    }
496                }
497            }
498        }
499
500        let (missing_indices, missing_blob_ids) = missing_indices_blob_ids(&maybe_blobs);
501        let fourth_block_blobs = self.storage.read_blobs(&missing_blob_ids).await?;
502        for (index, blob) in missing_indices.into_iter().zip(fourth_block_blobs) {
503            maybe_blobs[index].1 = blob.map(CacheArc::unwrap_or_clone);
504        }
505        Ok(maybe_blobs.into_iter().collect())
506    }
507
508    /// Creates cross-chain requests for a single recipient from its outbox.
509    #[instrument(skip_all, fields(
510        chain_id = %self.chain_id()
511    ))]
512    async fn create_cross_chain_actions_for_recipient(
513        &self,
514        recipient: ChainId,
515    ) -> Result<NetworkActions, WorkerError> {
516        let outbox = self.chain.outboxes.try_load_entry(&recipient).await?;
517        let Some(outbox) = outbox else {
518            return Ok(NetworkActions::default());
519        };
520        let heights = outbox.queue.elements().await?;
521        if heights.is_empty() {
522            return Ok(NetworkActions::default());
523        }
524        let heights_by_recipient = BTreeMap::from([(recipient, heights)]);
525        let cross_chain_requests = self
526            .create_cross_chain_requests(heights_by_recipient)
527            .await?;
528        Ok(NetworkActions {
529            cross_chain_requests,
530            notifications: Vec::new(),
531        })
532    }
533
534    /// Returns the set of chains tracked in full mode together with its memoized hash, or `None` if
535    /// every chain is implicitely in full-mode (e.g. on validator).
536    fn tracked_full_chains(&self) -> Option<Arc<Hashed<ChainIdSet>>> {
537        let chain_modes = self.chain_modes.as_ref()?;
538        let full = chain_modes
539            .read()
540            .expect("Panics should not happen while holding a lock to `chain_modes`")
541            .full();
542        Some(full)
543    }
544
545    /// Returns whether the given chain is tracked in full mode (always `true` on a validator),
546    /// i.e. whether its outbox should be indexed.
547    fn is_tracked(&self, chain_id: &ChainId) -> bool {
548        self.chain_modes.as_ref().is_none_or(|chain_modes| {
549            chain_modes
550                .read()
551                .expect("Panics should not happen while holding a lock to `chain_modes`")
552                .get(chain_id)
553                .is_some_and(ListeningMode::is_full)
554        })
555    }
556
557    /// Reconciles the chain's `nonempty_outboxes` and `outbox_counters` indices with the current
558    /// set of fully-tracked chains.
559    async fn reconcile_tracked_outboxes(
560        &mut self,
561    ) -> Result<Option<Arc<Hashed<ChainIdSet>>>, WorkerError> {
562        let full_chains = self.tracked_full_chains();
563        self.chain
564            .reconcile_outbox_index(full_chains.as_deref())
565            .await?;
566        Ok(full_chains)
567    }
568
569    /// Reconciles the outbox index with the current tracked set, then loads pending cross-chain
570    /// requests and adds `NewRound` notifications where appropriate.
571    async fn create_network_actions(
572        &mut self,
573        old_round: Option<Round>,
574    ) -> Result<NetworkActions, WorkerError> {
575        // Make the outbox index authoritative for the current tracked set first, so it already
576        // holds only `is_full` targets and needs no read-time filtering.
577        let tracked = self.reconcile_tracked_outboxes().await?;
578        self.build_network_actions(old_round, tracked.as_deref().map(|h| h.inner()))
579            .await
580    }
581
582    /// Builds the pending cross-chain actions from the already-reconciled outbox index.
583    async fn build_network_actions(
584        &self,
585        old_round: Option<Round>,
586        tracked: Option<&ChainIdSet>,
587    ) -> Result<NetworkActions, WorkerError> {
588        #[cfg(with_metrics)]
589        let _latency = metrics::CREATE_NETWORK_ACTIONS_LATENCY.measure_latency();
590        let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
591        let targets = self.chain.nonempty_outbox_chain_ids();
592        if let Some(tracked) = tracked {
593            if let Some(target) = targets.iter().find(|target| !tracked.contains(*target)) {
594                return Err(ChainError::CorruptedChainState(format!(
595                    "outbox index contains untracked target {target}"
596                ))
597                .into());
598            }
599        }
600        let outboxes = self.chain.load_outboxes(&targets).await?;
601        for (target, outbox) in targets.into_iter().zip(outboxes) {
602            let heights = outbox.queue.elements().await?;
603            heights_by_recipient.insert(target, heights);
604        }
605        let cross_chain_requests = self
606            .create_cross_chain_requests(heights_by_recipient)
607            .await?;
608        let mut notifications = Vec::new();
609        if let Some(old_round) = old_round {
610            let round = self.chain.manager.current_round();
611            if round > old_round {
612                let height = self.chain.tip_state.get().next_block_height;
613                notifications.push(Notification {
614                    chain_id: self.chain_id(),
615                    reason: Reason::NewRound { height, round },
616                });
617            }
618        }
619        Ok(NetworkActions {
620            cross_chain_requests,
621            notifications,
622        })
623    }
624
625    /// Returns confirmed blocks by hash, checking the cache first and batch-loading the rest
626    /// from storage. The order of the returned blocks matches the order of the input hashes.
627    async fn read_confirmed_blocks(
628        &self,
629        hashes: &[CryptoHash],
630    ) -> Result<Vec<Option<CacheArc<ConfirmedBlock>>>, WorkerError> {
631        let mut blocks = Vec::with_capacity(hashes.len());
632        let mut uncached_indices = Vec::new();
633        let mut uncached_hashes = Vec::new();
634
635        for (i, hash) in hashes.iter().enumerate() {
636            if let Some(block) = self.block_values.get(hash) {
637                blocks.push(Some(block));
638            } else {
639                blocks.push(None);
640                uncached_indices.push(i);
641                uncached_hashes.push(*hash);
642            }
643        }
644
645        if !uncached_hashes.is_empty() {
646            let from_storage = self.storage.read_confirmed_blocks(uncached_hashes).await?;
647            for (i, maybe_block) in uncached_indices.into_iter().zip(from_storage) {
648                blocks[i] = maybe_block;
649            }
650        }
651
652        Ok(blocks)
653    }
654
655    #[instrument(skip_all, fields(
656        chain_id = %self.chain_id(),
657        num_recipients = %heights_by_recipient.len()
658    ))]
659    async fn create_cross_chain_requests(
660        &self,
661        heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
662    ) -> Result<Vec<CrossChainRequest>, WorkerError> {
663        // Load all the certificates we will need, regardless of the medium.
664        let heights = heights_by_recipient
665            .values()
666            .flatten()
667            .copied()
668            .collect::<BTreeSet<_>>();
669        let hashes = self
670            .chain
671            .block_hashes_for_heights(heights.iter().copied())
672            .await?;
673
674        let blocks = self.read_confirmed_blocks(&hashes).await?;
675
676        let mut height_to_blocks = HashMap::new();
677        for (block, hash) in blocks.into_iter().zip(hashes) {
678            let block = block.ok_or_else(|| WorkerError::ReadCertificatesError(vec![hash]))?;
679            height_to_blocks.insert(block.height(), block);
680        }
681
682        let sender = self.chain.chain_id();
683        let mut cross_chain_requests = Vec::new();
684        for (recipient, heights) in heights_by_recipient {
685            // Extract the predecessor height for this recipient from the first
686            // block's `previous_message_blocks`. This lets the recipient detect
687            // gaps even before it consumes the missing message.
688            let previous_height = heights.first().and_then(|first_height| {
689                let block = height_to_blocks.get(first_height)?;
690                let (_, prev_height) =
691                    block.block().body.previous_message_blocks.get(&recipient)?;
692                Some(*prev_height)
693            });
694            let mut bundles = Vec::new();
695            let mut bundles_size = 0;
696            for height in heights {
697                let Some(confirmed_block) = height_to_blocks.get(&height) else {
698                    tracing::warn!(
699                        %height,
700                        %recipient,
701                        "spurious entry in outbox; skipping this and higher sender blocks"
702                    );
703                    break;
704                };
705                let new_bundles = confirmed_block
706                    .block()
707                    .message_bundles_for(recipient, confirmed_block.inner().hash())
708                    .collect::<Vec<_>>();
709                let new_size = new_bundles
710                    .iter()
711                    .map(|(_epoch, bundle)| bundle.estimated_size())
712                    .sum::<usize>();
713                // If adding this block's bundles would exceed the chunk limit,
714                // stop here. Always include at least one block's bundles.
715                if bundles_size + new_size > self.config.cross_chain_message_chunk_limit {
716                    if bundles.is_empty() {
717                        warn!(
718                            "Single block at height {height} produces an UpdateRecipient \
719                            of ~{new_size} bytes, exceeding the chunk limit of {}",
720                            self.config.cross_chain_message_chunk_limit
721                        );
722                    } else {
723                        debug!(
724                            "Stopping cross-chain batch for {recipient} at height {height}: \
725                            adding ~{new_size} bytes would exceed chunk limit of {} \
726                            (current batch ~{bundles_size} bytes)",
727                            self.config.cross_chain_message_chunk_limit
728                        );
729                        break;
730                    }
731                }
732                bundles.extend(new_bundles);
733                bundles_size += new_size;
734            }
735            if !bundles.is_empty() {
736                cross_chain_requests.push(CrossChainRequest::UpdateRecipient {
737                    sender,
738                    recipient,
739                    bundles,
740                    previous_height,
741                });
742            }
743        }
744        Ok(cross_chain_requests)
745    }
746
747    /// Processes a leader timeout issued for this multi-owner chain.
748    #[instrument(skip_all, fields(
749        chain_id = %self.chain_id(),
750        height = %certificate.inner().height()
751    ))]
752    pub(crate) async fn process_timeout(
753        &mut self,
754        certificate: TimeoutCertificate,
755    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
756        // Check that the chain is active and ready for this timeout.
757        // Verify the certificate. Returns a catch-all error to make client code more robust.
758        self.initialize_and_save_if_needed().await?;
759        let (chain_epoch, committee) = self.chain.current_committee().await?;
760        certificate.check(&committee)?;
761        if self
762            .chain
763            .tip_state
764            .get()
765            .already_validated_block(certificate.inner().height())?
766        {
767            return Ok((self.chain_info_response().await?, NetworkActions::default()));
768        }
769        ensure!(
770            certificate.inner().epoch() == chain_epoch,
771            WorkerError::InvalidEpoch {
772                chain_id: certificate.inner().chain_id(),
773                chain_epoch,
774                epoch: certificate.inner().epoch()
775            }
776        );
777        let old_round = self.chain.manager.current_round();
778        self.chain
779            .manager
780            .handle_timeout_certificate(certificate, self.storage.clock().current_time());
781        self.save().await?;
782        let actions = self.create_network_actions(Some(old_round)).await?;
783        Ok((self.chain_info_response().await?, actions))
784    }
785
786    /// Tries to load all blobs published in this proposal.
787    ///
788    /// If they cannot be found, it creates an entry in `pending_proposed_blobs` so they can be
789    /// submitted one by one.
790    #[instrument(skip_all, fields(
791        chain_id = %self.chain_id(),
792        block_height = %proposal.content.block.height
793    ))]
794    async fn load_proposal_blobs(
795        &mut self,
796        proposal: &BlockProposal,
797    ) -> Result<Vec<Blob>, WorkerError> {
798        let owner = proposal.owner();
799        let BlockProposal {
800            content:
801                ProposalContent {
802                    block,
803                    round,
804                    outcome: _,
805                },
806            original_proposal,
807            signature: _,
808        } = proposal;
809
810        let mut maybe_blobs = self
811            .maybe_get_required_blobs(proposal.required_blob_ids(), None)
812            .await?;
813        let missing_blob_ids = missing_blob_ids(&maybe_blobs);
814        if !missing_blob_ids.is_empty() {
815            let chain = &mut self.chain;
816            if chain.ownership().await?.open_multi_leader_rounds {
817                // TODO(#3203): Allow multiple pending proposals on permissionless chains.
818                chain.pending_proposed_blobs.clear();
819            }
820            let validated = matches!(original_proposal, Some(OriginalProposal::Regular { .. }));
821            chain
822                .pending_proposed_blobs
823                .try_load_entry_mut(&owner)
824                .await?
825                .update(*round, validated, maybe_blobs)?;
826            self.save().await?;
827            return Err(WorkerError::BlobsNotFound(missing_blob_ids));
828        }
829        let published_blobs = block
830            .published_blob_ids()
831            .iter()
832            .filter_map(|blob_id| maybe_blobs.remove(blob_id).flatten())
833            .collect::<Vec<_>>();
834        Ok(published_blobs)
835    }
836
837    /// Processes a validated block issued for this multi-owner chain.
838    #[instrument(skip_all, fields(
839        chain_id = %self.chain_id(),
840        block_height = %certificate.block().header.height
841    ))]
842    pub(crate) async fn process_validated_block(
843        &mut self,
844        certificate: ValidatedBlockCertificate,
845    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
846        let block = certificate.block();
847
848        let header = &block.header;
849        let height = header.height;
850        // Check that the chain is active and ready for this validated block.
851        // Verify the certificate. Returns a catch-all error to make client code more robust.
852        self.initialize_and_save_if_needed().await?;
853        let tip_state = self.chain.tip_state.get();
854        ensure!(
855            header.height == tip_state.next_block_height,
856            ChainError::UnexpectedBlockHeight {
857                expected_block_height: tip_state.next_block_height,
858                found_block_height: header.height,
859            }
860        );
861        let (epoch, committee) = self.chain.current_committee().await?;
862        check_block_epoch(epoch, header.chain_id, header.epoch)?;
863        certificate.check(&committee)?;
864        let already_committed_block = self.chain.tip_state.get().already_validated_block(height)?;
865        let should_skip_validated_block = || {
866            self.chain
867                .manager
868                .check_validated_block(&certificate)
869                .map(|outcome| outcome == manager::Outcome::Skip)
870        };
871        if already_committed_block || should_skip_validated_block()? {
872            // If we just processed the same pending block, return the chain info unchanged.
873            return Ok((
874                self.chain_info_response().await?,
875                NetworkActions::default(),
876                BlockOutcome::Skipped,
877            ));
878        }
879
880        self.block_values
881            .insert_hashed(Cow::Borrowed(certificate.inner().inner()));
882        let required_blob_ids = block.required_blob_ids();
883        let maybe_blobs = self
884            .maybe_get_required_blobs(required_blob_ids, Some(block.created_blobs()))
885            .await?;
886        let missing_blob_ids = missing_blob_ids(&maybe_blobs);
887        if !missing_blob_ids.is_empty() {
888            self.chain
889                .pending_validated_blobs
890                .update(certificate.round, true, maybe_blobs)?;
891            self.save().await?;
892            return Err(WorkerError::BlobsNotFound(missing_blob_ids));
893        }
894        let blobs = maybe_blobs
895            .into_iter()
896            .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
897            .collect();
898        let old_round = self.chain.manager.current_round();
899        self.chain.manager.create_final_vote(
900            certificate,
901            self.config.key_pair(),
902            self.storage.clock().current_time(),
903            blobs,
904        )?;
905        self.save().await?;
906        let actions = self.create_network_actions(Some(old_round)).await?;
907        Ok((
908            self.chain_info_response().await?,
909            actions,
910            BlockOutcome::Processed,
911        ))
912    }
913
914    /// Processes a confirmed block (aka a commit).
915    #[instrument(skip_all, fields(
916        chain_id = %certificate.block().header.chain_id,
917        height = %certificate.block().header.height,
918        block_hash = %certificate.hash(),
919    ))]
920    pub(crate) async fn process_confirmed_block(
921        &mut self,
922        certificate: ConfirmedBlockCertificate,
923        mode: ProcessConfirmedBlockMode,
924        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
925    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
926        let block = certificate.block();
927        let block_hash = certificate.hash();
928        let height = block.header.height;
929        let chain_id = block.header.chain_id;
930
931        // Trust-mark accept path: an earlier checkpoint cert recorded this block's
932        // hash in `pre_checkpoint_block_trust`. Removing the trust mark here is
933        // only an in-memory change; if anything below fails before `save`, the
934        // mark survives on the next reload. The dispatch's `gap` definition
935        // (`!=` rather than `<`) routes both above-tip and below-tip trust
936        // uploads to `preprocess_certified_block` so the chain can write the
937        // cert without advancing the tip.
938        let in_trust_set = self
939            .chain
940            .pre_checkpoint_block_trust
941            .contains(&block_hash)
942            .await?;
943        if in_trust_set {
944            self.chain.pre_checkpoint_block_trust.remove(&block_hash)?;
945        }
946
947        // Check if we already processed this block.
948        let tip = self.chain.tip_state.get().clone();
949        if !in_trust_set && tip.next_block_height > height {
950            let actions = self.create_network_actions(None).await?;
951            self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
952                .await;
953            return Ok((
954                self.chain_info_response().await?,
955                actions,
956                BlockOutcome::Skipped,
957            ));
958        }
959
960        // We haven't processed the block - verify the certificate first.
961        let committee = self.committee_for_epoch(block.header.epoch).await?;
962        certificate.check(&committee)?;
963
964        // Certificate check passed - which means the blobs the block requires are legitimate and
965        // we can take note of it, so that if any are missing, we will accept them when the client
966        // sends them.
967        let required_blob_ids = block.required_blob_ids();
968        let blobs_result = self
969            .get_required_blobs(required_blob_ids.iter().copied(), block.created_blobs())
970            .await
971            .map(|blobs| blobs.into_values().collect::<Vec<_>>());
972
973        if let Ok(blobs) = &blobs_result {
974            self.storage
975                .write_blobs_and_certificate(blobs, &certificate)
976                .await?;
977            let events = block
978                .body
979                .events
980                .iter()
981                .flatten()
982                .map(|event| (event.id(chain_id), event.value.clone()));
983            self.storage.write_events(events).await?;
984        }
985
986        // Update the blob state with last used certificate hash.
987        let blob_state = certificate.value().to_blob_state(blobs_result.is_ok());
988        let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
989        self.storage
990            .maybe_write_blob_states(&blob_ids, blob_state)
991            .await?;
992
993        let blobs = blobs_result?
994            .into_iter()
995            .map(|blob| (blob.id(), blob))
996            .collect::<BTreeMap<_, _>>();
997
998        // Dispatch on the actual outcome (preprocess / checkpoint-restore-then-execute
999        // / contiguous execute):
1000        //  - `Preprocess` mode, or `Auto` with an unbridgeable gap: preprocess.
1001        //  - `Execute` mode with an unbridgeable gap: error.
1002        //  - `Auto`/`Execute` mode + gap + checkpoint block: install the snapshot,
1003        //    then execute.
1004        //  - `Auto`/`Execute` mode + contiguous: execute directly.
1005        use ProcessConfirmedBlockMode::{Auto, Execute, Preprocess};
1006        let gap = tip.next_block_height != height;
1007        let starts_with_checkpoint = block.starts_with_checkpoint();
1008        match (mode, gap, starts_with_checkpoint) {
1009            (Preprocess, _, _) | (Auto, true, false) => {
1010                self.preprocess_certified_block(certificate, notify_when_messages_are_delivered)
1011                    .await
1012            }
1013            (Execute, true, false) => Err(WorkerError::InvalidBlockChaining),
1014            (Auto | Execute, true, true) => {
1015                self.execute_block_with_checkpoint_restore(
1016                    certificate,
1017                    blobs,
1018                    notify_when_messages_are_delivered,
1019                )
1020                .await
1021            }
1022            (Auto | Execute, false, _) => {
1023                self.execute_contiguous_block(
1024                    certificate,
1025                    blobs,
1026                    tip,
1027                    notify_when_messages_are_delivered,
1028                )
1029                .await
1030            }
1031        }
1032    }
1033
1034    /// Preprocesses a confirmed block: updates outboxes and event streams without
1035    /// executing it, and does not advance the chain tip.
1036    async fn preprocess_certified_block(
1037        &mut self,
1038        certificate: ConfirmedBlockCertificate,
1039        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1040    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1041        let block_hash = certificate.hash();
1042        let block = certificate.block();
1043        let chain_id = block.header.chain_id;
1044        let height = block.header.height;
1045
1046        let tracked = self.reconcile_tracked_outboxes().await?;
1047        let updated_event_streams = self
1048            .chain
1049            .preprocess_block(certificate.value(), tracked.as_deref().map(|h| h.inner()))
1050            .await?;
1051        self.save().await?;
1052        let mut actions = self.create_network_actions(None).await?;
1053        if !updated_event_streams.is_empty() {
1054            actions.notifications.push(Notification {
1055                chain_id,
1056                reason: Reason::NewEvents {
1057                    height,
1058                    block_hash,
1059                    event_streams: updated_event_streams,
1060                },
1061            });
1062        }
1063        trace!("Preprocessed confirmed block {height}");
1064        self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
1065            .await;
1066        Ok((
1067            self.chain_info_response().await?,
1068            actions,
1069            BlockOutcome::Preprocessed,
1070        ))
1071    }
1072
1073    /// Restores the chain's execution state from the checkpoint blob embedded in
1074    /// the block's first oracle response, fast-forwards the tip to the block's
1075    /// height, then executes the block as if it were contiguous. Re-running the
1076    /// block applies its fees and re-derives any post-checkpoint state.
1077    async fn execute_block_with_checkpoint_restore(
1078        &mut self,
1079        certificate: ConfirmedBlockCertificate,
1080        blobs: BTreeMap<BlobId, Blob>,
1081        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1082    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1083        let (bytes, chain_id, height, previous_block_hash, outbox_block_hashes, inbox_cursors) = {
1084            let block = certificate.block();
1085            let Some(OracleResponse::Checkpoint {
1086                execution_state_blobs,
1087                outbox_block_hashes,
1088                inbox_cursors,
1089                ..
1090            }) = block.body.oracle_responses.first().and_then(|r| r.first())
1091            else {
1092                return Err(ChainError::InternalError(
1093                    "Checkpoint block missing OracleResponse::Checkpoint".into(),
1094                )
1095                .into());
1096            };
1097            let mut bytes = Vec::new();
1098            let mut missing = Vec::new();
1099            for hash in execution_state_blobs {
1100                let blob_id = BlobId::new(*hash, BlobType::CheckpointExecutionState);
1101                match blobs.get(&blob_id) {
1102                    Some(blob) => bytes.extend_from_slice(blob.bytes()),
1103                    None => missing.push(blob_id),
1104                }
1105            }
1106            ensure!(missing.is_empty(), WorkerError::BlobsNotFound(missing));
1107            (
1108                bytes,
1109                block.header.chain_id,
1110                block.header.height,
1111                block.header.previous_block_hash,
1112                outbox_block_hashes.clone(),
1113                inbox_cursors.clone(),
1114            )
1115        };
1116        // Every pre-checkpoint sender block the oracle response names must already
1117        // be in storage before we touch any chain state. If some aren't, record
1118        // their hashes as trusted-by-this-checkpoint and error with `BlocksNotFound`
1119        // — the client uploads each missing cert (the trust-mark path at the top
1120        // of `process_confirmed_block` accepts them regardless of their possibly
1121        // revoked epoch), then retries the checkpoint. This way the worker never
1122        // exposes a half-restored chain whose outboxes reference unknown blocks.
1123        let mut missing_blocks = Vec::new();
1124        for hash in &outbox_block_hashes {
1125            if !self.storage.contains_certificate(*hash).await? {
1126                missing_blocks.push(*hash);
1127            }
1128        }
1129        if !missing_blocks.is_empty() {
1130            for hash in &missing_blocks {
1131                self.chain.pre_checkpoint_block_trust.insert(hash)?;
1132            }
1133            self.save().await?;
1134            return Err(WorkerError::BlocksNotFound(missing_blocks));
1135        }
1136        self.chain
1137            .execution_state
1138            .restore_from_content(&bytes)
1139            .await?;
1140        // `restore_from_content` writes directly to storage and leaves the
1141        // in-memory view in an undefined state — reload from storage.
1142        self.chain = self.storage.load_chain(chain_id).await?;
1143        // Re-populate `block_hashes` for every pre-checkpoint sender block the
1144        // chain still needs. The heights live in the just-restored execution
1145        // state (`unfinalized_message_blocks`); the matching hashes are the
1146        // ones the producer recorded in the oracle response, certified by the
1147        // checkpoint cert we already trust. Without this, the next step
1148        // (re-executing the checkpoint to verify its outcome) would fail
1149        // because `collect_unfinalized_block_hashes` looks these up.
1150        let heights = self.chain.collect_unfinalized_heights().await?;
1151        ensure!(
1152            heights.len() == outbox_block_hashes.len(),
1153            ChainError::InternalError(format!(
1154                "checkpoint oracle response has {} outbox block hashes but the \
1155                 restored state references {} distinct heights",
1156                outbox_block_hashes.len(),
1157                heights.len(),
1158            ))
1159        );
1160        for (height, hash) in heights.into_iter().zip(outbox_block_hashes) {
1161            self.chain.block_hashes.insert(&height, hash)?;
1162        }
1163        // Rebuild the off-chain outbox state (queues, counters,
1164        // nonempty_outboxes) from the on-chain unfinalized map so that this
1165        // node can resume pushing pre-checkpoint messages forward. The outbox
1166        // isn't part of the certified checkpoint blob, so without this a
1167        // bootstrapped validator would silently stop delivering pending
1168        // messages.
1169        let tracked = self.tracked_full_chains();
1170        self.chain
1171            .restore_outboxes_from_unfinalized(tracked.as_deref())
1172            .await?;
1173        for (origin, cursor) in inbox_cursors {
1174            let mut inbox = self.chain.inboxes.try_load_entry_mut(&origin).await?;
1175            inbox.restore_from_checkpoint(cursor).await?;
1176        }
1177        // We reset `execution_state` (via restore), `tip_state`, `block_hashes`
1178        // (for outbox-referenced pre-checkpoint heights), and the outbox views.
1179        // The other `ChainStateView` fields are either (a) already default for
1180        // a fresh bootstrap node (`inboxes`, `received_log`, …), (b) about to
1181        // be overwritten by `apply_confirmed_block` when the cert is applied
1182        // (`manager`, `block_hashes` for height `height`), or (c) outside the
1183        // protocol state hash so divergence from the producer is fine
1184        // (inboxes; subsequent blocks reconcile by anticipation if needed).
1185        // The `num_*` counters on `ChainTipState` are write-only in current
1186        // code, so leaving them at zero has no functional impact.
1187        let new_tip = ChainTipState {
1188            block_hash: previous_block_hash,
1189            next_block_height: height,
1190            ..Default::default()
1191        };
1192        self.chain.tip_state.set(new_tip.clone());
1193        self.save().await?;
1194        self.execute_contiguous_block(
1195            certificate,
1196            blobs,
1197            new_tip,
1198            notify_when_messages_are_delivered,
1199        )
1200        .await
1201    }
1202
1203    /// Executes a confirmed block whose height equals `tip.next_block_height`,
1204    /// updating inboxes, applying the block, and persisting the chain.
1205    async fn execute_contiguous_block(
1206        &mut self,
1207        certificate: ConfirmedBlockCertificate,
1208        mut blobs: BTreeMap<BlobId, Blob>,
1209        tip: ChainTipState,
1210        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1211    ) -> Result<(ChainInfoResponse, NetworkActions, BlockOutcome), WorkerError> {
1212        let block_hash = certificate.hash();
1213        let block = certificate.block();
1214        let chain_id = block.header.chain_id;
1215        let height = block.header.height;
1216
1217        // This should always be true for valid certificates.
1218        ensure!(
1219            tip.block_hash == block.header.previous_block_hash,
1220            WorkerError::InvalidBlockChaining
1221        );
1222
1223        // Verify that the chain is active and that the epoch we used for verifying
1224        // the certificate is actually the active one on the chain.
1225        self.initialize_and_save_if_needed().await?;
1226        let (epoch, _) = self.chain.current_committee().await?;
1227        check_block_epoch(epoch, chain_id, block.header.epoch)?;
1228
1229        let published_blobs = block
1230            .published_blob_ids()
1231            .iter()
1232            .filter_map(|blob_id| blobs.remove(blob_id))
1233            .collect::<Vec<_>>();
1234
1235        let local_time = self.storage.clock().current_time();
1236        if block.header.timestamp.duration_since(local_time) > self.config.block_time_grace_period {
1237            warn!(
1238                block_timestamp = %block.header.timestamp,
1239                %local_time,
1240                "Confirmed block has a timestamp in the future beyond the block time grace period"
1241            );
1242        }
1243        let tracked = self.reconcile_tracked_outboxes().await?;
1244        let chain = &mut self.chain;
1245        chain
1246            .remove_bundles_from_inboxes(
1247                block.header.timestamp,
1248                false,
1249                block.body.incoming_bundles(),
1250            )
1251            .await?;
1252        let confirmed_block = if let Some(mut execution_state) = self
1253            .execution_state_cache
1254            .as_ref()
1255            .and_then(|cache| cache.remove(&block_hash))
1256        {
1257            chain.execution_state = execution_state
1258                .with_context(|ctx| {
1259                    chain
1260                        .execution_state
1261                        .context()
1262                        .clone_with_base_key(ctx.base_key().bytes.clone())
1263                })
1264                .await;
1265            certificate.into_value()
1266        } else {
1267            let (proposed_block, outcome) = certificate.into_value().into_block().into_proposal();
1268            let oracle_responses = Some(outcome.oracle_responses.clone());
1269            let (proposed_block, verified, _resource_tracker, _) = chain
1270                .execute_block(
1271                    proposed_block,
1272                    local_time,
1273                    None,
1274                    &published_blobs,
1275                    oracle_responses,
1276                    BundleExecutionPolicy::committed(),
1277                )
1278                .await?;
1279            // We should always agree on the messages and state hash.
1280            if outcome != verified {
1281                return Err(ChainError::CorruptedChainState(format!(
1282                    "computed block outcome differs from the certificate.\n\
1283                    Computed: {verified:#?}\n\
1284                    Submitted: {outcome:#?}"
1285                ))
1286                .into());
1287            }
1288            ConfirmedBlock::new(Block::new(proposed_block, verified))
1289        };
1290
1291        let updated_streams = chain
1292            .apply_confirmed_block(
1293                &confirmed_block,
1294                local_time,
1295                tracked.as_deref().map(|h| h.inner()),
1296            )
1297            .await?;
1298        let mut actions = self.create_network_actions(None).await?;
1299        trace!("Processed confirmed block {height}");
1300        actions.notifications.push(Notification {
1301            chain_id,
1302            reason: Reason::NewBlock {
1303                height,
1304                hash: block_hash,
1305            },
1306        });
1307        if !updated_streams.is_empty() {
1308            actions.notifications.push(Notification {
1309                chain_id,
1310                reason: Reason::NewEvents {
1311                    height,
1312                    block_hash,
1313                    event_streams: updated_streams,
1314                },
1315            });
1316        }
1317        self.save().await?;
1318
1319        self.block_values
1320            .insert_hashed(Cow::Owned(confirmed_block.into_inner()));
1321
1322        self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
1323            .await;
1324
1325        Ok((
1326            self.chain_info_response().await?,
1327            actions,
1328            BlockOutcome::Processed,
1329        ))
1330    }
1331
1332    /// Schedules a notification for when cross-chain messages are delivered up to the given
1333    /// `height`.
1334    #[instrument(level = "trace", skip(self, notify_when_messages_are_delivered))]
1335    async fn register_delivery_notifier(
1336        &self,
1337        height: BlockHeight,
1338        actions: &NetworkActions,
1339        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
1340    ) {
1341        if let Some(notifier) = notify_when_messages_are_delivered {
1342            if actions
1343                .cross_chain_requests
1344                .iter()
1345                .any(|request| request.has_messages_lower_or_equal_than(height))
1346            {
1347                self.delivery_notifier.register(height, notifier);
1348            } else {
1349                // No need to wait. Also, cross-chain requests may not trigger the
1350                // notifier later, even if we register it.
1351                if let Err(()) = notifier.send(()) {
1352                    debug!("Failed to notify message delivery to caller (early case)");
1353                }
1354            }
1355        }
1356    }
1357
1358    /// Updates the chain's inboxes, receiving messages from a cross-chain update.
1359    #[instrument(level = "debug", skip(self, bundles), fields(chain_id = %self.chain_id()))]
1360    pub(crate) async fn process_cross_chain_update(
1361        &mut self,
1362        origin: ChainId,
1363        bundles: Vec<(Epoch, MessageBundle)>,
1364        sender_previous_height: Option<BlockHeight>,
1365    ) -> Result<CrossChainUpdateResult, WorkerError> {
1366        // Only process certificates with relevant heights and epochs.
1367        let mut inbox = self.chain.inboxes.try_load_entry_mut(&origin).await?;
1368        let next_height_to_receive = inbox.next_block_height_to_receive()?;
1369        let last_anticipated_block_height = inbox
1370            .removed_bundles
1371            .back()
1372            .await?
1373            .map(|bundle| bundle.height);
1374
1375        // Proactive gap detection: if the sender declares a predecessor height that
1376        // we haven't received yet, the inbox has a gap.
1377        if let Some(prev) = sender_previous_height {
1378            if prev >= next_height_to_receive {
1379                let chain_id = self.chain_id();
1380                if self.config.allow_revert_confirm && self.config.recovery_allowed_for(&chain_id) {
1381                    warn!(
1382                        %chain_id,
1383                        "Inbox gap detected from {origin}: \
1384                        sender declares previous height {prev} but we only have up to \
1385                        {next_height_to_receive}; requesting resend",
1386                    );
1387                    return Ok(CrossChainUpdateResult::GapDetected {
1388                        origin,
1389                        retransmit_from: next_height_to_receive,
1390                    });
1391                }
1392                return Err(ChainError::InboxGapDetected {
1393                    chain_id,
1394                    origin,
1395                    expected_height: prev,
1396                    actual_height: bundles.first().map(|(_, b)| b.height).unwrap_or_default(),
1397                }
1398                .into());
1399            }
1400        }
1401
1402        let bundles = self
1403            .select_message_bundles(
1404                &origin,
1405                next_height_to_receive,
1406                last_anticipated_block_height,
1407                bundles,
1408            )
1409            .await?;
1410        let Some(last_updated_height) = bundles.last().map(|bundle| bundle.height) else {
1411            return Ok(CrossChainUpdateResult::NothingToDo);
1412        };
1413        // Process the received messages in certificates.
1414        let local_time = self.storage.clock().current_time();
1415        let mut previous_height = None;
1416        for bundle in bundles {
1417            let add_to_received_log = previous_height != Some(bundle.height);
1418            previous_height = Some(bundle.height);
1419            // Update the staged chain state with the received block.
1420            self.chain
1421                .receive_message_bundle_with_inbox(
1422                    &mut inbox,
1423                    &origin,
1424                    bundle,
1425                    local_time,
1426                    add_to_received_log,
1427                )
1428                .await?;
1429        }
1430        inbox.observe_size_metric();
1431        drop(inbox);
1432        if !self.config.allow_inactive_chains && !self.chain.is_active().await? {
1433            // Refuse to create a chain state if the chain is still inactive by
1434            // now. Accordingly, do not send a confirmation, so that the
1435            // cross-chain update is retried later.
1436            warn!(
1437                chain_id = %self.chain_id(),
1438                "Refusing to deliver messages from {origin} \
1439                at height {last_updated_height} because the recipient is still inactive",
1440            );
1441            return Ok(CrossChainUpdateResult::NothingToDo);
1442        }
1443        Ok(CrossChainUpdateResult::Updated(last_updated_height))
1444    }
1445
1446    /// Handles the cross-chain request confirming that the recipient was updated.
1447    #[instrument(skip_all, fields(
1448        chain_id = %self.chain_id(),
1449        %recipient,
1450        %latest_height
1451    ))]
1452    pub(crate) async fn confirm_updated_recipient(
1453        &mut self,
1454        recipient: ChainId,
1455        latest_height: BlockHeight,
1456    ) -> Result<bool, WorkerError> {
1457        // Reconcile the outbox indices with the *current* tracked set before draining the counter
1458        // and checking delivery.
1459        let tracked = self.reconcile_tracked_outboxes().await?;
1460        // The indices are now reconciled to the tracked set, so `all_messages_delivered_up_to`
1461        // (the `outbox_counters` fast path) is already the complete answer for tracked chains.
1462        Ok(self
1463            .chain
1464            .mark_messages_as_received(
1465                &recipient,
1466                latest_height,
1467                tracked.as_deref().map(|h| h.inner()),
1468            )
1469            .await?
1470            && self.chain.all_messages_delivered_up_to(latest_height))
1471    }
1472
1473    /// Notifies delivery waiters that all messages up to `height` have been delivered.
1474    pub(crate) fn notify_delivery(&self, height: BlockHeight) {
1475        self.delivery_notifier.notify(height);
1476    }
1477
1478    /// Processes a batch of cross-chain requests, performing at most one `save()`.
1479    ///
1480    /// Both update and confirmation requests are handled together so that a
1481    /// single write-lock acquisition covers all pending work for the chain.
1482    pub(crate) async fn process_batch(&mut self, requests: Vec<BatchRequest>) {
1483        let mut update_results = Vec::new();
1484        let mut confirm_results = Vec::new();
1485        let mut need_save = false;
1486        let mut need_rollback = false;
1487        let mut max_delivered_height: Option<BlockHeight> = None;
1488
1489        for request in requests {
1490            match request {
1491                BatchRequest::Update {
1492                    origin,
1493                    bundles,
1494                    previous_height,
1495                    result_sender,
1496                } => {
1497                    if need_rollback {
1498                        send_result(result_sender, Err(WorkerError::BatchRolledBack));
1499                        continue;
1500                    }
1501                    let result = self
1502                        .process_cross_chain_update(origin, bundles, previous_height)
1503                        .await;
1504                    let update_result = match result {
1505                        Ok(update_result) => update_result,
1506                        Err(error) => {
1507                            need_rollback = true;
1508                            send_result(result_sender, Err(error));
1509                            continue;
1510                        }
1511                    };
1512                    match &update_result {
1513                        CrossChainUpdateResult::Updated(_) => need_save = true,
1514                        CrossChainUpdateResult::GapDetected { .. }
1515                        | CrossChainUpdateResult::NothingToDo => {}
1516                    }
1517                    update_results.push((result_sender, update_result));
1518                }
1519                BatchRequest::Confirm {
1520                    recipient,
1521                    latest_height,
1522                    result_sender,
1523                } => {
1524                    if need_rollback {
1525                        send_result(result_sender, Err(WorkerError::BatchRolledBack));
1526                        continue;
1527                    }
1528                    match self
1529                        .confirm_updated_recipient(recipient, latest_height)
1530                        .await
1531                    {
1532                        Ok(fully_delivered) => {
1533                            need_save = true;
1534                            if fully_delivered {
1535                                max_delivered_height = Some(
1536                                    max_delivered_height
1537                                        .map_or(latest_height, |h| h.max(latest_height)),
1538                                );
1539                            }
1540                            confirm_results.push((result_sender, recipient));
1541                        }
1542                        Err(error) => {
1543                            need_rollback = true;
1544                            send_result(result_sender, Err(error));
1545                        }
1546                    }
1547                }
1548            }
1549        }
1550        if !need_rollback && need_save {
1551            if let Err(error) = self.save().await {
1552                tracing::error!(%error, "failed to save batch; rolling back");
1553                need_rollback = true;
1554            }
1555        }
1556        if need_rollback {
1557            for (result_sender, _) in update_results {
1558                send_result(result_sender, Err(WorkerError::BatchRolledBack));
1559            }
1560            for (result_sender, _) in confirm_results {
1561                send_result(result_sender, Err(WorkerError::BatchRolledBack));
1562            }
1563            return;
1564        }
1565
1566        if let Some(height) = max_delivered_height {
1567            self.notify_delivery(height);
1568        }
1569
1570        for (result_sender, update_result) in update_results {
1571            send_result(result_sender, Ok(update_result));
1572        }
1573        for (result_sender, recipient) in confirm_results {
1574            let result = self
1575                .create_cross_chain_actions_for_recipient(recipient)
1576                .await;
1577            send_result(result_sender, result);
1578        }
1579    }
1580
1581    /// Handles a `RevertConfirm` request: walks backward through
1582    /// `previous_message_blocks` to find all block heights that sent messages to
1583    /// `recipient` starting from the latest down to `retransmit_from`, re-adds them
1584    /// to the outbox, and creates cross-chain update actions to resend the bundles.
1585    #[instrument(skip_all, fields(
1586        chain_id = %self.chain_id(),
1587        %recipient,
1588        %retransmit_from,
1589    ))]
1590    pub(crate) async fn handle_revert_confirm(
1591        &mut self,
1592        recipient: ChainId,
1593        retransmit_from: BlockHeight,
1594    ) -> Result<NetworkActions, WorkerError> {
1595        self.reconcile_tracked_outboxes().await?;
1596        // 1. Walk backward through previous_message_blocks to collect all heights
1597        //    that sent messages to this recipient, from the latest down to retransmit_from.
1598        let Some(latest_height) = self
1599            .chain
1600            .execution_state
1601            .previous_message_blocks
1602            .get(&recipient)
1603            .await?
1604        else {
1605            warn!("RevertConfirm: no record of sending to {recipient}");
1606            return Ok(NetworkActions::default());
1607        };
1608
1609        let mut heights_to_re_add = Vec::new();
1610        let mut current_height = latest_height;
1611        while current_height >= retransmit_from {
1612            // We arrived at current_height via previous_message_blocks links, starting from the
1613            // chain state and following the links downwards. So these blocks should all be in
1614            // `block_hashes` already.
1615            heights_to_re_add.push(current_height);
1616            // Load the block at current_height to find the previous message block
1617            let hash = match &*self
1618                .chain
1619                .block_hashes_for_heights([current_height])
1620                .await?
1621            {
1622                [hash] => *hash,
1623                _ => {
1624                    return Err(WorkerError::BlockHashNotFound {
1625                        height: current_height,
1626                        chain_id: self.chain_id(),
1627                    })
1628                }
1629            };
1630            let block = self
1631                .read_confirmed_blocks(&[hash])
1632                .await?
1633                .pop()
1634                .flatten()
1635                .ok_or_else(|| WorkerError::LocalBlockNotFound {
1636                    height: current_height,
1637                    chain_id: self.chain_id(),
1638                })?;
1639            match block.block().body.previous_message_blocks.get(&recipient) {
1640                Some((_, prev_height)) if *prev_height >= retransmit_from => {
1641                    current_height = *prev_height;
1642                }
1643                _ => break,
1644            }
1645        }
1646
1647        // 2. Re-add the heights to the outbox.
1648        let new_heights = self
1649            .chain
1650            .outboxes
1651            .try_load_entry_mut(&recipient)
1652            .await?
1653            .revert(&heights_to_re_add)
1654            .await?;
1655
1656        if new_heights.is_empty() {
1657            debug!("RevertConfirm: all heights already in outbox for {recipient}");
1658            return Ok(NetworkActions::default());
1659        }
1660
1661        // 3. Update the indices only for tracked recipients (mirroring `process_outgoing_messages`):
1662        //    an untracked recipient keeps its re-added outbox queue but is not counted or indexed.
1663        let new_heights_len = new_heights.len();
1664        if self.is_tracked(&recipient) {
1665            for h in new_heights {
1666                *self.chain.outbox_counters.get_mut().entry(h).or_default() += 1;
1667            }
1668            self.chain.nonempty_outboxes.get_mut().insert(recipient);
1669        }
1670
1671        // 4. Create cross-chain requests for this recipient.
1672        let actions = self
1673            .create_cross_chain_actions_for_recipient(recipient)
1674            .await?;
1675
1676        // 5. Save chain state.
1677        self.save().await?;
1678
1679        warn!(
1680            "RevertConfirm: re-added {new_heights_len} heights to outbox for {recipient}, \
1681            starting from height {retransmit_from}"
1682        );
1683
1684        Ok(actions)
1685    }
1686
1687    /// If the config enables corruption recovery and the min-duration guard is
1688    /// satisfied, resets the chain state and re-executes all confirmed blocks.
1689    /// Returns `RevertConfirm` requests to dispatch, or `None` if no reset happened.
1690    pub(crate) async fn maybe_reset_corrupted_chain_state(
1691        &mut self,
1692    ) -> Result<Option<Vec<CrossChainRequest>>, WorkerError> {
1693        let Some(min_duration) = self.config.reset_on_corrupted_chain_state else {
1694            return Ok(None);
1695        };
1696        let chain_id = self.chain_id();
1697        if !self.config.recovery_allowed_for(&chain_id) {
1698            return Ok(None);
1699        }
1700        let local_time = self.storage.clock().current_time();
1701        let block_zero_time = *self.chain.block_zero_executed_at.get();
1702        let elapsed = local_time.duration_since(block_zero_time);
1703        if elapsed < min_duration {
1704            warn!(
1705                %chain_id, ?elapsed, ?min_duration,
1706                "Not resetting corrupted chain state; not enough time elapsed \
1707                since last block 0 execution"
1708            );
1709            return Ok(None);
1710        }
1711        warn!(%chain_id, "Corrupted chain state detected; resetting and re-executing");
1712        Ok(Some(self.reset_and_reexecute_chain().await?))
1713    }
1714
1715    /// Resets the chain state completely and re-executes all confirmed blocks from storage.
1716    /// Returns a `RevertConfirm` request for every known sender so they resend cross-chain
1717    /// messages that may have been lost during the reset.
1718    #[instrument(skip_all, fields(
1719        chain_id = %self.chain_id(),
1720    ))]
1721    pub(crate) async fn reset_and_reexecute_chain(
1722        &mut self,
1723    ) -> Result<Vec<CrossChainRequest>, WorkerError> {
1724        let chain_id = self.chain_id();
1725        let tip_height = self.chain.tip_state.get().next_block_height;
1726
1727        // 1. Collect all sender chain IDs and block hashes before clearing.
1728        let sender_ids = self.chain.inboxes.indices().await?;
1729        let block_hashes = self.chain.block_hashes.index_values().await?;
1730
1731        // 2. Snapshot safety-critical manager state so that we cannot be tricked
1732        //    into double-signing if the reset wipes votes we already cast.
1733        let manager_snapshot = ManagerSafetySnapshot::capture(&self.chain.manager).await?;
1734
1735        // 3. Wipe every key under this chain's storage root and reload a fresh
1736        //    `ChainStateView`. `ChainStateView::clear` + `save` would only reset
1737        //    the in-memory view; `HistoricallyHashableView` deliberately keeps
1738        //    its `stored_hash` across clears, so the historical hash would carry
1739        //    over from the pre-reset state and the replayed block outcomes would
1740        //    never match the original certificates' `state_hash`. Deleting the
1741        //    storage prefix and reloading is equivalent to creating the chain
1742        //    from scratch, which is what replay expects.
1743        self.wipe_and_reload_chain().await?;
1744        self.knows_chain_is_active = false;
1745        warn!(
1746            %chain_id,
1747            "Cleared chain state up to height {tip_height}; \
1748            re-executing all blocks"
1749        );
1750
1751        // 4. Re-load certificates one at a time by hash and re-process them.
1752        for (height, hash) in block_hashes {
1753            let cert = self
1754                .storage
1755                .read_certificate(hash)
1756                .await?
1757                .map(CacheArc::unwrap_or_clone)
1758                .ok_or_else(|| WorkerError::LocalBlockNotFound { height, chain_id })?;
1759            Box::pin(self.process_confirmed_block(cert, ProcessConfirmedBlockMode::Execute, None))
1760                .await?;
1761        }
1762
1763        // 5. Restore any previously cast votes and locking block so we cannot be
1764        //    asked to sign a conflicting statement at the same height/round. Votes
1765        //    in the manager always belong to the pending height (one past the tip),
1766        //    so restoring is only meaningful if re-execution landed at the same
1767        //    tip. Otherwise the restored state would refer to a stale pending
1768        //    height and could only break the manager's invariants without any
1769        //    safety benefit — so we drop the snapshot in that case.
1770        let new_tip_height = self.chain.tip_state.get().next_block_height;
1771        if new_tip_height == tip_height {
1772            manager_snapshot.restore(&mut self.chain.manager)?;
1773            self.save().await?;
1774        } else {
1775            warn!(
1776                %tip_height, %new_tip_height,
1777                "Dropping manager snapshot: pre-reset tip differs from post-reset tip"
1778            );
1779        }
1780
1781        // 6. Build RevertConfirm requests so each sender resends messages we may
1782        //    have lost during the reset.
1783        let revert_requests = sender_ids
1784            .into_iter()
1785            .map(|sender| CrossChainRequest::RevertConfirm {
1786                sender,
1787                recipient: chain_id,
1788                retransmit_from: BlockHeight::ZERO,
1789            })
1790            .collect::<Vec<_>>();
1791
1792        warn!(
1793            tip_height = %self.chain.tip_state.get().next_block_height,
1794            num_revert_confirms = revert_requests.len(),
1795            "Chain reset and re-executed; sending RevertConfirm to senders"
1796        );
1797
1798        Ok(revert_requests)
1799    }
1800
1801    #[instrument(skip_all, fields(
1802        chain_id = %self.chain_id(),
1803        num_trackers = %new_trackers.len()
1804    ))]
1805    pub(crate) async fn update_received_certificate_trackers(
1806        &mut self,
1807        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
1808    ) -> Result<(), WorkerError> {
1809        self.chain
1810            .update_received_certificate_trackers(new_trackers);
1811        self.save().await?;
1812        Ok(())
1813    }
1814
1815    /// Returns the preprocessed block hashes in the given height range.
1816    #[instrument(skip_all, fields(
1817        chain_id = %self.chain_id(),
1818        start = %start,
1819        end = %end
1820    ))]
1821    pub(crate) async fn get_preprocessed_block_hashes(
1822        &self,
1823        start: BlockHeight,
1824        end: BlockHeight,
1825    ) -> Result<Vec<CryptoHash>, WorkerError> {
1826        let mut hashes = Vec::new();
1827        let mut height = start;
1828        while height < end {
1829            match self.chain.block_hashes.get(&height).await? {
1830                Some(hash) => hashes.push(hash),
1831                None => break,
1832            }
1833            height = height.try_add_one()?;
1834        }
1835        Ok(hashes)
1836    }
1837
1838    /// Returns the next block height to receive from an inbox.
1839    #[instrument(skip_all, fields(
1840        chain_id = %self.chain_id(),
1841        origin = %origin
1842    ))]
1843    pub(crate) async fn get_inbox_next_height(
1844        &self,
1845        origin: ChainId,
1846    ) -> Result<BlockHeight, WorkerError> {
1847        Ok(match self.chain.inboxes.try_load_entry(&origin).await? {
1848            Some(inbox) => inbox.next_block_height_to_receive()?,
1849            None => BlockHeight::ZERO,
1850        })
1851    }
1852
1853    /// Returns the locking blobs for the given blob IDs.
1854    /// Returns `Ok(None)` if any of the blobs is not found.
1855    #[instrument(skip_all, fields(
1856        chain_id = %self.chain_id(),
1857        num_blob_ids = %blob_ids.len()
1858    ))]
1859    pub(crate) async fn get_locking_blobs(
1860        &self,
1861        blob_ids: Vec<BlobId>,
1862    ) -> Result<Option<Vec<Blob>>, WorkerError> {
1863        let results = self
1864            .chain
1865            .manager
1866            .locking_blobs
1867            .multi_get(&blob_ids)
1868            .await?;
1869        Ok(results.into_iter().collect())
1870    }
1871
1872    /// Gets block hashes for specified heights.
1873    pub(crate) async fn get_block_hashes(
1874        &self,
1875        heights: Vec<BlockHeight>,
1876    ) -> Result<Vec<CryptoHash>, WorkerError> {
1877        Ok(self.chain.block_hashes_for_heights(heights).await?)
1878    }
1879
1880    /// Gets proposed blobs from the manager for specified blob IDs.
1881    pub(crate) async fn get_proposed_blobs(
1882        &self,
1883        blob_ids: Vec<BlobId>,
1884    ) -> Result<Vec<Blob>, WorkerError> {
1885        let results = self
1886            .chain
1887            .manager
1888            .proposed_blobs
1889            .multi_get(&blob_ids)
1890            .await?;
1891        let mut blobs = Vec::with_capacity(blob_ids.len());
1892        let mut missing = Vec::new();
1893        for (blob_id, maybe_blob) in blob_ids.into_iter().zip(results) {
1894            match maybe_blob {
1895                Some(blob) => blobs.push(blob),
1896                None => missing.push(blob_id),
1897            }
1898        }
1899        if !missing.is_empty() {
1900            return Err(WorkerError::BlobsNotFound(missing));
1901        }
1902        Ok(blobs)
1903    }
1904
1905    /// Gets event subscriptions.
1906    pub(crate) async fn get_event_subscriptions(
1907        &self,
1908    ) -> Result<EventSubscriptionsResult, WorkerError> {
1909        Ok(self
1910            .chain
1911            .execution_state
1912            .system
1913            .event_subscriptions
1914            .index_values()
1915            .await?)
1916    }
1917
1918    /// Gets the next expected event index for a stream.
1919    pub(crate) async fn get_next_expected_event(
1920        &self,
1921        stream_id: StreamId,
1922    ) -> Result<Option<u32>, WorkerError> {
1923        Ok(self.chain.next_expected_events.get(&stream_id).await?)
1924    }
1925
1926    /// Gets the `next_expected_events` indices for the given streams.
1927    pub(crate) async fn get_next_expected_events(
1928        &self,
1929        stream_ids: Vec<StreamId>,
1930    ) -> Result<BTreeMap<StreamId, u32>, WorkerError> {
1931        let values = self
1932            .chain
1933            .next_expected_events
1934            .multi_get(&stream_ids)
1935            .await?;
1936        Ok(stream_ids
1937            .into_iter()
1938            .zip(values)
1939            .filter_map(|(id, val)| Some((id, val?)))
1940            .collect())
1941    }
1942
1943    /// Gets received certificate trackers.
1944    pub(crate) async fn get_received_certificate_trackers(
1945        &self,
1946    ) -> Result<HashMap<ValidatorPublicKey, u64>, WorkerError> {
1947        Ok(self.chain.received_certificate_trackers.get().clone())
1948    }
1949
1950    /// Gets tip state and outbox info for next_outbox_heights calculation.
1951    pub(crate) async fn get_tip_state_and_outbox_info(
1952        &self,
1953        receiver_id: ChainId,
1954    ) -> Result<(BlockHeight, Option<BlockHeight>), WorkerError> {
1955        let next_block_height = self.chain.tip_state.get().next_block_height;
1956        let next_height_to_schedule = self
1957            .chain
1958            .outboxes
1959            .try_load_entry(&receiver_id)
1960            .await?
1961            .map(|outbox| *outbox.next_height_to_schedule.get());
1962        Ok((next_block_height, next_height_to_schedule))
1963    }
1964
1965    /// Gets the next height to preprocess.
1966    pub(crate) fn get_next_height_to_preprocess(&self) -> BlockHeight {
1967        *self.chain.next_height_to_preprocess.get()
1968    }
1969
1970    /// Attempts to vote for a leader timeout, if possible.
1971    #[instrument(skip_all, fields(
1972        chain_id = %self.chain_id(),
1973        height = %height,
1974        round = %round
1975    ))]
1976    async fn vote_for_leader_timeout(
1977        &mut self,
1978        height: BlockHeight,
1979        round: Round,
1980    ) -> Result<(), WorkerError> {
1981        let chain = &mut self.chain;
1982        ensure!(
1983            height == chain.tip_state.get().next_block_height,
1984            WorkerError::UnexpectedBlockHeight {
1985                expected_block_height: chain.tip_state.get().next_block_height,
1986                found_block_height: height
1987            }
1988        );
1989        let epoch = chain.execution_state.system.epoch.get();
1990        let chain_id = chain.chain_id();
1991        let key_pair = self.config.key_pair();
1992        let local_time = self.storage.clock().current_time();
1993        if chain
1994            .manager
1995            .create_timeout_vote(chain_id, height, round, *epoch, key_pair, local_time)?
1996        {
1997            self.save().await?;
1998        }
1999        Ok(())
2000    }
2001
2002    /// Votes for falling back to a public chain.
2003    ///
2004    /// Fallback is triggered when the chain is in epoch `e` and epoch `e+1` has been created
2005    /// on the admin chain longer than the configured `fallback_duration` ago.
2006    #[instrument(skip_all, fields(
2007        chain_id = %self.chain_id()
2008    ))]
2009    async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> {
2010        let chain = &mut self.chain;
2011        let epoch = *chain.execution_state.system.epoch.get();
2012        let Some(admin_chain_id) = chain.execution_state.system.admin_chain_id.get() else {
2013            return Ok(());
2014        };
2015
2016        // Check if epoch e+1 exists on the admin chain and when it was created.
2017        let next_epoch_index = epoch.0.saturating_add(1);
2018        let event_id = EventId {
2019            chain_id: *admin_chain_id,
2020            stream_id: StreamId::system(EPOCH_STREAM_NAME),
2021            index: next_epoch_index,
2022        };
2023
2024        let Some(event_bytes) = self.storage.read_event(event_id).await? else {
2025            return Ok(()); // Next epoch doesn't exist yet.
2026        };
2027
2028        let event_data: EpochEventData = bcs::from_bytes(&event_bytes)?;
2029        let elapsed = self
2030            .storage
2031            .clock()
2032            .current_time()
2033            .delta_since(event_data.timestamp);
2034        if elapsed >= chain.ownership().await?.timeout_config.fallback_duration {
2035            let chain_id = chain.chain_id();
2036            let height = chain.tip_state.get().next_block_height;
2037            let key_pair = self.config.key_pair();
2038            if chain
2039                .manager
2040                .vote_fallback(chain_id, height, epoch, key_pair)
2041            {
2042                self.save().await?;
2043            }
2044        }
2045        Ok(())
2046    }
2047
2048    #[instrument(skip_all, fields(
2049        chain_id = %self.chain_id(),
2050        blob_id = %blob.id()
2051    ))]
2052    pub(crate) async fn handle_pending_blob(
2053        &mut self,
2054        blob: Blob,
2055    ) -> Result<ChainInfoResponse, WorkerError> {
2056        let mut was_expected = self
2057            .chain
2058            .pending_validated_blobs
2059            .maybe_insert(&blob)
2060            .await?;
2061        for (_, mut pending_blobs) in self
2062            .chain
2063            .pending_proposed_blobs
2064            .try_load_all_entries_mut()
2065            .await?
2066        {
2067            if !pending_blobs.validated.get() {
2068                let (_, committee) = self.chain.current_committee().await?;
2069                let policy = committee.policy();
2070                policy
2071                    .check_blob_size(blob.content())
2072                    .with_execution_context(ChainExecutionContext::Block)?;
2073                ensure!(
2074                    u64::try_from(pending_blobs.pending_blobs.iterative_count().await?)
2075                        .is_ok_and(|count| count < policy.maximum_published_blobs),
2076                    WorkerError::TooManyPublishedBlobs(policy.maximum_published_blobs)
2077                );
2078            }
2079            was_expected = was_expected || pending_blobs.maybe_insert(&blob).await?;
2080        }
2081        ensure!(was_expected, WorkerError::UnexpectedBlob);
2082        self.save().await?;
2083        self.chain_info_response().await
2084    }
2085
2086    /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`].
2087    ///
2088    /// Does not need `&mut self` because the chain is eagerly initialized when the
2089    /// chain handle is created.
2090    #[cfg(with_testing)]
2091    #[instrument(skip_all, fields(
2092        chain_id = %self.chain_id(),
2093        height = %height
2094    ))]
2095    pub(crate) async fn read_certificate(
2096        &self,
2097        height: BlockHeight,
2098    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, WorkerError> {
2099        let certificate_hash = match self.chain.block_hashes.get(&height).await? {
2100            Some(hash) => hash,
2101            None => return Ok(None),
2102        };
2103        let certificate = self
2104            .storage
2105            .read_certificate(certificate_hash)
2106            .await?
2107            .ok_or(WorkerError::BlocksNotFound(vec![certificate_hash]))?;
2108        Ok(Some(certificate))
2109    }
2110
2111    /// Queries an application's state on the chain.
2112    #[instrument(skip_all, fields(
2113        chain_id = %self.chain_id(),
2114        query_application_id = %query.application_id()
2115    ))]
2116    pub(crate) async fn query_application(
2117        &mut self,
2118        query: Query,
2119        block_hash: Option<CryptoHash>,
2120    ) -> Result<(QueryOutcome, BlockHeight), WorkerError> {
2121        self.initialize_and_save_if_needed().await?;
2122        let next_block_height = self.chain.tip_state.get().next_block_height;
2123        let local_time = self.storage.clock().current_time();
2124        // Try to use a cached execution state for the requested block.
2125        // We want to pretend that this block is committed, so we set the next block height.
2126        let cached_state = block_hash
2127            .zip(self.execution_state_cache.as_ref())
2128            .and_then(|(h, cache)| Some(h).zip(cache.remove(&h)));
2129        if let Some((requested_block, mut state)) = cached_state {
2130            let next_block_height = next_block_height
2131                .try_add_one()
2132                .expect("block height to not overflow");
2133            let context = QueryContext {
2134                chain_id: self.chain_id(),
2135                next_block_height,
2136                local_time,
2137            };
2138            let outcome = state
2139                .with_context(|ctx| {
2140                    self.chain
2141                        .execution_state
2142                        .context()
2143                        .clone_with_base_key(ctx.base_key().bytes.clone())
2144                })
2145                .await
2146                .query_application(context, query, self.service_runtime_endpoint.as_mut())
2147                .await
2148                .with_execution_context(ChainExecutionContext::Query)?;
2149            if let Some(cache) = &self.execution_state_cache {
2150                cache.insert(&requested_block, state);
2151            }
2152            Ok((outcome, next_block_height))
2153        } else {
2154            if block_hash.is_some() {
2155                tracing::debug!(
2156                    "requested block hash not found in cache, querying committed state"
2157                );
2158            }
2159            let outcome = self
2160                .chain
2161                .query_application(local_time, query, self.service_runtime_endpoint.as_mut())
2162                .await?;
2163            Ok((outcome, next_block_height))
2164        }
2165    }
2166
2167    /// Returns an application's description by reading the blob directly from storage.
2168    ///
2169    /// Does not track blob usage (which requires `&mut self`), making it safe for
2170    /// concurrent reads. Blob tracking is only relevant during block execution and is
2171    /// always rolled back for read-only queries.
2172    #[instrument(skip_all, fields(
2173        chain_id = %self.chain_id(),
2174        application_id = %application_id
2175    ))]
2176    pub(crate) async fn describe_application_readonly(
2177        &self,
2178        application_id: ApplicationId,
2179    ) -> Result<ApplicationDescription, WorkerError> {
2180        let blob_id = application_id.description_blob_id();
2181        let blob = self
2182            .storage
2183            .read_blob(blob_id)
2184            .await?
2185            .ok_or(WorkerError::BlobsNotFound(vec![blob_id]))?;
2186        Ok(bcs::from_bytes(blob.bytes())?)
2187    }
2188
2189    /// Executes a block without persisting any changes to the state, with a specified
2190    /// policy for handling bundle failures.
2191    ///
2192    /// The block may be modified to reflect the actual executed transactions
2193    /// (bundles may be rejected or removed based on the policy).
2194    #[instrument(skip_all, fields(
2195        chain_id = %self.chain_id(),
2196        block_height = %block.height
2197    ))]
2198    pub(crate) async fn stage_block_execution(
2199        &mut self,
2200        block: ProposedBlock,
2201        round: Option<u32>,
2202        published_blobs: &[Blob],
2203        policy: BundleExecutionPolicy,
2204    ) -> Result<
2205        (
2206            ProposedBlock,
2207            Block,
2208            ChainInfoResponse,
2209            ResourceTracker,
2210            HashSet<ChainId>,
2211        ),
2212        WorkerError,
2213    > {
2214        self.initialize_and_save_if_needed().await?;
2215        let local_time = self.storage.clock().current_time();
2216        let (_, committee) = self.chain.current_committee().await?;
2217        block.check_proposal_size(committee.policy().maximum_block_proposal_size)?;
2218
2219        self.chain
2220            .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
2221            .await?;
2222        let (executed_block, resource_tracker, never_reject_origins) =
2223            Box::pin(self.execute_block(block, local_time, round, published_blobs, policy)).await?;
2224
2225        // No need to sign: only used internally.
2226        let info = ChainInfo::from_chain_view(&mut self.chain).await?;
2227        let mut response = ChainInfoResponse::new(info, None);
2228        if let Some(owner) = executed_block.header.authenticated_owner {
2229            response.info.requested_owner_balance = self
2230                .chain
2231                .execution_state
2232                .system
2233                .balances
2234                .get(&owner)
2235                .await?;
2236        }
2237
2238        let (proposed_block, _) = executed_block.clone().into_proposal();
2239        Ok((
2240            proposed_block,
2241            executed_block,
2242            response,
2243            resource_tracker,
2244            never_reject_origins,
2245        ))
2246    }
2247
2248    /// Validates and executes a block proposed to extend this chain.
2249    ///
2250    /// Returns network actions alongside the result so the caller can dispatch them
2251    /// even when the proposal is rejected: a `HasIncompatibleConfirmedVote` rejection
2252    /// can still advance `current_round` via `update_signed_proposal`, and subscribers
2253    /// need the resulting `NewRound` notification.
2254    #[instrument(skip_all, fields(
2255        chain_id = %self.chain_id(),
2256        block_height = %proposal.content.block.height
2257    ))]
2258    pub(crate) async fn handle_block_proposal(
2259        &mut self,
2260        proposal: BlockProposal,
2261    ) -> (Result<ChainInfoResponse, WorkerError>, NetworkActions) {
2262        let old_round = self.chain.manager.current_round();
2263        match self.try_handle_block_proposal(proposal).await {
2264            Ok((response, actions)) => (Ok(response), actions),
2265            Err(err) => {
2266                // Even on error, the manager's `current_round` may have advanced
2267                // (the `HasIncompatibleConfirmedVote` recovery path calls
2268                // `update_signed_proposal`). Surface the resulting `NewRound`
2269                // notification so subscribers can react.
2270                let actions = if self.chain.manager.current_round() != old_round {
2271                    self.create_network_actions(Some(old_round))
2272                        .await
2273                        .unwrap_or_default()
2274                } else {
2275                    NetworkActions::default()
2276                };
2277                (Err(err), actions)
2278            }
2279        }
2280    }
2281
2282    async fn try_handle_block_proposal(
2283        &mut self,
2284        proposal: BlockProposal,
2285    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
2286        self.initialize_and_save_if_needed().await?;
2287        proposal
2288            .check_invariants()
2289            .map_err(|msg| WorkerError::InvalidBlockProposal(msg.to_string()))?;
2290        proposal.check_signature()?;
2291        let owner = proposal.owner();
2292        let BlockProposal {
2293            content,
2294            original_proposal,
2295            signature: _,
2296        } = &proposal;
2297        let block = &content.block;
2298        let chain = &self.chain;
2299        // Check if the chain is ready for this new block proposal.
2300        chain.tip_state.get().verify_block_chaining(block)?;
2301        // Check the epoch.
2302        let (epoch, committee) = chain.current_committee().await?;
2303        check_block_epoch(epoch, block.chain_id, block.epoch)?;
2304        let policy = committee.policy().clone();
2305        block.check_proposal_size(policy.maximum_block_proposal_size)?;
2306        // Check the authentication of the block.
2307        ensure!(
2308            chain.manager.can_propose(&owner, proposal.content.round),
2309            WorkerError::InvalidOwner
2310        );
2311        let old_round = self.chain.manager.current_round();
2312        match original_proposal {
2313            None => {
2314                if let Some(signer) = block.authenticated_owner {
2315                    // Check the authentication of the operations in the new block.
2316                    ensure!(signer == owner, WorkerError::InvalidSigner(owner));
2317                }
2318            }
2319            Some(OriginalProposal::Regular { certificate }) => {
2320                // Verify that this block has been validated by a quorum before.
2321                certificate.check(&committee)?;
2322            }
2323            Some(OriginalProposal::Fast(signature)) => {
2324                let original_proposal = BlockProposal {
2325                    content: ProposalContent {
2326                        block: content.block.clone(),
2327                        round: Round::Fast,
2328                        outcome: None,
2329                    },
2330                    signature: *signature,
2331                    original_proposal: None,
2332                };
2333                let super_owner = original_proposal.owner();
2334                ensure!(
2335                    chain
2336                        .manager
2337                        .ownership
2338                        .get()
2339                        .super_owners
2340                        .contains(&super_owner),
2341                    WorkerError::InvalidOwner
2342                );
2343                if let Some(signer) = block.authenticated_owner {
2344                    // Check the authentication of the operations in the new block.
2345                    ensure!(signer == super_owner, WorkerError::InvalidSigner(signer));
2346                }
2347                original_proposal.check_signature()?;
2348            }
2349        }
2350        let local_time = self.storage.clock().current_time();
2351        match chain.manager.check_proposed_block(&proposal) {
2352            Ok(manager::Outcome::Skip) => {
2353                // We already voted for this block.
2354                return Ok((self.chain_info_response().await?, NetworkActions::default()));
2355            }
2356            Ok(manager::Outcome::Accept) => {}
2357            Err(err) => {
2358                // A `HasIncompatibleConfirmedVote` rejection means the proposer is at a
2359                // round we'd otherwise be happy to sign at; only our prior confirmed vote
2360                // prevents us from voting. Record the proposal so `current_round` still
2361                // tracks the round the proposer is in — without it, the chain can wedge.
2362                // Other rejections (e.g. `WrongRound`, `InsufficientRound`) mean the
2363                // proposal is not actually valid for the chain's current state, so we
2364                // shouldn't let it advance our round.
2365                if matches!(err, ChainError::HasIncompatibleConfirmedVote(_, _))
2366                    && self
2367                        .chain
2368                        .manager
2369                        .update_signed_proposal(&proposal, local_time)
2370                {
2371                    self.save().await?;
2372                }
2373                return Err(err.into());
2374            }
2375        }
2376
2377        // Make sure we remember that a proposal was signed, to determine the correct round to
2378        // propose in.
2379        if self
2380            .chain
2381            .manager
2382            .update_signed_proposal(&proposal, local_time)
2383        {
2384            self.save().await?;
2385        }
2386
2387        let published_blobs = self.load_proposal_blobs(&proposal).await?;
2388        let ProposalContent {
2389            block,
2390            round,
2391            outcome,
2392        } = content;
2393
2394        if self.config.key_pair().is_some()
2395            && block.timestamp.duration_since(local_time) > self.config.block_time_grace_period
2396        {
2397            return Err(WorkerError::InvalidTimestamp {
2398                local_time,
2399                block_timestamp: block.timestamp,
2400                block_time_grace_period: self.config.block_time_grace_period,
2401            });
2402        }
2403        // Note: WorkerState::handle_block_proposal delays processing proposals with future
2404        // timestamps (within the grace period) before acquiring the chain lock. By the time
2405        // we reach here, the block timestamp should be in the past or very close to current time.
2406
2407        self.chain
2408            .remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
2409            .await?;
2410        let block = if let Some(outcome) = outcome {
2411            outcome.clone().with(proposal.content.block.clone())
2412        } else {
2413            let (executed_block, _resource_tracker, _) = Box::pin(self.execute_block(
2414                block.clone(),
2415                local_time,
2416                round.multi_leader(),
2417                &published_blobs,
2418                BundleExecutionPolicy::committed(),
2419            ))
2420            .await?;
2421            executed_block
2422        };
2423
2424        ensure!(
2425            !round.is_fast() || !block.has_oracle_responses(),
2426            WorkerError::FastBlockUsingOracles
2427        );
2428        let chain = &mut self.chain;
2429        // Check if the counters of tip_state would be valid.
2430        chain
2431            .tip_state
2432            .get_mut()
2433            .update_counters(&block.body.transactions, &block.body.messages)?;
2434        // Don't save the changes since the block is not confirmed yet.
2435        chain.rollback();
2436
2437        // Create the vote and store it in the chain state.
2438        let blobs = self
2439            .get_required_blobs(proposal.expected_blob_ids(), block.created_blobs())
2440            .await?;
2441        let key_pair = self.config.key_pair();
2442        let manager = &mut self.chain.manager;
2443        match manager.create_vote(&proposal, block, key_pair, local_time, blobs)? {
2444            // Cache the value we voted on, so the client doesn't have to send it again.
2445            Some(Either::Left(vote)) => {
2446                self.block_values
2447                    .insert_hashed(Cow::Borrowed(vote.value.inner()));
2448            }
2449            Some(Either::Right(vote)) => {
2450                self.block_values
2451                    .insert_hashed(Cow::Borrowed(vote.value.inner()));
2452            }
2453            None => (),
2454        }
2455        self.save().await?;
2456        let actions = self.create_network_actions(Some(old_round)).await?;
2457        Ok((self.chain_info_response().await?, actions))
2458    }
2459
2460    /// Prepares a [`ChainInfoResponse`] for a [`ChainInfoQuery`].
2461    #[instrument(skip_all, fields(
2462        chain_id = %self.chain_id()
2463    ))]
2464    async fn prepare_chain_info_response(
2465        &mut self,
2466        query: ChainInfoQuery,
2467    ) -> Result<ChainInfoResponse, WorkerError> {
2468        self.initialize_and_save_if_needed().await?;
2469        let mut info = ChainInfo::from_chain_view(&mut self.chain).await?;
2470        let chain = &self.chain;
2471        if query.request_owner_balance == AccountOwner::CHAIN {
2472            info.requested_owner_balance = Some(*chain.execution_state.system.balance.get());
2473        } else {
2474            info.requested_owner_balance = chain
2475                .execution_state
2476                .system
2477                .balances
2478                .get(&query.request_owner_balance)
2479                .await?;
2480        }
2481        if let Some(next_block_height) = query.test_next_block_height {
2482            // If not, send the same error as if a block with next_block_height was proposed.
2483            ensure!(
2484                chain.tip_state.get().next_block_height == next_block_height,
2485                WorkerError::UnexpectedBlockHeight {
2486                    expected_block_height: chain.tip_state.get().next_block_height,
2487                    found_block_height: next_block_height,
2488                }
2489            );
2490        }
2491        if query.request_pending_message_bundles {
2492            let mut bundles = Vec::new();
2493            let nonempty_origins: Vec<ChainId> =
2494                chain.nonempty_inboxes.get().iter().copied().collect();
2495            #[cfg(with_metrics)]
2496            metrics::NUM_INBOXES
2497                .with_label_values(&[])
2498                .observe(nonempty_origins.len() as f64);
2499            let action = if *chain.execution_state.system.closed.get() {
2500                MessageAction::Reject
2501            } else {
2502                MessageAction::Accept
2503            };
2504            let inboxes = chain.inboxes.try_load_entries(&nonempty_origins).await?;
2505            for (origin, inbox) in nonempty_origins.into_iter().zip(inboxes) {
2506                let inbox = inbox.ok_or_else(|| {
2507                    ChainError::InternalError(format!("Missing inbox for origin {origin}"))
2508                })?;
2509                for bundle in inbox.added_bundles.elements().await? {
2510                    bundles.push(IncomingBundle {
2511                        origin,
2512                        bundle,
2513                        action,
2514                    });
2515                }
2516            }
2517            info.requested_pending_message_bundles = bundles;
2518        }
2519        let hashes = chain
2520            .block_hashes_for_heights(query.request_sent_certificate_hashes_by_heights)
2521            .await?;
2522        info.requested_sent_certificate_hashes = hashes;
2523        if let Some(start) = query.request_received_log_excluding_first_n {
2524            let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
2525            let max_received_log_entries = self.config.chain_info_max_received_log_entries;
2526            let end = start
2527                .saturating_add(max_received_log_entries)
2528                .min(chain.received_log.count());
2529            info.requested_received_log = chain.received_log.read(start..end).await?;
2530        }
2531        if query.request_manager_values {
2532            info.manager.add_values(&chain.manager);
2533        }
2534        if !query.request_previous_event_blocks.is_empty() {
2535            let stream_ids = query.request_previous_event_blocks;
2536            let heights = chain
2537                .execution_state
2538                .previous_event_blocks
2539                .multi_get(&stream_ids)
2540                .await?;
2541            let mut streams_with_heights = Vec::new();
2542            for (stream_id, height) in stream_ids.into_iter().zip(heights) {
2543                if let Some(height) = height {
2544                    streams_with_heights.push((stream_id, height));
2545                }
2546            }
2547            let hashes = chain
2548                .block_hashes
2549                .multi_get(streams_with_heights.iter().map(|(_, height)| height))
2550                .await?;
2551            for (maybe_hash, (stream_id, height)) in hashes.into_iter().zip(streams_with_heights) {
2552                let hash = maybe_hash.ok_or_else(|| WorkerError::BlockHashNotFound {
2553                    height,
2554                    chain_id: info.chain_id,
2555                })?;
2556                info.requested_previous_event_blocks
2557                    .insert(stream_id, (height, hash));
2558            }
2559        }
2560        if query.request_latest_checkpoint_height {
2561            info.requested_latest_checkpoint_height = *self.chain.latest_checkpoint_height.get();
2562        }
2563        Ok(ChainInfoResponse::new(info, self.config.key_pair()))
2564    }
2565
2566    /// Executes a block with a specified policy for handling bundle failures.
2567    ///
2568    /// The block may be modified to reflect the actual executed transactions.
2569    #[instrument(skip_all, fields(
2570        chain_id = %self.chain_id(),
2571        block_height = %block.height
2572    ))]
2573    async fn execute_block(
2574        &mut self,
2575        block: ProposedBlock,
2576        local_time: Timestamp,
2577        round: Option<u32>,
2578        published_blobs: &[Blob],
2579        policy: BundleExecutionPolicy,
2580    ) -> Result<(Block, ResourceTracker, HashSet<ChainId>), WorkerError> {
2581        let (proposed_block, outcome, resource_tracker, never_reject_origins) = Box::pin(
2582            self.chain
2583                .execute_block(block, local_time, round, published_blobs, None, policy),
2584        )
2585        .await?;
2586        let executed_block = Block::new(proposed_block, outcome);
2587        let block_hash = CryptoHash::new(&executed_block);
2588        if let Some(cache) = &self.execution_state_cache {
2589            cache.insert(
2590                &block_hash,
2591                Box::pin(
2592                    self.chain
2593                        .execution_state
2594                        .with_context(|ctx| InactiveContext(ctx.base_key().clone())),
2595                )
2596                .await,
2597            );
2598        }
2599        Ok((executed_block, resource_tracker, never_reject_origins))
2600    }
2601
2602    /// Initializes and saves the current chain if it is not active yet.
2603    #[instrument(skip_all, fields(
2604        chain_id = %self.chain_id()
2605    ))]
2606    pub(crate) async fn initialize_and_save_if_needed(&mut self) -> Result<(), WorkerError> {
2607        if !self.knows_chain_is_active {
2608            let local_time = self.storage.clock().current_time();
2609            self.chain.initialize_if_needed(local_time).await?;
2610            self.save().await?;
2611            self.knows_chain_is_active = true;
2612        }
2613        Ok(())
2614    }
2615
2616    pub(crate) async fn chain_info_response(&mut self) -> Result<ChainInfoResponse, WorkerError> {
2617        let info = ChainInfo::from_chain_view(&mut self.chain).await?;
2618        Ok(ChainInfoResponse::new(info, self.config.key_pair()))
2619    }
2620
2621    /// Stores the chain state in persistent storage.
2622    ///
2623    /// If the save fails, the worker is marked as poisoned and must be reloaded.
2624    #[instrument(skip_all, fields(
2625        chain_id = %self.chain_id()
2626    ))]
2627    pub(crate) async fn save(&mut self) -> Result<(), WorkerError> {
2628        if let Err(error) = self.chain.save().await {
2629            if error.must_reload_view() {
2630                tracing::error!(
2631                    ?error,
2632                    chain_id = %self.chain_id(),
2633                    "Chain save failed with a nonrecoverable error; marking worker as poisoned"
2634                );
2635                self.poisoned = true;
2636            }
2637            return Err(WorkerError::ViewError(error));
2638        }
2639        Ok(())
2640    }
2641
2642    /// Deletes every key under this chain's storage root and replaces `self.chain`
2643    /// with a freshly loaded (empty) view. If either the write or the reload fails,
2644    /// the worker is marked as poisoned: the partial deletion may have left
2645    /// storage inconsistent with the in-memory view, so it cannot be reused.
2646    #[instrument(skip_all, fields(
2647        chain_id = %self.chain_id()
2648    ))]
2649    async fn wipe_and_reload_chain(&mut self) -> Result<(), WorkerError> {
2650        let context = self.chain.context().clone();
2651        let mut batch = Batch::new();
2652        batch.delete_key_prefix(Vec::new());
2653        if let Err(error) = context.store().write_batch(batch).await {
2654            tracing::error!(
2655                ?error,
2656                chain_id = %self.chain_id(),
2657                "Wiping chain storage failed; marking worker as poisoned"
2658            );
2659            self.poisoned = true;
2660            return Err(WorkerError::PoisonedWorker);
2661        }
2662        match ChainStateView::load(context).await {
2663            Ok(chain) => {
2664                self.chain = chain;
2665                Ok(())
2666            }
2667            Err(error) => {
2668                tracing::error!(
2669                    ?error,
2670                    chain_id = %self.chain_id(),
2671                    "Reloading chain after wipe failed; marking worker as poisoned"
2672                );
2673                self.poisoned = true;
2674                Err(WorkerError::PoisonedWorker)
2675            }
2676        }
2677    }
2678}
2679
2680/// Sends a result through a oneshot channel, logging at `debug` level if the
2681/// receiver has been dropped.
2682pub(crate) fn send_result<T>(sender: oneshot::Sender<T>, value: T) {
2683    if sender.send(value).is_err() {
2684        tracing::debug!("cannot send cross-chain result; receiver dropped");
2685    }
2686}
2687
2688/// Returns the missing indices and corresponding blob_ids.
2689fn missing_indices_blob_ids(maybe_blobs: &[(BlobId, Option<Blob>)]) -> (Vec<usize>, Vec<BlobId>) {
2690    let mut missing_indices = Vec::new();
2691    let mut missing_blob_ids = Vec::new();
2692    for (index, (blob_id, blob)) in maybe_blobs.iter().enumerate() {
2693        if blob.is_none() {
2694            missing_indices.push(index);
2695            missing_blob_ids.push(*blob_id);
2696        }
2697    }
2698    (missing_indices, missing_blob_ids)
2699}
2700
2701/// Returns the blob IDs whose corresponding value is `None`.
2702fn missing_blob_ids<'a>(
2703    maybe_blobs: impl IntoIterator<Item = (&'a BlobId, &'a Option<Blob>)>,
2704) -> Vec<BlobId> {
2705    maybe_blobs
2706        .into_iter()
2707        .filter(|(_, maybe_blob)| maybe_blob.is_none())
2708        .map(|(blob_id, _)| *blob_id)
2709        .collect()
2710}
2711
2712/// Returns an error if the block is not at the expected epoch.
2713fn check_block_epoch(
2714    chain_epoch: Epoch,
2715    block_chain: ChainId,
2716    block_epoch: Epoch,
2717) -> Result<(), WorkerError> {
2718    ensure!(
2719        block_epoch == chain_epoch,
2720        WorkerError::InvalidEpoch {
2721            chain_id: block_chain,
2722            epoch: block_epoch,
2723            chain_epoch
2724        }
2725    );
2726    Ok(())
2727}