linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, GenericCertificate},
19    ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
29    notifier::Notifier,
30    worker::{ProcessableCertificate, WorkerError, WorkerState},
31};
32
33/// A local node with a single worker, typically used by clients.
34pub struct LocalNode<S>
35where
36    S: Storage,
37{
38    state: WorkerState<S>,
39}
40
41/// A client to a local node.
42#[derive(Clone)]
43pub struct LocalNodeClient<S>
44where
45    S: Storage,
46{
47    node: Arc<LocalNode<S>>,
48}
49
50/// Error type for the operations on a local node.
51#[derive(Debug, Error)]
52pub enum LocalNodeError {
53    #[error(transparent)]
54    ArithmeticError(#[from] ArithmeticError),
55
56    #[error(transparent)]
57    ViewError(#[from] ViewError),
58
59    #[error("Worker operation failed: {0}")]
60    WorkerError(WorkerError),
61
62    #[error("The local node doesn't have an active chain {0}")]
63    InactiveChain(ChainId),
64
65    #[error("The chain info response received from the local node is invalid")]
66    InvalidChainInfoResponse,
67
68    #[error("Blobs not found: {0:?}")]
69    BlobsNotFound(Vec<BlobId>),
70
71    #[error("Events not found: {0:?}")]
72    EventsNotFound(Vec<EventId>),
73}
74
75impl From<ExecutionError> for LocalNodeError {
76    fn from(error: ExecutionError) -> Self {
77        match error {
78            ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
79            ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
80            ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
81            error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
82                Box::new(error),
83                ChainExecutionContext::Block,
84            ))),
85        }
86    }
87}
88
89impl From<WorkerError> for LocalNodeError {
90    fn from(error: WorkerError) -> Self {
91        match error {
92            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
93            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
94            error => LocalNodeError::WorkerError(error),
95        }
96    }
97}
98
99impl<S> LocalNodeClient<S>
100where
101    S: Storage + Clone + 'static,
102{
103    #[instrument(level = "trace", skip_all)]
104    pub async fn handle_block_proposal(
105        &self,
106        proposal: BlockProposal,
107    ) -> Result<ChainInfoResponse, LocalNodeError> {
108        // In local nodes, cross-chain actions will be handled internally, so we discard them.
109        let (response, _actions) =
110            Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
111        Ok(response)
112    }
113
114    #[instrument(level = "trace", skip_all)]
115    pub async fn handle_certificate<T>(
116        &self,
117        certificate: GenericCertificate<T>,
118        notifier: &impl Notifier,
119    ) -> Result<ChainInfoResponse, LocalNodeError>
120    where
121        T: ProcessableCertificate,
122    {
123        Ok(Box::pin(
124            self.node
125                .state
126                .fully_handle_certificate_with_notifications(certificate, notifier),
127        )
128        .await?)
129    }
130
131    #[instrument(level = "trace", skip_all)]
132    pub async fn handle_chain_info_query(
133        &self,
134        query: ChainInfoQuery,
135    ) -> Result<ChainInfoResponse, LocalNodeError> {
136        Ok(self.node.state.handle_chain_info_query(query).await?)
137    }
138
139    #[instrument(level = "trace", skip_all)]
140    pub fn new(state: WorkerState<S>) -> Self {
141        Self {
142            node: Arc::new(LocalNode { state }),
143        }
144    }
145
146    #[instrument(level = "trace", skip_all)]
147    pub(crate) fn storage_client(&self) -> S {
148        self.node.state.storage_client().clone()
149    }
150
151    /// Executes a block with a policy for handling bundle failures.
152    ///
153    /// Returns the modified block (bundles may be rejected/removed based on the policy),
154    /// the executed block, chain info response, and resource tracker.
155    #[instrument(level = "trace", skip_all)]
156    pub async fn stage_block_execution(
157        &self,
158        block: ProposedBlock,
159        round: Option<u32>,
160        published_blobs: Vec<Blob>,
161        policy: BundleExecutionPolicy,
162    ) -> Result<
163        (
164            ProposedBlock,
165            Block,
166            ChainInfoResponse,
167            ResourceTracker,
168            HashSet<ChainId>,
169        ),
170        LocalNodeError,
171    > {
172        Ok(self
173            .node
174            .state
175            .stage_block_execution(block, round, published_blobs, policy)
176            .await?)
177    }
178
179    /// Reads blobs from storage.
180    pub async fn read_blobs_from_storage(
181        &self,
182        blob_ids: &[BlobId],
183    ) -> Result<Option<Vec<Arc<Blob>>>, LocalNodeError> {
184        let storage = self.storage_client();
185        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
186    }
187
188    /// Reads blob states from storage.
189    pub async fn read_blob_states_from_storage(
190        &self,
191        blob_ids: &[BlobId],
192    ) -> Result<Vec<BlobState>, LocalNodeError> {
193        let storage = self.storage_client();
194        let mut blobs_not_found = Vec::new();
195        let mut blob_states = Vec::new();
196        for (blob_state, blob_id) in storage
197            .read_blob_states(blob_ids)
198            .await?
199            .into_iter()
200            .zip(blob_ids)
201        {
202            match blob_state {
203                None => blobs_not_found.push(*blob_id),
204                Some(blob_state) => blob_states.push(blob_state),
205            }
206        }
207        if !blobs_not_found.is_empty() {
208            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
209        }
210        Ok(blob_states)
211    }
212
213    /// Looks for the specified blobs in the local chain manager's locking blobs.
214    /// Returns `Ok(None)` if any of the blobs is not found.
215    pub async fn get_locking_blobs(
216        &self,
217        blob_ids: impl IntoIterator<Item = &BlobId>,
218        chain_id: ChainId,
219    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
220        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
221        Ok(self
222            .node
223            .state
224            .get_locking_blobs(chain_id, blob_ids_vec)
225            .await?)
226    }
227
228    /// Writes the given blobs to storage if there is an appropriate blob state.
229    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
230        let storage = self.storage_client();
231        storage.maybe_write_blobs(blobs).await?;
232        Ok(())
233    }
234
235    pub async fn handle_pending_blobs(
236        &self,
237        chain_id: ChainId,
238        blobs: Vec<Blob>,
239    ) -> Result<(), LocalNodeError> {
240        for blob in blobs {
241            self.node.state.handle_pending_blob(chain_id, blob).await?;
242        }
243        Ok(())
244    }
245
246    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
247    /// [`ChainId`].
248    ///
249    /// The returned view holds a lock on the chain state, which prevents the local node from
250    /// changing the state of that chain.
251    #[instrument(level = "trace", skip(self))]
252    pub async fn chain_state_view(
253        &self,
254        chain_id: ChainId,
255    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
256        Ok(self.node.state.chain_state_view(chain_id).await?)
257    }
258
259    #[instrument(level = "trace", skip(self))]
260    pub(crate) async fn chain_info(
261        &self,
262        chain_id: ChainId,
263    ) -> Result<Box<ChainInfo>, LocalNodeError> {
264        let query = ChainInfoQuery::new(chain_id);
265        Ok(self.handle_chain_info_query(query).await?.info)
266    }
267
268    #[instrument(level = "trace", skip(self, query))]
269    pub async fn query_application(
270        &self,
271        chain_id: ChainId,
272        query: Query,
273        block_hash: Option<CryptoHash>,
274    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
275        let result = self
276            .node
277            .state
278            .query_application(chain_id, query, block_hash)
279            .await?;
280        Ok(result)
281    }
282
283    /// Handles any pending local cross-chain requests.
284    ///
285    /// Does not initialize the sender chain's execution state, so it is safe to
286    /// call even when the sender's `ChainDescription` blob is not in local storage.
287    /// Previously this went through `handle_chain_info_query`, which unconditionally
288    /// initialized the worker and therefore forced a `ChainDescription` download on
289    /// every call.
290    #[instrument(level = "trace", skip(self, notifier))]
291    pub async fn retry_pending_cross_chain_requests(
292        &self,
293        sender_chain: ChainId,
294        notifier: &impl Notifier,
295    ) -> Result<(), LocalNodeError> {
296        let actions = self
297            .node
298            .state
299            .cross_chain_network_actions(sender_chain)
300            .await?;
301        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
302        while let Some(request) = requests.pop_front() {
303            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
304            notifier.notify(&new_actions.notifications);
305            requests.extend(new_actions.cross_chain_requests);
306        }
307        Ok(())
308    }
309
310    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
311    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
312    /// to `receiver_id` to the outbox yet.
313    pub async fn next_outbox_heights(
314        &self,
315        chain_ids: impl IntoIterator<Item = &ChainId>,
316        receiver_id: ChainId,
317    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
318        let futures = chain_ids
319            .into_iter()
320            .map(|chain_id| async move {
321                let (next_block_height, next_height_to_schedule) = match self
322                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
323                    .await
324                {
325                    Ok(info) => info,
326                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
327                        return Ok((*chain_id, BlockHeight::ZERO))
328                    }
329                    Err(err) => Err(err)?,
330                };
331                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
332                    next_block_height.max(scheduled_height)
333                } else {
334                    next_block_height
335                };
336                Ok::<_, LocalNodeError>((*chain_id, next_height))
337            })
338            .collect::<FuturesUnordered<_>>();
339        futures.try_collect().await
340    }
341
342    pub async fn update_received_certificate_trackers(
343        &self,
344        chain_id: ChainId,
345        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
346    ) -> Result<(), LocalNodeError> {
347        self.node
348            .state
349            .update_received_certificate_trackers(chain_id, new_trackers)
350            .await?;
351        Ok(())
352    }
353
354    pub async fn get_preprocessed_block_hashes(
355        &self,
356        chain_id: ChainId,
357        start: BlockHeight,
358        end: BlockHeight,
359    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
360        Ok(self
361            .node
362            .state
363            .get_preprocessed_block_hashes(chain_id, start, end)
364            .await?)
365    }
366
367    pub async fn get_inbox_next_height(
368        &self,
369        chain_id: ChainId,
370        origin: ChainId,
371    ) -> Result<BlockHeight, LocalNodeError> {
372        Ok(self
373            .node
374            .state
375            .get_inbox_next_height(chain_id, origin)
376            .await?)
377    }
378
379    /// Gets block hashes for the given heights.
380    pub async fn get_block_hashes(
381        &self,
382        chain_id: ChainId,
383        heights: Vec<BlockHeight>,
384    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
385        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
386    }
387
388    /// Gets proposed blobs from the manager for specified blob IDs.
389    pub async fn get_proposed_blobs(
390        &self,
391        chain_id: ChainId,
392        blob_ids: Vec<BlobId>,
393    ) -> Result<Vec<Blob>, LocalNodeError> {
394        Ok(self
395            .node
396            .state
397            .get_proposed_blobs(chain_id, blob_ids)
398            .await?)
399    }
400
401    /// Gets event subscriptions from the chain.
402    pub async fn get_event_subscriptions(
403        &self,
404        chain_id: ChainId,
405    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
406        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
407    }
408
409    /// Gets the next expected event index for a stream.
410    pub async fn get_next_expected_event(
411        &self,
412        chain_id: ChainId,
413        stream_id: StreamId,
414    ) -> Result<Option<u32>, LocalNodeError> {
415        Ok(self
416            .node
417            .state
418            .get_next_expected_event(chain_id, stream_id)
419            .await?)
420    }
421
422    /// Gets the `next_expected_events` indices for the given streams.
423    pub async fn next_expected_events(
424        &self,
425        chain_id: ChainId,
426        stream_ids: Vec<StreamId>,
427    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
428        Ok(self
429            .node
430            .state
431            .next_expected_events(chain_id, stream_ids)
432            .await?)
433    }
434
435    /// Gets received certificate trackers.
436    pub async fn get_received_certificate_trackers(
437        &self,
438        chain_id: ChainId,
439    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
440        Ok(self
441            .node
442            .state
443            .get_received_certificate_trackers(chain_id)
444            .await?)
445    }
446
447    /// Gets tip state and outbox info for next_outbox_heights calculation.
448    pub async fn get_tip_state_and_outbox_info(
449        &self,
450        chain_id: ChainId,
451        receiver_id: ChainId,
452    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
453        Ok(self
454            .node
455            .state
456            .get_tip_state_and_outbox_info(chain_id, receiver_id)
457            .await?)
458    }
459
460    /// Gets the next height to preprocess.
461    pub async fn get_next_height_to_preprocess(
462        &self,
463        chain_id: ChainId,
464    ) -> Result<BlockHeight, LocalNodeError> {
465        Ok(self
466            .node
467            .state
468            .get_next_height_to_preprocess(chain_id)
469            .await?)
470    }
471}