Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19    ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28    chain_worker::ProcessConfirmedBlockMode,
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71
72    #[error("Blocks not found: {0:?}")]
73    BlocksNotFound(Vec<CryptoHash>),
74
75    #[error("Events not found: {0:?}")]
76    EventsNotFound(Vec<EventId>),
77}
78
79impl From<ExecutionError> for LocalNodeError {
80    fn from(error: ExecutionError) -> Self {
81        match error {
82            ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
83            ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
84            ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
85            error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
86                Box::new(error),
87                ChainExecutionContext::Block,
88            ))),
89        }
90    }
91}
92
93impl From<WorkerError> for LocalNodeError {
94    fn from(error: WorkerError) -> Self {
95        match error {
96            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
97            WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
98            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
99            error => LocalNodeError::WorkerError(error),
100        }
101    }
102}
103
104impl<S> LocalNodeClient<S>
105where
106    S: Storage + Clone + 'static,
107{
108    #[instrument(level = "trace", skip_all)]
109    pub async fn handle_block_proposal(
110        &self,
111        proposal: BlockProposal,
112    ) -> Result<ChainInfoResponse, LocalNodeError> {
113        // In local nodes, cross-chain actions will be handled internally, so we discard them.
114        let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
115        Ok(response?)
116    }
117
118    #[instrument(level = "trace", skip_all)]
119    pub async fn handle_certificate<T>(
120        &self,
121        certificate: GenericCertificate<T>,
122        notifier: &impl Notifier,
123    ) -> Result<ChainInfoResponse, LocalNodeError>
124    where
125        T: ProcessableCertificate,
126    {
127        Ok(Box::pin(
128            self.node
129                .state
130                .fully_handle_certificate_with_notifications(certificate, notifier),
131        )
132        .await?)
133    }
134
135    /// Same as [`Self::handle_certificate`] but for a confirmed block certificate
136    /// and with an explicit [`ProcessConfirmedBlockMode`]. The generic variant
137    /// always uses [`ProcessConfirmedBlockMode::Auto`].
138    #[instrument(level = "trace", skip_all)]
139    pub async fn handle_confirmed_certificate(
140        &self,
141        certificate: ConfirmedBlockCertificate,
142        mode: ProcessConfirmedBlockMode,
143        notifier: &impl Notifier,
144    ) -> Result<ChainInfoResponse, LocalNodeError> {
145        Ok(Box::pin(
146            self.node
147                .state
148                .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
149        )
150        .await?)
151    }
152
153    /// Test helper: returns the stored [`ConfirmedBlockCertificate`] for a
154    /// chain's block at a given height (or [`WorkerError::BlocksNotFound`] if
155    /// the height is referenced but the bytes aren't local).
156    #[cfg(with_testing)]
157    pub async fn read_certificate(
158        &self,
159        chain_id: linera_base::identifiers::ChainId,
160        height: linera_base::data_types::BlockHeight,
161    ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
162        Ok(self.node.state.read_certificate(chain_id, height).await?)
163    }
164
165    #[instrument(level = "trace", skip_all)]
166    pub async fn handle_chain_info_query(
167        &self,
168        query: ChainInfoQuery,
169    ) -> Result<ChainInfoResponse, LocalNodeError> {
170        Ok(self.node.state.handle_chain_info_query(query).await?)
171    }
172
173    #[instrument(level = "trace", skip_all)]
174    pub fn new(state: WorkerState<S>) -> Self {
175        Self {
176            node: Arc::new(LocalNode { state }),
177        }
178    }
179
180    #[instrument(level = "trace", skip_all)]
181    pub(crate) fn storage_client(&self) -> S {
182        self.node.state.storage_client().clone()
183    }
184
185    /// Executes a block with a policy for handling bundle failures.
186    ///
187    /// Returns the modified block (bundles may be rejected/removed based on the policy),
188    /// the executed block, chain info response, and resource tracker.
189    #[instrument(level = "trace", skip_all)]
190    pub async fn stage_block_execution(
191        &self,
192        block: ProposedBlock,
193        round: Option<u32>,
194        published_blobs: Vec<Blob>,
195        policy: BundleExecutionPolicy,
196    ) -> Result<
197        (
198            ProposedBlock,
199            Block,
200            ChainInfoResponse,
201            ResourceTracker,
202            HashSet<ChainId>,
203        ),
204        LocalNodeError,
205    > {
206        Ok(self
207            .node
208            .state
209            .stage_block_execution(block, round, published_blobs, policy)
210            .await?)
211    }
212
213    /// Reads blobs from storage.
214    pub async fn read_blobs_from_storage(
215        &self,
216        blob_ids: &[BlobId],
217    ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
218        let storage = self.storage_client();
219        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
220    }
221
222    /// Reads blob states from storage.
223    pub async fn read_blob_states_from_storage(
224        &self,
225        blob_ids: &[BlobId],
226    ) -> Result<Vec<BlobState>, LocalNodeError> {
227        let storage = self.storage_client();
228        let mut blobs_not_found = Vec::new();
229        let mut blob_states = Vec::new();
230        for (blob_state, blob_id) in storage
231            .read_blob_states(blob_ids)
232            .await?
233            .into_iter()
234            .zip(blob_ids)
235        {
236            match blob_state {
237                None => blobs_not_found.push(*blob_id),
238                Some(blob_state) => blob_states.push(blob_state),
239            }
240        }
241        if !blobs_not_found.is_empty() {
242            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
243        }
244        Ok(blob_states)
245    }
246
247    /// Looks for the specified blobs in the local chain manager's locking blobs.
248    /// Returns `Ok(None)` if any of the blobs is not found.
249    pub async fn get_locking_blobs(
250        &self,
251        blob_ids: impl IntoIterator<Item = &BlobId>,
252        chain_id: ChainId,
253    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
254        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
255        Ok(self
256            .node
257            .state
258            .get_locking_blobs(chain_id, blob_ids_vec)
259            .await?)
260    }
261
262    /// Writes the given blobs to storage if there is an appropriate blob state.
263    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
264        let storage = self.storage_client();
265        storage.maybe_write_blobs(blobs).await?;
266        Ok(())
267    }
268
269    pub async fn handle_pending_blobs(
270        &self,
271        chain_id: ChainId,
272        blobs: Vec<Blob>,
273    ) -> Result<(), LocalNodeError> {
274        for blob in blobs {
275            self.node.state.handle_pending_blob(chain_id, blob).await?;
276        }
277        Ok(())
278    }
279
280    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
281    /// [`ChainId`].
282    ///
283    /// The returned view holds a lock on the chain state, which prevents the local node from
284    /// changing the state of that chain.
285    #[instrument(level = "trace", skip(self))]
286    pub async fn chain_state_view(
287        &self,
288        chain_id: ChainId,
289    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
290        Ok(self.node.state.chain_state_view(chain_id).await?)
291    }
292
293    #[instrument(level = "trace", skip(self))]
294    pub(crate) async fn chain_info(
295        &self,
296        chain_id: ChainId,
297    ) -> Result<Box<ChainInfo>, LocalNodeError> {
298        let query = ChainInfoQuery::new(chain_id);
299        Ok(self.handle_chain_info_query(query).await?.info)
300    }
301
302    #[instrument(level = "trace", skip(self, query))]
303    pub async fn query_application(
304        &self,
305        chain_id: ChainId,
306        query: Query,
307        block_hash: Option<CryptoHash>,
308    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
309        let result = self
310            .node
311            .state
312            .query_application(chain_id, query, block_hash)
313            .await?;
314        Ok(result)
315    }
316
317    /// Handles any pending local cross-chain requests.
318    ///
319    /// Does not initialize the sender chain's execution state, so it is safe to
320    /// call even when the sender's `ChainDescription` blob is not in local storage.
321    /// Previously this went through `handle_chain_info_query`, which unconditionally
322    /// initialized the worker and therefore forced a `ChainDescription` download on
323    /// every call.
324    #[instrument(level = "trace", skip(self, notifier))]
325    pub async fn retry_pending_cross_chain_requests(
326        &self,
327        sender_chain: ChainId,
328        notifier: &impl Notifier,
329    ) -> Result<(), LocalNodeError> {
330        let actions = self
331            .node
332            .state
333            .cross_chain_network_actions(sender_chain)
334            .await?;
335        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
336        while let Some(request) = requests.pop_front() {
337            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
338            notifier.notify(&new_actions.notifications);
339            requests.extend(new_actions.cross_chain_requests);
340        }
341        Ok(())
342    }
343
344    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
345    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
346    /// to `receiver_id` to the outbox yet.
347    pub async fn next_outbox_heights(
348        &self,
349        chain_ids: impl IntoIterator<Item = &ChainId>,
350        receiver_id: ChainId,
351    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
352        let futures = chain_ids
353            .into_iter()
354            .map(|chain_id| async move {
355                let (next_block_height, next_height_to_schedule) = match self
356                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
357                    .await
358                {
359                    Ok(info) => info,
360                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
361                        return Ok((*chain_id, BlockHeight::ZERO))
362                    }
363                    Err(err) => Err(err)?,
364                };
365                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
366                    next_block_height.max(scheduled_height)
367                } else {
368                    next_block_height
369                };
370                Ok::<_, LocalNodeError>((*chain_id, next_height))
371            })
372            .collect::<FuturesUnordered<_>>();
373        futures.try_collect().await
374    }
375
376    pub async fn update_received_certificate_trackers(
377        &self,
378        chain_id: ChainId,
379        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
380    ) -> Result<(), LocalNodeError> {
381        self.node
382            .state
383            .update_received_certificate_trackers(chain_id, new_trackers)
384            .await?;
385        Ok(())
386    }
387
388    pub async fn get_preprocessed_block_hashes(
389        &self,
390        chain_id: ChainId,
391        start: BlockHeight,
392        end: BlockHeight,
393    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
394        Ok(self
395            .node
396            .state
397            .get_preprocessed_block_hashes(chain_id, start, end)
398            .await?)
399    }
400
401    pub async fn get_inbox_next_height(
402        &self,
403        chain_id: ChainId,
404        origin: ChainId,
405    ) -> Result<BlockHeight, LocalNodeError> {
406        Ok(self
407            .node
408            .state
409            .get_inbox_next_height(chain_id, origin)
410            .await?)
411    }
412
413    /// Gets block hashes for the given heights.
414    pub async fn get_block_hashes(
415        &self,
416        chain_id: ChainId,
417        heights: Vec<BlockHeight>,
418    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
419        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
420    }
421
422    /// Gets proposed blobs from the manager for specified blob IDs.
423    pub async fn get_proposed_blobs(
424        &self,
425        chain_id: ChainId,
426        blob_ids: Vec<BlobId>,
427    ) -> Result<Vec<Blob>, LocalNodeError> {
428        Ok(self
429            .node
430            .state
431            .get_proposed_blobs(chain_id, blob_ids)
432            .await?)
433    }
434
435    /// Gets event subscriptions from the chain.
436    pub async fn get_event_subscriptions(
437        &self,
438        chain_id: ChainId,
439    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
440        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
441    }
442
443    /// Gets the next expected event index for a stream.
444    pub async fn get_next_expected_event(
445        &self,
446        chain_id: ChainId,
447        stream_id: StreamId,
448    ) -> Result<Option<u32>, LocalNodeError> {
449        Ok(self
450            .node
451            .state
452            .get_next_expected_event(chain_id, stream_id)
453            .await?)
454    }
455
456    /// Gets the `next_expected_events` indices for the given streams.
457    pub async fn next_expected_events(
458        &self,
459        chain_id: ChainId,
460        stream_ids: Vec<StreamId>,
461    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
462        Ok(self
463            .node
464            .state
465            .next_expected_events(chain_id, stream_ids)
466            .await?)
467    }
468
469    /// Gets received certificate trackers.
470    pub async fn get_received_certificate_trackers(
471        &self,
472        chain_id: ChainId,
473    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
474        Ok(self
475            .node
476            .state
477            .get_received_certificate_trackers(chain_id)
478            .await?)
479    }
480
481    /// Gets tip state and outbox info for next_outbox_heights calculation.
482    pub async fn get_tip_state_and_outbox_info(
483        &self,
484        chain_id: ChainId,
485        receiver_id: ChainId,
486    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
487        Ok(self
488            .node
489            .state
490            .get_tip_state_and_outbox_info(chain_id, receiver_id)
491            .await?)
492    }
493
494    /// Gets the next height to preprocess.
495    pub async fn get_next_height_to_preprocess(
496        &self,
497        chain_id: ChainId,
498    ) -> Result<BlockHeight, LocalNodeError> {
499        Ok(self
500            .node
501            .state
502            .get_next_height_to_preprocess(chain_id)
503            .await?)
504    }
505}