linera_core/chain_worker/state/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The state and functionality of a chain worker.
5
6mod attempted_changes;
7mod temporary_changes;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11    iter,
12    sync::{self, Arc},
13};
14
15use linera_base::{
16    crypto::{CryptoHash, ValidatorPublicKey},
17    data_types::{ApplicationDescription, Blob, BlockHeight, Epoch},
18    ensure,
19    hashed::Hashed,
20    identifiers::{ApplicationId, BlobId, BlobType, ChainId},
21};
22use linera_chain::{
23    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24    manager,
25    types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
26    ChainError, ChainStateView,
27};
28use linera_execution::{ExecutionStateView, Query, QueryOutcome, ServiceRuntimeEndpoint};
29use linera_storage::{Clock as _, ResultReadCertificates, Storage};
30use linera_views::views::ClonableView;
31use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard};
32use tracing::{instrument, warn};
33
34#[cfg(test)]
35pub(crate) use self::attempted_changes::CrossChainUpdateHelper;
36use self::{
37    attempted_changes::ChainWorkerStateWithAttemptedChanges,
38    temporary_changes::ChainWorkerStateWithTemporaryChanges,
39};
40use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier};
41use crate::{
42    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
43    value_cache::ValueCache,
44    worker::{NetworkActions, WorkerError},
45};
46
47/// The state of the chain worker.
48pub struct ChainWorkerState<StorageClient>
49where
50    StorageClient: Storage + Clone + Send + Sync + 'static,
51{
52    config: ChainWorkerConfig,
53    storage: StorageClient,
54    chain: ChainStateView<StorageClient::Context>,
55    shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
56    service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
57    block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
58    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
59    tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
60    delivery_notifier: DeliveryNotifier,
61    knows_chain_is_active: bool,
62}
63
64impl<StorageClient> ChainWorkerState<StorageClient>
65where
66    StorageClient: Storage + Clone + Send + Sync + 'static,
67{
68    /// Creates a new [`ChainWorkerState`] using the provided `storage` client.
69    #[expect(clippy::too_many_arguments)]
70    pub async fn load(
71        config: ChainWorkerConfig,
72        storage: StorageClient,
73        block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
74        execution_state_cache: Arc<
75            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
76        >,
77        tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
78        delivery_notifier: DeliveryNotifier,
79        chain_id: ChainId,
80        service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
81    ) -> Result<Self, WorkerError> {
82        let chain = storage.load_chain(chain_id).await?;
83
84        Ok(ChainWorkerState {
85            config,
86            storage,
87            chain,
88            shared_chain_view: None,
89            service_runtime_endpoint,
90            block_values,
91            execution_state_cache,
92            tracked_chains,
93            delivery_notifier,
94            knows_chain_is_active: false,
95        })
96    }
97
98    /// Returns the [`ChainId`] of the chain handled by this worker.
99    pub fn chain_id(&self) -> ChainId {
100        self.chain.chain_id()
101    }
102
103    /// Handles a request and applies it to the chain state.
104    #[instrument(skip_all)]
105    pub async fn handle_request(&mut self, request: ChainWorkerRequest<StorageClient::Context>) {
106        tracing::trace!("Handling chain worker request: {request:?}");
107        // TODO(#2237): Spawn concurrent tasks for read-only operations
108        let responded = match request {
109            #[cfg(with_testing)]
110            ChainWorkerRequest::ReadCertificate { height, callback } => {
111                callback.send(self.read_certificate(height).await).is_ok()
112            }
113            #[cfg(with_testing)]
114            ChainWorkerRequest::FindBundleInInbox {
115                inbox_id,
116                certificate_hash,
117                height,
118                index,
119                callback,
120            } => callback
121                .send(
122                    self.find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
123                        .await,
124                )
125                .is_ok(),
126            ChainWorkerRequest::GetChainStateView { callback } => {
127                callback.send(self.chain_state_view().await).is_ok()
128            }
129            ChainWorkerRequest::QueryApplication { query, callback } => {
130                callback.send(self.query_application(query).await).is_ok()
131            }
132            ChainWorkerRequest::DescribeApplication {
133                application_id,
134                callback,
135            } => callback
136                .send(self.describe_application(application_id).await)
137                .is_ok(),
138            ChainWorkerRequest::StageBlockExecution {
139                block,
140                round,
141                published_blobs,
142                callback,
143            } => callback
144                .send(
145                    self.stage_block_execution(block, round, &published_blobs)
146                        .await,
147                )
148                .is_ok(),
149            ChainWorkerRequest::ProcessTimeout {
150                certificate,
151                callback,
152            } => callback
153                .send(self.process_timeout(certificate).await)
154                .is_ok(),
155            ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback
156                .send(self.handle_block_proposal(proposal).await)
157                .is_ok(),
158            ChainWorkerRequest::ProcessValidatedBlock {
159                certificate,
160                callback,
161            } => callback
162                .send(self.process_validated_block(certificate).await)
163                .is_ok(),
164            ChainWorkerRequest::ProcessConfirmedBlock {
165                certificate,
166                notify_when_messages_are_delivered,
167                callback,
168            } => callback
169                .send(
170                    self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
171                        .await,
172                )
173                .is_ok(),
174            ChainWorkerRequest::ProcessCrossChainUpdate {
175                origin,
176                bundles,
177                callback,
178            } => callback
179                .send(self.process_cross_chain_update(origin, bundles).await)
180                .is_ok(),
181            ChainWorkerRequest::ConfirmUpdatedRecipient {
182                recipient,
183                latest_height,
184                callback,
185            } => callback
186                .send(
187                    self.confirm_updated_recipient(recipient, latest_height)
188                        .await,
189                )
190                .is_ok(),
191            ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback
192                .send(self.handle_chain_info_query(query).await)
193                .is_ok(),
194            ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
195                .send(self.download_pending_blob(blob_id).await)
196                .is_ok(),
197            ChainWorkerRequest::HandlePendingBlob { blob, callback } => {
198                callback.send(self.handle_pending_blob(blob).await).is_ok()
199            }
200            ChainWorkerRequest::UpdateReceivedCertificateTrackers {
201                new_trackers,
202                callback,
203            } => callback
204                .send(
205                    self.update_received_certificate_trackers(new_trackers)
206                        .await,
207                )
208                .is_ok(),
209        };
210
211        if !responded {
212            warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
213        }
214    }
215
216    /// Returns a read-only view of the [`ChainStateView`].
217    ///
218    /// The returned view holds a lock on the chain state, which prevents the worker from changing
219    /// it.
220    pub(super) async fn chain_state_view(
221        &mut self,
222    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
223        if self.shared_chain_view.is_none() {
224            self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?)));
225        }
226
227        Ok(self
228            .shared_chain_view
229            .as_ref()
230            .expect("`shared_chain_view` should be initialized above")
231            .clone()
232            .read_owned()
233            .await)
234    }
235
236    /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`].
237    #[cfg(with_testing)]
238    pub(super) async fn read_certificate(
239        &mut self,
240        height: BlockHeight,
241    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
242        ChainWorkerStateWithTemporaryChanges::new(self)
243            .await
244            .read_certificate(height)
245            .await
246    }
247
248    /// Searches for a bundle in one of the chain's inboxes.
249    #[cfg(with_testing)]
250    pub(super) async fn find_bundle_in_inbox(
251        &mut self,
252        inbox_id: ChainId,
253        certificate_hash: CryptoHash,
254        height: BlockHeight,
255        index: u32,
256    ) -> Result<Option<MessageBundle>, WorkerError> {
257        ChainWorkerStateWithTemporaryChanges::new(self)
258            .await
259            .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
260            .await
261    }
262
263    /// Queries an application's state on the chain.
264    pub(super) async fn query_application(
265        &mut self,
266        query: Query,
267    ) -> Result<QueryOutcome, WorkerError> {
268        ChainWorkerStateWithTemporaryChanges::new(self)
269            .await
270            .query_application(query)
271            .await
272    }
273
274    /// Returns an application's description.
275    pub(super) async fn describe_application(
276        &mut self,
277        application_id: ApplicationId,
278    ) -> Result<ApplicationDescription, WorkerError> {
279        ChainWorkerStateWithTemporaryChanges::new(self)
280            .await
281            .describe_application(application_id)
282            .await
283    }
284
285    /// Executes a block without persisting any changes to the state.
286    pub(super) async fn stage_block_execution(
287        &mut self,
288        block: ProposedBlock,
289        round: Option<u32>,
290        published_blobs: &[Blob],
291    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
292        let (block, response) = ChainWorkerStateWithTemporaryChanges::new(self)
293            .await
294            .stage_block_execution(block, round, published_blobs)
295            .await?;
296        Ok((block, response))
297    }
298
299    /// Processes a leader timeout issued for this multi-owner chain.
300    pub(super) async fn process_timeout(
301        &mut self,
302        certificate: TimeoutCertificate,
303    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
304        ChainWorkerStateWithAttemptedChanges::new(self)
305            .await
306            .process_timeout(certificate)
307            .await
308    }
309
310    /// Handles a proposal for the next block for this chain.
311    #[tracing::instrument(level = "debug", skip(self))]
312    pub(super) async fn handle_block_proposal(
313        &mut self,
314        proposal: BlockProposal,
315    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
316        self.ensure_is_active().await?;
317        if ChainWorkerStateWithTemporaryChanges::new(&mut *self)
318            .await
319            .check_proposed_block(&proposal)
320            .await?
321            == manager::Outcome::Skip
322        {
323            // Skipping: We already voted for this block.
324            let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
325            return Ok((info, NetworkActions::default()));
326        };
327        let published_blobs = ChainWorkerStateWithAttemptedChanges::new(&mut *self)
328            .await
329            .load_proposal_blobs(&proposal)
330            .await?;
331        let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self)
332            .await
333            .validate_proposal_content(&proposal.content, &published_blobs)
334            .await?;
335
336        let actions = if let Some((outcome, local_time)) = validation_outcome {
337            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
338                .await
339                .vote_for_block_proposal(proposal, outcome, local_time)
340                .await?;
341            // Trigger any outgoing cross-chain messages that haven't been confirmed yet.
342            self.create_network_actions().await?
343        } else {
344            // If we just processed the same pending block, return the chain info unchanged.
345            NetworkActions::default()
346        };
347
348        let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
349        Ok((info, actions))
350    }
351
352    /// Clears the shared chain view, and acquires and drops its write lock.
353    ///
354    /// This is the only place a write lock is acquired, and read locks are acquired in
355    /// the `chain_state_view` method, which has a `&mut self` receiver like this one.
356    /// That means that when this function returns, no readers will be waiting to acquire
357    /// the lock and it is safe to write the chain state to storage without any readers
358    /// having a stale view of it.
359    pub(super) async fn clear_shared_chain_view(&mut self) {
360        if let Some(shared_chain_view) = self.shared_chain_view.take() {
361            let _: RwLockWriteGuard<_> = shared_chain_view.write().await;
362        }
363    }
364
365    /// Processes a validated block issued for this multi-owner chain.
366    #[tracing::instrument(level = "debug", skip(self))]
367    pub(super) async fn process_validated_block(
368        &mut self,
369        certificate: ValidatedBlockCertificate,
370    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
371        ChainWorkerStateWithAttemptedChanges::new(self)
372            .await
373            .process_validated_block(certificate)
374            .await
375    }
376
377    /// Processes a confirmed block (aka a commit).
378    #[tracing::instrument(level = "debug", skip(self))]
379    pub(super) async fn process_confirmed_block(
380        &mut self,
381        certificate: ConfirmedBlockCertificate,
382        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
383    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
384        ChainWorkerStateWithAttemptedChanges::new(self)
385            .await
386            .process_confirmed_block(certificate, notify_when_messages_are_delivered)
387            .await
388    }
389
390    /// Updates the chain's inboxes, receiving messages from a cross-chain update.
391    #[tracing::instrument(level = "debug", skip(self))]
392    pub(super) async fn process_cross_chain_update(
393        &mut self,
394        origin: ChainId,
395        bundles: Vec<(Epoch, MessageBundle)>,
396    ) -> Result<Option<BlockHeight>, WorkerError> {
397        ChainWorkerStateWithAttemptedChanges::new(self)
398            .await
399            .process_cross_chain_update(origin, bundles)
400            .await
401    }
402
403    /// Handles the cross-chain request confirming that the recipient was updated.
404    pub(super) async fn confirm_updated_recipient(
405        &mut self,
406        recipient: ChainId,
407        latest_height: BlockHeight,
408    ) -> Result<(), WorkerError> {
409        ChainWorkerStateWithAttemptedChanges::new(self)
410            .await
411            .confirm_updated_recipient(recipient, latest_height)
412            .await
413    }
414
415    /// Handles a [`ChainInfoQuery`], potentially voting on the next block.
416    #[tracing::instrument(level = "debug", skip(self))]
417    pub(super) async fn handle_chain_info_query(
418        &mut self,
419        query: ChainInfoQuery,
420    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
421        if query.request_leader_timeout {
422            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
423                .await
424                .vote_for_leader_timeout()
425                .await?;
426        }
427        if query.request_fallback {
428            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
429                .await
430                .vote_for_fallback()
431                .await?;
432        }
433        let response = ChainWorkerStateWithTemporaryChanges::new(self)
434            .await
435            .prepare_chain_info_response(query)
436            .await?;
437        // Trigger any outgoing cross-chain messages that haven't been confirmed yet.
438        let actions = self.create_network_actions().await?;
439        Ok((response, actions))
440    }
441
442    /// Returns the requested blob, if it belongs to the current locking block or pending proposal.
443    pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
444        if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
445            return Ok(blob);
446        }
447        let blob = self.storage.read_blob(blob_id).await?;
448        blob.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
449    }
450
451    /// Adds the blob to pending blocks or validated block certificates that are missing it.
452    #[tracing::instrument(level = "debug", skip(self))]
453    pub(super) async fn handle_pending_blob(
454        &mut self,
455        blob: Blob,
456    ) -> Result<ChainInfoResponse, WorkerError> {
457        ChainWorkerStateWithAttemptedChanges::new(&mut *self)
458            .await
459            .handle_pending_blob(blob)
460            .await
461    }
462
463    /// Ensures that the current chain is active, returning an error otherwise.
464    async fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
465        if !self.knows_chain_is_active {
466            let local_time = self.storage.clock().current_time();
467            self.chain.ensure_is_active(local_time).await?;
468            self.knows_chain_is_active = true;
469        }
470        Ok(())
471    }
472
473    /// Reads the blobs from the chain manager or from storage. Returns an error if any are
474    /// missing.
475    async fn get_required_blobs(
476        &self,
477        required_blob_ids: impl IntoIterator<Item = BlobId>,
478        created_blobs: &BTreeMap<BlobId, Blob>,
479    ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
480        let maybe_blobs = self
481            .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
482            .await?;
483        let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
484        ensure!(
485            not_found_blob_ids.is_empty(),
486            WorkerError::BlobsNotFound(not_found_blob_ids)
487        );
488        Ok(maybe_blobs
489            .into_iter()
490            .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
491            .collect())
492    }
493
494    /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found.
495    async fn maybe_get_required_blobs(
496        &self,
497        blob_ids: impl IntoIterator<Item = BlobId>,
498        created_blobs: Option<&BTreeMap<BlobId, Blob>>,
499    ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
500        let mut maybe_blobs = BTreeMap::from_iter(blob_ids.into_iter().zip(iter::repeat(None)));
501
502        for (blob_id, maybe_blob) in &mut maybe_blobs {
503            if let Some(blob) = created_blobs.and_then(|blob_map| blob_map.get(blob_id)) {
504                *maybe_blob = Some(blob.clone());
505            } else if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? {
506                *maybe_blob = Some(blob);
507            } else if let Some(blob) = self.chain.pending_validated_blobs.get(blob_id).await? {
508                *maybe_blob = Some(blob);
509            } else {
510                for (_, pending_blobs) in self
511                    .chain
512                    .pending_proposed_blobs
513                    .try_load_all_entries()
514                    .await?
515                {
516                    if let Some(blob) = pending_blobs.get(blob_id).await? {
517                        *maybe_blob = Some(blob);
518                        break;
519                    }
520                }
521            }
522        }
523        let missing_blob_ids = missing_blob_ids(&maybe_blobs);
524        let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
525        for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
526            maybe_blobs.insert(blob_id, maybe_blob);
527        }
528        Ok(maybe_blobs)
529    }
530
531    /// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is
532    /// also tracked.
533    ///
534    /// Chains that are not tracked are usually processed only because they sent some message
535    /// to one of the tracked chains. In most use cases, their children won't be of interest.
536    fn track_newly_created_chains(
537        &self,
538        proposed_block: &ProposedBlock,
539        outcome: &BlockExecutionOutcome,
540    ) {
541        if let Some(tracked_chains) = self.tracked_chains.as_ref() {
542            if !tracked_chains
543                .read()
544                .expect("Panics should not happen while holding a lock to `tracked_chains`")
545                .contains(&proposed_block.chain_id)
546            {
547                return; // The parent chain is not tracked; don't track the child.
548            }
549            let new_chain_ids = outcome
550                .created_blobs_ids()
551                .into_iter()
552                .filter(|blob_id| blob_id.blob_type == BlobType::ChainDescription)
553                .map(|blob_id| ChainId(blob_id.hash));
554
555            tracked_chains
556                .write()
557                .expect("Panics should not happen while holding a lock to `tracked_chains`")
558                .extend(new_chain_ids);
559        }
560    }
561
562    /// Loads pending cross-chain requests.
563    async fn create_network_actions(&self) -> Result<NetworkActions, WorkerError> {
564        let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
565        let mut targets = self.chain.nonempty_outbox_chain_ids();
566        if let Some(tracked_chains) = self.tracked_chains.as_ref() {
567            let tracked_chains = tracked_chains
568                .read()
569                .expect("Panics should not happen while holding a lock to `tracked_chains`");
570            targets.retain(|target| tracked_chains.contains(target));
571        }
572        let outboxes = self.chain.load_outboxes(&targets).await?;
573        for (target, outbox) in targets.into_iter().zip(outboxes) {
574            let heights = outbox.queue.elements().await?;
575            heights_by_recipient.insert(target, heights);
576        }
577        self.create_cross_chain_requests(heights_by_recipient).await
578    }
579
580    async fn create_cross_chain_requests(
581        &self,
582        heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
583    ) -> Result<NetworkActions, WorkerError> {
584        // Load all the certificates we will need, regardless of the medium.
585        let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
586        let next_block_height = self.chain.tip_state.get().next_block_height;
587        let log_heights = heights
588            .range(..next_block_height)
589            .copied()
590            .map(usize::try_from)
591            .collect::<Result<Vec<_>, _>>()?;
592        let mut hashes = self
593            .chain
594            .confirmed_log
595            .multi_get(log_heights)
596            .await?
597            .into_iter()
598            .zip(&heights)
599            .map(|(maybe_hash, height)| {
600                maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
601                    height: *height,
602                    chain_id: self.chain_id(),
603                })
604            })
605            .collect::<Result<Vec<_>, _>>()?;
606        for height in heights.range(next_block_height..) {
607            hashes.push(
608                self.chain
609                    .preprocessed_blocks
610                    .get(height)
611                    .await?
612                    .ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
613                        height: *height,
614                        chain_id: self.chain_id(),
615                    })?,
616            );
617        }
618        let certificates = self.storage.read_certificates(hashes.clone()).await?;
619        let certificates = match ResultReadCertificates::new(certificates, hashes) {
620            ResultReadCertificates::Certificates(certificates) => certificates,
621            ResultReadCertificates::InvalidHashes(hashes) => {
622                return Err(WorkerError::ReadCertificatesError(hashes))
623            }
624        };
625        let certificates = heights
626            .into_iter()
627            .zip(certificates)
628            .collect::<HashMap<_, _>>();
629        // For each medium, select the relevant messages.
630        let mut actions = NetworkActions::default();
631        for (recipient, heights) in heights_by_recipient {
632            let mut bundles = Vec::new();
633            for height in heights {
634                let cert = certificates
635                    .get(&height)
636                    .ok_or_else(|| ChainError::InternalError("missing certificates".to_string()))?;
637                bundles.extend(cert.message_bundles_for(recipient));
638            }
639            let request = CrossChainRequest::UpdateRecipient {
640                sender: self.chain.chain_id(),
641                recipient,
642                bundles,
643            };
644            actions.cross_chain_requests.push(request);
645        }
646        Ok(actions)
647    }
648
649    /// Returns true if there are no more outgoing messages in flight up to the given
650    /// block height.
651    pub async fn all_messages_to_tracked_chains_delivered_up_to(
652        &self,
653        height: BlockHeight,
654    ) -> Result<bool, WorkerError> {
655        if self.chain.all_messages_delivered_up_to(height) {
656            return Ok(true);
657        }
658        let Some(tracked_chains) = self.tracked_chains.as_ref() else {
659            return Ok(false);
660        };
661        let mut targets = self.chain.nonempty_outbox_chain_ids();
662        {
663            let tracked_chains = tracked_chains.read().unwrap();
664            targets.retain(|target| tracked_chains.contains(target));
665        }
666        let outboxes = self.chain.load_outboxes(&targets).await?;
667        for outbox in outboxes {
668            let front = outbox.queue.front();
669            if front.is_some_and(|key| *key <= height) {
670                return Ok(false);
671            }
672        }
673        Ok(true)
674    }
675
676    /// Updates the received certificate trackers to at least the given values.
677    pub async fn update_received_certificate_trackers(
678        &mut self,
679        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
680    ) -> Result<(), WorkerError> {
681        ChainWorkerStateWithAttemptedChanges::new(self)
682            .await
683            .update_received_certificate_trackers(new_trackers)
684            .await
685    }
686}
687
688/// Returns the keys whose value is `None`.
689fn missing_blob_ids(maybe_blobs: &BTreeMap<BlobId, Option<Blob>>) -> Vec<BlobId> {
690    maybe_blobs
691        .iter()
692        .filter(|(_, maybe_blob)| maybe_blob.is_none())
693        .map(|(blob_id, _)| *blob_id)
694        .collect()
695}
696
697/// Returns an error if the block is not at the expected epoch.
698fn check_block_epoch(
699    chain_epoch: Epoch,
700    block_chain: ChainId,
701    block_epoch: Epoch,
702) -> Result<(), WorkerError> {
703    ensure!(
704        block_epoch == chain_epoch,
705        WorkerError::InvalidEpoch {
706            chain_id: block_chain,
707            epoch: block_epoch,
708            chain_epoch
709        }
710    );
711    Ok(())
712}