1use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16 },
17 identifiers::{BlobId, ChainId, EventId},
18 task::{MaybeSend, MaybeSync},
19};
20use linera_chain::{
21 data_types::BlockProposal,
22 types::{
23 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
24 ValidatedBlock,
25 },
26 ChainError,
27};
28use linera_execution::{committee::Committee, ExecutionError};
29use linera_version::VersionInfo;
30use linera_views::ViewError;
31use serde::{Deserialize, Serialize};
32use thiserror::Error;
33
34use crate::{
35 data_types::{ChainInfoQuery, ChainInfoResponse},
36 worker::{Notification, WorkerError},
37};
38
39pub type NotificationStream = BoxStream<'static, Notification>;
41
42pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
44
45#[derive(Debug, Default, Clone, Copy)]
47pub enum CrossChainMessageDelivery {
48 #[default]
49 NonBlocking,
50 Blocking,
51}
52
53#[allow(async_fn_in_trait)]
55#[cfg_attr(not(web), trait_variant::make(Send))]
56pub trait ValidatorNode {
57 type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
58
59 fn address(&self) -> String;
60
61 async fn handle_block_proposal(
63 &self,
64 proposal: BlockProposal,
65 ) -> Result<ChainInfoResponse, NodeError>;
66
67 async fn handle_lite_certificate(
69 &self,
70 certificate: LiteCertificate<'_>,
71 delivery: CrossChainMessageDelivery,
72 ) -> Result<ChainInfoResponse, NodeError>;
73
74 async fn handle_confirmed_certificate(
76 &self,
77 certificate: Arc<GenericCertificate<ConfirmedBlock>>,
78 delivery: CrossChainMessageDelivery,
79 ) -> Result<ChainInfoResponse, NodeError>;
80
81 async fn handle_validated_certificate(
83 &self,
84 certificate: GenericCertificate<ValidatedBlock>,
85 ) -> Result<ChainInfoResponse, NodeError>;
86
87 async fn handle_timeout_certificate(
89 &self,
90 certificate: GenericCertificate<Timeout>,
91 ) -> Result<ChainInfoResponse, NodeError>;
92
93 async fn handle_chain_info_query(
95 &self,
96 query: ChainInfoQuery,
97 ) -> Result<ChainInfoResponse, NodeError>;
98
99 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
101
102 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
104
105 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
107
108 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
111
112 fn upload_blobs(
117 &self,
118 blobs: Vec<Arc<Blob>>,
119 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
120 let tasks: Vec<_> = blobs
121 .into_iter()
122 .map(|blob| self.upload_blob(blob.into()))
123 .collect();
124 futures::future::try_join_all(tasks)
125 }
126
127 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
129
130 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
134
135 async fn download_pending_blob(
137 &self,
138 chain_id: ChainId,
139 blob_id: BlobId,
140 ) -> Result<BlobContent, NodeError>;
141
142 async fn handle_pending_blob(
144 &self,
145 chain_id: ChainId,
146 blob: BlobContent,
147 ) -> Result<ChainInfoResponse, NodeError>;
148
149 async fn download_certificate(
150 &self,
151 hash: CryptoHash,
152 ) -> Result<ConfirmedBlockCertificate, NodeError>;
153
154 async fn download_certificates(
156 &self,
157 hashes: Vec<CryptoHash>,
158 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
159
160 async fn download_certificates_by_heights(
166 &self,
167 chain_id: ChainId,
168 heights: Vec<BlockHeight>,
169 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
170
171 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
173
174 async fn blob_last_used_by_certificate(
176 &self,
177 blob_id: BlobId,
178 ) -> Result<ConfirmedBlockCertificate, NodeError>;
179
180 async fn event_block_heights(
183 &self,
184 event_ids: Vec<EventId>,
185 ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
186
187 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
189
190 async fn get_shard_info(
192 &self,
193 chain_id: ChainId,
194 ) -> Result<crate::data_types::ShardInfo, NodeError>;
195}
196
197#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
199pub trait ValidatorNodeProvider: 'static {
200 type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
201
202 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
203
204 fn make_nodes(
205 &self,
206 committee: &Committee,
207 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
208 let validator_addresses: Vec<_> = committee
209 .validator_addresses()
210 .map(|(node, name)| (node, name.to_owned()))
211 .collect();
212 self.make_nodes_from_list(validator_addresses)
213 }
214
215 fn make_nodes_from_list<A>(
216 &self,
217 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
218 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
219 where
220 A: AsRef<str>,
221 {
222 Ok(validators
223 .into_iter()
224 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
225 .collect::<Result<Vec<_>, NodeError>>()?
226 .into_iter())
227 }
228}
229
230#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
235pub enum NodeError {
236 #[error("Cryptographic error: {error}")]
237 CryptoError { error: String },
238
239 #[error("Arithmetic error: {error}")]
240 ArithmeticError { error: String },
241
242 #[error("Error while accessing storage: {error}")]
243 ViewError { error: String },
244
245 #[error("Chain error: {error}")]
246 ChainError { error: String },
247
248 #[error("Worker error: {error}")]
249 WorkerError { error: String },
250
251 #[error("The chain {0} is not active in validator")]
253 InactiveChain(ChainId),
254
255 #[error("Round number should be {0:?}")]
256 WrongRound(Round),
257
258 #[error(
259 "Chain is expecting a next block at height {expected_block_height} but the given block \
260 is at height {found_block_height} instead"
261 )]
262 UnexpectedBlockHeight {
263 expected_block_height: BlockHeight,
264 found_block_height: BlockHeight,
265 },
266
267 #[error(
269 "Cannot vote for block proposal of chain {chain_id} because a message \
270 from chain {origin} at height {height} has not been received yet"
271 )]
272 MissingCrossChainUpdate {
273 chain_id: ChainId,
274 origin: ChainId,
275 height: BlockHeight,
276 },
277
278 #[error("Blobs not found: {0:?}")]
279 BlobsNotFound(Vec<BlobId>),
280
281 #[error("Events not found: {0:?}")]
282 EventsNotFound(Vec<EventId>),
283
284 #[error("We don't have the value for the certificate.")]
286 MissingCertificateValue,
287
288 #[error("Response doesn't contain requested certificates: {0:?}")]
289 MissingCertificates(Vec<CryptoHash>),
290
291 #[error("Validator's response failed to include a vote when trying to {0}")]
292 MissingVoteInValidatorResponse(String),
293
294 #[error("The received chain info response is invalid")]
295 InvalidChainInfoResponse,
296 #[error("Unexpected certificate value")]
297 UnexpectedCertificateValue,
298
299 #[error("Cannot deserialize")]
302 InvalidDecoding,
303 #[error("Unexpected message")]
304 UnexpectedMessage,
305 #[error("Grpc error: {error}")]
306 GrpcError { error: String },
307 #[error("Network error while querying service: {error}")]
308 ClientIoError { error: String },
309 #[error("Failed to resolve validator address: {address}")]
310 CannotResolveValidatorAddress { address: String },
311 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
312 SubscriptionError { transport: String },
313 #[error("Failed to subscribe; tonic status: {status:?}")]
314 SubscriptionFailed { status: String },
315
316 #[error("Node failed to provide a 'last used by' certificate for the blob")]
317 InvalidCertificateForBlob(BlobId),
318 #[error("Node returned a BlobsNotFound error with duplicates")]
319 DuplicatesInBlobsNotFound,
320 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
321 UnexpectedEntriesInBlobsNotFound,
322 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
323 UnexpectedCertificates {
324 returned: Vec<CryptoHash>,
325 requested: Vec<CryptoHash>,
326 },
327 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
328 EmptyBlobsNotFound,
329 #[error("Local error handling validator response: {error}")]
330 ResponseHandlingError { error: String },
331
332 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
333 MissingCertificatesByHeights {
334 chain_id: ChainId,
335 heights: Vec<BlockHeight>,
336 },
337
338 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
339 TooManyCertificatesReturned {
340 chain_id: ChainId,
341 remote_node: Box<ValidatorPublicKey>,
342 },
343
344 #[error(
345 "Block timestamp ({block_timestamp}) is further in the future from local time \
346 ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
347 )]
348 InvalidTimestamp {
349 block_timestamp: Timestamp,
350 local_time: Timestamp,
351 block_time_grace_period_ms: u64,
352 },
353
354 #[error("No validators available to handle the request")]
355 NoValidators,
356}
357
358impl NodeError {
359 pub fn is_expected(&self) -> bool {
366 match self {
367 NodeError::BlobsNotFound(_)
370 | NodeError::EventsNotFound(_)
371 | NodeError::MissingCrossChainUpdate { .. }
372 | NodeError::WrongRound(_)
373 | NodeError::UnexpectedBlockHeight { .. }
374 | NodeError::InactiveChain(_)
375 | NodeError::InvalidTimestamp { .. }
376 | NodeError::MissingCertificateValue => true,
377
378 NodeError::CryptoError { .. }
380 | NodeError::ArithmeticError { .. }
381 | NodeError::ViewError { .. }
382 | NodeError::ChainError { .. }
383 | NodeError::WorkerError { .. }
384 | NodeError::MissingCertificates(_)
385 | NodeError::MissingVoteInValidatorResponse(_)
386 | NodeError::InvalidChainInfoResponse
387 | NodeError::UnexpectedCertificateValue
388 | NodeError::InvalidDecoding
389 | NodeError::UnexpectedMessage
390 | NodeError::GrpcError { .. }
391 | NodeError::ClientIoError { .. }
392 | NodeError::CannotResolveValidatorAddress { .. }
393 | NodeError::SubscriptionError { .. }
394 | NodeError::SubscriptionFailed { .. }
395 | NodeError::InvalidCertificateForBlob(_)
396 | NodeError::DuplicatesInBlobsNotFound
397 | NodeError::UnexpectedEntriesInBlobsNotFound
398 | NodeError::UnexpectedCertificates { .. }
399 | NodeError::EmptyBlobsNotFound
400 | NodeError::ResponseHandlingError { .. }
401 | NodeError::MissingCertificatesByHeights { .. }
402 | NodeError::TooManyCertificatesReturned { .. }
403 | NodeError::NoValidators => false,
404 }
405 }
406}
407
408impl From<tonic::Status> for NodeError {
409 fn from(status: tonic::Status) -> Self {
410 Self::GrpcError {
411 error: status.to_string(),
412 }
413 }
414}
415
416impl CrossChainMessageDelivery {
417 pub fn new(wait_for_outgoing_messages: bool) -> Self {
418 if wait_for_outgoing_messages {
419 CrossChainMessageDelivery::Blocking
420 } else {
421 CrossChainMessageDelivery::NonBlocking
422 }
423 }
424
425 pub fn wait_for_outgoing_messages(self) -> bool {
426 match self {
427 CrossChainMessageDelivery::NonBlocking => false,
428 CrossChainMessageDelivery::Blocking => true,
429 }
430 }
431}
432
433impl From<ViewError> for NodeError {
434 fn from(error: ViewError) -> Self {
435 Self::ViewError {
436 error: error.to_string(),
437 }
438 }
439}
440
441impl From<ArithmeticError> for NodeError {
442 fn from(error: ArithmeticError) -> Self {
443 Self::ArithmeticError {
444 error: error.to_string(),
445 }
446 }
447}
448
449impl From<CryptoError> for NodeError {
450 fn from(error: CryptoError) -> Self {
451 Self::CryptoError {
452 error: error.to_string(),
453 }
454 }
455}
456
457impl From<ChainError> for NodeError {
458 fn from(error: ChainError) -> Self {
459 match error {
460 ChainError::MissingCrossChainUpdate {
461 chain_id,
462 origin,
463 height,
464 } => Self::MissingCrossChainUpdate {
465 chain_id,
466 origin,
467 height,
468 },
469 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
470 ChainError::ExecutionError(execution_error, context) => match *execution_error {
471 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
472 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
473 _ => Self::ChainError {
474 error: ChainError::ExecutionError(execution_error, context).to_string(),
475 },
476 },
477 ChainError::UnexpectedBlockHeight {
478 expected_block_height,
479 found_block_height,
480 } => Self::UnexpectedBlockHeight {
481 expected_block_height,
482 found_block_height,
483 },
484 ChainError::WrongRound(round) => Self::WrongRound(round),
485 error => Self::ChainError {
486 error: error.to_string(),
487 },
488 }
489 }
490}
491
492impl From<WorkerError> for NodeError {
493 fn from(error: WorkerError) -> Self {
494 match error {
495 WorkerError::ChainError(error) => (*error).into(),
496 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
497 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
498 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
499 WorkerError::UnexpectedBlockHeight {
500 expected_block_height,
501 found_block_height,
502 } => NodeError::UnexpectedBlockHeight {
503 expected_block_height,
504 found_block_height,
505 },
506 WorkerError::InvalidTimestamp {
507 block_timestamp,
508 local_time,
509 block_time_grace_period,
510 } => NodeError::InvalidTimestamp {
511 block_timestamp,
512 local_time,
513 block_time_grace_period_ms: block_time_grace_period.as_millis() as u64,
514 },
515 error => Self::WorkerError {
516 error: error.to_string(),
517 },
518 }
519 }
520}