Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19    ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28    chain_worker::ProcessConfirmedBlockMode,
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71
72    #[error("Events not found: {0:?}")]
73    EventsNotFound(Vec<EventId>),
74}
75
76impl From<ExecutionError> for LocalNodeError {
77    fn from(error: ExecutionError) -> Self {
78        match error {
79            ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80            ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
81            ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
82            error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
83                Box::new(error),
84                ChainExecutionContext::Block,
85            ))),
86        }
87    }
88}
89
90impl From<WorkerError> for LocalNodeError {
91    fn from(error: WorkerError) -> Self {
92        match error {
93            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
94            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
95            error => LocalNodeError::WorkerError(error),
96        }
97    }
98}
99
100impl<S> LocalNodeClient<S>
101where
102    S: Storage + Clone + 'static,
103{
104    #[instrument(level = "trace", skip_all)]
105    pub async fn handle_block_proposal(
106        &self,
107        proposal: BlockProposal,
108    ) -> Result<ChainInfoResponse, LocalNodeError> {
109        // In local nodes, cross-chain actions will be handled internally, so we discard them.
110        let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
111        Ok(response?)
112    }
113
114    #[instrument(level = "trace", skip_all)]
115    pub async fn handle_certificate<T>(
116        &self,
117        certificate: GenericCertificate<T>,
118        notifier: &impl Notifier,
119    ) -> Result<ChainInfoResponse, LocalNodeError>
120    where
121        T: ProcessableCertificate,
122    {
123        Ok(Box::pin(
124            self.node
125                .state
126                .fully_handle_certificate_with_notifications(certificate, notifier),
127        )
128        .await?)
129    }
130
131    /// Same as [`Self::handle_certificate`] but for a confirmed block certificate
132    /// and with an explicit [`ProcessConfirmedBlockMode`]. The generic variant
133    /// always uses [`ProcessConfirmedBlockMode::Auto`].
134    #[instrument(level = "trace", skip_all)]
135    pub async fn handle_confirmed_certificate(
136        &self,
137        certificate: ConfirmedBlockCertificate,
138        mode: ProcessConfirmedBlockMode,
139        notifier: &impl Notifier,
140    ) -> Result<ChainInfoResponse, LocalNodeError> {
141        Ok(Box::pin(
142            self.node
143                .state
144                .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
145        )
146        .await?)
147    }
148
149    #[instrument(level = "trace", skip_all)]
150    pub async fn handle_chain_info_query(
151        &self,
152        query: ChainInfoQuery,
153    ) -> Result<ChainInfoResponse, LocalNodeError> {
154        Ok(self.node.state.handle_chain_info_query(query).await?)
155    }
156
157    #[instrument(level = "trace", skip_all)]
158    pub fn new(state: WorkerState<S>) -> Self {
159        Self {
160            node: Arc::new(LocalNode { state }),
161        }
162    }
163
164    #[instrument(level = "trace", skip_all)]
165    pub(crate) fn storage_client(&self) -> S {
166        self.node.state.storage_client().clone()
167    }
168
169    /// Executes a block with a policy for handling bundle failures.
170    ///
171    /// Returns the modified block (bundles may be rejected/removed based on the policy),
172    /// the executed block, chain info response, and resource tracker.
173    #[instrument(level = "trace", skip_all)]
174    pub async fn stage_block_execution(
175        &self,
176        block: ProposedBlock,
177        round: Option<u32>,
178        published_blobs: Vec<Blob>,
179        policy: BundleExecutionPolicy,
180    ) -> Result<
181        (
182            ProposedBlock,
183            Block,
184            ChainInfoResponse,
185            ResourceTracker,
186            HashSet<ChainId>,
187        ),
188        LocalNodeError,
189    > {
190        Ok(self
191            .node
192            .state
193            .stage_block_execution(block, round, published_blobs, policy)
194            .await?)
195    }
196
197    /// Reads blobs from storage.
198    pub async fn read_blobs_from_storage(
199        &self,
200        blob_ids: &[BlobId],
201    ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
202        let storage = self.storage_client();
203        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
204    }
205
206    /// Reads blob states from storage.
207    pub async fn read_blob_states_from_storage(
208        &self,
209        blob_ids: &[BlobId],
210    ) -> Result<Vec<BlobState>, LocalNodeError> {
211        let storage = self.storage_client();
212        let mut blobs_not_found = Vec::new();
213        let mut blob_states = Vec::new();
214        for (blob_state, blob_id) in storage
215            .read_blob_states(blob_ids)
216            .await?
217            .into_iter()
218            .zip(blob_ids)
219        {
220            match blob_state {
221                None => blobs_not_found.push(*blob_id),
222                Some(blob_state) => blob_states.push(blob_state),
223            }
224        }
225        if !blobs_not_found.is_empty() {
226            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
227        }
228        Ok(blob_states)
229    }
230
231    /// Looks for the specified blobs in the local chain manager's locking blobs.
232    /// Returns `Ok(None)` if any of the blobs is not found.
233    pub async fn get_locking_blobs(
234        &self,
235        blob_ids: impl IntoIterator<Item = &BlobId>,
236        chain_id: ChainId,
237    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
238        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
239        Ok(self
240            .node
241            .state
242            .get_locking_blobs(chain_id, blob_ids_vec)
243            .await?)
244    }
245
246    /// Writes the given blobs to storage if there is an appropriate blob state.
247    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
248        let storage = self.storage_client();
249        storage.maybe_write_blobs(blobs).await?;
250        Ok(())
251    }
252
253    pub async fn handle_pending_blobs(
254        &self,
255        chain_id: ChainId,
256        blobs: Vec<Blob>,
257    ) -> Result<(), LocalNodeError> {
258        for blob in blobs {
259            self.node.state.handle_pending_blob(chain_id, blob).await?;
260        }
261        Ok(())
262    }
263
264    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
265    /// [`ChainId`].
266    ///
267    /// The returned view holds a lock on the chain state, which prevents the local node from
268    /// changing the state of that chain.
269    #[instrument(level = "trace", skip(self))]
270    pub async fn chain_state_view(
271        &self,
272        chain_id: ChainId,
273    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
274        Ok(self.node.state.chain_state_view(chain_id).await?)
275    }
276
277    #[instrument(level = "trace", skip(self))]
278    pub(crate) async fn chain_info(
279        &self,
280        chain_id: ChainId,
281    ) -> Result<Box<ChainInfo>, LocalNodeError> {
282        let query = ChainInfoQuery::new(chain_id);
283        Ok(self.handle_chain_info_query(query).await?.info)
284    }
285
286    #[instrument(level = "trace", skip(self, query))]
287    pub async fn query_application(
288        &self,
289        chain_id: ChainId,
290        query: Query,
291        block_hash: Option<CryptoHash>,
292    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
293        let result = self
294            .node
295            .state
296            .query_application(chain_id, query, block_hash)
297            .await?;
298        Ok(result)
299    }
300
301    /// Handles any pending local cross-chain requests.
302    ///
303    /// Does not initialize the sender chain's execution state, so it is safe to
304    /// call even when the sender's `ChainDescription` blob is not in local storage.
305    /// Previously this went through `handle_chain_info_query`, which unconditionally
306    /// initialized the worker and therefore forced a `ChainDescription` download on
307    /// every call.
308    #[instrument(level = "trace", skip(self, notifier))]
309    pub async fn retry_pending_cross_chain_requests(
310        &self,
311        sender_chain: ChainId,
312        notifier: &impl Notifier,
313    ) -> Result<(), LocalNodeError> {
314        let actions = self
315            .node
316            .state
317            .cross_chain_network_actions(sender_chain)
318            .await?;
319        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
320        while let Some(request) = requests.pop_front() {
321            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
322            notifier.notify(&new_actions.notifications);
323            requests.extend(new_actions.cross_chain_requests);
324        }
325        Ok(())
326    }
327
328    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
329    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
330    /// to `receiver_id` to the outbox yet.
331    pub async fn next_outbox_heights(
332        &self,
333        chain_ids: impl IntoIterator<Item = &ChainId>,
334        receiver_id: ChainId,
335    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
336        let futures = chain_ids
337            .into_iter()
338            .map(|chain_id| async move {
339                let (next_block_height, next_height_to_schedule) = match self
340                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
341                    .await
342                {
343                    Ok(info) => info,
344                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
345                        return Ok((*chain_id, BlockHeight::ZERO))
346                    }
347                    Err(err) => Err(err)?,
348                };
349                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
350                    next_block_height.max(scheduled_height)
351                } else {
352                    next_block_height
353                };
354                Ok::<_, LocalNodeError>((*chain_id, next_height))
355            })
356            .collect::<FuturesUnordered<_>>();
357        futures.try_collect().await
358    }
359
360    pub async fn update_received_certificate_trackers(
361        &self,
362        chain_id: ChainId,
363        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
364    ) -> Result<(), LocalNodeError> {
365        self.node
366            .state
367            .update_received_certificate_trackers(chain_id, new_trackers)
368            .await?;
369        Ok(())
370    }
371
372    pub async fn get_preprocessed_block_hashes(
373        &self,
374        chain_id: ChainId,
375        start: BlockHeight,
376        end: BlockHeight,
377    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
378        Ok(self
379            .node
380            .state
381            .get_preprocessed_block_hashes(chain_id, start, end)
382            .await?)
383    }
384
385    pub async fn get_inbox_next_height(
386        &self,
387        chain_id: ChainId,
388        origin: ChainId,
389    ) -> Result<BlockHeight, LocalNodeError> {
390        Ok(self
391            .node
392            .state
393            .get_inbox_next_height(chain_id, origin)
394            .await?)
395    }
396
397    /// Gets block hashes for the given heights.
398    pub async fn get_block_hashes(
399        &self,
400        chain_id: ChainId,
401        heights: Vec<BlockHeight>,
402    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
403        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
404    }
405
406    /// Gets proposed blobs from the manager for specified blob IDs.
407    pub async fn get_proposed_blobs(
408        &self,
409        chain_id: ChainId,
410        blob_ids: Vec<BlobId>,
411    ) -> Result<Vec<Blob>, LocalNodeError> {
412        Ok(self
413            .node
414            .state
415            .get_proposed_blobs(chain_id, blob_ids)
416            .await?)
417    }
418
419    /// Gets event subscriptions from the chain.
420    pub async fn get_event_subscriptions(
421        &self,
422        chain_id: ChainId,
423    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
424        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
425    }
426
427    /// Gets the next expected event index for a stream.
428    pub async fn get_next_expected_event(
429        &self,
430        chain_id: ChainId,
431        stream_id: StreamId,
432    ) -> Result<Option<u32>, LocalNodeError> {
433        Ok(self
434            .node
435            .state
436            .get_next_expected_event(chain_id, stream_id)
437            .await?)
438    }
439
440    /// Gets the `next_expected_events` indices for the given streams.
441    pub async fn next_expected_events(
442        &self,
443        chain_id: ChainId,
444        stream_ids: Vec<StreamId>,
445    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
446        Ok(self
447            .node
448            .state
449            .next_expected_events(chain_id, stream_ids)
450            .await?)
451    }
452
453    /// Gets received certificate trackers.
454    pub async fn get_received_certificate_trackers(
455        &self,
456        chain_id: ChainId,
457    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
458        Ok(self
459            .node
460            .state
461            .get_received_certificate_trackers(chain_id)
462            .await?)
463    }
464
465    /// Gets tip state and outbox info for next_outbox_heights calculation.
466    pub async fn get_tip_state_and_outbox_info(
467        &self,
468        chain_id: ChainId,
469        receiver_id: ChainId,
470    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
471        Ok(self
472            .node
473            .state
474            .get_tip_state_and_outbox_info(chain_id, receiver_id)
475            .await?)
476    }
477
478    /// Gets the next height to preprocess.
479    pub async fn get_next_height_to_preprocess(
480        &self,
481        chain_id: ChainId,
482    ) -> Result<BlockHeight, LocalNodeError> {
483        Ok(self
484            .node
485            .state
486            .get_next_height_to_preprocess(chain_id)
487            .await?)
488    }
489}