1use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8 crypto::CryptoHash,
9 data_types::{BlobContent, NetworkDescription},
10 ensure,
11 identifiers::{BlobId, ChainId},
12 time::Duration,
13};
14use linera_chain::{
15 data_types::{self},
16 types::{
17 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
18 ValidatedBlock,
19 },
20};
21use linera_core::{
22 data_types::ChainInfoResponse,
23 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24 worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, error, info, instrument, warn};
29
30use super::{
31 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32 transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35 HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest,
36 HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41 address: String,
42 client: ValidatorNodeClient<transport::Channel>,
43 retry_delay: Duration,
44 max_retries: u32,
45}
46
47impl GrpcClient {
48 pub fn new(
49 address: String,
50 channel: transport::Channel,
51 retry_delay: Duration,
52 max_retries: u32,
53 ) -> Self {
54 let client = ValidatorNodeClient::new(channel)
55 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57 Self {
58 address,
59 client,
60 retry_delay,
61 max_retries,
62 }
63 }
64
65 fn is_retryable(status: &Status) -> bool {
68 match status.code() {
69 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
70 info!("gRPC request interrupted: {}; retrying", status);
71 true
72 }
73 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
74 error!("Unexpected gRPC status: {}; retrying", status);
75 true
76 }
77 Code::InvalidArgument
78 | Code::NotFound
79 | Code::AlreadyExists
80 | Code::PermissionDenied
81 | Code::FailedPrecondition
82 | Code::OutOfRange
83 | Code::Unimplemented
84 | Code::Internal
85 | Code::DataLoss
86 | Code::Unauthenticated => {
87 error!("Unexpected gRPC status: {}", status);
88 false
89 }
90 }
91 }
92
93 async fn delegate<F, Fut, R, S>(
94 &self,
95 f: F,
96 request: impl TryInto<R> + fmt::Debug + Clone,
97 handler: &str,
98 ) -> Result<S, NodeError>
99 where
100 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
101 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
102 R: IntoRequest<R> + Clone,
103 {
104 let mut retry_count = 0;
105 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
106 error: "could not convert request to proto".to_string(),
107 })?;
108 loop {
109 match f(self.client.clone(), Request::new(request_inner.clone())).await {
110 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
111 let delay = self.retry_delay.saturating_mul(retry_count);
112 retry_count += 1;
113 linera_base::time::timer::sleep(delay).await;
114 continue;
115 }
116 Err(s) => {
117 return Err(NodeError::GrpcError {
118 error: format!("remote request [{handler}] failed with status: {s:?}"),
119 });
120 }
121 Ok(result) => return Ok(result.into_inner()),
122 };
123 }
124 }
125
126 fn try_into_chain_info(
127 result: api::ChainInfoResult,
128 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
129 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
130 error: "missing body from response".to_string(),
131 })?;
132 match inner {
133 api::chain_info_result::Inner::ChainInfoResponse(response) => {
134 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
135 error: format!("failed to unmarshal response: {}", err),
136 })?)
137 }
138 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
139 .map_err(|err| NodeError::GrpcError {
140 error: format!("failed to unmarshal error message: {}", err),
141 })?),
142 }
143 }
144}
145
146impl TryFrom<api::PendingBlobResult> for BlobContent {
147 type Error = NodeError;
148
149 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
150 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
151 error: "missing body from response".to_string(),
152 })?;
153 match inner {
154 api::pending_blob_result::Inner::Blob(blob) => {
155 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
156 error: format!("failed to unmarshal response: {}", err),
157 })?)
158 }
159 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
160 .map_err(|err| NodeError::GrpcError {
161 error: format!("failed to unmarshal error message: {}", err),
162 })?),
163 }
164 }
165}
166
167macro_rules! client_delegate {
168 ($self:ident, $handler:ident, $req:ident) => {{
169 debug!(
170 handler = stringify!($handler),
171 request = ?$req,
172 "sending gRPC request"
173 );
174 $self
175 .delegate(
176 |mut client, req| async move { client.$handler(req).await },
177 $req,
178 stringify!($handler),
179 )
180 .await
181 }};
182}
183
184impl ValidatorNode for GrpcClient {
185 type NotificationStream = NotificationStream;
186
187 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
188 async fn handle_block_proposal(
189 &self,
190 proposal: data_types::BlockProposal,
191 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
192 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
193 }
194
195 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
196 async fn handle_lite_certificate(
197 &self,
198 certificate: types::LiteCertificate<'_>,
199 delivery: CrossChainMessageDelivery,
200 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
201 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
202 let request = HandleLiteCertRequest {
203 certificate,
204 wait_for_outgoing_messages,
205 };
206 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
207 }
208
209 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
210 async fn handle_confirmed_certificate(
211 &self,
212 certificate: GenericCertificate<ConfirmedBlock>,
213 delivery: CrossChainMessageDelivery,
214 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
215 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
216 let request = HandleConfirmedCertificateRequest {
217 certificate,
218 wait_for_outgoing_messages,
219 };
220 GrpcClient::try_into_chain_info(client_delegate!(
221 self,
222 handle_confirmed_certificate,
223 request
224 )?)
225 }
226
227 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
228 async fn handle_validated_certificate(
229 &self,
230 certificate: GenericCertificate<ValidatedBlock>,
231 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
232 let request = HandleValidatedCertificateRequest { certificate };
233 GrpcClient::try_into_chain_info(client_delegate!(
234 self,
235 handle_validated_certificate,
236 request
237 )?)
238 }
239
240 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
241 async fn handle_timeout_certificate(
242 &self,
243 certificate: GenericCertificate<Timeout>,
244 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
245 let request = HandleTimeoutCertificateRequest { certificate };
246 GrpcClient::try_into_chain_info(client_delegate!(
247 self,
248 handle_timeout_certificate,
249 request
250 )?)
251 }
252
253 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
254 async fn handle_chain_info_query(
255 &self,
256 query: linera_core::data_types::ChainInfoQuery,
257 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
258 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
259 }
260
261 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
262 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
263 let retry_delay = self.retry_delay;
264 let max_retries = self.max_retries;
265 let mut retry_count = 0;
266 let subscription_request = SubscriptionRequest {
267 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
268 };
269 let mut client = self.client.clone();
270
271 let mut stream = Some(
273 client
274 .subscribe(subscription_request.clone())
275 .await
276 .map_err(|status| NodeError::SubscriptionFailed {
277 status: status.to_string(),
278 })?
279 .into_inner(),
280 );
281
282 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
285 let mut client = client.clone();
286 let subscription_request = subscription_request.clone();
287 let mut stream = stream.take();
288 async move {
289 let stream = if let Some(stream) = stream.take() {
290 future::Either::Right(stream)
291 } else {
292 match client.subscribe(subscription_request.clone()).await {
293 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
294 Ok(response) => future::Either::Right(response.into_inner()),
295 }
296 };
297 Some((stream, ()))
298 }
299 })
300 .flatten();
301
302 let span = tracing::info_span!("notification stream");
303 let notification_stream = endlessly_retrying_notification_stream
306 .map(|result| {
307 Option::<Notification>::try_from(result?).map_err(|err| {
308 let message = format!("Could not deserialize notification: {}", err);
309 tonic::Status::new(Code::Internal, message)
310 })
311 })
312 .take_while(move |result| {
313 let Err(status) = result else {
314 retry_count = 0;
315 return future::Either::Left(future::ready(true));
316 };
317
318 if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
319 return future::Either::Left(future::ready(false));
320 }
321 let delay = retry_delay.saturating_mul(retry_count);
322 retry_count += 1;
323 future::Either::Right(async move {
324 linera_base::time::timer::sleep(delay).await;
325 true
326 })
327 })
328 .filter_map(|result| {
329 future::ready(match result {
330 Ok(notification @ Some(_)) => notification,
331 Ok(None) => None,
332 Err(err) => {
333 warn!("{}", err);
334 None
335 }
336 })
337 });
338
339 Ok(Box::pin(notification_stream))
340 }
341
342 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
343 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
344 let req = ();
345 Ok(client_delegate!(self, get_version_info, req)?.into())
346 }
347
348 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
349 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
350 let req = ();
351 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
352 }
353
354 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
355 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
356 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
357 }
358
359 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
360 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
361 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
362 }
363
364 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
365 async fn download_pending_blob(
366 &self,
367 chain_id: ChainId,
368 blob_id: BlobId,
369 ) -> Result<BlobContent, NodeError> {
370 let req = (chain_id, blob_id);
371 client_delegate!(self, download_pending_blob, req)?.try_into()
372 }
373
374 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
375 async fn handle_pending_blob(
376 &self,
377 chain_id: ChainId,
378 blob: BlobContent,
379 ) -> Result<ChainInfoResponse, NodeError> {
380 let req = (chain_id, blob);
381 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
382 }
383
384 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
385 async fn download_certificate(
386 &self,
387 hash: CryptoHash,
388 ) -> Result<ConfirmedBlockCertificate, NodeError> {
389 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
390 self,
391 download_certificate,
392 hash
393 )?)?)
394 .map_err(|_| NodeError::UnexpectedCertificateValue)
395 }
396
397 #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
398 async fn download_certificates(
399 &self,
400 hashes: Vec<CryptoHash>,
401 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
402 let mut missing_hashes = hashes;
403 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
404 loop {
405 let missing = missing_hashes.clone();
407 let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
408 client_delegate!(self, download_certificates, missing)?,
409 )?
410 .into_iter()
411 .map(|cert| {
412 ConfirmedBlockCertificate::try_from(cert)
413 .map_err(|_| NodeError::UnexpectedCertificateValue)
414 })
415 .collect::<Result<_, _>>()?;
416
417 if received.is_empty() {
419 break;
420 }
421
422 missing_hashes = missing_hashes[received.len()..].to_vec();
424 certs_collected.append(&mut received);
425 }
426 ensure!(
427 missing_hashes.is_empty(),
428 NodeError::MissingCertificates(missing_hashes)
429 );
430 Ok(certs_collected)
431 }
432
433 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
434 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
435 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
436 }
437
438 #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
439 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
440 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
441 }
442}