1use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8 crypto::CryptoHash,
9 data_types::{BlobContent, NetworkDescription},
10 ensure,
11 identifiers::{BlobId, ChainId},
12 time::Duration,
13};
14use linera_chain::{
15 data_types::{self},
16 types::{
17 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
18 ValidatedBlock,
19 },
20};
21use linera_core::{
22 data_types::ChainInfoResponse,
23 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24 worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, error, info, instrument, warn};
29
30use super::{
31 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32 transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35 HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest,
36 HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41 address: String,
42 client: ValidatorNodeClient<transport::Channel>,
43 retry_delay: Duration,
44 max_retries: u32,
45}
46
47impl GrpcClient {
48 pub fn new(
49 address: String,
50 channel: transport::Channel,
51 retry_delay: Duration,
52 max_retries: u32,
53 ) -> Self {
54 let client = ValidatorNodeClient::new(channel)
55 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57 Self {
58 address,
59 client,
60 retry_delay,
61 max_retries,
62 }
63 }
64
65 pub fn address(&self) -> &str {
66 &self.address
67 }
68
69 fn is_retryable(status: &Status) -> bool {
72 match status.code() {
73 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
74 info!("gRPC request interrupted: {}; retrying", status);
75 true
76 }
77 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
78 error!("Unexpected gRPC status: {}; retrying", status);
79 true
80 }
81 Code::InvalidArgument
82 | Code::NotFound
83 | Code::AlreadyExists
84 | Code::PermissionDenied
85 | Code::FailedPrecondition
86 | Code::OutOfRange
87 | Code::Unimplemented
88 | Code::Internal
89 | Code::DataLoss
90 | Code::Unauthenticated => {
91 error!("Unexpected gRPC status: {}", status);
92 false
93 }
94 }
95 }
96
97 async fn delegate<F, Fut, R, S>(
98 &self,
99 f: F,
100 request: impl TryInto<R> + fmt::Debug + Clone,
101 handler: &str,
102 ) -> Result<S, NodeError>
103 where
104 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
105 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
106 R: IntoRequest<R> + Clone,
107 {
108 let mut retry_count = 0;
109 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
110 error: "could not convert request to proto".to_string(),
111 })?;
112 loop {
113 match f(self.client.clone(), Request::new(request_inner.clone())).await {
114 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
115 let delay = self.retry_delay.saturating_mul(retry_count);
116 retry_count += 1;
117 linera_base::time::timer::sleep(delay).await;
118 continue;
119 }
120 Err(s) => {
121 return Err(NodeError::GrpcError {
122 error: format!("remote request [{handler}] failed with status: {s:?}"),
123 });
124 }
125 Ok(result) => return Ok(result.into_inner()),
126 };
127 }
128 }
129
130 fn try_into_chain_info(
131 result: api::ChainInfoResult,
132 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
133 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
134 error: "missing body from response".to_string(),
135 })?;
136 match inner {
137 api::chain_info_result::Inner::ChainInfoResponse(response) => {
138 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
139 error: format!("failed to unmarshal response: {}", err),
140 })?)
141 }
142 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
143 .map_err(|err| NodeError::GrpcError {
144 error: format!("failed to unmarshal error message: {}", err),
145 })?),
146 }
147 }
148}
149
150impl TryFrom<api::PendingBlobResult> for BlobContent {
151 type Error = NodeError;
152
153 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
154 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
155 error: "missing body from response".to_string(),
156 })?;
157 match inner {
158 api::pending_blob_result::Inner::Blob(blob) => {
159 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
160 error: format!("failed to unmarshal response: {}", err),
161 })?)
162 }
163 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
164 .map_err(|err| NodeError::GrpcError {
165 error: format!("failed to unmarshal error message: {}", err),
166 })?),
167 }
168 }
169}
170
171macro_rules! client_delegate {
172 ($self:ident, $handler:ident, $req:ident) => {{
173 debug!(
174 handler = stringify!($handler),
175 request = ?$req,
176 "sending gRPC request"
177 );
178 $self
179 .delegate(
180 |mut client, req| async move { client.$handler(req).await },
181 $req,
182 stringify!($handler),
183 )
184 .await
185 }};
186}
187
188impl ValidatorNode for GrpcClient {
189 type NotificationStream = NotificationStream;
190
191 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
192 async fn handle_block_proposal(
193 &self,
194 proposal: data_types::BlockProposal,
195 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
196 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
197 }
198
199 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
200 async fn handle_lite_certificate(
201 &self,
202 certificate: types::LiteCertificate<'_>,
203 delivery: CrossChainMessageDelivery,
204 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
205 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
206 let request = HandleLiteCertRequest {
207 certificate,
208 wait_for_outgoing_messages,
209 };
210 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
211 }
212
213 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
214 async fn handle_confirmed_certificate(
215 &self,
216 certificate: GenericCertificate<ConfirmedBlock>,
217 delivery: CrossChainMessageDelivery,
218 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
219 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
220 let request = HandleConfirmedCertificateRequest {
221 certificate,
222 wait_for_outgoing_messages,
223 };
224 GrpcClient::try_into_chain_info(client_delegate!(
225 self,
226 handle_confirmed_certificate,
227 request
228 )?)
229 }
230
231 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
232 async fn handle_validated_certificate(
233 &self,
234 certificate: GenericCertificate<ValidatedBlock>,
235 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
236 let request = HandleValidatedCertificateRequest { certificate };
237 GrpcClient::try_into_chain_info(client_delegate!(
238 self,
239 handle_validated_certificate,
240 request
241 )?)
242 }
243
244 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
245 async fn handle_timeout_certificate(
246 &self,
247 certificate: GenericCertificate<Timeout>,
248 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
249 let request = HandleTimeoutCertificateRequest { certificate };
250 GrpcClient::try_into_chain_info(client_delegate!(
251 self,
252 handle_timeout_certificate,
253 request
254 )?)
255 }
256
257 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
258 async fn handle_chain_info_query(
259 &self,
260 query: linera_core::data_types::ChainInfoQuery,
261 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
262 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
263 }
264
265 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
266 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
267 let retry_delay = self.retry_delay;
268 let max_retries = self.max_retries;
269 let mut retry_count = 0;
270 let subscription_request = SubscriptionRequest {
271 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
272 };
273 let mut client = self.client.clone();
274
275 let mut stream = Some(
277 client
278 .subscribe(subscription_request.clone())
279 .await
280 .map_err(|status| NodeError::SubscriptionFailed {
281 status: status.to_string(),
282 })?
283 .into_inner(),
284 );
285
286 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
289 let mut client = client.clone();
290 let subscription_request = subscription_request.clone();
291 let mut stream = stream.take();
292 async move {
293 let stream = if let Some(stream) = stream.take() {
294 future::Either::Right(stream)
295 } else {
296 match client.subscribe(subscription_request.clone()).await {
297 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
298 Ok(response) => future::Either::Right(response.into_inner()),
299 }
300 };
301 Some((stream, ()))
302 }
303 })
304 .flatten();
305
306 let span = tracing::info_span!("notification stream");
307 let notification_stream = endlessly_retrying_notification_stream
310 .map(|result| {
311 Option::<Notification>::try_from(result?).map_err(|err| {
312 let message = format!("Could not deserialize notification: {}", err);
313 tonic::Status::new(Code::Internal, message)
314 })
315 })
316 .take_while(move |result| {
317 let Err(status) = result else {
318 retry_count = 0;
319 return future::Either::Left(future::ready(true));
320 };
321
322 if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
323 return future::Either::Left(future::ready(false));
324 }
325 let delay = retry_delay.saturating_mul(retry_count);
326 retry_count += 1;
327 future::Either::Right(async move {
328 linera_base::time::timer::sleep(delay).await;
329 true
330 })
331 })
332 .filter_map(|result| {
333 future::ready(match result {
334 Ok(notification @ Some(_)) => notification,
335 Ok(None) => None,
336 Err(err) => {
337 warn!("{}", err);
338 None
339 }
340 })
341 });
342
343 Ok(Box::pin(notification_stream))
344 }
345
346 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
347 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
348 let req = ();
349 Ok(client_delegate!(self, get_version_info, req)?.into())
350 }
351
352 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
353 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
354 let req = ();
355 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
356 }
357
358 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
359 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
360 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
361 }
362
363 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
364 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
365 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
366 }
367
368 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
369 async fn download_pending_blob(
370 &self,
371 chain_id: ChainId,
372 blob_id: BlobId,
373 ) -> Result<BlobContent, NodeError> {
374 let req = (chain_id, blob_id);
375 client_delegate!(self, download_pending_blob, req)?.try_into()
376 }
377
378 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
379 async fn handle_pending_blob(
380 &self,
381 chain_id: ChainId,
382 blob: BlobContent,
383 ) -> Result<ChainInfoResponse, NodeError> {
384 let req = (chain_id, blob);
385 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
386 }
387
388 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
389 async fn download_certificate(
390 &self,
391 hash: CryptoHash,
392 ) -> Result<ConfirmedBlockCertificate, NodeError> {
393 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
394 self,
395 download_certificate,
396 hash
397 )?)?)
398 .map_err(|_| NodeError::UnexpectedCertificateValue)
399 }
400
401 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
402 async fn download_certificates(
403 &self,
404 hashes: Vec<CryptoHash>,
405 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
406 let mut missing_hashes = hashes;
407 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
408 loop {
409 let missing = missing_hashes.clone();
411 let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
412 client_delegate!(self, download_certificates, missing)?,
413 )?
414 .into_iter()
415 .map(|cert| {
416 ConfirmedBlockCertificate::try_from(cert)
417 .map_err(|_| NodeError::UnexpectedCertificateValue)
418 })
419 .collect::<Result<_, _>>()?;
420
421 if received.is_empty() {
423 break;
424 }
425
426 missing_hashes = missing_hashes[received.len()..].to_vec();
428 certs_collected.append(&mut received);
429 }
430 ensure!(
431 missing_hashes.is_empty(),
432 NodeError::MissingCertificates(missing_hashes)
433 );
434 Ok(certs_collected)
435 }
436
437 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
438 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
439 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
440 }
441
442 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
443 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
444 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
445 }
446}