linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::ValidatorPublicKey,
13    data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{BlobId, ChainId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, ProposedBlock},
18    types::{Block, GenericCertificate},
19    ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74    fn from(error: WorkerError) -> Self {
75        match error {
76            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77            error => LocalNodeError::WorkerError(error),
78        }
79    }
80}
81
82impl<S> LocalNodeClient<S>
83where
84    S: Storage + Clone + Send + Sync + 'static,
85{
86    #[instrument(level = "trace", skip_all)]
87    pub async fn handle_block_proposal(
88        &self,
89        proposal: BlockProposal,
90    ) -> Result<ChainInfoResponse, LocalNodeError> {
91        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
92        let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
93        Ok(response)
94    }
95
96    #[instrument(level = "trace", skip_all)]
97    pub async fn handle_certificate<T>(
98        &self,
99        certificate: GenericCertificate<T>,
100        notifier: &impl Notifier,
101    ) -> Result<ChainInfoResponse, LocalNodeError>
102    where
103        T: ProcessableCertificate,
104    {
105        Ok(Box::pin(
106            self.node
107                .state
108                .fully_handle_certificate_with_notifications(certificate, notifier),
109        )
110        .await?)
111    }
112
113    #[instrument(level = "trace", skip_all)]
114    pub async fn handle_chain_info_query(
115        &self,
116        query: ChainInfoQuery,
117    ) -> Result<ChainInfoResponse, LocalNodeError> {
118        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
119        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
120        Ok(response)
121    }
122
123    #[instrument(level = "trace", skip_all)]
124    pub fn new(state: WorkerState<S>) -> Self {
125        Self {
126            node: Arc::new(LocalNode { state }),
127        }
128    }
129
130    #[instrument(level = "trace", skip_all)]
131    pub(crate) fn storage_client(&self) -> S {
132        self.node.state.storage_client().clone()
133    }
134
135    #[instrument(level = "trace", skip_all)]
136    pub async fn stage_block_execution(
137        &self,
138        block: ProposedBlock,
139        round: Option<u32>,
140        published_blobs: Vec<Blob>,
141    ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
142        Ok(self
143            .node
144            .state
145            .stage_block_execution(block, round, published_blobs)
146            .await?)
147    }
148
149    /// Reads blobs from storage.
150    pub async fn read_blobs_from_storage(
151        &self,
152        blob_ids: &[BlobId],
153    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
154        let storage = self.storage_client();
155        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
156    }
157
158    /// Reads blob states from storage.
159    pub async fn read_blob_states_from_storage(
160        &self,
161        blob_ids: &[BlobId],
162    ) -> Result<Vec<BlobState>, LocalNodeError> {
163        let storage = self.storage_client();
164        let mut blobs_not_found = Vec::new();
165        let mut blob_states = Vec::new();
166        for (blob_state, blob_id) in storage
167            .read_blob_states(blob_ids)
168            .await?
169            .into_iter()
170            .zip(blob_ids)
171        {
172            match blob_state {
173                None => blobs_not_found.push(*blob_id),
174                Some(blob_state) => blob_states.push(blob_state),
175            }
176        }
177        if !blobs_not_found.is_empty() {
178            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
179        }
180        Ok(blob_states)
181    }
182
183    /// Looks for the specified blobs in the local chain manager's locking blobs.
184    /// Returns `Ok(None)` if any of the blobs is not found.
185    pub async fn get_locking_blobs(
186        &self,
187        blob_ids: impl IntoIterator<Item = &BlobId>,
188        chain_id: ChainId,
189    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
190        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
191        Ok(self
192            .node
193            .state
194            .get_locking_blobs(chain_id, blob_ids_vec)
195            .await?)
196    }
197
198    /// Writes the given blobs to storage if there is an appropriate blob state.
199    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
200        let storage = self.storage_client();
201        storage.maybe_write_blobs(blobs).await?;
202        Ok(())
203    }
204
205    pub async fn handle_pending_blobs(
206        &self,
207        chain_id: ChainId,
208        blobs: Vec<Blob>,
209    ) -> Result<(), LocalNodeError> {
210        for blob in blobs {
211            self.node.state.handle_pending_blob(chain_id, blob).await?;
212        }
213        Ok(())
214    }
215
216    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
217    /// [`ChainId`].
218    ///
219    /// The returned view holds a lock on the chain state, which prevents the local node from
220    /// changing the state of that chain.
221    #[instrument(level = "trace", skip(self))]
222    pub async fn chain_state_view(
223        &self,
224        chain_id: ChainId,
225    ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
226        Ok(self.node.state.chain_state_view(chain_id).await?)
227    }
228
229    #[instrument(level = "trace", skip(self))]
230    pub(crate) async fn chain_info(
231        &self,
232        chain_id: ChainId,
233    ) -> Result<Box<ChainInfo>, LocalNodeError> {
234        let query = ChainInfoQuery::new(chain_id);
235        Ok(self.handle_chain_info_query(query).await?.info)
236    }
237
238    #[instrument(level = "trace", skip(self, query))]
239    pub async fn query_application(
240        &self,
241        chain_id: ChainId,
242        query: Query,
243    ) -> Result<QueryOutcome, LocalNodeError> {
244        let outcome = self.node.state.query_application(chain_id, query).await?;
245        Ok(outcome)
246    }
247
248    /// Handles any pending local cross-chain requests.
249    #[instrument(level = "trace", skip(self))]
250    pub async fn retry_pending_cross_chain_requests(
251        &self,
252        sender_chain: ChainId,
253    ) -> Result<(), LocalNodeError> {
254        let (_response, actions) = self
255            .node
256            .state
257            .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
258            .await?;
259        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
260        while let Some(request) = requests.pop_front() {
261            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
262            requests.extend(new_actions.cross_chain_requests);
263        }
264        Ok(())
265    }
266
267    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
268    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
269    /// to `receiver_id` to the outbox yet.
270    pub async fn next_outbox_heights(
271        &self,
272        chain_ids: impl IntoIterator<Item = &ChainId>,
273        receiver_id: ChainId,
274    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
275        let futures =
276            FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
277                let chain = match self.chain_state_view(*chain_id).await {
278                    Ok(chain) => chain,
279                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
280                        return Ok((*chain_id, BlockHeight::ZERO))
281                    }
282                    Err(err) => Err(err)?,
283                };
284                let mut next_height = chain.tip_state.get().next_block_height;
285                if let Some(outbox) = chain.outboxes.try_load_entry(&receiver_id).await? {
286                    next_height = next_height.max(*outbox.next_height_to_schedule.get());
287                }
288                Ok::<_, LocalNodeError>((*chain_id, next_height))
289            }));
290        futures.try_collect().await
291    }
292
293    pub async fn update_received_certificate_trackers(
294        &self,
295        chain_id: ChainId,
296        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
297    ) -> Result<(), LocalNodeError> {
298        self.node
299            .state
300            .update_received_certificate_trackers(chain_id, new_trackers)
301            .await?;
302        Ok(())
303    }
304
305    pub async fn get_preprocessed_block_hashes(
306        &self,
307        chain_id: ChainId,
308        start: BlockHeight,
309        end: BlockHeight,
310    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
311        Ok(self
312            .node
313            .state
314            .get_preprocessed_block_hashes(chain_id, start, end)
315            .await?)
316    }
317
318    pub async fn get_inbox_next_height(
319        &self,
320        chain_id: ChainId,
321        origin: ChainId,
322    ) -> Result<BlockHeight, LocalNodeError> {
323        Ok(self
324            .node
325            .state
326            .get_inbox_next_height(chain_id, origin)
327            .await?)
328    }
329}
330
331/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
332/// contain the requested information.
333pub trait LocalChainInfoExt {
334    /// Returns the requested map of committees.
335    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
336
337    /// Returns the current committee.
338    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
339
340    /// Returns a reference to the current committee.
341    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
342}
343
344impl LocalChainInfoExt for ChainInfo {
345    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
346        self.requested_committees
347            .ok_or(LocalNodeError::InvalidChainInfoResponse)
348    }
349
350    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
351        self.requested_committees
352            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
353            .remove(&self.epoch)
354            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
355    }
356
357    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
358        self.requested_committees
359            .as_ref()
360            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
361            .get(&self.epoch)
362            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
363    }
364}