1use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16 },
17 identifiers::{BlobId, ChainId, EventId},
18};
19use linera_chain::{
20 data_types::BlockProposal,
21 types::{
22 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
23 ValidatedBlock,
24 },
25 ChainError,
26};
27use linera_execution::{committee::Committee, ExecutionError};
28use linera_version::VersionInfo;
29use linera_views::ViewError;
30use serde::{Deserialize, Serialize};
31use thiserror::Error;
32
33use crate::{
34 data_types::{ChainInfoQuery, ChainInfoResponse},
35 worker::{Notification, WorkerError},
36};
37
38pub type NotificationStream = BoxStream<'static, Notification>;
40
41pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
43
44#[derive(Debug, Default, Clone, Copy)]
46pub enum CrossChainMessageDelivery {
47 #[default]
48 NonBlocking,
49 Blocking,
50}
51
52#[allow(async_fn_in_trait)]
54#[cfg_attr(not(web), trait_variant::make(Send))]
55pub trait ValidatorNode {
56 #[cfg(not(web))]
57 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
58 #[cfg(web)]
59 type NotificationStream: Stream<Item = Notification> + Unpin;
60
61 fn address(&self) -> String;
62
63 async fn handle_block_proposal(
65 &self,
66 proposal: BlockProposal,
67 ) -> Result<ChainInfoResponse, NodeError>;
68
69 async fn handle_lite_certificate(
71 &self,
72 certificate: LiteCertificate<'_>,
73 delivery: CrossChainMessageDelivery,
74 ) -> Result<ChainInfoResponse, NodeError>;
75
76 async fn handle_confirmed_certificate(
78 &self,
79 certificate: Arc<GenericCertificate<ConfirmedBlock>>,
80 delivery: CrossChainMessageDelivery,
81 ) -> Result<ChainInfoResponse, NodeError>;
82
83 async fn handle_validated_certificate(
85 &self,
86 certificate: GenericCertificate<ValidatedBlock>,
87 ) -> Result<ChainInfoResponse, NodeError>;
88
89 async fn handle_timeout_certificate(
91 &self,
92 certificate: GenericCertificate<Timeout>,
93 ) -> Result<ChainInfoResponse, NodeError>;
94
95 async fn handle_chain_info_query(
97 &self,
98 query: ChainInfoQuery,
99 ) -> Result<ChainInfoResponse, NodeError>;
100
101 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
103
104 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
106
107 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
109
110 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
113
114 fn upload_blobs(
119 &self,
120 blobs: Vec<Arc<Blob>>,
121 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
122 let tasks: Vec<_> = blobs
123 .into_iter()
124 .map(|blob| self.upload_blob(blob.into()))
125 .collect();
126 futures::future::try_join_all(tasks)
127 }
128
129 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
131
132 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
136
137 async fn download_pending_blob(
139 &self,
140 chain_id: ChainId,
141 blob_id: BlobId,
142 ) -> Result<BlobContent, NodeError>;
143
144 async fn handle_pending_blob(
146 &self,
147 chain_id: ChainId,
148 blob: BlobContent,
149 ) -> Result<ChainInfoResponse, NodeError>;
150
151 async fn download_certificate(
152 &self,
153 hash: CryptoHash,
154 ) -> Result<ConfirmedBlockCertificate, NodeError>;
155
156 async fn download_certificates(
158 &self,
159 hashes: Vec<CryptoHash>,
160 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
161
162 async fn download_certificates_by_heights(
168 &self,
169 chain_id: ChainId,
170 heights: Vec<BlockHeight>,
171 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
172
173 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
175
176 async fn blob_last_used_by_certificate(
178 &self,
179 blob_id: BlobId,
180 ) -> Result<ConfirmedBlockCertificate, NodeError>;
181
182 async fn event_block_heights(
185 &self,
186 event_ids: Vec<EventId>,
187 ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
188
189 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
191
192 async fn get_shard_info(
194 &self,
195 chain_id: ChainId,
196 ) -> Result<crate::data_types::ShardInfo, NodeError>;
197}
198
199#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
201pub trait ValidatorNodeProvider: 'static {
202 #[cfg(not(web))]
203 type Node: ValidatorNode + Send + Sync + Clone + 'static;
204 #[cfg(web)]
205 type Node: ValidatorNode + Clone + 'static;
206
207 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
208
209 fn make_nodes(
210 &self,
211 committee: &Committee,
212 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
213 let validator_addresses: Vec<_> = committee
214 .validator_addresses()
215 .map(|(node, name)| (node, name.to_owned()))
216 .collect();
217 self.make_nodes_from_list(validator_addresses)
218 }
219
220 fn make_nodes_from_list<A>(
221 &self,
222 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
223 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
224 where
225 A: AsRef<str>,
226 {
227 Ok(validators
228 .into_iter()
229 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
230 .collect::<Result<Vec<_>, NodeError>>()?
231 .into_iter())
232 }
233}
234
235#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
240pub enum NodeError {
241 #[error("Cryptographic error: {error}")]
242 CryptoError { error: String },
243
244 #[error("Arithmetic error: {error}")]
245 ArithmeticError { error: String },
246
247 #[error("Error while accessing storage: {error}")]
248 ViewError { error: String },
249
250 #[error("Chain error: {error}")]
251 ChainError { error: String },
252
253 #[error("Worker error: {error}")]
254 WorkerError { error: String },
255
256 #[error("The chain {0} is not active in validator")]
258 InactiveChain(ChainId),
259
260 #[error("Round number should be {0:?}")]
261 WrongRound(Round),
262
263 #[error(
264 "Chain is expecting a next block at height {expected_block_height} but the given block \
265 is at height {found_block_height} instead"
266 )]
267 UnexpectedBlockHeight {
268 expected_block_height: BlockHeight,
269 found_block_height: BlockHeight,
270 },
271
272 #[error(
274 "Cannot vote for block proposal of chain {chain_id} because a message \
275 from chain {origin} at height {height} has not been received yet"
276 )]
277 MissingCrossChainUpdate {
278 chain_id: ChainId,
279 origin: ChainId,
280 height: BlockHeight,
281 },
282
283 #[error("Blobs not found: {0:?}")]
284 BlobsNotFound(Vec<BlobId>),
285
286 #[error("Events not found: {0:?}")]
287 EventsNotFound(Vec<EventId>),
288
289 #[error("We don't have the value for the certificate.")]
291 MissingCertificateValue,
292
293 #[error("Response doesn't contain requested certificates: {0:?}")]
294 MissingCertificates(Vec<CryptoHash>),
295
296 #[error("Validator's response failed to include a vote when trying to {0}")]
297 MissingVoteInValidatorResponse(String),
298
299 #[error("The received chain info response is invalid")]
300 InvalidChainInfoResponse,
301 #[error("Unexpected certificate value")]
302 UnexpectedCertificateValue,
303
304 #[error("Cannot deserialize")]
307 InvalidDecoding,
308 #[error("Unexpected message")]
309 UnexpectedMessage,
310 #[error("Grpc error: {error}")]
311 GrpcError { error: String },
312 #[error("Network error while querying service: {error}")]
313 ClientIoError { error: String },
314 #[error("Failed to resolve validator address: {address}")]
315 CannotResolveValidatorAddress { address: String },
316 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
317 SubscriptionError { transport: String },
318 #[error("Failed to subscribe; tonic status: {status:?}")]
319 SubscriptionFailed { status: String },
320
321 #[error("Node failed to provide a 'last used by' certificate for the blob")]
322 InvalidCertificateForBlob(BlobId),
323 #[error("Node returned a BlobsNotFound error with duplicates")]
324 DuplicatesInBlobsNotFound,
325 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
326 UnexpectedEntriesInBlobsNotFound,
327 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
328 UnexpectedCertificates {
329 returned: Vec<CryptoHash>,
330 requested: Vec<CryptoHash>,
331 },
332 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
333 EmptyBlobsNotFound,
334 #[error("Local error handling validator response: {error}")]
335 ResponseHandlingError { error: String },
336
337 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
338 MissingCertificatesByHeights {
339 chain_id: ChainId,
340 heights: Vec<BlockHeight>,
341 },
342
343 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
344 TooManyCertificatesReturned {
345 chain_id: ChainId,
346 remote_node: Box<ValidatorPublicKey>,
347 },
348
349 #[error(
350 "Block timestamp ({block_timestamp}) is further in the future from local time \
351 ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
352 )]
353 InvalidTimestamp {
354 block_timestamp: Timestamp,
355 local_time: Timestamp,
356 block_time_grace_period_ms: u64,
357 },
358
359 #[error("No validators available to handle the request")]
360 NoValidators,
361}
362
363impl NodeError {
364 pub fn is_expected(&self) -> bool {
371 match self {
372 NodeError::BlobsNotFound(_)
375 | NodeError::EventsNotFound(_)
376 | NodeError::MissingCrossChainUpdate { .. }
377 | NodeError::WrongRound(_)
378 | NodeError::UnexpectedBlockHeight { .. }
379 | NodeError::InactiveChain(_)
380 | NodeError::InvalidTimestamp { .. }
381 | NodeError::MissingCertificateValue => true,
382
383 NodeError::CryptoError { .. }
385 | NodeError::ArithmeticError { .. }
386 | NodeError::ViewError { .. }
387 | NodeError::ChainError { .. }
388 | NodeError::WorkerError { .. }
389 | NodeError::MissingCertificates(_)
390 | NodeError::MissingVoteInValidatorResponse(_)
391 | NodeError::InvalidChainInfoResponse
392 | NodeError::UnexpectedCertificateValue
393 | NodeError::InvalidDecoding
394 | NodeError::UnexpectedMessage
395 | NodeError::GrpcError { .. }
396 | NodeError::ClientIoError { .. }
397 | NodeError::CannotResolveValidatorAddress { .. }
398 | NodeError::SubscriptionError { .. }
399 | NodeError::SubscriptionFailed { .. }
400 | NodeError::InvalidCertificateForBlob(_)
401 | NodeError::DuplicatesInBlobsNotFound
402 | NodeError::UnexpectedEntriesInBlobsNotFound
403 | NodeError::UnexpectedCertificates { .. }
404 | NodeError::EmptyBlobsNotFound
405 | NodeError::ResponseHandlingError { .. }
406 | NodeError::MissingCertificatesByHeights { .. }
407 | NodeError::TooManyCertificatesReturned { .. }
408 | NodeError::NoValidators => false,
409 }
410 }
411}
412
413impl From<tonic::Status> for NodeError {
414 fn from(status: tonic::Status) -> Self {
415 Self::GrpcError {
416 error: status.to_string(),
417 }
418 }
419}
420
421impl CrossChainMessageDelivery {
422 pub fn new(wait_for_outgoing_messages: bool) -> Self {
423 if wait_for_outgoing_messages {
424 CrossChainMessageDelivery::Blocking
425 } else {
426 CrossChainMessageDelivery::NonBlocking
427 }
428 }
429
430 pub fn wait_for_outgoing_messages(self) -> bool {
431 match self {
432 CrossChainMessageDelivery::NonBlocking => false,
433 CrossChainMessageDelivery::Blocking => true,
434 }
435 }
436}
437
438impl From<ViewError> for NodeError {
439 fn from(error: ViewError) -> Self {
440 Self::ViewError {
441 error: error.to_string(),
442 }
443 }
444}
445
446impl From<ArithmeticError> for NodeError {
447 fn from(error: ArithmeticError) -> Self {
448 Self::ArithmeticError {
449 error: error.to_string(),
450 }
451 }
452}
453
454impl From<CryptoError> for NodeError {
455 fn from(error: CryptoError) -> Self {
456 Self::CryptoError {
457 error: error.to_string(),
458 }
459 }
460}
461
462impl From<ChainError> for NodeError {
463 fn from(error: ChainError) -> Self {
464 match error {
465 ChainError::MissingCrossChainUpdate {
466 chain_id,
467 origin,
468 height,
469 } => Self::MissingCrossChainUpdate {
470 chain_id,
471 origin,
472 height,
473 },
474 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
475 ChainError::ExecutionError(execution_error, context) => match *execution_error {
476 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
477 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
478 _ => Self::ChainError {
479 error: ChainError::ExecutionError(execution_error, context).to_string(),
480 },
481 },
482 ChainError::UnexpectedBlockHeight {
483 expected_block_height,
484 found_block_height,
485 } => Self::UnexpectedBlockHeight {
486 expected_block_height,
487 found_block_height,
488 },
489 ChainError::WrongRound(round) => Self::WrongRound(round),
490 error => Self::ChainError {
491 error: error.to_string(),
492 },
493 }
494 }
495}
496
497impl From<WorkerError> for NodeError {
498 fn from(error: WorkerError) -> Self {
499 match error {
500 WorkerError::ChainError(error) => (*error).into(),
501 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
502 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
503 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
504 WorkerError::UnexpectedBlockHeight {
505 expected_block_height,
506 found_block_height,
507 } => NodeError::UnexpectedBlockHeight {
508 expected_block_height,
509 found_block_height,
510 },
511 WorkerError::InvalidTimestamp {
512 block_timestamp,
513 local_time,
514 block_time_grace_period,
515 } => NodeError::InvalidTimestamp {
516 block_timestamp,
517 local_time,
518 block_time_grace_period_ms: block_time_grace_period.as_millis() as u64,
519 },
520 error => Self::WorkerError {
521 error: error.to_string(),
522 },
523 }
524 }
525}