linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{future::Either, stream, StreamExt as _, TryStreamExt as _};
11use linera_base::{
12    crypto::ValidatorPublicKey,
13    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{ApplicationId, BlobId, ChainId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate},
19    ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("Failed to read blob {blob_id:?} of chain {chain_id:?}")]
64    CannotReadLocalBlob { chain_id: ChainId, blob_id: BlobId },
65
66    #[error("The local node doesn't have an active chain {0}")]
67    InactiveChain(ChainId),
68
69    #[error("The chain info response received from the local node is invalid")]
70    InvalidChainInfoResponse,
71
72    #[error("Blobs not found: {0:?}")]
73    BlobsNotFound(Vec<BlobId>),
74}
75
76impl From<WorkerError> for LocalNodeError {
77    fn from(error: WorkerError) -> Self {
78        match error {
79            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80            error => LocalNodeError::WorkerError(error),
81        }
82    }
83}
84
85impl<S> LocalNodeClient<S>
86where
87    S: Storage + Clone + Send + Sync + 'static,
88{
89    #[instrument(level = "trace", skip_all)]
90    pub async fn handle_block_proposal(
91        &self,
92        proposal: BlockProposal,
93    ) -> Result<ChainInfoResponse, LocalNodeError> {
94        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
95        let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
96        Ok(response)
97    }
98
99    #[instrument(level = "trace", skip_all)]
100    pub async fn handle_lite_certificate(
101        &self,
102        certificate: LiteCertificate<'_>,
103        notifier: &impl Notifier,
104    ) -> Result<ChainInfoResponse, LocalNodeError> {
105        match self.node.state.full_certificate(certificate).await? {
106            Either::Left(confirmed) => Ok(self.handle_certificate(confirmed, notifier).await?),
107            Either::Right(validated) => Ok(self.handle_certificate(validated, notifier).await?),
108        }
109    }
110
111    #[instrument(level = "trace", skip_all)]
112    pub async fn handle_certificate<T>(
113        &self,
114        certificate: GenericCertificate<T>,
115        notifier: &impl Notifier,
116    ) -> Result<ChainInfoResponse, LocalNodeError>
117    where
118        T: ProcessableCertificate,
119    {
120        Ok(Box::pin(
121            self.node
122                .state
123                .fully_handle_certificate_with_notifications(certificate, notifier),
124        )
125        .await?)
126    }
127
128    /// Preprocesses a block without executing it.
129    #[instrument(level = "trace", skip_all)]
130    pub async fn preprocess_certificate(
131        &self,
132        certificate: ConfirmedBlockCertificate,
133        notifier: &impl Notifier,
134    ) -> Result<(), LocalNodeError> {
135        self.node
136            .state
137            .fully_preprocess_certificate_with_notifications(certificate, notifier)
138            .await?;
139        Ok(())
140    }
141
142    #[instrument(level = "trace", skip_all)]
143    pub async fn handle_chain_info_query(
144        &self,
145        query: ChainInfoQuery,
146    ) -> Result<ChainInfoResponse, LocalNodeError> {
147        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
148        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
149        Ok(response)
150    }
151
152    #[instrument(level = "trace", skip_all)]
153    pub fn new(state: WorkerState<S>) -> Self {
154        Self {
155            node: Arc::new(LocalNode { state }),
156        }
157    }
158
159    #[instrument(level = "trace", skip_all)]
160    pub(crate) fn storage_client(&self) -> S {
161        self.node.state.storage_client().clone()
162    }
163
164    #[instrument(level = "trace", skip_all)]
165    pub async fn stage_block_execution(
166        &self,
167        block: ProposedBlock,
168        round: Option<u32>,
169        published_blobs: Vec<Blob>,
170    ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
171        Ok(self
172            .node
173            .state
174            .stage_block_execution(block, round, published_blobs)
175            .await?)
176    }
177
178    /// Reads blobs from storage.
179    pub async fn read_blobs_from_storage(
180        &self,
181        blob_ids: &[BlobId],
182    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
183        let storage = self.storage_client();
184        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
185    }
186
187    /// Reads blob states from storage
188    pub async fn read_blob_states_from_storage(
189        &self,
190        blob_ids: &[BlobId],
191    ) -> Result<Vec<BlobState>, LocalNodeError> {
192        let storage = self.storage_client();
193        let mut blobs_not_found = Vec::new();
194        let mut blob_states = Vec::new();
195        for (blob_state, blob_id) in storage
196            .read_blob_states(blob_ids)
197            .await?
198            .into_iter()
199            .zip(blob_ids)
200        {
201            match blob_state {
202                None => blobs_not_found.push(*blob_id),
203                Some(blob_state) => blob_states.push(blob_state),
204            }
205        }
206        if !blobs_not_found.is_empty() {
207            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
208        }
209        Ok(blob_states)
210    }
211
212    /// Looks for the specified blobs in the local chain manager's locking blobs.
213    /// Returns `Ok(None)` if any of the blobs is not found.
214    pub async fn get_locking_blobs(
215        &self,
216        blob_ids: impl IntoIterator<Item = &BlobId>,
217        chain_id: ChainId,
218    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
219        let chain = self.chain_state_view(chain_id).await?;
220        let mut blobs = Vec::new();
221        for blob_id in blob_ids {
222            match chain.manager.locking_blobs.get(blob_id).await? {
223                None => return Ok(None),
224                Some(blob) => blobs.push(blob),
225            }
226        }
227        Ok(Some(blobs))
228    }
229
230    /// Writes the given blobs to storage if there is an appropriate blob state.
231    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
232        let storage = self.storage_client();
233        storage.maybe_write_blobs(blobs).await?;
234        Ok(())
235    }
236
237    pub async fn handle_pending_blobs(
238        &self,
239        chain_id: ChainId,
240        blobs: Vec<Blob>,
241    ) -> Result<(), LocalNodeError> {
242        for blob in blobs {
243            self.node.state.handle_pending_blob(chain_id, blob).await?;
244        }
245        Ok(())
246    }
247
248    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
249    /// [`ChainId`].
250    ///
251    /// The returned view holds a lock on the chain state, which prevents the local node from
252    /// changing the state of that chain.
253    #[instrument(level = "trace", skip(self))]
254    pub async fn chain_state_view(
255        &self,
256        chain_id: ChainId,
257    ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
258        Ok(self.node.state.chain_state_view(chain_id).await?)
259    }
260
261    #[instrument(level = "trace", skip(self))]
262    pub(crate) async fn chain_info(
263        &self,
264        chain_id: ChainId,
265    ) -> Result<Box<ChainInfo>, LocalNodeError> {
266        let query = ChainInfoQuery::new(chain_id);
267        Ok(self.handle_chain_info_query(query).await?.info)
268    }
269
270    #[instrument(level = "trace", skip(self, query))]
271    pub async fn query_application(
272        &self,
273        chain_id: ChainId,
274        query: Query,
275    ) -> Result<QueryOutcome, LocalNodeError> {
276        let outcome = self.node.state.query_application(chain_id, query).await?;
277        Ok(outcome)
278    }
279
280    #[instrument(level = "trace", skip(self))]
281    pub async fn describe_application(
282        &self,
283        chain_id: ChainId,
284        application_id: ApplicationId,
285    ) -> Result<ApplicationDescription, LocalNodeError> {
286        let response = self
287            .node
288            .state
289            .describe_application(chain_id, application_id)
290            .await?;
291        Ok(response)
292    }
293
294    /// Handles any pending local cross-chain requests.
295    #[instrument(level = "trace", skip(self))]
296    pub async fn retry_pending_cross_chain_requests(
297        &self,
298        sender_chain: ChainId,
299    ) -> Result<(), LocalNodeError> {
300        let (_response, actions) = self
301            .node
302            .state
303            .handle_chain_info_query(ChainInfoQuery::new(sender_chain))
304            .await?;
305        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
306        while let Some(request) = requests.pop_front() {
307            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
308            requests.extend(new_actions.cross_chain_requests);
309        }
310        Ok(())
311    }
312
313    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
314    /// height, i.e. the lowest block height that we have not processed in the local node yet.
315    ///
316    /// It makes at most `chain_worker_limit` requests to the local node in parallel.
317    pub async fn next_block_heights(
318        &self,
319        chain_ids: impl IntoIterator<Item = &ChainId>,
320        chain_worker_limit: usize,
321    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
322        let futures = chain_ids
323            .into_iter()
324            .map(|chain_id| async move {
325                let chain = self.chain_state_view(*chain_id).await?;
326                let mut next_height = chain.tip_state.get().next_block_height;
327                // TODO(#3969): This is not great for performance, but the whole function will
328                // probably go away with #3969 anyway.
329                while chain.preprocessed_blocks.contains_key(&next_height).await? {
330                    next_height.try_add_assign_one()?;
331                }
332                Ok::<_, LocalNodeError>((*chain_id, next_height))
333            })
334            .collect::<Vec<_>>();
335        stream::iter(futures)
336            .buffer_unordered(chain_worker_limit)
337            .try_collect()
338            .await
339    }
340
341    pub async fn update_received_certificate_trackers(
342        &self,
343        chain_id: ChainId,
344        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
345    ) -> Result<(), LocalNodeError> {
346        self.node
347            .state
348            .update_received_certificate_trackers(chain_id, new_trackers)
349            .await?;
350        Ok(())
351    }
352}
353
354/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
355/// contain the requested information.
356pub trait LocalChainInfoExt {
357    /// Returns the requested map of committees.
358    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
359
360    /// Returns the current committee.
361    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
362
363    /// Returns a reference to the current committee.
364    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
365}
366
367impl LocalChainInfoExt for ChainInfo {
368    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
369        self.requested_committees
370            .ok_or(LocalNodeError::InvalidChainInfoResponse)
371    }
372
373    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
374        self.requested_committees
375            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
376            .remove(&self.epoch)
377            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
378    }
379
380    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
381        self.requested_committees
382            .as_ref()
383            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
384            .get(&self.epoch)
385            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
386    }
387}