1#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12 data_types::{ArithmeticError, BlobContent, BlockHeight, NetworkDescription},
13 identifiers::{BlobId, ChainId},
14};
15use linera_chain::{
16 data_types::BlockProposal,
17 types::{
18 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19 ValidatedBlock,
20 },
21 ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30 data_types::{ChainInfoQuery, ChainInfoResponse},
31 worker::{Notification, WorkerError},
32};
33
34pub type NotificationStream = BoxStream<'static, Notification>;
36
37#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40 #[default]
41 NonBlocking,
42 Blocking,
43}
44
45#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49 #[cfg(not(web))]
50 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51 #[cfg(web)]
52 type NotificationStream: Stream<Item = Notification> + Unpin;
53
54 async fn handle_block_proposal(
56 &self,
57 proposal: BlockProposal,
58 ) -> Result<ChainInfoResponse, NodeError>;
59
60 async fn handle_lite_certificate(
62 &self,
63 certificate: LiteCertificate<'_>,
64 delivery: CrossChainMessageDelivery,
65 ) -> Result<ChainInfoResponse, NodeError>;
66
67 async fn handle_confirmed_certificate(
69 &self,
70 certificate: GenericCertificate<ConfirmedBlock>,
71 delivery: CrossChainMessageDelivery,
72 ) -> Result<ChainInfoResponse, NodeError>;
73
74 async fn handle_validated_certificate(
76 &self,
77 certificate: GenericCertificate<ValidatedBlock>,
78 ) -> Result<ChainInfoResponse, NodeError>;
79
80 async fn handle_timeout_certificate(
82 &self,
83 certificate: GenericCertificate<Timeout>,
84 ) -> Result<ChainInfoResponse, NodeError>;
85
86 async fn handle_chain_info_query(
88 &self,
89 query: ChainInfoQuery,
90 ) -> Result<ChainInfoResponse, NodeError>;
91
92 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
107
108 async fn download_pending_blob(
110 &self,
111 chain_id: ChainId,
112 blob_id: BlobId,
113 ) -> Result<BlobContent, NodeError>;
114
115 async fn handle_pending_blob(
117 &self,
118 chain_id: ChainId,
119 blob: BlobContent,
120 ) -> Result<ChainInfoResponse, NodeError>;
121
122 async fn download_certificate(
123 &self,
124 hash: CryptoHash,
125 ) -> Result<ConfirmedBlockCertificate, NodeError>;
126
127 async fn download_certificates(
129 &self,
130 hashes: Vec<CryptoHash>,
131 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
132
133 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
135
136 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
138}
139
140#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
142pub trait ValidatorNodeProvider: 'static {
143 #[cfg(not(web))]
144 type Node: ValidatorNode + Send + Sync + Clone + 'static;
145 #[cfg(web)]
146 type Node: ValidatorNode + Clone + 'static;
147
148 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
149
150 fn make_nodes(
151 &self,
152 committee: &Committee,
153 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
154 let validator_addresses: Vec<_> = committee
155 .validator_addresses()
156 .map(|(node, name)| (node, name.to_owned()))
157 .collect();
158 self.make_nodes_from_list(validator_addresses)
159 }
160
161 fn make_nodes_from_list<A>(
162 &self,
163 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
164 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
165 where
166 A: AsRef<str>,
167 {
168 Ok(validators
169 .into_iter()
170 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
171 .collect::<Result<Vec<_>, NodeError>>()?
172 .into_iter())
173 }
174}
175
176#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
181pub enum NodeError {
182 #[error("Cryptographic error: {error}")]
183 CryptoError { error: String },
184
185 #[error("Arithmetic error: {error}")]
186 ArithmeticError { error: String },
187
188 #[error("Error while accessing storage: {error}")]
189 ViewError { error: String },
190
191 #[error("Chain error: {error}")]
192 ChainError { error: String },
193
194 #[error("Worker error: {error}")]
195 WorkerError { error: String },
196
197 #[error("The chain {0} is not active in validator")]
199 InactiveChain(ChainId),
200
201 #[error(
203 "Cannot vote for block proposal of chain {chain_id:?} because a message \
204 from chain {origin:?} at height {height:?} has not been received yet"
205 )]
206 MissingCrossChainUpdate {
207 chain_id: ChainId,
208 origin: ChainId,
209 height: BlockHeight,
210 },
211
212 #[error("Blobs not found: {0:?}")]
213 BlobsNotFound(Vec<BlobId>),
214
215 #[error("We don't have the value for the certificate.")]
217 MissingCertificateValue,
218
219 #[error("Response doesn't contain requested certificates: {0:?}")]
220 MissingCertificates(Vec<CryptoHash>),
221
222 #[error("Validator's response to block proposal failed to include a vote")]
223 MissingVoteInValidatorResponse,
224
225 #[error("The received chain info response is invalid")]
226 InvalidChainInfoResponse,
227 #[error("Unexpected certificate value")]
228 UnexpectedCertificateValue,
229
230 #[error("Cannot deserialize")]
233 InvalidDecoding,
234 #[error("Unexpected message")]
235 UnexpectedMessage,
236 #[error("Grpc error: {error}")]
237 GrpcError { error: String },
238 #[error("Network error while querying service: {error}")]
239 ClientIoError { error: String },
240 #[error("Failed to resolve validator address: {address}")]
241 CannotResolveValidatorAddress { address: String },
242 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
243 SubscriptionError { transport: String },
244 #[error("Failed to subscribe; tonic status: {status}")]
245 SubscriptionFailed { status: String },
246
247 #[error("Node failed to provide a 'last used by' certificate for the blob")]
248 InvalidCertificateForBlob(BlobId),
249 #[error("Node returned a BlobsNotFound error with duplicates")]
250 DuplicatesInBlobsNotFound,
251 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
252 UnexpectedEntriesInBlobsNotFound,
253 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
254 EmptyBlobsNotFound,
255 #[error("Local error handling validator response")]
256 ResponseHandlingError { error: String },
257}
258
259impl From<tonic::Status> for NodeError {
260 fn from(status: tonic::Status) -> Self {
261 Self::GrpcError {
262 error: status.to_string(),
263 }
264 }
265}
266
267impl CrossChainMessageDelivery {
268 pub fn new(wait_for_outgoing_messages: bool) -> Self {
269 if wait_for_outgoing_messages {
270 CrossChainMessageDelivery::Blocking
271 } else {
272 CrossChainMessageDelivery::NonBlocking
273 }
274 }
275
276 pub fn wait_for_outgoing_messages(self) -> bool {
277 match self {
278 CrossChainMessageDelivery::NonBlocking => false,
279 CrossChainMessageDelivery::Blocking => true,
280 }
281 }
282}
283
284impl From<ViewError> for NodeError {
285 fn from(error: ViewError) -> Self {
286 Self::ViewError {
287 error: error.to_string(),
288 }
289 }
290}
291
292impl From<ArithmeticError> for NodeError {
293 fn from(error: ArithmeticError) -> Self {
294 Self::ArithmeticError {
295 error: error.to_string(),
296 }
297 }
298}
299
300impl From<CryptoError> for NodeError {
301 fn from(error: CryptoError) -> Self {
302 Self::CryptoError {
303 error: error.to_string(),
304 }
305 }
306}
307
308impl From<ChainError> for NodeError {
309 fn from(error: ChainError) -> Self {
310 match error {
311 ChainError::MissingCrossChainUpdate {
312 chain_id,
313 origin,
314 height,
315 } => Self::MissingCrossChainUpdate {
316 chain_id,
317 origin,
318 height,
319 },
320 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
321 ChainError::ExecutionError(execution_error, context) => {
322 if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
323 Self::BlobsNotFound(blob_ids)
324 } else {
325 Self::ChainError {
326 error: ChainError::ExecutionError(execution_error, context).to_string(),
327 }
328 }
329 }
330 error => Self::ChainError {
331 error: error.to_string(),
332 },
333 }
334 }
335}
336
337impl From<WorkerError> for NodeError {
338 fn from(error: WorkerError) -> Self {
339 match error {
340 WorkerError::ChainError(error) => (*error).into(),
341 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
342 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
343 error => Self::WorkerError {
344 error: error.to_string(),
345 },
346 }
347 }
348}