1#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12 data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round},
13 identifiers::{BlobId, ChainId, EventId},
14};
15use linera_chain::{
16 data_types::BlockProposal,
17 types::{
18 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19 ValidatedBlock,
20 },
21 ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30 data_types::{ChainInfoQuery, ChainInfoResponse},
31 worker::{Notification, WorkerError},
32};
33
34pub type NotificationStream = BoxStream<'static, Notification>;
36
37#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40 #[default]
41 NonBlocking,
42 Blocking,
43}
44
45#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49 #[cfg(not(web))]
50 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51 #[cfg(web)]
52 type NotificationStream: Stream<Item = Notification> + Unpin;
53
54 async fn handle_block_proposal(
56 &self,
57 proposal: BlockProposal,
58 ) -> Result<ChainInfoResponse, NodeError>;
59
60 async fn handle_lite_certificate(
62 &self,
63 certificate: LiteCertificate<'_>,
64 delivery: CrossChainMessageDelivery,
65 ) -> Result<ChainInfoResponse, NodeError>;
66
67 async fn handle_confirmed_certificate(
69 &self,
70 certificate: GenericCertificate<ConfirmedBlock>,
71 delivery: CrossChainMessageDelivery,
72 ) -> Result<ChainInfoResponse, NodeError>;
73
74 async fn handle_validated_certificate(
76 &self,
77 certificate: GenericCertificate<ValidatedBlock>,
78 ) -> Result<ChainInfoResponse, NodeError>;
79
80 async fn handle_timeout_certificate(
82 &self,
83 certificate: GenericCertificate<Timeout>,
84 ) -> Result<ChainInfoResponse, NodeError>;
85
86 async fn handle_chain_info_query(
88 &self,
89 query: ChainInfoQuery,
90 ) -> Result<ChainInfoResponse, NodeError>;
91
92 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105 fn upload_blobs(
110 &self,
111 blobs: Vec<Blob>,
112 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
113 let tasks: Vec<_> = blobs
114 .into_iter()
115 .map(|blob| self.upload_blob(blob.into()))
116 .collect();
117 futures::future::try_join_all(tasks)
118 }
119
120 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
122
123 async fn download_pending_blob(
125 &self,
126 chain_id: ChainId,
127 blob_id: BlobId,
128 ) -> Result<BlobContent, NodeError>;
129
130 async fn handle_pending_blob(
132 &self,
133 chain_id: ChainId,
134 blob: BlobContent,
135 ) -> Result<ChainInfoResponse, NodeError>;
136
137 async fn download_certificate(
138 &self,
139 hash: CryptoHash,
140 ) -> Result<ConfirmedBlockCertificate, NodeError>;
141
142 async fn download_certificates(
144 &self,
145 hashes: Vec<CryptoHash>,
146 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
147
148 async fn download_certificates_by_heights(
150 &self,
151 chain_id: ChainId,
152 heights: Vec<BlockHeight>,
153 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
154
155 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
157
158 async fn blob_last_used_by_certificate(
160 &self,
161 blob_id: BlobId,
162 ) -> Result<ConfirmedBlockCertificate, NodeError>;
163
164 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
166
167 async fn get_shard_info(
169 &self,
170 chain_id: ChainId,
171 ) -> Result<crate::data_types::ShardInfo, NodeError>;
172}
173
174#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
176pub trait ValidatorNodeProvider: 'static {
177 #[cfg(not(web))]
178 type Node: ValidatorNode + Send + Sync + Clone + 'static;
179 #[cfg(web)]
180 type Node: ValidatorNode + Clone + 'static;
181
182 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
183
184 fn make_nodes(
185 &self,
186 committee: &Committee,
187 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
188 let validator_addresses: Vec<_> = committee
189 .validator_addresses()
190 .map(|(node, name)| (node, name.to_owned()))
191 .collect();
192 self.make_nodes_from_list(validator_addresses)
193 }
194
195 fn make_nodes_from_list<A>(
196 &self,
197 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
198 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
199 where
200 A: AsRef<str>,
201 {
202 Ok(validators
203 .into_iter()
204 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
205 .collect::<Result<Vec<_>, NodeError>>()?
206 .into_iter())
207 }
208}
209
210#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
215pub enum NodeError {
216 #[error("Cryptographic error: {error}")]
217 CryptoError { error: String },
218
219 #[error("Arithmetic error: {error}")]
220 ArithmeticError { error: String },
221
222 #[error("Error while accessing storage: {error}")]
223 ViewError { error: String },
224
225 #[error("Chain error: {error}")]
226 ChainError { error: String },
227
228 #[error("Worker error: {error}")]
229 WorkerError { error: String },
230
231 #[error("The chain {0} is not active in validator")]
233 InactiveChain(ChainId),
234
235 #[error("Round number should be {0:?}")]
236 WrongRound(Round),
237
238 #[error(
239 "Chain is expecting a next block at height {expected_block_height} but the given block \
240 is at height {found_block_height} instead"
241 )]
242 UnexpectedBlockHeight {
243 expected_block_height: BlockHeight,
244 found_block_height: BlockHeight,
245 },
246
247 #[error(
249 "Cannot vote for block proposal of chain {chain_id} because a message \
250 from chain {origin} at height {height} has not been received yet"
251 )]
252 MissingCrossChainUpdate {
253 chain_id: ChainId,
254 origin: ChainId,
255 height: BlockHeight,
256 },
257
258 #[error("Blobs not found: {0:?}")]
259 BlobsNotFound(Vec<BlobId>),
260
261 #[error("Events not found: {0:?}")]
262 EventsNotFound(Vec<EventId>),
263
264 #[error("We don't have the value for the certificate.")]
266 MissingCertificateValue,
267
268 #[error("Response doesn't contain requested certificates: {0:?}")]
269 MissingCertificates(Vec<CryptoHash>),
270
271 #[error("Validator's response failed to include a vote when trying to {0}")]
272 MissingVoteInValidatorResponse(String),
273
274 #[error("The received chain info response is invalid")]
275 InvalidChainInfoResponse,
276 #[error("Unexpected certificate value")]
277 UnexpectedCertificateValue,
278
279 #[error("Cannot deserialize")]
282 InvalidDecoding,
283 #[error("Unexpected message")]
284 UnexpectedMessage,
285 #[error("Grpc error: {error}")]
286 GrpcError { error: String },
287 #[error("Network error while querying service: {error}")]
288 ClientIoError { error: String },
289 #[error("Failed to resolve validator address: {address}")]
290 CannotResolveValidatorAddress { address: String },
291 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
292 SubscriptionError { transport: String },
293 #[error("Failed to subscribe; tonic status: {status:?}")]
294 SubscriptionFailed { status: String },
295
296 #[error("Node failed to provide a 'last used by' certificate for the blob")]
297 InvalidCertificateForBlob(BlobId),
298 #[error("Node returned a BlobsNotFound error with duplicates")]
299 DuplicatesInBlobsNotFound,
300 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
301 UnexpectedEntriesInBlobsNotFound,
302 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
303 UnexpectedCertificates {
304 returned: Vec<CryptoHash>,
305 requested: Vec<CryptoHash>,
306 },
307 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
308 EmptyBlobsNotFound,
309 #[error("Local error handling validator response: {error}")]
310 ResponseHandlingError { error: String },
311
312 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
313 MissingCertificatesByHeights {
314 chain_id: ChainId,
315 heights: Vec<BlockHeight>,
316 },
317
318 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
319 TooManyCertificatesReturned {
320 chain_id: ChainId,
321 remote_node: Box<ValidatorPublicKey>,
322 },
323}
324
325impl From<tonic::Status> for NodeError {
326 fn from(status: tonic::Status) -> Self {
327 Self::GrpcError {
328 error: status.to_string(),
329 }
330 }
331}
332
333impl CrossChainMessageDelivery {
334 pub fn new(wait_for_outgoing_messages: bool) -> Self {
335 if wait_for_outgoing_messages {
336 CrossChainMessageDelivery::Blocking
337 } else {
338 CrossChainMessageDelivery::NonBlocking
339 }
340 }
341
342 pub fn wait_for_outgoing_messages(self) -> bool {
343 match self {
344 CrossChainMessageDelivery::NonBlocking => false,
345 CrossChainMessageDelivery::Blocking => true,
346 }
347 }
348}
349
350impl From<ViewError> for NodeError {
351 fn from(error: ViewError) -> Self {
352 Self::ViewError {
353 error: error.to_string(),
354 }
355 }
356}
357
358impl From<ArithmeticError> for NodeError {
359 fn from(error: ArithmeticError) -> Self {
360 Self::ArithmeticError {
361 error: error.to_string(),
362 }
363 }
364}
365
366impl From<CryptoError> for NodeError {
367 fn from(error: CryptoError) -> Self {
368 Self::CryptoError {
369 error: error.to_string(),
370 }
371 }
372}
373
374impl From<ChainError> for NodeError {
375 fn from(error: ChainError) -> Self {
376 match error {
377 ChainError::MissingCrossChainUpdate {
378 chain_id,
379 origin,
380 height,
381 } => Self::MissingCrossChainUpdate {
382 chain_id,
383 origin,
384 height,
385 },
386 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
387 ChainError::ExecutionError(execution_error, context) => match *execution_error {
388 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
389 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
390 _ => Self::ChainError {
391 error: ChainError::ExecutionError(execution_error, context).to_string(),
392 },
393 },
394 ChainError::UnexpectedBlockHeight {
395 expected_block_height,
396 found_block_height,
397 } => Self::UnexpectedBlockHeight {
398 expected_block_height,
399 found_block_height,
400 },
401 ChainError::WrongRound(round) => Self::WrongRound(round),
402 error => Self::ChainError {
403 error: error.to_string(),
404 },
405 }
406 }
407}
408
409impl From<WorkerError> for NodeError {
410 fn from(error: WorkerError) -> Self {
411 match error {
412 WorkerError::ChainError(error) => (*error).into(),
413 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
414 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
415 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
416 WorkerError::UnexpectedBlockHeight {
417 expected_block_height,
418 found_block_height,
419 } => NodeError::UnexpectedBlockHeight {
420 expected_block_height,
421 found_block_height,
422 },
423 error => Self::WorkerError {
424 error: error.to_string(),
425 },
426 }
427 }
428}