1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8 time::{Duration, Instant},
9};
10
11use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
12use linera_base::{data_types::Blob, identifiers::ChainId};
13use linera_core::{
14 join_set_ext::JoinSet,
15 node::NodeError,
16 worker::{NetworkActions, Notification, Reason, WorkerError, WorkerState},
17 JoinSetExt as _, TaskHandle,
18};
19use linera_storage::Storage;
20use tokio::sync::oneshot;
21use tokio_util::sync::CancellationToken;
22use tonic::{transport::Channel, Request, Response, Status};
23use tower::{builder::ServiceBuilder, Layer, Service};
24use tracing::{debug, error, info, instrument, trace, warn};
25
26use super::{
27 api::{
28 self,
29 notifier_service_client::NotifierServiceClient,
30 validator_worker_client::ValidatorWorkerClient,
31 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
32 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
33 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
34 },
35 pool::GrpcConnectionPool,
36 GrpcError, GRPC_MAX_MESSAGE_SIZE,
37};
38use crate::{
39 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
40 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
41 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
42};
43
44type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
45type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
46
47#[cfg(with_metrics)]
48mod metrics {
49 use std::sync::LazyLock;
50
51 use linera_base::prometheus_util::{
52 linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
53 };
54 use prometheus::{HistogramVec, IntCounterVec};
55
56 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "server_request_latency",
59 "Server request latency",
60 &[],
61 linear_bucket_interval(1.0, 25.0, 2000.0),
62 )
63 });
64
65 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
66 register_int_counter_vec("server_request_count", "Server request count", &[])
67 });
68
69 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
70 register_int_counter_vec(
71 "server_request_success",
72 "Server request success",
73 &["method_name"],
74 )
75 });
76
77 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
78 register_int_counter_vec(
79 "server_request_error",
80 "Server request error",
81 &["method_name"],
82 )
83 });
84
85 pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
86 LazyLock::new(|| {
87 register_histogram_vec(
88 "server_request_latency_per_request_type",
89 "Server request latency per request type",
90 &["method_name"],
91 linear_bucket_interval(1.0, 25.0, 2000.0),
92 )
93 });
94
95 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
96 register_int_counter_vec(
97 "cross_chain_message_channel_full",
98 "Cross-chain message channel full",
99 &[],
100 )
101 });
102}
103
104#[derive(Clone)]
105pub struct GrpcServer<S>
106where
107 S: Storage,
108{
109 state: WorkerState<S>,
110 shard_id: ShardId,
111 network: ValidatorInternalNetworkConfig,
112 cross_chain_sender: CrossChainSender,
113 notification_sender: NotificationSender,
114}
115
116pub struct GrpcServerHandle {
117 handle: TaskHandle<Result<(), GrpcError>>,
118}
119
120impl GrpcServerHandle {
121 pub async fn join(self) -> Result<(), GrpcError> {
122 self.handle.await?
123 }
124}
125
126#[derive(Clone)]
127pub struct GrpcPrometheusMetricsMiddlewareLayer;
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareService<T> {
131 service: T,
132}
133
134impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
135 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
136
137 fn layer(&self, service: S) -> Self::Service {
138 GrpcPrometheusMetricsMiddlewareService { service }
139 }
140}
141
142impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
143where
144 S::Future: Send + 'static,
145 S: Service<Req> + std::marker::Send,
146{
147 type Response = S::Response;
148 type Error = S::Error;
149 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
150
151 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 self.service.poll_ready(cx)
153 }
154
155 fn call(&mut self, request: Req) -> Self::Future {
156 #[cfg(with_metrics)]
157 let start = Instant::now();
158 let future = self.service.call(request);
159 async move {
160 let response = future.await?;
161 #[cfg(with_metrics)]
162 {
163 metrics::SERVER_REQUEST_LATENCY
164 .with_label_values(&[])
165 .observe(start.elapsed().as_secs_f64() * 1000.0);
166 metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
167 }
168 Ok(response)
169 }
170 .boxed()
171 }
172}
173
174impl<S> GrpcServer<S>
175where
176 S: Storage + Clone + Send + Sync + 'static,
177{
178 #[expect(clippy::too_many_arguments)]
179 pub fn spawn(
180 host: String,
181 port: u16,
182 state: WorkerState<S>,
183 shard_id: ShardId,
184 internal_network: ValidatorInternalNetworkConfig,
185 cross_chain_config: CrossChainConfig,
186 notification_config: NotificationConfig,
187 shutdown_signal: CancellationToken,
188 join_set: &mut JoinSet,
189 ) -> GrpcServerHandle {
190 info!(
191 "spawning gRPC server on {}:{} for shard {}",
192 host, port, shard_id
193 );
194
195 let (cross_chain_sender, cross_chain_receiver) =
196 mpsc::channel(cross_chain_config.queue_size);
197
198 let (notification_sender, _) =
199 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
200
201 join_set.spawn_task({
202 info!(
203 nickname = state.nickname(),
204 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
205 );
206 Self::forward_cross_chain_queries(
207 state.nickname().to_string(),
208 internal_network.clone(),
209 cross_chain_config.max_retries,
210 Duration::from_millis(cross_chain_config.retry_delay_ms),
211 Duration::from_millis(cross_chain_config.sender_delay_ms),
212 cross_chain_config.sender_failure_rate,
213 shard_id,
214 cross_chain_receiver,
215 )
216 });
217
218 for proxy in &internal_network.proxies {
219 let receiver = notification_sender.subscribe();
220 join_set.spawn_task({
221 info!(
222 nickname = state.nickname(),
223 "spawning notifications thread on {} for shard {}", host, shard_id
224 );
225 Self::forward_notifications(
226 state.nickname().to_string(),
227 proxy.internal_address(&internal_network.protocol),
228 internal_network.exporter_addresses(),
229 receiver,
230 )
231 });
232 }
233
234 let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
235
236 let grpc_server = GrpcServer {
237 state,
238 shard_id,
239 network: internal_network,
240 cross_chain_sender,
241 notification_sender,
242 };
243
244 let worker_node = ValidatorWorkerServer::new(grpc_server)
245 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
246 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
247
248 let handle = join_set.spawn_task(async move {
249 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
250
251 let reflection_service = tonic_reflection::server::Builder::configure()
252 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
253 .build_v1()?;
254
255 health_reporter
256 .set_serving::<ValidatorWorkerServer<Self>>()
257 .await;
258
259 tonic::transport::Server::builder()
260 .layer(
261 ServiceBuilder::new()
262 .layer(GrpcPrometheusMetricsMiddlewareLayer)
263 .into_inner(),
264 )
265 .add_service(health_service)
266 .add_service(reflection_service)
267 .add_service(worker_node)
268 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
269 .await?;
270
271 Ok(())
272 });
273
274 GrpcServerHandle { handle }
275 }
276
277 #[instrument(skip(receiver))]
280 async fn forward_notifications(
281 nickname: String,
282 proxy_address: String,
283 exporter_addresses: Vec<String>,
284 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
285 ) {
286 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
287 .expect("Proxy URI should be valid")
288 .connect_lazy();
289 let mut client = NotifierServiceClient::new(channel)
290 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
291 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
292
293 let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
294 .iter()
295 .map(|address| {
296 let channel = tonic::transport::Channel::from_shared(address.clone())
297 .expect("Exporter URI should be valid")
298 .connect_lazy();
299 NotifierServiceClient::new(channel)
300 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
302 })
303 .collect::<Vec<_>>();
304
305 while let Ok(notification) = receiver.recv().await {
306 let reason = ¬ification.reason;
307 let notification: api::Notification = match notification.clone().try_into() {
308 Ok(notification) => notification,
309 Err(error) => {
310 warn!(%error, nickname, "could not deserialize notification");
311 continue;
312 }
313 };
314 let request = tonic::Request::new(notification.clone());
315 if let Err(error) = client.notify(request).await {
316 error!(
317 %error,
318 nickname,
319 ?notification,
320 "could not send notification",
321 )
322 }
323
324 if let Reason::NewBlock { height: _, hash: _ } = reason {
325 for exporter_client in &mut exporter_clients {
326 let request = tonic::Request::new(notification.clone());
327 if let Err(error) = exporter_client.notify(request).await {
328 error!(
329 %error,
330 nickname,
331 ?notification,
332 "could not send notification",
333 )
334 }
335 }
336 }
337 }
338 }
339
340 fn handle_network_actions(&self, actions: NetworkActions) {
341 let mut cross_chain_sender = self.cross_chain_sender.clone();
342 let notification_sender = self.notification_sender.clone();
343
344 for request in actions.cross_chain_requests {
345 let shard_id = self.network.get_shard_id(request.target_chain_id());
346 trace!(
347 source_shard_id = self.shard_id,
348 target_shard_id = shard_id,
349 "Scheduling cross-chain query",
350 );
351
352 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
353 error!(%error, "dropping cross-chain request");
354 #[cfg(with_metrics)]
355 if error.is_full() {
356 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
357 .with_label_values(&[])
358 .inc();
359 }
360 }
361 }
362
363 for notification in actions.notifications {
364 trace!("Scheduling notification query");
365 if let Err(error) = notification_sender.send(notification) {
366 error!(%error, "dropping notification");
367 break;
368 }
369 }
370 }
371
372 #[instrument(skip_all, fields(nickname, %this_shard))]
373 #[expect(clippy::too_many_arguments)]
374 async fn forward_cross_chain_queries(
375 nickname: String,
376 network: ValidatorInternalNetworkConfig,
377 cross_chain_max_retries: u32,
378 cross_chain_retry_delay: Duration,
379 cross_chain_sender_delay: Duration,
380 cross_chain_sender_failure_rate: f32,
381 this_shard: ShardId,
382 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
383 ) {
384 let pool = GrpcConnectionPool::default();
385 let handle_request =
386 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
387 let channel_result = pool.channel(network.shard(shard_id).http_address());
388 async move {
389 let mut client = ValidatorWorkerClient::new(channel_result?)
390 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
391 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
392 client
393 .handle_cross_chain_request(Request::new(request.try_into()?))
394 .await?;
395 anyhow::Result::<_, anyhow::Error>::Ok(())
396 }
397 };
398 cross_chain_message_queue::forward_cross_chain_queries(
399 nickname,
400 cross_chain_max_retries,
401 cross_chain_retry_delay,
402 cross_chain_sender_delay,
403 cross_chain_sender_failure_rate,
404 this_shard,
405 receiver,
406 handle_request,
407 )
408 .await;
409 }
410
411 fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
412 #![allow(unused_variables)]
413 #[cfg(with_metrics)]
414 {
415 metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
416 .with_label_values(&[method_name])
417 .observe(start.elapsed().as_secs_f64() * 1000.0);
418 if success {
419 metrics::SERVER_REQUEST_SUCCESS
420 .with_label_values(&[method_name])
421 .inc();
422 } else {
423 metrics::SERVER_REQUEST_ERROR
424 .with_label_values(&[method_name])
425 .inc();
426 }
427 }
428 }
429}
430
431#[tonic::async_trait]
432impl<S> ValidatorWorkerRpc for GrpcServer<S>
433where
434 S: Storage + Clone + Send + Sync + 'static,
435{
436 #[instrument(
437 target = "grpc_server",
438 skip_all,
439 err,
440 fields(
441 nickname = self.state.nickname(),
442 chain_id = ?request.get_ref().chain_id()
443 )
444 )]
445 async fn handle_block_proposal(
446 &self,
447 request: Request<BlockProposal>,
448 ) -> Result<Response<ChainInfoResult>, Status> {
449 let start = Instant::now();
450 let proposal = request.into_inner().try_into()?;
451 trace!(?proposal, "Handling block proposal");
452 Ok(Response::new(
453 match self.state.clone().handle_block_proposal(proposal).await {
454 Ok((info, actions)) => {
455 Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
456 self.handle_network_actions(actions);
457 info.try_into()?
458 }
459 Err(error) => {
460 Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
461 let nickname = self.state.nickname();
462 warn!(nickname, %error, "Failed to handle block proposal");
463 NodeError::from(error).try_into()?
464 }
465 },
466 ))
467 }
468
469 #[instrument(
470 target = "grpc_server",
471 skip_all,
472 err,
473 fields(
474 nickname = self.state.nickname(),
475 chain_id = ?request.get_ref().chain_id()
476 )
477 )]
478 async fn handle_lite_certificate(
479 &self,
480 request: Request<LiteCertificate>,
481 ) -> Result<Response<ChainInfoResult>, Status> {
482 let start = Instant::now();
483 let HandleLiteCertRequest {
484 certificate,
485 wait_for_outgoing_messages,
486 } = request.into_inner().try_into()?;
487 trace!(?certificate, "Handling lite certificate");
488 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
489 match Box::pin(
490 self.state
491 .clone()
492 .handle_lite_certificate(certificate, sender),
493 )
494 .await
495 {
496 Ok((info, actions)) => {
497 Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
498 self.handle_network_actions(actions);
499 if let Some(receiver) = receiver {
500 if let Err(e) = receiver.await {
501 error!("Failed to wait for message delivery: {e}");
502 }
503 }
504 Ok(Response::new(info.try_into()?))
505 }
506 Err(error) => {
507 Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
508 let nickname = self.state.nickname();
509 if let WorkerError::MissingCertificateValue = &error {
510 debug!(nickname, %error, "Failed to handle lite certificate");
511 } else {
512 error!(nickname, %error, "Failed to handle lite certificate");
513 }
514 Ok(Response::new(NodeError::from(error).try_into()?))
515 }
516 }
517 }
518
519 #[instrument(
520 target = "grpc_server",
521 skip_all,
522 err,
523 fields(
524 nickname = self.state.nickname(),
525 chain_id = ?request.get_ref().chain_id()
526 )
527 )]
528 async fn handle_confirmed_certificate(
529 &self,
530 request: Request<api::HandleConfirmedCertificateRequest>,
531 ) -> Result<Response<ChainInfoResult>, Status> {
532 let start = Instant::now();
533 let HandleConfirmedCertificateRequest {
534 certificate,
535 wait_for_outgoing_messages,
536 } = request.into_inner().try_into()?;
537 trace!(?certificate, "Handling certificate");
538 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
539 match self
540 .state
541 .clone()
542 .handle_confirmed_certificate(certificate, sender)
543 .await
544 {
545 Ok((info, actions)) => {
546 Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
547 self.handle_network_actions(actions);
548 if let Some(receiver) = receiver {
549 if let Err(e) = receiver.await {
550 error!("Failed to wait for message delivery: {e}");
551 }
552 }
553 Ok(Response::new(info.try_into()?))
554 }
555 Err(error) => {
556 Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
557 let nickname = self.state.nickname();
558 error!(nickname, %error, "Failed to handle confirmed certificate");
559 Ok(Response::new(NodeError::from(error).try_into()?))
560 }
561 }
562 }
563
564 #[instrument(
565 target = "grpc_server",
566 skip_all,
567 err,
568 fields(
569 nickname = self.state.nickname(),
570 chain_id = ?request.get_ref().chain_id()
571 )
572 )]
573 async fn handle_validated_certificate(
574 &self,
575 request: Request<api::HandleValidatedCertificateRequest>,
576 ) -> Result<Response<ChainInfoResult>, Status> {
577 let start = Instant::now();
578 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
579 trace!(?certificate, "Handling certificate");
580 match self
581 .state
582 .clone()
583 .handle_validated_certificate(certificate)
584 .await
585 {
586 Ok((info, actions)) => {
587 Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
588 self.handle_network_actions(actions);
589 Ok(Response::new(info.try_into()?))
590 }
591 Err(error) => {
592 Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
593 let nickname = self.state.nickname();
594 error!(nickname, %error, "Failed to handle validated certificate");
595 Ok(Response::new(NodeError::from(error).try_into()?))
596 }
597 }
598 }
599
600 #[instrument(
601 target = "grpc_server",
602 skip_all,
603 err,
604 fields(
605 nickname = self.state.nickname(),
606 chain_id = ?request.get_ref().chain_id()
607 )
608 )]
609 async fn handle_timeout_certificate(
610 &self,
611 request: Request<api::HandleTimeoutCertificateRequest>,
612 ) -> Result<Response<ChainInfoResult>, Status> {
613 let start = Instant::now();
614 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
615 trace!(?certificate, "Handling Timeout certificate");
616 match self
617 .state
618 .clone()
619 .handle_timeout_certificate(certificate)
620 .await
621 {
622 Ok((info, _actions)) => {
623 Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
624 Ok(Response::new(info.try_into()?))
625 }
626 Err(error) => {
627 Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
628 let nickname = self.state.nickname();
629 error!(nickname, %error, "Failed to handle timeout certificate");
630 Ok(Response::new(NodeError::from(error).try_into()?))
631 }
632 }
633 }
634
635 #[instrument(
636 target = "grpc_server",
637 skip_all,
638 err,
639 fields(
640 nickname = self.state.nickname(),
641 chain_id = ?request.get_ref().chain_id()
642 )
643 )]
644 async fn handle_chain_info_query(
645 &self,
646 request: Request<ChainInfoQuery>,
647 ) -> Result<Response<ChainInfoResult>, Status> {
648 let start = Instant::now();
649 let query = request.into_inner().try_into()?;
650 trace!(?query, "Handling chain info query");
651 match self.state.clone().handle_chain_info_query(query).await {
652 Ok((info, actions)) => {
653 Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
654 self.handle_network_actions(actions);
655 Ok(Response::new(info.try_into()?))
656 }
657 Err(error) => {
658 Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
659 let nickname = self.state.nickname();
660 error!(nickname, %error, "Failed to handle chain info query");
661 Ok(Response::new(NodeError::from(error).try_into()?))
662 }
663 }
664 }
665
666 #[instrument(
667 target = "grpc_server",
668 skip_all,
669 err,
670 fields(
671 nickname = self.state.nickname(),
672 chain_id = ?request.get_ref().chain_id()
673 )
674 )]
675 async fn download_pending_blob(
676 &self,
677 request: Request<PendingBlobRequest>,
678 ) -> Result<Response<PendingBlobResult>, Status> {
679 let start = Instant::now();
680 let (chain_id, blob_id) = request.into_inner().try_into()?;
681 trace!(?chain_id, ?blob_id, "Download pending blob");
682 match self
683 .state
684 .clone()
685 .download_pending_blob(chain_id, blob_id)
686 .await
687 {
688 Ok(blob) => {
689 Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
690 Ok(Response::new(blob.into_content().try_into()?))
691 }
692 Err(error) => {
693 Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
694 let nickname = self.state.nickname();
695 error!(nickname, %error, "Failed to download pending blob");
696 Ok(Response::new(NodeError::from(error).try_into()?))
697 }
698 }
699 }
700
701 #[instrument(
702 target = "grpc_server",
703 skip_all,
704 err,
705 fields(
706 nickname = self.state.nickname(),
707 chain_id = ?request.get_ref().chain_id
708 )
709 )]
710 async fn handle_pending_blob(
711 &self,
712 request: Request<HandlePendingBlobRequest>,
713 ) -> Result<Response<ChainInfoResult>, Status> {
714 let start = Instant::now();
715 let (chain_id, blob_content) = request.into_inner().try_into()?;
716 let blob = Blob::new(blob_content);
717 let blob_id = blob.id();
718 trace!(?chain_id, ?blob_id, "Handle pending blob");
719 match self.state.clone().handle_pending_blob(chain_id, blob).await {
720 Ok(info) => {
721 Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
722 Ok(Response::new(info.try_into()?))
723 }
724 Err(error) => {
725 Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
726 let nickname = self.state.nickname();
727 error!(nickname, %error, "Failed to handle pending blob");
728 Ok(Response::new(NodeError::from(error).try_into()?))
729 }
730 }
731 }
732
733 #[instrument(
734 target = "grpc_server",
735 skip_all,
736 err,
737 fields(
738 nickname = self.state.nickname(),
739 chain_id = ?request.get_ref().chain_id()
740 )
741 )]
742 async fn handle_cross_chain_request(
743 &self,
744 request: Request<CrossChainRequest>,
745 ) -> Result<Response<()>, Status> {
746 let start = Instant::now();
747 let request = request.into_inner().try_into()?;
748 trace!(?request, "Handling cross-chain request");
749 match self.state.clone().handle_cross_chain_request(request).await {
750 Ok(actions) => {
751 Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
752 self.handle_network_actions(actions)
753 }
754 Err(error) => {
755 Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
756 let nickname = self.state.nickname();
757 error!(nickname, %error, "Failed to handle cross-chain request");
758 }
759 }
760 Ok(Response::new(()))
761 }
762}
763
764pub trait GrpcProxyable {
767 fn chain_id(&self) -> Option<ChainId>;
768}
769
770impl GrpcProxyable for BlockProposal {
771 fn chain_id(&self) -> Option<ChainId> {
772 self.chain_id.clone()?.try_into().ok()
773 }
774}
775
776impl GrpcProxyable for LiteCertificate {
777 fn chain_id(&self) -> Option<ChainId> {
778 self.chain_id.clone()?.try_into().ok()
779 }
780}
781
782impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
783 fn chain_id(&self) -> Option<ChainId> {
784 self.chain_id.clone()?.try_into().ok()
785 }
786}
787
788impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
789 fn chain_id(&self) -> Option<ChainId> {
790 self.chain_id.clone()?.try_into().ok()
791 }
792}
793
794impl GrpcProxyable for api::HandleValidatedCertificateRequest {
795 fn chain_id(&self) -> Option<ChainId> {
796 self.chain_id.clone()?.try_into().ok()
797 }
798}
799
800impl GrpcProxyable for ChainInfoQuery {
801 fn chain_id(&self) -> Option<ChainId> {
802 self.chain_id.clone()?.try_into().ok()
803 }
804}
805
806impl GrpcProxyable for PendingBlobRequest {
807 fn chain_id(&self) -> Option<ChainId> {
808 self.chain_id.clone()?.try_into().ok()
809 }
810}
811
812impl GrpcProxyable for HandlePendingBlobRequest {
813 fn chain_id(&self) -> Option<ChainId> {
814 self.chain_id.clone()?.try_into().ok()
815 }
816}
817
818impl GrpcProxyable for CrossChainRequest {
819 fn chain_id(&self) -> Option<ChainId> {
820 use super::api::cross_chain_request::Inner;
821
822 match self.inner.as_ref()? {
823 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
824 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
825 recipient.clone()?.try_into().ok()
826 }
827 }
828 }
829}