linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{BlobId, ChainId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, ProposedBlock},
18    types::{Block, GenericCertificate},
19    ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74    fn from(error: WorkerError) -> Self {
75        match error {
76            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77            error => LocalNodeError::WorkerError(error),
78        }
79    }
80}
81
82impl<S> LocalNodeClient<S>
83where
84    S: Storage + Clone + 'static,
85{
86    #[instrument(level = "trace", skip_all)]
87    pub async fn handle_block_proposal(
88        &self,
89        proposal: BlockProposal,
90    ) -> Result<ChainInfoResponse, LocalNodeError> {
91        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
92        let (response, _actions) =
93            Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
94        Ok(response)
95    }
96
97    #[instrument(level = "trace", skip_all)]
98    pub async fn handle_certificate<T>(
99        &self,
100        certificate: GenericCertificate<T>,
101        notifier: &impl Notifier,
102    ) -> Result<ChainInfoResponse, LocalNodeError>
103    where
104        T: ProcessableCertificate,
105    {
106        Ok(Box::pin(
107            self.node
108                .state
109                .fully_handle_certificate_with_notifications(certificate, notifier),
110        )
111        .await?)
112    }
113
114    #[instrument(level = "trace", skip_all)]
115    pub async fn handle_chain_info_query(
116        &self,
117        query: ChainInfoQuery,
118    ) -> Result<ChainInfoResponse, LocalNodeError> {
119        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
120        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
121        Ok(response)
122    }
123
124    #[instrument(level = "trace", skip_all)]
125    pub fn new(state: WorkerState<S>) -> Self {
126        Self {
127            node: Arc::new(LocalNode { state }),
128        }
129    }
130
131    #[instrument(level = "trace", skip_all)]
132    pub(crate) fn storage_client(&self) -> S {
133        self.node.state.storage_client().clone()
134    }
135
136    #[instrument(level = "trace", skip_all)]
137    pub async fn stage_block_execution(
138        &self,
139        block: ProposedBlock,
140        round: Option<u32>,
141        published_blobs: Vec<Blob>,
142    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
143        Ok(self
144            .node
145            .state
146            .stage_block_execution(block, round, published_blobs)
147            .await?)
148    }
149
150    /// Reads blobs from storage.
151    pub async fn read_blobs_from_storage(
152        &self,
153        blob_ids: &[BlobId],
154    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
155        let storage = self.storage_client();
156        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
157    }
158
159    /// Reads blob states from storage.
160    pub async fn read_blob_states_from_storage(
161        &self,
162        blob_ids: &[BlobId],
163    ) -> Result<Vec<BlobState>, LocalNodeError> {
164        let storage = self.storage_client();
165        let mut blobs_not_found = Vec::new();
166        let mut blob_states = Vec::new();
167        for (blob_state, blob_id) in storage
168            .read_blob_states(blob_ids)
169            .await?
170            .into_iter()
171            .zip(blob_ids)
172        {
173            match blob_state {
174                None => blobs_not_found.push(*blob_id),
175                Some(blob_state) => blob_states.push(blob_state),
176            }
177        }
178        if !blobs_not_found.is_empty() {
179            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
180        }
181        Ok(blob_states)
182    }
183
184    /// Looks for the specified blobs in the local chain manager's locking blobs.
185    /// Returns `Ok(None)` if any of the blobs is not found.
186    pub async fn get_locking_blobs(
187        &self,
188        blob_ids: impl IntoIterator<Item = &BlobId>,
189        chain_id: ChainId,
190    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
191        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
192        Ok(self
193            .node
194            .state
195            .get_locking_blobs(chain_id, blob_ids_vec)
196            .await?)
197    }
198
199    /// Writes the given blobs to storage if there is an appropriate blob state.
200    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
201        let storage = self.storage_client();
202        storage.maybe_write_blobs(blobs).await?;
203        Ok(())
204    }
205
206    pub async fn handle_pending_blobs(
207        &self,
208        chain_id: ChainId,
209        blobs: Vec<Blob>,
210    ) -> Result<(), LocalNodeError> {
211        for blob in blobs {
212            self.node.state.handle_pending_blob(chain_id, blob).await?;
213        }
214        Ok(())
215    }
216
217    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
218    /// [`ChainId`].
219    ///
220    /// The returned view holds a lock on the chain state, which prevents the local node from
221    /// changing the state of that chain.
222    #[instrument(level = "trace", skip(self))]
223    pub async fn chain_state_view(
224        &self,
225        chain_id: ChainId,
226    ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
227        Ok(self.node.state.chain_state_view(chain_id).await?)
228    }
229
230    #[instrument(level = "trace", skip(self))]
231    pub(crate) async fn chain_info(
232        &self,
233        chain_id: ChainId,
234    ) -> Result<Box<ChainInfo>, LocalNodeError> {
235        let query = ChainInfoQuery::new(chain_id);
236        Ok(self.handle_chain_info_query(query).await?.info)
237    }
238
239    #[instrument(level = "trace", skip(self, query))]
240    pub async fn query_application(
241        &self,
242        chain_id: ChainId,
243        query: Query,
244        block_hash: Option<CryptoHash>,
245    ) -> Result<QueryOutcome, LocalNodeError> {
246        let outcome = self
247            .node
248            .state
249            .query_application(chain_id, query, block_hash)
250            .await?;
251        Ok(outcome)
252    }
253
254    /// Handles any pending local cross-chain requests.
255    #[instrument(level = "trace", skip(self))]
256    pub async fn retry_pending_cross_chain_requests(
257        &self,
258        sender_chain: ChainId,
259    ) -> Result<(), LocalNodeError> {
260        let (_response, actions) = self
261            .node
262            .state
263            .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
264            .await?;
265        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
266        while let Some(request) = requests.pop_front() {
267            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
268            requests.extend(new_actions.cross_chain_requests);
269        }
270        Ok(())
271    }
272
273    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
274    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
275    /// to `receiver_id` to the outbox yet.
276    pub async fn next_outbox_heights(
277        &self,
278        chain_ids: impl IntoIterator<Item = &ChainId>,
279        receiver_id: ChainId,
280    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
281        let futures =
282            FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
283                let (next_block_height, next_height_to_schedule) = match self
284                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
285                    .await
286                {
287                    Ok(info) => info,
288                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
289                        return Ok((*chain_id, BlockHeight::ZERO))
290                    }
291                    Err(err) => Err(err)?,
292                };
293                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
294                    next_block_height.max(scheduled_height)
295                } else {
296                    next_block_height
297                };
298                Ok::<_, LocalNodeError>((*chain_id, next_height))
299            }));
300        futures.try_collect().await
301    }
302
303    pub async fn update_received_certificate_trackers(
304        &self,
305        chain_id: ChainId,
306        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
307    ) -> Result<(), LocalNodeError> {
308        self.node
309            .state
310            .update_received_certificate_trackers(chain_id, new_trackers)
311            .await?;
312        Ok(())
313    }
314
315    pub async fn get_preprocessed_block_hashes(
316        &self,
317        chain_id: ChainId,
318        start: BlockHeight,
319        end: BlockHeight,
320    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
321        Ok(self
322            .node
323            .state
324            .get_preprocessed_block_hashes(chain_id, start, end)
325            .await?)
326    }
327
328    pub async fn get_inbox_next_height(
329        &self,
330        chain_id: ChainId,
331        origin: ChainId,
332    ) -> Result<BlockHeight, LocalNodeError> {
333        Ok(self
334            .node
335            .state
336            .get_inbox_next_height(chain_id, origin)
337            .await?)
338    }
339
340    /// Gets block hashes for the given heights.
341    pub async fn get_block_hashes(
342        &self,
343        chain_id: ChainId,
344        heights: Vec<BlockHeight>,
345    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
346        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
347    }
348
349    /// Gets proposed blobs from the manager for specified blob IDs.
350    pub async fn get_proposed_blobs(
351        &self,
352        chain_id: ChainId,
353        blob_ids: Vec<BlobId>,
354    ) -> Result<Vec<Blob>, LocalNodeError> {
355        Ok(self
356            .node
357            .state
358            .get_proposed_blobs(chain_id, blob_ids)
359            .await?)
360    }
361
362    /// Gets event subscriptions from the chain.
363    pub async fn get_event_subscriptions(
364        &self,
365        chain_id: ChainId,
366    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
367        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
368    }
369
370    /// Gets the next expected event index for a stream.
371    pub async fn get_next_expected_event(
372        &self,
373        chain_id: ChainId,
374        stream_id: StreamId,
375    ) -> Result<Option<u32>, LocalNodeError> {
376        Ok(self
377            .node
378            .state
379            .get_next_expected_event(chain_id, stream_id)
380            .await?)
381    }
382
383    /// Gets received certificate trackers.
384    pub async fn get_received_certificate_trackers(
385        &self,
386        chain_id: ChainId,
387    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
388        Ok(self
389            .node
390            .state
391            .get_received_certificate_trackers(chain_id)
392            .await?)
393    }
394
395    /// Gets tip state and outbox info for next_outbox_heights calculation.
396    pub async fn get_tip_state_and_outbox_info(
397        &self,
398        chain_id: ChainId,
399        receiver_id: ChainId,
400    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
401        Ok(self
402            .node
403            .state
404            .get_tip_state_and_outbox_info(chain_id, receiver_id)
405            .await?)
406    }
407
408    /// Gets the next height to preprocess.
409    pub async fn get_next_height_to_preprocess(
410        &self,
411        chain_id: ChainId,
412    ) -> Result<BlockHeight, LocalNodeError> {
413        Ok(self
414            .node
415            .state
416            .get_next_height_to_preprocess(chain_id)
417            .await?)
418    }
419}
420
421/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
422/// contain the requested information.
423pub trait LocalChainInfoExt {
424    /// Returns the requested map of committees.
425    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
426
427    /// Returns the current committee.
428    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
429
430    /// Returns a reference to the current committee.
431    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
432}
433
434impl LocalChainInfoExt for ChainInfo {
435    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
436        self.requested_committees
437            .ok_or(LocalNodeError::InvalidChainInfoResponse)
438    }
439
440    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
441        self.requested_committees
442            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
443            .remove(&self.epoch)
444            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
445    }
446
447    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
448        self.requested_committees
449            .as_ref()
450            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
451            .get(&self.epoch)
452            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
453    }
454}