Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19    ChainError, ChainExecutionContext, StreamCounts,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28    chain_worker::ProcessConfirmedBlockMode,
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53#[allow(missing_docs)]
54pub enum LocalNodeError {
55    #[error(transparent)]
56    ArithmeticError(#[from] ArithmeticError),
57
58    #[error(transparent)]
59    ViewError(#[from] ViewError),
60
61    #[error("Worker operation failed: {0}")]
62    WorkerError(WorkerError),
63
64    #[error("The local node doesn't have an active chain {0}")]
65    InactiveChain(ChainId),
66
67    #[error("The chain info response received from the local node is invalid")]
68    InvalidChainInfoResponse,
69
70    #[error("Blobs not found: {0:?}")]
71    BlobsNotFound(Vec<BlobId>),
72
73    #[error("Blocks not found: {0:?}")]
74    BlocksNotFound(Vec<CryptoHash>),
75
76    #[error("Events not found: {0:?}")]
77    EventsNotFound(Vec<EventId>),
78}
79
80impl From<ExecutionError> for LocalNodeError {
81    fn from(error: ExecutionError) -> Self {
82        match error {
83            ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
84            ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
85            ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
86            error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
87                Box::new(error),
88                ChainExecutionContext::Block,
89            ))),
90        }
91    }
92}
93
94impl From<WorkerError> for LocalNodeError {
95    fn from(error: WorkerError) -> Self {
96        match error {
97            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
98            WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
99            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
100            error => LocalNodeError::WorkerError(error),
101        }
102    }
103}
104
105impl<S> LocalNodeClient<S>
106where
107    S: Storage + Clone + 'static,
108{
109    #[instrument(level = "trace", skip_all)]
110    pub async fn handle_block_proposal(
111        &self,
112        proposal: BlockProposal,
113    ) -> Result<ChainInfoResponse, LocalNodeError> {
114        // In local nodes, cross-chain actions will be handled internally, so we discard them.
115        let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
116        Ok(response?)
117    }
118
119    #[instrument(level = "trace", skip_all)]
120    pub async fn handle_certificate<T>(
121        &self,
122        certificate: GenericCertificate<T>,
123        notifier: &impl Notifier,
124    ) -> Result<ChainInfoResponse, LocalNodeError>
125    where
126        T: ProcessableCertificate,
127    {
128        Ok(Box::pin(
129            self.node
130                .state
131                .fully_handle_certificate_with_notifications(certificate, notifier),
132        )
133        .await?)
134    }
135
136    /// Same as [`Self::handle_certificate`] but for a confirmed block certificate
137    /// and with an explicit [`ProcessConfirmedBlockMode`]. The generic variant
138    /// always uses [`ProcessConfirmedBlockMode::Auto`].
139    #[instrument(level = "trace", skip_all)]
140    pub async fn handle_confirmed_certificate(
141        &self,
142        certificate: ConfirmedBlockCertificate,
143        mode: ProcessConfirmedBlockMode,
144        notifier: &impl Notifier,
145    ) -> Result<ChainInfoResponse, LocalNodeError> {
146        Ok(Box::pin(
147            self.node
148                .state
149                .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
150        )
151        .await?)
152    }
153
154    /// Test helper: returns the stored [`ConfirmedBlockCertificate`] for a
155    /// chain's block at a given height (or [`WorkerError::BlocksNotFound`] if
156    /// the height is referenced but the bytes aren't local).
157    #[cfg(with_testing)]
158    pub async fn read_certificate(
159        &self,
160        chain_id: linera_base::identifiers::ChainId,
161        height: linera_base::data_types::BlockHeight,
162    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
163        Ok(self.node.state.read_certificate(chain_id, height).await?)
164    }
165
166    #[instrument(level = "trace", skip_all)]
167    pub async fn handle_chain_info_query(
168        &self,
169        query: ChainInfoQuery,
170    ) -> Result<ChainInfoResponse, LocalNodeError> {
171        Ok(self.node.state.handle_chain_info_query(query).await?)
172    }
173
174    #[instrument(level = "trace", skip_all)]
175    pub fn new(state: WorkerState<S>) -> Self {
176        Self {
177            node: Arc::new(LocalNode { state }),
178        }
179    }
180
181    #[instrument(level = "trace", skip_all)]
182    pub(crate) fn storage_client(&self) -> S {
183        self.node.state.storage_client().clone()
184    }
185
186    /// Executes a block with a policy for handling bundle failures.
187    ///
188    /// Returns the modified block (bundles may be rejected/removed based on the policy),
189    /// the executed block, chain info response, and resource tracker.
190    #[instrument(level = "trace", skip_all)]
191    pub async fn stage_block_execution(
192        &self,
193        block: ProposedBlock,
194        round: Option<u32>,
195        published_blobs: Vec<Blob>,
196        policy: BundleExecutionPolicy,
197    ) -> Result<
198        (
199            ProposedBlock,
200            Block,
201            ChainInfoResponse,
202            ResourceTracker,
203            HashSet<ChainId>,
204        ),
205        LocalNodeError,
206    > {
207        Ok(self
208            .node
209            .state
210            .stage_block_execution(block, round, published_blobs, policy)
211            .await?)
212    }
213
214    /// Reads blobs from storage.
215    pub async fn read_blobs_from_storage(
216        &self,
217        blob_ids: &[BlobId],
218    ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
219        let storage = self.storage_client();
220        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
221    }
222
223    /// Reads blob states from storage.
224    pub async fn read_blob_states_from_storage(
225        &self,
226        blob_ids: &[BlobId],
227    ) -> Result<Vec<BlobState>, LocalNodeError> {
228        let storage = self.storage_client();
229        let mut blobs_not_found = Vec::new();
230        let mut blob_states = Vec::new();
231        for (blob_state, blob_id) in storage
232            .read_blob_states(blob_ids)
233            .await?
234            .into_iter()
235            .zip(blob_ids)
236        {
237            match blob_state {
238                None => blobs_not_found.push(*blob_id),
239                Some(blob_state) => blob_states.push(blob_state),
240            }
241        }
242        if !blobs_not_found.is_empty() {
243            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
244        }
245        Ok(blob_states)
246    }
247
248    /// Looks for the specified blobs in the local chain manager's locking blobs.
249    /// Returns `Ok(None)` if any of the blobs is not found.
250    pub async fn get_locking_blobs(
251        &self,
252        blob_ids: impl IntoIterator<Item = &BlobId>,
253        chain_id: ChainId,
254    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
255        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
256        Ok(self
257            .node
258            .state
259            .get_locking_blobs(chain_id, blob_ids_vec)
260            .await?)
261    }
262
263    /// Writes the given blobs to storage if there is an appropriate blob state.
264    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
265        let storage = self.storage_client();
266        storage.maybe_write_blobs(blobs).await?;
267        Ok(())
268    }
269
270    pub async fn handle_pending_blobs(
271        &self,
272        chain_id: ChainId,
273        blobs: Vec<Blob>,
274    ) -> Result<(), LocalNodeError> {
275        for blob in blobs {
276            self.node.state.handle_pending_blob(chain_id, blob).await?;
277        }
278        Ok(())
279    }
280
281    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
282    /// [`ChainId`].
283    ///
284    /// The returned view holds a lock on the chain state, which prevents the local node from
285    /// changing the state of that chain.
286    #[instrument(level = "trace", skip(self))]
287    pub async fn chain_state_view(
288        &self,
289        chain_id: ChainId,
290    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
291        Ok(self.node.state.chain_state_view(chain_id).await?)
292    }
293
294    #[instrument(level = "trace", skip(self))]
295    pub(crate) async fn chain_info(
296        &self,
297        chain_id: ChainId,
298    ) -> Result<Box<ChainInfo>, LocalNodeError> {
299        let query = ChainInfoQuery::new(chain_id);
300        Ok(self.handle_chain_info_query(query).await?.info)
301    }
302
303    #[instrument(level = "trace", skip(self, query))]
304    pub async fn query_application(
305        &self,
306        chain_id: ChainId,
307        query: Query,
308        block_hash: Option<CryptoHash>,
309    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
310        let result = self
311            .node
312            .state
313            .query_application(chain_id, query, block_hash)
314            .await?;
315        Ok(result)
316    }
317
318    /// Handles any pending local cross-chain requests.
319    ///
320    /// Does not initialize the sender chain's execution state, so it is safe to
321    /// call even when the sender's `ChainDescription` blob is not in local storage.
322    /// Previously this went through `handle_chain_info_query`, which unconditionally
323    /// initialized the worker and therefore forced a `ChainDescription` download on
324    /// every call.
325    #[instrument(level = "trace", skip(self, notifier))]
326    pub async fn retry_pending_cross_chain_requests(
327        &self,
328        sender_chain: ChainId,
329        notifier: &impl Notifier,
330    ) -> Result<(), LocalNodeError> {
331        let actions = self
332            .node
333            .state
334            .cross_chain_network_actions(sender_chain)
335            .await?;
336        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
337        while let Some(request) = requests.pop_front() {
338            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
339            notifier.notify(&new_actions.notifications);
340            requests.extend(new_actions.cross_chain_requests);
341        }
342        Ok(())
343    }
344
345    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
346    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
347    /// to `receiver_id` to the outbox yet.
348    pub async fn next_outbox_heights(
349        &self,
350        chain_ids: impl IntoIterator<Item = &ChainId>,
351        receiver_id: ChainId,
352    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
353        let futures = chain_ids
354            .into_iter()
355            .map(|chain_id| async move {
356                let (next_block_height, next_height_to_schedule) = match self
357                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
358                    .await
359                {
360                    Ok(info) => info,
361                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
362                        return Ok((*chain_id, BlockHeight::ZERO))
363                    }
364                    Err(err) => Err(err)?,
365                };
366                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
367                    next_block_height.max(scheduled_height)
368                } else {
369                    next_block_height
370                };
371                Ok::<_, LocalNodeError>((*chain_id, next_height))
372            })
373            .collect::<FuturesUnordered<_>>();
374        futures.try_collect().await
375    }
376
377    pub async fn update_received_certificate_trackers(
378        &self,
379        chain_id: ChainId,
380        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
381    ) -> Result<(), LocalNodeError> {
382        self.node
383            .state
384            .update_received_certificate_trackers(chain_id, new_trackers)
385            .await?;
386        Ok(())
387    }
388
389    pub async fn get_preprocessed_block_hashes(
390        &self,
391        chain_id: ChainId,
392        start: BlockHeight,
393        end: BlockHeight,
394    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
395        Ok(self
396            .node
397            .state
398            .get_preprocessed_block_hashes(chain_id, start, end)
399            .await?)
400    }
401
402    pub async fn get_inbox_next_height(
403        &self,
404        chain_id: ChainId,
405        origin: ChainId,
406    ) -> Result<BlockHeight, LocalNodeError> {
407        Ok(self
408            .node
409            .state
410            .get_inbox_next_height(chain_id, origin)
411            .await?)
412    }
413
414    /// Gets block hashes for the given heights.
415    pub async fn get_block_hashes(
416        &self,
417        chain_id: ChainId,
418        heights: Vec<BlockHeight>,
419    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
420        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
421    }
422
423    /// Gets proposed blobs from the manager for specified blob IDs.
424    pub async fn get_proposed_blobs(
425        &self,
426        chain_id: ChainId,
427        blob_ids: Vec<BlobId>,
428    ) -> Result<Vec<Blob>, LocalNodeError> {
429        Ok(self
430            .node
431            .state
432            .get_proposed_blobs(chain_id, blob_ids)
433            .await?)
434    }
435
436    /// Gets event subscriptions from the chain.
437    pub async fn get_event_subscriptions(
438        &self,
439        chain_id: ChainId,
440    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
441        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
442    }
443
444    /// Gets a stream's [`StreamCounts`]: its next expected event index and its lowest readable
445    /// index, read from a single chain state view so they are mutually consistent.
446    pub async fn get_stream_indices(
447        &self,
448        chain_id: ChainId,
449        stream_id: StreamId,
450    ) -> Result<StreamCounts, LocalNodeError> {
451        Ok(self
452            .node
453            .state
454            .get_stream_indices(chain_id, stream_id)
455            .await?)
456    }
457
458    /// Gets the `next_expected_events` indices for the given streams.
459    pub async fn next_expected_events(
460        &self,
461        chain_id: ChainId,
462        stream_ids: Vec<StreamId>,
463    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
464        Ok(self
465            .node
466            .state
467            .next_expected_events(chain_id, stream_ids)
468            .await?)
469    }
470
471    /// Test helper: resets a chain and re-executes it from its latest checkpoint.
472    #[cfg(with_testing)]
473    pub async fn reset_and_reexecute_chain(
474        &self,
475        chain_id: ChainId,
476    ) -> Result<Vec<crate::data_types::CrossChainRequest>, LocalNodeError> {
477        Ok(self.node.state.reset_and_reexecute_chain(chain_id).await?)
478    }
479
480    /// Gets received certificate trackers.
481    pub async fn get_received_certificate_trackers(
482        &self,
483        chain_id: ChainId,
484    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
485        Ok(self
486            .node
487            .state
488            .get_received_certificate_trackers(chain_id)
489            .await?)
490    }
491
492    /// Gets tip state and outbox info for next_outbox_heights calculation.
493    pub async fn get_tip_state_and_outbox_info(
494        &self,
495        chain_id: ChainId,
496        receiver_id: ChainId,
497    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
498        Ok(self
499            .node
500            .state
501            .get_tip_state_and_outbox_info(chain_id, receiver_id)
502            .await?)
503    }
504
505    /// Gets the next height to preprocess.
506    pub async fn get_next_height_to_preprocess(
507        &self,
508        chain_id: ChainId,
509    ) -> Result<BlockHeight, LocalNodeError> {
510        Ok(self
511            .node
512            .state
513            .get_next_height_to_preprocess(chain_id)
514            .await?)
515    }
516}