linera_core/chain_worker/state/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The state and functionality of a chain worker.
5
6mod attempted_changes;
7mod temporary_changes;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11    iter,
12    sync::{self, Arc},
13};
14
15use linera_base::{
16    crypto::{CryptoHash, ValidatorPublicKey},
17    data_types::{ApplicationDescription, Blob, BlockHeight, Epoch},
18    ensure,
19    hashed::Hashed,
20    identifiers::{ApplicationId, BlobId, BlobType, ChainId},
21};
22use linera_chain::{
23    data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24    manager,
25    types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
26    ChainError, ChainStateView,
27};
28use linera_execution::{ExecutionStateView, Query, QueryOutcome, ServiceRuntimeEndpoint};
29use linera_storage::{Clock as _, Storage};
30use linera_views::views::ClonableView;
31use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock};
32
33#[cfg(test)]
34pub(crate) use self::attempted_changes::CrossChainUpdateHelper;
35use self::{
36    attempted_changes::ChainWorkerStateWithAttemptedChanges,
37    temporary_changes::ChainWorkerStateWithTemporaryChanges,
38};
39use super::{ChainWorkerConfig, DeliveryNotifier};
40use crate::{
41    data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
42    value_cache::ValueCache,
43    worker::{NetworkActions, WorkerError},
44};
45
46/// The state of the chain worker.
47pub struct ChainWorkerState<StorageClient>
48where
49    StorageClient: Storage + Clone + Send + Sync + 'static,
50{
51    config: ChainWorkerConfig,
52    storage: StorageClient,
53    chain: ChainStateView<StorageClient::Context>,
54    shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
55    service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
56    block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
57    execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
58    tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
59    delivery_notifier: DeliveryNotifier,
60    knows_chain_is_active: bool,
61}
62
63impl<StorageClient> ChainWorkerState<StorageClient>
64where
65    StorageClient: Storage + Clone + Send + Sync + 'static,
66{
67    /// Creates a new [`ChainWorkerState`] using the provided `storage` client.
68    #[expect(clippy::too_many_arguments)]
69    pub async fn load(
70        config: ChainWorkerConfig,
71        storage: StorageClient,
72        block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
73
74        execution_state_cache: Arc<
75            ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
76        >,
77        tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
78        delivery_notifier: DeliveryNotifier,
79        chain_id: ChainId,
80        service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
81    ) -> Result<Self, WorkerError> {
82        let chain = storage.load_chain(chain_id).await?;
83
84        Ok(ChainWorkerState {
85            config,
86            storage,
87            chain,
88            shared_chain_view: None,
89            service_runtime_endpoint,
90            block_values,
91            execution_state_cache,
92            tracked_chains,
93            delivery_notifier,
94            knows_chain_is_active: false,
95        })
96    }
97
98    /// Returns the [`ChainId`] of the chain handled by this worker.
99    pub fn chain_id(&self) -> ChainId {
100        self.chain.chain_id()
101    }
102
103    /// Returns a read-only view of the [`ChainStateView`].
104    ///
105    /// The returned view holds a lock on the chain state, which prevents the worker from changing
106    /// it.
107    pub(super) async fn chain_state_view(
108        &mut self,
109    ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
110        if self.shared_chain_view.is_none() {
111            self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?)));
112        }
113
114        Ok(self
115            .shared_chain_view
116            .as_ref()
117            .expect("`shared_chain_view` should be initialized above")
118            .clone()
119            .read_owned()
120            .await)
121    }
122
123    /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`].
124    #[cfg(with_testing)]
125    pub(super) async fn read_certificate(
126        &mut self,
127        height: BlockHeight,
128    ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
129        ChainWorkerStateWithTemporaryChanges::new(self)
130            .await
131            .read_certificate(height)
132            .await
133    }
134
135    /// Searches for a bundle in one of the chain's inboxes.
136    #[cfg(with_testing)]
137    pub(super) async fn find_bundle_in_inbox(
138        &mut self,
139        inbox_id: ChainId,
140        certificate_hash: CryptoHash,
141        height: BlockHeight,
142        index: u32,
143    ) -> Result<Option<MessageBundle>, WorkerError> {
144        ChainWorkerStateWithTemporaryChanges::new(self)
145            .await
146            .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
147            .await
148    }
149
150    /// Queries an application's state on the chain.
151    pub(super) async fn query_application(
152        &mut self,
153        query: Query,
154    ) -> Result<QueryOutcome, WorkerError> {
155        ChainWorkerStateWithTemporaryChanges::new(self)
156            .await
157            .query_application(query)
158            .await
159    }
160
161    /// Returns an application's description.
162    pub(super) async fn describe_application(
163        &mut self,
164        application_id: ApplicationId,
165    ) -> Result<ApplicationDescription, WorkerError> {
166        ChainWorkerStateWithTemporaryChanges::new(self)
167            .await
168            .describe_application(application_id)
169            .await
170    }
171
172    /// Executes a block without persisting any changes to the state.
173    pub(super) async fn stage_block_execution(
174        &mut self,
175        block: ProposedBlock,
176        round: Option<u32>,
177        published_blobs: &[Blob],
178    ) -> Result<(Block, ChainInfoResponse), WorkerError> {
179        let (block, response) = ChainWorkerStateWithTemporaryChanges::new(self)
180            .await
181            .stage_block_execution(block, round, published_blobs)
182            .await?;
183        Ok((block, response))
184    }
185
186    /// Processes a leader timeout issued for this multi-owner chain.
187    pub(super) async fn process_timeout(
188        &mut self,
189        certificate: TimeoutCertificate,
190    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
191        ChainWorkerStateWithAttemptedChanges::new(self)
192            .await
193            .process_timeout(certificate)
194            .await
195    }
196
197    /// Handles a proposal for the next block for this chain.
198    #[tracing::instrument(level = "debug", skip(self))]
199    pub(super) async fn handle_block_proposal(
200        &mut self,
201        proposal: BlockProposal,
202    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
203        self.ensure_is_active().await?;
204        if ChainWorkerStateWithTemporaryChanges::new(&mut *self)
205            .await
206            .check_proposed_block(&proposal)
207            .await?
208            == manager::Outcome::Skip
209        {
210            // Skipping: We already voted for this block.
211            let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
212            return Ok((info, NetworkActions::default()));
213        };
214        let published_blobs = ChainWorkerStateWithAttemptedChanges::new(&mut *self)
215            .await
216            .load_proposal_blobs(&proposal)
217            .await?;
218        let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self)
219            .await
220            .validate_proposal_content(&proposal.content, &published_blobs)
221            .await?;
222
223        let actions = if let Some((outcome, local_time)) = validation_outcome {
224            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
225                .await
226                .vote_for_block_proposal(proposal, outcome, local_time)
227                .await?;
228            // Trigger any outgoing cross-chain messages that haven't been confirmed yet.
229            self.create_network_actions().await?
230        } else {
231            // If we just processed the same pending block, return the chain info unchanged.
232            NetworkActions::default()
233        };
234
235        let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
236        Ok((info, actions))
237    }
238
239    /// Processes a validated block issued for this multi-owner chain.
240    #[tracing::instrument(level = "debug", skip(self))]
241    pub(super) async fn process_validated_block(
242        &mut self,
243        certificate: ValidatedBlockCertificate,
244    ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
245        ChainWorkerStateWithAttemptedChanges::new(self)
246            .await
247            .process_validated_block(certificate)
248            .await
249    }
250
251    /// Processes a confirmed block (aka a commit).
252    #[tracing::instrument(level = "debug", skip(self))]
253    pub(super) async fn process_confirmed_block(
254        &mut self,
255        certificate: ConfirmedBlockCertificate,
256        notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
257    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
258        ChainWorkerStateWithAttemptedChanges::new(self)
259            .await
260            .process_confirmed_block(certificate, notify_when_messages_are_delivered)
261            .await
262    }
263
264    /// Preprocesses a block without executing it.
265    #[tracing::instrument(level = "debug", skip(self))]
266    pub(super) async fn preprocess_certificate(
267        &mut self,
268        certificate: ConfirmedBlockCertificate,
269    ) -> Result<NetworkActions, WorkerError> {
270        ChainWorkerStateWithAttemptedChanges::new(self)
271            .await
272            .preprocess_certificate(certificate)
273            .await
274    }
275
276    /// Updates the chain's inboxes, receiving messages from a cross-chain update.
277    #[tracing::instrument(level = "debug", skip(self))]
278    pub(super) async fn process_cross_chain_update(
279        &mut self,
280        origin: ChainId,
281        bundles: Vec<(Epoch, MessageBundle)>,
282    ) -> Result<Option<BlockHeight>, WorkerError> {
283        ChainWorkerStateWithAttemptedChanges::new(self)
284            .await
285            .process_cross_chain_update(origin, bundles)
286            .await
287    }
288
289    /// Handles the cross-chain request confirming that the recipient was updated.
290    pub(super) async fn confirm_updated_recipient(
291        &mut self,
292        recipient: ChainId,
293        latest_height: BlockHeight,
294    ) -> Result<(), WorkerError> {
295        ChainWorkerStateWithAttemptedChanges::new(self)
296            .await
297            .confirm_updated_recipient(recipient, latest_height)
298            .await
299    }
300
301    /// Handles a [`ChainInfoQuery`], potentially voting on the next block.
302    #[tracing::instrument(level = "debug", skip(self))]
303    pub(super) async fn handle_chain_info_query(
304        &mut self,
305        query: ChainInfoQuery,
306    ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
307        if query.request_leader_timeout {
308            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
309                .await
310                .vote_for_leader_timeout()
311                .await?;
312        }
313        if query.request_fallback {
314            ChainWorkerStateWithAttemptedChanges::new(&mut *self)
315                .await
316                .vote_for_fallback()
317                .await?;
318        }
319        let response = ChainWorkerStateWithTemporaryChanges::new(self)
320            .await
321            .prepare_chain_info_response(query)
322            .await?;
323        // Trigger any outgoing cross-chain messages that haven't been confirmed yet.
324        let actions = self.create_network_actions().await?;
325        Ok((response, actions))
326    }
327
328    /// Returns the requested blob, if it belongs to the current locking block or pending proposal.
329    pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
330        if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
331            return Ok(blob);
332        }
333        let blob = self.storage.read_blob(blob_id).await?;
334        blob.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
335    }
336
337    /// Adds the blob to pending blocks or validated block certificates that are missing it.
338    #[tracing::instrument(level = "debug", skip(self))]
339    pub(super) async fn handle_pending_blob(
340        &mut self,
341        blob: Blob,
342    ) -> Result<ChainInfoResponse, WorkerError> {
343        ChainWorkerStateWithAttemptedChanges::new(&mut *self)
344            .await
345            .handle_pending_blob(blob)
346            .await
347    }
348
349    /// Ensures that the current chain is active, returning an error otherwise.
350    async fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
351        if !self.knows_chain_is_active {
352            let local_time = self.storage.clock().current_time();
353            self.chain.ensure_is_active(local_time).await?;
354            self.knows_chain_is_active = true;
355        }
356        Ok(())
357    }
358
359    /// Reads the blobs from the chain manager or from storage. Returns an error if any are
360    /// missing.
361    async fn get_required_blobs(
362        &self,
363        required_blob_ids: impl IntoIterator<Item = BlobId>,
364        created_blobs: &BTreeMap<BlobId, Blob>,
365    ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
366        let maybe_blobs = self
367            .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
368            .await?;
369        let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
370        ensure!(
371            not_found_blob_ids.is_empty(),
372            WorkerError::BlobsNotFound(not_found_blob_ids)
373        );
374        Ok(maybe_blobs
375            .into_iter()
376            .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
377            .collect())
378    }
379
380    /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found.
381    async fn maybe_get_required_blobs(
382        &self,
383        blob_ids: impl IntoIterator<Item = BlobId>,
384        created_blobs: Option<&BTreeMap<BlobId, Blob>>,
385    ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
386        let mut maybe_blobs = BTreeMap::from_iter(blob_ids.into_iter().zip(iter::repeat(None)));
387
388        for (blob_id, maybe_blob) in &mut maybe_blobs {
389            if let Some(blob) = created_blobs.and_then(|blob_map| blob_map.get(blob_id)) {
390                *maybe_blob = Some(blob.clone());
391            } else if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? {
392                *maybe_blob = Some(blob);
393            } else if let Some(blob) = self.chain.pending_validated_blobs.get(blob_id).await? {
394                *maybe_blob = Some(blob);
395            } else {
396                for (_, pending_blobs) in self
397                    .chain
398                    .pending_proposed_blobs
399                    .try_load_all_entries()
400                    .await?
401                {
402                    if let Some(blob) = pending_blobs.get(blob_id).await? {
403                        *maybe_blob = Some(blob);
404                        break;
405                    }
406                }
407            }
408        }
409        let missing_blob_ids = missing_blob_ids(&maybe_blobs);
410        let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
411        for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
412            maybe_blobs.insert(blob_id, maybe_blob);
413        }
414        Ok(maybe_blobs)
415    }
416
417    /// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is
418    /// also tracked.
419    ///
420    /// Chains that are not tracked are usually processed only because they sent some message
421    /// to one of the tracked chains. In most use cases, their children won't be of interest.
422    fn track_newly_created_chains(
423        &self,
424        proposed_block: &ProposedBlock,
425        outcome: &BlockExecutionOutcome,
426    ) {
427        if let Some(tracked_chains) = self.tracked_chains.as_ref() {
428            if !tracked_chains
429                .read()
430                .expect("Panics should not happen while holding a lock to `tracked_chains`")
431                .contains(&proposed_block.chain_id)
432            {
433                return; // The parent chain is not tracked; don't track the child.
434            }
435            let new_chain_ids = outcome
436                .created_blobs_ids()
437                .into_iter()
438                .filter(|blob_id| blob_id.blob_type == BlobType::ChainDescription)
439                .map(|blob_id| ChainId(blob_id.hash));
440
441            tracked_chains
442                .write()
443                .expect("Panics should not happen while holding a lock to `tracked_chains`")
444                .extend(new_chain_ids);
445        }
446    }
447
448    /// Loads pending cross-chain requests.
449    async fn create_network_actions(&self) -> Result<NetworkActions, WorkerError> {
450        let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
451        let mut targets = self.chain.outboxes.indices().await?;
452        if let Some(tracked_chains) = self.tracked_chains.as_ref() {
453            let tracked_chains = tracked_chains
454                .read()
455                .expect("Panics should not happen while holding a lock to `tracked_chains`");
456            targets.retain(|target| tracked_chains.contains(target));
457        }
458        let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
459        for (target, outbox) in targets.into_iter().zip(outboxes) {
460            let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
461            let heights = outbox.queue.elements().await?;
462            heights_by_recipient.insert(target, heights);
463        }
464        self.create_cross_chain_requests(heights_by_recipient).await
465    }
466
467    async fn create_cross_chain_requests(
468        &self,
469        heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
470    ) -> Result<NetworkActions, WorkerError> {
471        // Load all the certificates we will need, regardless of the medium.
472        let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
473        let next_block_height = self.chain.tip_state.get().next_block_height;
474        let log_heights = heights
475            .range(..next_block_height)
476            .copied()
477            .map(usize::try_from)
478            .collect::<Result<Vec<_>, _>>()?;
479        let mut hashes = self
480            .chain
481            .confirmed_log
482            .multi_get(log_heights)
483            .await?
484            .into_iter()
485            .zip(&heights)
486            .map(|(maybe_hash, height)| {
487                maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
488                    height: *height,
489                    chain_id: self.chain_id(),
490                })
491            })
492            .collect::<Result<Vec<_>, _>>()?;
493        for height in heights.range(next_block_height..) {
494            hashes.push(
495                self.chain
496                    .preprocessed_blocks
497                    .get(height)
498                    .await?
499                    .ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
500                        height: *height,
501                        chain_id: self.chain_id(),
502                    })?,
503            );
504        }
505        let certificates = self.storage.read_certificates(hashes).await?;
506        let certificates = heights
507            .into_iter()
508            .zip(certificates)
509            .collect::<HashMap<_, _>>();
510        // For each medium, select the relevant messages.
511        let mut actions = NetworkActions::default();
512        for (recipient, heights) in heights_by_recipient {
513            let mut bundles = Vec::new();
514            for height in heights {
515                let cert = certificates
516                    .get(&height)
517                    .ok_or_else(|| ChainError::InternalError("missing certificates".to_string()))?;
518                bundles.extend(cert.message_bundles_for(recipient));
519            }
520            let request = CrossChainRequest::UpdateRecipient {
521                sender: self.chain.chain_id(),
522                recipient,
523                bundles,
524            };
525            actions.cross_chain_requests.push(request);
526        }
527        Ok(actions)
528    }
529
530    /// Returns true if there are no more outgoing messages in flight up to the given
531    /// block height.
532    pub async fn all_messages_to_tracked_chains_delivered_up_to(
533        &self,
534        height: BlockHeight,
535    ) -> Result<bool, WorkerError> {
536        if self.chain.all_messages_delivered_up_to(height) {
537            return Ok(true);
538        }
539        let Some(tracked_chains) = self.tracked_chains.as_ref() else {
540            return Ok(false);
541        };
542        let mut targets = self.chain.outboxes.indices().await?;
543        {
544            let tracked_chains = tracked_chains.read().unwrap();
545            targets.retain(|target| tracked_chains.contains(target));
546        }
547        let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
548        for outbox in outboxes {
549            let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
550            let front = outbox.queue.front();
551            if front.is_some_and(|key| *key <= height) {
552                return Ok(false);
553            }
554        }
555        Ok(true)
556    }
557
558    /// Updates the received certificate trackers to at least the given values.
559    pub async fn update_received_certificate_trackers(
560        &mut self,
561        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
562    ) -> Result<(), WorkerError> {
563        ChainWorkerStateWithAttemptedChanges::new(self)
564            .await
565            .update_received_certificate_trackers(new_trackers)
566            .await
567    }
568}
569
570/// Returns the keys whose value is `None`.
571fn missing_blob_ids(maybe_blobs: &BTreeMap<BlobId, Option<Blob>>) -> Vec<BlobId> {
572    maybe_blobs
573        .iter()
574        .filter(|(_, maybe_blob)| maybe_blob.is_none())
575        .map(|(blob_id, _)| *blob_id)
576        .collect()
577}
578
579/// Returns an error if the block is not at the expected epoch.
580fn check_block_epoch(
581    chain_epoch: Epoch,
582    block_chain: ChainId,
583    block_epoch: Epoch,
584) -> Result<(), WorkerError> {
585    ensure!(
586        block_epoch == chain_epoch,
587        WorkerError::InvalidEpoch {
588            chain_id: block_chain,
589            epoch: block_epoch,
590            chain_epoch
591        }
592    );
593    Ok(())
594}