linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{BlobId, ChainId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, GenericCertificate},
19};
20use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
21use linera_storage::Storage;
22use linera_views::ViewError;
23use thiserror::Error;
24use tracing::{instrument, warn};
25
26use crate::{
27    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
28    notifier::Notifier,
29    worker::{ProcessableCertificate, WorkerError, WorkerState},
30};
31
32/// A local node with a single worker, typically used by clients.
33pub struct LocalNode<S>
34where
35    S: Storage,
36{
37    state: WorkerState<S>,
38}
39
40/// A client to a local node.
41#[derive(Clone)]
42pub struct LocalNodeClient<S>
43where
44    S: Storage,
45{
46    node: Arc<LocalNode<S>>,
47}
48
49/// Error type for the operations on a local node.
50#[derive(Debug, Error)]
51pub enum LocalNodeError {
52    #[error(transparent)]
53    ArithmeticError(#[from] ArithmeticError),
54
55    #[error(transparent)]
56    ViewError(#[from] ViewError),
57
58    #[error("Worker operation failed: {0}")]
59    WorkerError(WorkerError),
60
61    #[error("The local node doesn't have an active chain {0}")]
62    InactiveChain(ChainId),
63
64    #[error("The chain info response received from the local node is invalid")]
65    InvalidChainInfoResponse,
66
67    #[error("Blobs not found: {0:?}")]
68    BlobsNotFound(Vec<BlobId>),
69}
70
71impl From<WorkerError> for LocalNodeError {
72    fn from(error: WorkerError) -> Self {
73        match error {
74            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
75            error => LocalNodeError::WorkerError(error),
76        }
77    }
78}
79
80impl<S> LocalNodeClient<S>
81where
82    S: Storage + Clone + 'static,
83{
84    #[instrument(level = "trace", skip_all)]
85    pub async fn handle_block_proposal(
86        &self,
87        proposal: BlockProposal,
88    ) -> Result<ChainInfoResponse, LocalNodeError> {
89        // In local nodes, cross-chain actions will be handled internally, so we discard them.
90        let (response, _actions) =
91            Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
92        Ok(response)
93    }
94
95    #[instrument(level = "trace", skip_all)]
96    pub async fn handle_certificate<T>(
97        &self,
98        certificate: GenericCertificate<T>,
99        notifier: &impl Notifier,
100    ) -> Result<ChainInfoResponse, LocalNodeError>
101    where
102        T: ProcessableCertificate,
103    {
104        Ok(Box::pin(
105            self.node
106                .state
107                .fully_handle_certificate_with_notifications(certificate, notifier),
108        )
109        .await?)
110    }
111
112    #[instrument(level = "trace", skip_all)]
113    pub async fn handle_chain_info_query(
114        &self,
115        query: ChainInfoQuery,
116    ) -> Result<ChainInfoResponse, LocalNodeError> {
117        // In local nodes, cross-chain actions will be handled internally, so we discard them.
118        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
119        Ok(response)
120    }
121
122    #[instrument(level = "trace", skip_all)]
123    pub fn new(state: WorkerState<S>) -> Self {
124        Self {
125            node: Arc::new(LocalNode { state }),
126        }
127    }
128
129    #[instrument(level = "trace", skip_all)]
130    pub(crate) fn storage_client(&self) -> S {
131        self.node.state.storage_client().clone()
132    }
133
134    #[instrument(level = "trace", skip_all)]
135    pub async fn stage_block_execution(
136        &self,
137        block: ProposedBlock,
138        round: Option<u32>,
139        published_blobs: Vec<Blob>,
140    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
141        Ok(self
142            .node
143            .state
144            .stage_block_execution(block, round, published_blobs)
145            .await?)
146    }
147
148    /// Executes a block with a policy for handling bundle failures.
149    ///
150    /// Returns the modified block (bundles may be rejected/removed based on the policy),
151    /// the executed block, chain info response, and resource tracker.
152    #[instrument(level = "trace", skip_all)]
153    pub async fn stage_block_execution_with_policy(
154        &self,
155        block: ProposedBlock,
156        round: Option<u32>,
157        published_blobs: Vec<Blob>,
158        policy: BundleExecutionPolicy,
159    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
160        Ok(self
161            .node
162            .state
163            .stage_block_execution_with_policy(block, round, published_blobs, policy)
164            .await?)
165    }
166
167    /// Reads blobs from storage.
168    pub async fn read_blobs_from_storage(
169        &self,
170        blob_ids: &[BlobId],
171    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
172        let storage = self.storage_client();
173        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
174    }
175
176    /// Reads blob states from storage.
177    pub async fn read_blob_states_from_storage(
178        &self,
179        blob_ids: &[BlobId],
180    ) -> Result<Vec<BlobState>, LocalNodeError> {
181        let storage = self.storage_client();
182        let mut blobs_not_found = Vec::new();
183        let mut blob_states = Vec::new();
184        for (blob_state, blob_id) in storage
185            .read_blob_states(blob_ids)
186            .await?
187            .into_iter()
188            .zip(blob_ids)
189        {
190            match blob_state {
191                None => blobs_not_found.push(*blob_id),
192                Some(blob_state) => blob_states.push(blob_state),
193            }
194        }
195        if !blobs_not_found.is_empty() {
196            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
197        }
198        Ok(blob_states)
199    }
200
201    /// Looks for the specified blobs in the local chain manager's locking blobs.
202    /// Returns `Ok(None)` if any of the blobs is not found.
203    pub async fn get_locking_blobs(
204        &self,
205        blob_ids: impl IntoIterator<Item = &BlobId>,
206        chain_id: ChainId,
207    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
208        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
209        Ok(self
210            .node
211            .state
212            .get_locking_blobs(chain_id, blob_ids_vec)
213            .await?)
214    }
215
216    /// Writes the given blobs to storage if there is an appropriate blob state.
217    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
218        let storage = self.storage_client();
219        storage.maybe_write_blobs(blobs).await?;
220        Ok(())
221    }
222
223    pub async fn handle_pending_blobs(
224        &self,
225        chain_id: ChainId,
226        blobs: Vec<Blob>,
227    ) -> Result<(), LocalNodeError> {
228        for blob in blobs {
229            self.node.state.handle_pending_blob(chain_id, blob).await?;
230        }
231        Ok(())
232    }
233
234    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
235    /// [`ChainId`].
236    ///
237    /// The returned view holds a lock on the chain state, which prevents the local node from
238    /// changing the state of that chain.
239    #[instrument(level = "trace", skip(self))]
240    pub async fn chain_state_view(
241        &self,
242        chain_id: ChainId,
243    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
244        Ok(self.node.state.chain_state_view(chain_id).await?)
245    }
246
247    #[instrument(level = "trace", skip(self))]
248    pub(crate) async fn chain_info(
249        &self,
250        chain_id: ChainId,
251    ) -> Result<Box<ChainInfo>, LocalNodeError> {
252        let query = ChainInfoQuery::new(chain_id);
253        Ok(self.handle_chain_info_query(query).await?.info)
254    }
255
256    #[instrument(level = "trace", skip(self, query))]
257    pub async fn query_application(
258        &self,
259        chain_id: ChainId,
260        query: Query,
261        block_hash: Option<CryptoHash>,
262    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
263        let result = self
264            .node
265            .state
266            .query_application(chain_id, query, block_hash)
267            .await?;
268        Ok(result)
269    }
270
271    /// Handles any pending local cross-chain requests.
272    #[instrument(level = "trace", skip(self, notifier))]
273    pub async fn retry_pending_cross_chain_requests(
274        &self,
275        sender_chain: ChainId,
276        notifier: &impl Notifier,
277    ) -> Result<(), LocalNodeError> {
278        let (_response, actions) = self
279            .node
280            .state
281            .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
282            .await?;
283        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
284        while let Some(request) = requests.pop_front() {
285            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
286            notifier.notify(&new_actions.notifications);
287            requests.extend(new_actions.cross_chain_requests);
288        }
289        Ok(())
290    }
291
292    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
293    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
294    /// to `receiver_id` to the outbox yet.
295    pub async fn next_outbox_heights(
296        &self,
297        chain_ids: impl IntoIterator<Item = &ChainId>,
298        receiver_id: ChainId,
299    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
300        let futures =
301            FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
302                let (next_block_height, next_height_to_schedule) = match self
303                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
304                    .await
305                {
306                    Ok(info) => info,
307                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
308                        return Ok((*chain_id, BlockHeight::ZERO))
309                    }
310                    Err(err) => Err(err)?,
311                };
312                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
313                    next_block_height.max(scheduled_height)
314                } else {
315                    next_block_height
316                };
317                Ok::<_, LocalNodeError>((*chain_id, next_height))
318            }));
319        futures.try_collect().await
320    }
321
322    pub async fn update_received_certificate_trackers(
323        &self,
324        chain_id: ChainId,
325        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
326    ) -> Result<(), LocalNodeError> {
327        self.node
328            .state
329            .update_received_certificate_trackers(chain_id, new_trackers)
330            .await?;
331        Ok(())
332    }
333
334    pub async fn get_preprocessed_block_hashes(
335        &self,
336        chain_id: ChainId,
337        start: BlockHeight,
338        end: BlockHeight,
339    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
340        Ok(self
341            .node
342            .state
343            .get_preprocessed_block_hashes(chain_id, start, end)
344            .await?)
345    }
346
347    pub async fn get_inbox_next_height(
348        &self,
349        chain_id: ChainId,
350        origin: ChainId,
351    ) -> Result<BlockHeight, LocalNodeError> {
352        Ok(self
353            .node
354            .state
355            .get_inbox_next_height(chain_id, origin)
356            .await?)
357    }
358
359    /// Gets block hashes for the given heights.
360    pub async fn get_block_hashes(
361        &self,
362        chain_id: ChainId,
363        heights: Vec<BlockHeight>,
364    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
365        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
366    }
367
368    /// Gets proposed blobs from the manager for specified blob IDs.
369    pub async fn get_proposed_blobs(
370        &self,
371        chain_id: ChainId,
372        blob_ids: Vec<BlobId>,
373    ) -> Result<Vec<Blob>, LocalNodeError> {
374        Ok(self
375            .node
376            .state
377            .get_proposed_blobs(chain_id, blob_ids)
378            .await?)
379    }
380
381    /// Gets event subscriptions from the chain.
382    pub async fn get_event_subscriptions(
383        &self,
384        chain_id: ChainId,
385    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
386        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
387    }
388
389    /// Gets the next expected event index for a stream.
390    pub async fn get_next_expected_event(
391        &self,
392        chain_id: ChainId,
393        stream_id: StreamId,
394    ) -> Result<Option<u32>, LocalNodeError> {
395        Ok(self
396            .node
397            .state
398            .get_next_expected_event(chain_id, stream_id)
399            .await?)
400    }
401
402    /// Gets received certificate trackers.
403    pub async fn get_received_certificate_trackers(
404        &self,
405        chain_id: ChainId,
406    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
407        Ok(self
408            .node
409            .state
410            .get_received_certificate_trackers(chain_id)
411            .await?)
412    }
413
414    /// Gets tip state and outbox info for next_outbox_heights calculation.
415    pub async fn get_tip_state_and_outbox_info(
416        &self,
417        chain_id: ChainId,
418        receiver_id: ChainId,
419    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
420        Ok(self
421            .node
422            .state
423            .get_tip_state_and_outbox_info(chain_id, receiver_id)
424            .await?)
425    }
426
427    /// Gets the next height to preprocess.
428    pub async fn get_next_height_to_preprocess(
429        &self,
430        chain_id: ChainId,
431    ) -> Result<BlockHeight, LocalNodeError> {
432        Ok(self
433            .node
434            .state
435            .get_next_height_to_preprocess(chain_id)
436            .await?)
437    }
438}
439
440/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
441/// contain the requested information.
442pub trait LocalChainInfoExt {
443    /// Returns the requested map of committees.
444    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
445
446    /// Returns the current committee.
447    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
448
449    /// Returns a reference to the current committee.
450    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
451}
452
453impl LocalChainInfoExt for ChainInfo {
454    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
455        self.requested_committees
456            .ok_or(LocalNodeError::InvalidChainInfoResponse)
457    }
458
459    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
460        self.requested_committees
461            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
462            .remove(&self.epoch)
463            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
464    }
465
466    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
467        self.requested_committees
468            .as_ref()
469            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
470            .get(&self.epoch)
471            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
472    }
473}