linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{ArithmeticError, BlobContent, BlockHeight, NetworkDescription},
13    identifiers::{BlobId, ChainId},
14};
15use linera_chain::{
16    data_types::BlockProposal,
17    types::{
18        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19        ValidatedBlock,
20    },
21    ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30    data_types::{ChainInfoQuery, ChainInfoResponse},
31    worker::{Notification, WorkerError},
32};
33
34/// A pinned [`Stream`] of Notifications.
35pub type NotificationStream = BoxStream<'static, Notification>;
36
37/// Whether to wait for the delivery of outgoing cross-chain messages.
38#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40    #[default]
41    NonBlocking,
42    Blocking,
43}
44
45/// How to communicate with a validator node.
46#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49    #[cfg(not(web))]
50    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51    #[cfg(web)]
52    type NotificationStream: Stream<Item = Notification> + Unpin;
53
54    /// Proposes a new block.
55    async fn handle_block_proposal(
56        &self,
57        proposal: BlockProposal,
58    ) -> Result<ChainInfoResponse, NodeError>;
59
60    /// Processes a certificate without a value.
61    async fn handle_lite_certificate(
62        &self,
63        certificate: LiteCertificate<'_>,
64        delivery: CrossChainMessageDelivery,
65    ) -> Result<ChainInfoResponse, NodeError>;
66
67    /// Processes a confirmed certificate.
68    async fn handle_confirmed_certificate(
69        &self,
70        certificate: GenericCertificate<ConfirmedBlock>,
71        delivery: CrossChainMessageDelivery,
72    ) -> Result<ChainInfoResponse, NodeError>;
73
74    /// Processes a validated certificate.
75    async fn handle_validated_certificate(
76        &self,
77        certificate: GenericCertificate<ValidatedBlock>,
78    ) -> Result<ChainInfoResponse, NodeError>;
79
80    /// Processes a timeout certificate.
81    async fn handle_timeout_certificate(
82        &self,
83        certificate: GenericCertificate<Timeout>,
84    ) -> Result<ChainInfoResponse, NodeError>;
85
86    /// Handles information queries for this chain.
87    async fn handle_chain_info_query(
88        &self,
89        query: ChainInfoQuery,
90    ) -> Result<ChainInfoResponse, NodeError>;
91
92    /// Gets the version info for this validator node.
93    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95    /// Gets the network's description.
96    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98    /// Subscribes to receiving notifications for a collection of chains.
99    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101    // Uploads a blob. Returns an error if the validator has not seen a
102    // certificate using this blob.
103    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105    /// Downloads a blob. Returns an error if the validator does not have the blob.
106    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
107
108    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
109    async fn download_pending_blob(
110        &self,
111        chain_id: ChainId,
112        blob_id: BlobId,
113    ) -> Result<BlobContent, NodeError>;
114
115    /// Handles a blob that belongs to a pending proposal or validated block certificate.
116    async fn handle_pending_blob(
117        &self,
118        chain_id: ChainId,
119        blob: BlobContent,
120    ) -> Result<ChainInfoResponse, NodeError>;
121
122    async fn download_certificate(
123        &self,
124        hash: CryptoHash,
125    ) -> Result<ConfirmedBlockCertificate, NodeError>;
126
127    /// Requests a batch of certificates from the validator.
128    async fn download_certificates(
129        &self,
130        hashes: Vec<CryptoHash>,
131    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
132
133    /// Returns the hash of the `Certificate` that last used a blob.
134    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
135
136    /// Returns the missing `Blob`s by their IDs.
137    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
138}
139
140/// Turn an address into a validator node.
141#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
142pub trait ValidatorNodeProvider: 'static {
143    #[cfg(not(web))]
144    type Node: ValidatorNode + Send + Sync + Clone + 'static;
145    #[cfg(web)]
146    type Node: ValidatorNode + Clone + 'static;
147
148    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
149
150    fn make_nodes(
151        &self,
152        committee: &Committee,
153    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
154        let validator_addresses: Vec<_> = committee
155            .validator_addresses()
156            .map(|(node, name)| (node, name.to_owned()))
157            .collect();
158        self.make_nodes_from_list(validator_addresses)
159    }
160
161    fn make_nodes_from_list<A>(
162        &self,
163        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
164    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
165    where
166        A: AsRef<str>,
167    {
168        Ok(validators
169            .into_iter()
170            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
171            .collect::<Result<Vec<_>, NodeError>>()?
172            .into_iter())
173    }
174}
175
176/// Error type for node queries.
177///
178/// This error is meant to be serialized over the network and aggregated by clients (i.e.
179/// clients will track validator votes on each error value).
180#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
181pub enum NodeError {
182    #[error("Cryptographic error: {error}")]
183    CryptoError { error: String },
184
185    #[error("Arithmetic error: {error}")]
186    ArithmeticError { error: String },
187
188    #[error("Error while accessing storage: {error}")]
189    ViewError { error: String },
190
191    #[error("Chain error: {error}")]
192    ChainError { error: String },
193
194    #[error("Worker error: {error}")]
195    WorkerError { error: String },
196
197    // This error must be normalized during conversions.
198    #[error("The chain {0} is not active in validator")]
199    InactiveChain(ChainId),
200
201    // This error must be normalized during conversions.
202    #[error(
203        "Cannot vote for block proposal of chain {chain_id:?} because a message \
204         from chain {origin:?} at height {height:?} has not been received yet"
205    )]
206    MissingCrossChainUpdate {
207        chain_id: ChainId,
208        origin: ChainId,
209        height: BlockHeight,
210    },
211
212    #[error("Blobs not found: {0:?}")]
213    BlobsNotFound(Vec<BlobId>),
214
215    // This error must be normalized during conversions.
216    #[error("We don't have the value for the certificate.")]
217    MissingCertificateValue,
218
219    #[error("Response doesn't contain requested certificates: {0:?}")]
220    MissingCertificates(Vec<CryptoHash>),
221
222    #[error("Validator's response to block proposal failed to include a vote")]
223    MissingVoteInValidatorResponse,
224
225    #[error("The received chain info response is invalid")]
226    InvalidChainInfoResponse,
227    #[error("Unexpected certificate value")]
228    UnexpectedCertificateValue,
229
230    // Networking errors.
231    // TODO(#258): These errors should be defined in linera-rpc.
232    #[error("Cannot deserialize")]
233    InvalidDecoding,
234    #[error("Unexpected message")]
235    UnexpectedMessage,
236    #[error("Grpc error: {error}")]
237    GrpcError { error: String },
238    #[error("Network error while querying service: {error}")]
239    ClientIoError { error: String },
240    #[error("Failed to resolve validator address: {address}")]
241    CannotResolveValidatorAddress { address: String },
242    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
243    SubscriptionError { transport: String },
244    #[error("Failed to subscribe; tonic status: {status}")]
245    SubscriptionFailed { status: String },
246
247    #[error("Node failed to provide a 'last used by' certificate for the blob")]
248    InvalidCertificateForBlob(BlobId),
249    #[error("Node returned a BlobsNotFound error with duplicates")]
250    DuplicatesInBlobsNotFound,
251    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
252    UnexpectedEntriesInBlobsNotFound,
253    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
254    EmptyBlobsNotFound,
255    #[error("Local error handling validator response")]
256    ResponseHandlingError { error: String },
257}
258
259impl From<tonic::Status> for NodeError {
260    fn from(status: tonic::Status) -> Self {
261        Self::GrpcError {
262            error: status.to_string(),
263        }
264    }
265}
266
267impl CrossChainMessageDelivery {
268    pub fn new(wait_for_outgoing_messages: bool) -> Self {
269        if wait_for_outgoing_messages {
270            CrossChainMessageDelivery::Blocking
271        } else {
272            CrossChainMessageDelivery::NonBlocking
273        }
274    }
275
276    pub fn wait_for_outgoing_messages(self) -> bool {
277        match self {
278            CrossChainMessageDelivery::NonBlocking => false,
279            CrossChainMessageDelivery::Blocking => true,
280        }
281    }
282}
283
284impl From<ViewError> for NodeError {
285    fn from(error: ViewError) -> Self {
286        Self::ViewError {
287            error: error.to_string(),
288        }
289    }
290}
291
292impl From<ArithmeticError> for NodeError {
293    fn from(error: ArithmeticError) -> Self {
294        Self::ArithmeticError {
295            error: error.to_string(),
296        }
297    }
298}
299
300impl From<CryptoError> for NodeError {
301    fn from(error: CryptoError) -> Self {
302        Self::CryptoError {
303            error: error.to_string(),
304        }
305    }
306}
307
308impl From<ChainError> for NodeError {
309    fn from(error: ChainError) -> Self {
310        match error {
311            ChainError::MissingCrossChainUpdate {
312                chain_id,
313                origin,
314                height,
315            } => Self::MissingCrossChainUpdate {
316                chain_id,
317                origin,
318                height,
319            },
320            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
321            ChainError::ExecutionError(execution_error, context) => {
322                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
323                    Self::BlobsNotFound(blob_ids)
324                } else {
325                    Self::ChainError {
326                        error: ChainError::ExecutionError(execution_error, context).to_string(),
327                    }
328                }
329            }
330            error => Self::ChainError {
331                error: error.to_string(),
332            },
333        }
334    }
335}
336
337impl From<WorkerError> for NodeError {
338    fn from(error: WorkerError) -> Self {
339        match error {
340            WorkerError::ChainError(error) => (*error).into(),
341            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
342            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
343            error => Self::WorkerError {
344                error: error.to_string(),
345            },
346        }
347    }
348}