Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19    ChainError, ChainExecutionContext, StreamCounts,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28    chain_worker::ProcessConfirmedBlockMode,
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error, strum::IntoStaticStr)]
53#[allow(missing_docs)]
54pub enum LocalNodeError {
55    #[error(transparent)]
56    ArithmeticError(#[from] ArithmeticError),
57
58    #[error(transparent)]
59    ViewError(#[from] ViewError),
60
61    #[error("Worker operation failed: {0}")]
62    WorkerError(WorkerError),
63
64    #[error("The local node doesn't have an active chain {0}")]
65    InactiveChain(ChainId),
66
67    #[error("The chain info response received from the local node is invalid")]
68    InvalidChainInfoResponse,
69
70    #[error("Blobs not found: {0:?}")]
71    BlobsNotFound(Vec<BlobId>),
72
73    #[error("Blocks not found: {0:?}")]
74    BlocksNotFound(Vec<CryptoHash>),
75
76    #[error("Events not found: {0:?}")]
77    EventsNotFound(Vec<EventId>),
78}
79
80impl LocalNodeError {
81    /// Returns the qualified error variant name for the `error_type` metric label,
82    /// delegating to [`WorkerError::error_type`] for wrapped worker errors.
83    pub fn error_type(&self) -> String {
84        match self {
85            LocalNodeError::WorkerError(worker_error) => worker_error.error_type(),
86            other => {
87                let variant: &'static str = other.into();
88                format!("LocalNodeError::{variant}")
89            }
90        }
91    }
92}
93
94impl From<ExecutionError> for LocalNodeError {
95    fn from(error: ExecutionError) -> Self {
96        match error {
97            ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
98            ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
99            ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
100            error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
101                Box::new(error),
102                ChainExecutionContext::Block,
103            ))),
104        }
105    }
106}
107
108impl From<WorkerError> for LocalNodeError {
109    fn from(error: WorkerError) -> Self {
110        match error {
111            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
112            WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
113            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
114            error => LocalNodeError::WorkerError(error),
115        }
116    }
117}
118
119impl<S> LocalNodeClient<S>
120where
121    S: Storage + Clone + 'static,
122{
123    #[instrument(level = "trace", skip_all)]
124    pub async fn handle_block_proposal(
125        &self,
126        proposal: BlockProposal,
127    ) -> Result<ChainInfoResponse, LocalNodeError> {
128        // In local nodes, cross-chain actions will be handled internally, so we discard them.
129        let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
130        Ok(response?)
131    }
132
133    #[instrument(level = "trace", skip_all)]
134    pub async fn handle_certificate<T>(
135        &self,
136        certificate: GenericCertificate<T>,
137        notifier: &impl Notifier,
138    ) -> Result<ChainInfoResponse, LocalNodeError>
139    where
140        T: ProcessableCertificate,
141    {
142        Ok(Box::pin(
143            self.node
144                .state
145                .fully_handle_certificate_with_notifications(certificate, notifier),
146        )
147        .await?)
148    }
149
150    /// Same as [`Self::handle_certificate`] but for a confirmed block certificate
151    /// and with an explicit [`ProcessConfirmedBlockMode`]. The generic variant
152    /// always uses [`ProcessConfirmedBlockMode::Auto`].
153    #[instrument(level = "trace", skip_all)]
154    pub async fn handle_confirmed_certificate(
155        &self,
156        certificate: ConfirmedBlockCertificate,
157        mode: ProcessConfirmedBlockMode,
158        notifier: &impl Notifier,
159    ) -> Result<ChainInfoResponse, LocalNodeError> {
160        Ok(Box::pin(
161            self.node
162                .state
163                .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
164        )
165        .await?)
166    }
167
168    /// Test helper: returns the stored [`ConfirmedBlockCertificate`] for a
169    /// chain's block at a given height (or [`WorkerError::BlocksNotFound`] if
170    /// the height is referenced but the bytes aren't local).
171    #[cfg(with_testing)]
172    pub async fn read_certificate(
173        &self,
174        chain_id: linera_base::identifiers::ChainId,
175        height: linera_base::data_types::BlockHeight,
176    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
177        Ok(self.node.state.read_certificate(chain_id, height).await?)
178    }
179
180    #[instrument(level = "trace", skip_all)]
181    pub async fn handle_chain_info_query(
182        &self,
183        query: ChainInfoQuery,
184    ) -> Result<ChainInfoResponse, LocalNodeError> {
185        Ok(self.node.state.handle_chain_info_query(query).await?)
186    }
187
188    #[instrument(level = "trace", skip_all)]
189    pub fn new(state: WorkerState<S>) -> Self {
190        Self {
191            node: Arc::new(LocalNode { state }),
192        }
193    }
194
195    #[instrument(level = "trace", skip_all)]
196    pub(crate) fn storage_client(&self) -> S {
197        self.node.state.storage_client().clone()
198    }
199
200    /// Executes a block with a policy for handling bundle failures.
201    ///
202    /// Returns the modified block (bundles may be rejected/removed based on the policy),
203    /// the executed block, chain info response, and resource tracker.
204    #[instrument(level = "trace", skip_all)]
205    pub async fn stage_block_execution(
206        &self,
207        block: ProposedBlock,
208        round: Option<u32>,
209        published_blobs: Vec<Blob>,
210        policy: BundleExecutionPolicy,
211    ) -> Result<
212        (
213            ProposedBlock,
214            Block,
215            ChainInfoResponse,
216            ResourceTracker,
217            HashSet<ChainId>,
218        ),
219        LocalNodeError,
220    > {
221        Ok(self
222            .node
223            .state
224            .stage_block_execution(block, round, published_blobs, policy)
225            .await?)
226    }
227
228    /// Reads blobs from storage.
229    pub async fn read_blobs_from_storage(
230        &self,
231        blob_ids: &[BlobId],
232    ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
233        let storage = self.storage_client();
234        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
235    }
236
237    /// Reads blob states from storage.
238    pub async fn read_blob_states_from_storage(
239        &self,
240        blob_ids: &[BlobId],
241    ) -> Result<Vec<BlobState>, LocalNodeError> {
242        let storage = self.storage_client();
243        let mut blobs_not_found = Vec::new();
244        let mut blob_states = Vec::new();
245        for (blob_state, blob_id) in storage
246            .read_blob_states(blob_ids)
247            .await?
248            .into_iter()
249            .zip(blob_ids)
250        {
251            match blob_state {
252                None => blobs_not_found.push(*blob_id),
253                Some(blob_state) => blob_states.push(blob_state),
254            }
255        }
256        if !blobs_not_found.is_empty() {
257            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
258        }
259        Ok(blob_states)
260    }
261
262    /// Looks for the specified blobs in the local chain manager's locking blobs.
263    /// Returns `Ok(None)` if any of the blobs is not found.
264    pub async fn get_locking_blobs(
265        &self,
266        blob_ids: impl IntoIterator<Item = &BlobId>,
267        chain_id: ChainId,
268    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
269        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
270        Ok(self
271            .node
272            .state
273            .get_locking_blobs(chain_id, blob_ids_vec)
274            .await?)
275    }
276
277    /// Writes the given blobs to storage if there is an appropriate blob state.
278    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
279        let storage = self.storage_client();
280        storage.maybe_write_blobs(blobs).await?;
281        Ok(())
282    }
283
284    pub async fn handle_pending_blobs(
285        &self,
286        chain_id: ChainId,
287        blobs: Vec<Blob>,
288    ) -> Result<(), LocalNodeError> {
289        for blob in blobs {
290            self.node.state.handle_pending_blob(chain_id, blob).await?;
291        }
292        Ok(())
293    }
294
295    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
296    /// [`ChainId`].
297    ///
298    /// The returned view holds a lock on the chain state, which prevents the local node from
299    /// changing the state of that chain.
300    #[instrument(level = "trace", skip(self))]
301    pub async fn chain_state_view(
302        &self,
303        chain_id: ChainId,
304    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
305        Ok(self.node.state.chain_state_view(chain_id).await?)
306    }
307
308    #[instrument(level = "trace", skip(self))]
309    pub(crate) async fn chain_info(
310        &self,
311        chain_id: ChainId,
312    ) -> Result<Box<ChainInfo>, LocalNodeError> {
313        let query = ChainInfoQuery::new(chain_id);
314        Ok(self.handle_chain_info_query(query).await?.info)
315    }
316
317    #[instrument(level = "trace", skip(self, query))]
318    pub async fn query_application(
319        &self,
320        chain_id: ChainId,
321        query: Query,
322        block_hash: Option<CryptoHash>,
323    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
324        let result = self
325            .node
326            .state
327            .query_application(chain_id, query, block_hash)
328            .await?;
329        Ok(result)
330    }
331
332    /// Handles any pending local cross-chain requests.
333    ///
334    /// Does not initialize the sender chain's execution state, so it is safe to
335    /// call even when the sender's `ChainDescription` blob is not in local storage.
336    /// Previously this went through `handle_chain_info_query`, which unconditionally
337    /// initialized the worker and therefore forced a `ChainDescription` download on
338    /// every call.
339    #[instrument(level = "trace", skip(self, notifier))]
340    pub async fn retry_pending_cross_chain_requests(
341        &self,
342        sender_chain: ChainId,
343        notifier: &impl Notifier,
344    ) -> Result<(), LocalNodeError> {
345        let actions = self
346            .node
347            .state
348            .cross_chain_network_actions(sender_chain)
349            .await?;
350        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
351        while let Some(request) = requests.pop_front() {
352            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
353            notifier.notify(&new_actions.notifications);
354            requests.extend(new_actions.cross_chain_requests);
355        }
356        Ok(())
357    }
358
359    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
360    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
361    /// to `receiver_id` to the outbox yet.
362    pub async fn next_outbox_heights(
363        &self,
364        chain_ids: impl IntoIterator<Item = &ChainId>,
365        receiver_id: ChainId,
366    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
367        let futures = chain_ids
368            .into_iter()
369            .map(|chain_id| async move {
370                let (next_block_height, next_height_to_schedule) = match self
371                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
372                    .await
373                {
374                    Ok(info) => info,
375                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
376                        return Ok((*chain_id, BlockHeight::ZERO))
377                    }
378                    Err(err) => Err(err)?,
379                };
380                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
381                    next_block_height.max(scheduled_height)
382                } else {
383                    next_block_height
384                };
385                Ok::<_, LocalNodeError>((*chain_id, next_height))
386            })
387            .collect::<FuturesUnordered<_>>();
388        futures.try_collect().await
389    }
390
391    pub async fn update_received_certificate_trackers(
392        &self,
393        chain_id: ChainId,
394        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
395    ) -> Result<(), LocalNodeError> {
396        self.node
397            .state
398            .update_received_certificate_trackers(chain_id, new_trackers)
399            .await?;
400        Ok(())
401    }
402
403    pub async fn get_preprocessed_block_hashes(
404        &self,
405        chain_id: ChainId,
406        start: BlockHeight,
407        end: BlockHeight,
408    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
409        Ok(self
410            .node
411            .state
412            .get_preprocessed_block_hashes(chain_id, start, end)
413            .await?)
414    }
415
416    pub async fn get_inbox_next_height(
417        &self,
418        chain_id: ChainId,
419        origin: ChainId,
420    ) -> Result<BlockHeight, LocalNodeError> {
421        Ok(self
422            .node
423            .state
424            .get_inbox_next_height(chain_id, origin)
425            .await?)
426    }
427
428    /// Gets block hashes for the given heights.
429    pub async fn get_block_hashes(
430        &self,
431        chain_id: ChainId,
432        heights: Vec<BlockHeight>,
433    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
434        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
435    }
436
437    /// Gets proposed blobs from the manager for specified blob IDs.
438    pub async fn get_proposed_blobs(
439        &self,
440        chain_id: ChainId,
441        blob_ids: Vec<BlobId>,
442    ) -> Result<Vec<Blob>, LocalNodeError> {
443        Ok(self
444            .node
445            .state
446            .get_proposed_blobs(chain_id, blob_ids)
447            .await?)
448    }
449
450    /// Gets event subscriptions from the chain.
451    pub async fn get_event_subscriptions(
452        &self,
453        chain_id: ChainId,
454    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
455        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
456    }
457
458    /// Gets a stream's [`StreamCounts`]: its next expected event index and its lowest readable
459    /// index, read from a single chain state view so they are mutually consistent.
460    pub async fn get_stream_indices(
461        &self,
462        chain_id: ChainId,
463        stream_id: StreamId,
464    ) -> Result<StreamCounts, LocalNodeError> {
465        Ok(self
466            .node
467            .state
468            .get_stream_indices(chain_id, stream_id)
469            .await?)
470    }
471
472    /// Gets the `next_expected_events` indices for the given streams.
473    pub async fn next_expected_events(
474        &self,
475        chain_id: ChainId,
476        stream_ids: Vec<StreamId>,
477    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
478        Ok(self
479            .node
480            .state
481            .next_expected_events(chain_id, stream_ids)
482            .await?)
483    }
484
485    /// Test helper: resets a chain and re-executes it from its latest checkpoint.
486    #[cfg(with_testing)]
487    pub async fn reset_and_reexecute_chain(
488        &self,
489        chain_id: ChainId,
490    ) -> Result<Vec<crate::data_types::CrossChainRequest>, LocalNodeError> {
491        Ok(self.node.state.reset_and_reexecute_chain(chain_id).await?)
492    }
493
494    /// Gets received certificate trackers.
495    pub async fn get_received_certificate_trackers(
496        &self,
497        chain_id: ChainId,
498    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
499        Ok(self
500            .node
501            .state
502            .get_received_certificate_trackers(chain_id)
503            .await?)
504    }
505
506    /// Gets tip state and outbox info for next_outbox_heights calculation.
507    pub async fn get_tip_state_and_outbox_info(
508        &self,
509        chain_id: ChainId,
510        receiver_id: ChainId,
511    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
512        Ok(self
513            .node
514            .state
515            .get_tip_state_and_outbox_info(chain_id, receiver_id)
516            .await?)
517    }
518
519    /// Gets the next height to preprocess.
520    pub async fn get_next_height_to_preprocess(
521        &self,
522        chain_id: ChainId,
523    ) -> Result<BlockHeight, LocalNodeError> {
524        Ok(self
525            .node
526            .state
527            .get_next_height_to_preprocess(chain_id)
528            .await?)
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535
536    #[test]
537    fn error_type_delegates_to_worker_error() {
538        assert_eq!(
539            LocalNodeError::WorkerError(WorkerError::InvalidOwner).error_type(),
540            "WorkerError::InvalidOwner"
541        );
542    }
543
544    #[test]
545    fn error_type_falls_back_to_local_node_variant() {
546        assert_eq!(
547            LocalNodeError::InvalidChainInfoResponse.error_type(),
548            "LocalNodeError::InvalidChainInfoResponse"
549        );
550    }
551}