linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{future::Either, stream, StreamExt as _, TryStreamExt as _};
11use linera_base::{
12    crypto::ValidatorPublicKey,
13    data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{ApplicationId, BlobId, ChainId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, ProposedBlock},
18    types::{Block, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate},
19    ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74    fn from(error: WorkerError) -> Self {
75        match error {
76            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77            error => LocalNodeError::WorkerError(error),
78        }
79    }
80}
81
82impl<S> LocalNodeClient<S>
83where
84    S: Storage + Clone + Send + Sync + 'static,
85{
86    #[instrument(level = "trace", skip_all)]
87    pub async fn handle_block_proposal(
88        &self,
89        proposal: BlockProposal,
90    ) -> Result<ChainInfoResponse, LocalNodeError> {
91        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
92        let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
93        Ok(response)
94    }
95
96    #[instrument(level = "trace", skip_all)]
97    pub async fn handle_lite_certificate(
98        &self,
99        certificate: LiteCertificate<'_>,
100        notifier: &impl Notifier,
101    ) -> Result<ChainInfoResponse, LocalNodeError> {
102        match self.node.state.full_certificate(certificate).await? {
103            Either::Left(confirmed) => Ok(self.handle_certificate(confirmed, notifier).await?),
104            Either::Right(validated) => Ok(self.handle_certificate(validated, notifier).await?),
105        }
106    }
107
108    #[instrument(level = "trace", skip_all)]
109    pub async fn handle_certificate<T>(
110        &self,
111        certificate: GenericCertificate<T>,
112        notifier: &impl Notifier,
113    ) -> Result<ChainInfoResponse, LocalNodeError>
114    where
115        T: ProcessableCertificate,
116    {
117        Ok(Box::pin(
118            self.node
119                .state
120                .fully_handle_certificate_with_notifications(certificate, notifier),
121        )
122        .await?)
123    }
124
125    /// Preprocesses a block without executing it.
126    #[instrument(level = "trace", skip_all)]
127    pub async fn preprocess_certificate(
128        &self,
129        certificate: ConfirmedBlockCertificate,
130        notifier: &impl Notifier,
131    ) -> Result<(), LocalNodeError> {
132        self.node
133            .state
134            .fully_preprocess_certificate_with_notifications(certificate, notifier)
135            .await?;
136        Ok(())
137    }
138
139    #[instrument(level = "trace", skip_all)]
140    pub async fn handle_chain_info_query(
141        &self,
142        query: ChainInfoQuery,
143    ) -> Result<ChainInfoResponse, LocalNodeError> {
144        // In local nodes, we can trust fully_handle_certificate to carry all actions eventually.
145        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
146        Ok(response)
147    }
148
149    #[instrument(level = "trace", skip_all)]
150    pub fn new(state: WorkerState<S>) -> Self {
151        Self {
152            node: Arc::new(LocalNode { state }),
153        }
154    }
155
156    #[instrument(level = "trace", skip_all)]
157    pub(crate) fn storage_client(&self) -> S {
158        self.node.state.storage_client().clone()
159    }
160
161    #[instrument(level = "trace", skip_all)]
162    pub async fn stage_block_execution(
163        &self,
164        block: ProposedBlock,
165        round: Option<u32>,
166        published_blobs: Vec<Blob>,
167    ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
168        Ok(self
169            .node
170            .state
171            .stage_block_execution(block, round, published_blobs)
172            .await?)
173    }
174
175    /// Reads blobs from storage.
176    pub async fn read_blobs_from_storage(
177        &self,
178        blob_ids: &[BlobId],
179    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
180        let storage = self.storage_client();
181        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
182    }
183
184    /// Reads blob states from storage
185    pub async fn read_blob_states_from_storage(
186        &self,
187        blob_ids: &[BlobId],
188    ) -> Result<Vec<BlobState>, LocalNodeError> {
189        let storage = self.storage_client();
190        let mut blobs_not_found = Vec::new();
191        let mut blob_states = Vec::new();
192        for (blob_state, blob_id) in storage
193            .read_blob_states(blob_ids)
194            .await?
195            .into_iter()
196            .zip(blob_ids)
197        {
198            match blob_state {
199                None => blobs_not_found.push(*blob_id),
200                Some(blob_state) => blob_states.push(blob_state),
201            }
202        }
203        if !blobs_not_found.is_empty() {
204            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
205        }
206        Ok(blob_states)
207    }
208
209    /// Looks for the specified blobs in the local chain manager's locking blobs.
210    /// Returns `Ok(None)` if any of the blobs is not found.
211    pub async fn get_locking_blobs(
212        &self,
213        blob_ids: impl IntoIterator<Item = &BlobId>,
214        chain_id: ChainId,
215    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
216        let chain = self.chain_state_view(chain_id).await?;
217        let mut blobs = Vec::new();
218        for blob_id in blob_ids {
219            match chain.manager.locking_blobs.get(blob_id).await? {
220                None => return Ok(None),
221                Some(blob) => blobs.push(blob),
222            }
223        }
224        Ok(Some(blobs))
225    }
226
227    /// Writes the given blobs to storage if there is an appropriate blob state.
228    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
229        let storage = self.storage_client();
230        storage.maybe_write_blobs(blobs).await?;
231        Ok(())
232    }
233
234    pub async fn handle_pending_blobs(
235        &self,
236        chain_id: ChainId,
237        blobs: Vec<Blob>,
238    ) -> Result<(), LocalNodeError> {
239        for blob in blobs {
240            self.node.state.handle_pending_blob(chain_id, blob).await?;
241        }
242        Ok(())
243    }
244
245    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
246    /// [`ChainId`].
247    ///
248    /// The returned view holds a lock on the chain state, which prevents the local node from
249    /// changing the state of that chain.
250    #[instrument(level = "trace", skip(self))]
251    pub async fn chain_state_view(
252        &self,
253        chain_id: ChainId,
254    ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
255        Ok(self.node.state.chain_state_view(chain_id).await?)
256    }
257
258    #[instrument(level = "trace", skip(self))]
259    pub(crate) async fn chain_info(
260        &self,
261        chain_id: ChainId,
262    ) -> Result<Box<ChainInfo>, LocalNodeError> {
263        let query = ChainInfoQuery::new(chain_id);
264        Ok(self.handle_chain_info_query(query).await?.info)
265    }
266
267    #[instrument(level = "trace", skip(self, query))]
268    pub async fn query_application(
269        &self,
270        chain_id: ChainId,
271        query: Query,
272    ) -> Result<QueryOutcome, LocalNodeError> {
273        let outcome = self.node.state.query_application(chain_id, query).await?;
274        Ok(outcome)
275    }
276
277    #[instrument(level = "trace", skip(self))]
278    pub async fn describe_application(
279        &self,
280        chain_id: ChainId,
281        application_id: ApplicationId,
282    ) -> Result<ApplicationDescription, LocalNodeError> {
283        let response = self
284            .node
285            .state
286            .describe_application(chain_id, application_id)
287            .await?;
288        Ok(response)
289    }
290
291    /// Handles any pending local cross-chain requests.
292    #[instrument(level = "trace", skip(self))]
293    pub async fn retry_pending_cross_chain_requests(
294        &self,
295        sender_chain: ChainId,
296    ) -> Result<(), LocalNodeError> {
297        let (_response, actions) = self
298            .node
299            .state
300            .handle_chain_info_query(ChainInfoQuery::new(sender_chain))
301            .await?;
302        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
303        while let Some(request) = requests.pop_front() {
304            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
305            requests.extend(new_actions.cross_chain_requests);
306        }
307        Ok(())
308    }
309
310    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
311    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
312    /// to `receiver_id` to the outbox yet.
313    ///
314    /// It makes at most `chain_worker_limit` requests to the local node in parallel.
315    pub async fn next_outbox_heights(
316        &self,
317        chain_ids: impl IntoIterator<Item = &ChainId>,
318        chain_worker_limit: usize,
319        receiver_id: ChainId,
320    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
321        let futures = chain_ids
322            .into_iter()
323            .map(|chain_id| async move {
324                let chain = self.chain_state_view(*chain_id).await?;
325                let mut next_height = chain.tip_state.get().next_block_height;
326                if let Some(outbox) = chain.outboxes.try_load_entry(&receiver_id).await? {
327                    next_height = next_height.max(*outbox.next_height_to_schedule.get());
328                }
329                Ok::<_, LocalNodeError>((*chain_id, next_height))
330            })
331            .collect::<Vec<_>>();
332        stream::iter(futures)
333            .buffer_unordered(chain_worker_limit)
334            .try_collect()
335            .await
336    }
337
338    pub async fn update_received_certificate_trackers(
339        &self,
340        chain_id: ChainId,
341        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
342    ) -> Result<(), LocalNodeError> {
343        self.node
344            .state
345            .update_received_certificate_trackers(chain_id, new_trackers)
346            .await?;
347        Ok(())
348    }
349}
350
351/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
352/// contain the requested information.
353pub trait LocalChainInfoExt {
354    /// Returns the requested map of committees.
355    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
356
357    /// Returns the current committee.
358    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
359
360    /// Returns a reference to the current committee.
361    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
362}
363
364impl LocalChainInfoExt for ChainInfo {
365    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
366        self.requested_committees
367            .ok_or(LocalNodeError::InvalidChainInfoResponse)
368    }
369
370    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
371        self.requested_committees
372            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
373            .remove(&self.epoch)
374            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
375    }
376
377    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
378        self.requested_committees
379            .as_ref()
380            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
381            .get(&self.epoch)
382            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
383    }
384}