1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12 data_types::Blob,
13 identifiers::ChainId,
14 time::{Duration, Instant},
15};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerState},
20 JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56 };
57 use prometheus::{HistogramVec, IntCounterVec};
58
59 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60 register_histogram_vec(
61 "server_request_latency",
62 "Server request latency",
63 &[],
64 linear_bucket_interval(1.0, 25.0, 2000.0),
65 )
66 });
67
68 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("server_request_count", "Server request count", &[])
70 });
71
72 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73 register_int_counter_vec(
74 "server_request_success",
75 "Server request success",
76 &["method_name"],
77 )
78 });
79
80 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81 register_int_counter_vec(
82 "server_request_error",
83 "Server request error",
84 &["method_name"],
85 )
86 });
87
88 pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89 LazyLock::new(|| {
90 register_histogram_vec(
91 "server_request_latency_per_request_type",
92 "Server request latency per request type",
93 &["method_name"],
94 linear_bucket_interval(1.0, 25.0, 2000.0),
95 )
96 });
97
98 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99 register_int_counter_vec(
100 "cross_chain_message_channel_full",
101 "Cross-chain message channel full",
102 &[],
103 )
104 });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110 S: Storage,
111{
112 state: WorkerState<S>,
113 shard_id: ShardId,
114 network: ValidatorInternalNetworkConfig,
115 cross_chain_sender: CrossChainSender,
116 notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120 handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124 pub async fn join(self) -> Result<(), GrpcError> {
125 self.handle.await?
126 }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134 service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140 fn layer(&self, service: S) -> Self::Service {
141 GrpcPrometheusMetricsMiddlewareService { service }
142 }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147 S::Future: Send + 'static,
148 S: Service<Req> + std::marker::Send,
149{
150 type Response = S::Response;
151 type Error = S::Error;
152 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 self.service.poll_ready(cx)
156 }
157
158 fn call(&mut self, request: Req) -> Self::Future {
159 #[cfg(with_metrics)]
160 let start = Instant::now();
161 let future = self.service.call(request);
162 async move {
163 let response = future.await?;
164 #[cfg(with_metrics)]
165 {
166 metrics::SERVER_REQUEST_LATENCY
167 .with_label_values(&[])
168 .observe(start.elapsed().as_secs_f64() * 1000.0);
169 metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170 }
171 Ok(response)
172 }
173 .boxed()
174 }
175}
176
177impl<S> GrpcServer<S>
178where
179 S: Storage + Clone + Send + Sync + 'static,
180{
181 #[expect(clippy::too_many_arguments)]
182 pub fn spawn(
183 host: String,
184 port: u16,
185 state: WorkerState<S>,
186 shard_id: ShardId,
187 internal_network: ValidatorInternalNetworkConfig,
188 cross_chain_config: CrossChainConfig,
189 notification_config: NotificationConfig,
190 shutdown_signal: CancellationToken,
191 join_set: &mut JoinSet,
192 ) -> GrpcServerHandle {
193 info!(
194 "spawning gRPC server on {}:{} for shard {}",
195 host, port, shard_id
196 );
197
198 let (cross_chain_sender, cross_chain_receiver) =
199 mpsc::channel(cross_chain_config.queue_size);
200
201 let (notification_sender, _) =
202 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204 join_set.spawn_task({
205 info!(
206 nickname = state.nickname(),
207 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208 );
209 Self::forward_cross_chain_queries(
210 state.nickname().to_string(),
211 internal_network.clone(),
212 cross_chain_config.max_retries,
213 Duration::from_millis(cross_chain_config.retry_delay_ms),
214 Duration::from_millis(cross_chain_config.sender_delay_ms),
215 cross_chain_config.sender_failure_rate,
216 shard_id,
217 cross_chain_receiver,
218 )
219 });
220
221 let mut exporter_forwarded = false;
222 for proxy in &internal_network.proxies {
223 let receiver = notification_sender.subscribe();
224 join_set.spawn_task({
225 info!(
226 nickname = state.nickname(),
227 "spawning notifications thread on {} for shard {}", host, shard_id
228 );
229 let exporter_addresses = if exporter_forwarded {
230 vec![]
231 } else {
232 exporter_forwarded = true;
233 internal_network.exporter_addresses()
234 };
235 Self::forward_notifications(
236 state.nickname().to_string(),
237 proxy.internal_address(&internal_network.protocol),
238 exporter_addresses,
239 receiver,
240 )
241 });
242 }
243
244 let (health_reporter, health_service) = tonic_health::server::health_reporter();
245
246 let grpc_server = GrpcServer {
247 state,
248 shard_id,
249 network: internal_network,
250 cross_chain_sender,
251 notification_sender,
252 };
253
254 let worker_node = ValidatorWorkerServer::new(grpc_server)
255 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
256 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
257
258 let handle = join_set.spawn_task(async move {
259 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
260
261 let reflection_service = tonic_reflection::server::Builder::configure()
262 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
263 .build_v1()?;
264
265 health_reporter
266 .set_serving::<ValidatorWorkerServer<Self>>()
267 .await;
268
269 tonic::transport::Server::builder()
270 .layer(
271 ServiceBuilder::new()
272 .layer(GrpcPrometheusMetricsMiddlewareLayer)
273 .into_inner(),
274 )
275 .add_service(health_service)
276 .add_service(reflection_service)
277 .add_service(worker_node)
278 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
279 .await?;
280
281 Ok(())
282 });
283
284 GrpcServerHandle { handle }
285 }
286
287 #[instrument(skip(receiver))]
290 async fn forward_notifications(
291 nickname: String,
292 proxy_address: String,
293 exporter_addresses: Vec<String>,
294 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
295 ) {
296 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
297 .expect("Proxy URI should be valid")
298 .connect_lazy();
299 let mut client = NotifierServiceClient::new(channel)
300 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
302
303 let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
304 .iter()
305 .map(|address| {
306 let channel = tonic::transport::Channel::from_shared(address.clone())
307 .expect("Exporter URI should be valid")
308 .connect_lazy();
309 NotifierServiceClient::new(channel)
310 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
311 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
312 })
313 .collect::<Vec<_>>();
314
315 loop {
316 let notification = match receiver.recv().await {
317 Ok(notification) => notification,
318 Err(RecvError::Lagged(skipped_count)) => {
319 warn!(
320 nickname,
321 skipped_count, "notification receiver lagged, messages were skipped"
322 );
323 continue;
324 }
325 Err(RecvError::Closed) => {
326 warn!(
327 nickname,
328 "notification channel closed, exiting forwarding loop"
329 );
330 break;
331 }
332 };
333
334 let reason = ¬ification.reason;
335 let chain_id = notification.chain_id;
336 let notification: api::Notification = match notification.clone().try_into() {
337 Ok(notification) => notification,
338 Err(error) => {
339 warn!(%error, nickname, "could not deserialize notification");
340 continue;
341 }
342 };
343 let request = tonic::Request::new(notification.clone());
344 if let Err(error) = client.notify(request).await {
345 error!(
346 %error,
347 nickname,
348 ?chain_id,
349 ?reason,
350 "proxy: could not send notification",
351 )
352 }
353
354 if let Reason::NewBlock { height: _, hash: _ } = reason {
355 for exporter_client in &mut exporter_clients {
356 let request = tonic::Request::new(notification.clone());
357 if let Err(error) = exporter_client.notify(request).await {
358 error!(
359 %error,
360 nickname,
361 ?chain_id,
362 ?reason,
363 "block exporter: could not send notification",
364 )
365 }
366 }
367 }
368 }
369 }
370
371 fn handle_network_actions(&self, actions: NetworkActions) {
372 let mut cross_chain_sender = self.cross_chain_sender.clone();
373 let notification_sender = self.notification_sender.clone();
374
375 for request in actions.cross_chain_requests {
376 let shard_id = self.network.get_shard_id(request.target_chain_id());
377 trace!(
378 source_shard_id = self.shard_id,
379 target_shard_id = shard_id,
380 "Scheduling cross-chain query",
381 );
382
383 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
384 error!(%error, "dropping cross-chain request");
385 #[cfg(with_metrics)]
386 if error.is_full() {
387 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
388 .with_label_values(&[])
389 .inc();
390 }
391 }
392 }
393
394 for notification in actions.notifications {
395 trace!("Scheduling notification query");
396 if let Err(error) = notification_sender.send(notification) {
397 error!(%error, "dropping notification");
398 break;
399 }
400 }
401 }
402
403 #[instrument(skip_all, fields(nickname, %this_shard))]
404 #[expect(clippy::too_many_arguments)]
405 async fn forward_cross_chain_queries(
406 nickname: String,
407 network: ValidatorInternalNetworkConfig,
408 cross_chain_max_retries: u32,
409 cross_chain_retry_delay: Duration,
410 cross_chain_sender_delay: Duration,
411 cross_chain_sender_failure_rate: f32,
412 this_shard: ShardId,
413 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
414 ) {
415 let pool = GrpcConnectionPool::default();
416 let handle_request =
417 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
418 let channel_result = pool.channel(network.shard(shard_id).http_address());
419 async move {
420 let mut client = ValidatorWorkerClient::new(channel_result?)
421 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
422 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
423 client
424 .handle_cross_chain_request(Request::new(request.try_into()?))
425 .await?;
426 anyhow::Result::<_, anyhow::Error>::Ok(())
427 }
428 };
429 cross_chain_message_queue::forward_cross_chain_queries(
430 nickname,
431 cross_chain_max_retries,
432 cross_chain_retry_delay,
433 cross_chain_sender_delay,
434 cross_chain_sender_failure_rate,
435 this_shard,
436 receiver,
437 handle_request,
438 )
439 .await;
440 }
441
442 fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
443 #![cfg_attr(not(with_metrics), allow(unused_variables))]
444 #[cfg(with_metrics)]
445 {
446 metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
447 .with_label_values(&[method_name])
448 .observe(start.elapsed().as_secs_f64() * 1000.0);
449 if success {
450 metrics::SERVER_REQUEST_SUCCESS
451 .with_label_values(&[method_name])
452 .inc();
453 } else {
454 metrics::SERVER_REQUEST_ERROR
455 .with_label_values(&[method_name])
456 .inc();
457 }
458 }
459 }
460
461 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
462 let nickname = self.state.nickname();
463 if error.is_local() {
464 error!(nickname, %error, "{}", context);
465 } else {
466 debug!(nickname, %error, "{}", context);
467 }
468 }
469}
470
471#[tonic::async_trait]
472impl<S> ValidatorWorkerRpc for GrpcServer<S>
473where
474 S: Storage + Clone + Send + Sync + 'static,
475{
476 #[instrument(
477 target = "grpc_server",
478 skip_all,
479 err,
480 fields(
481 nickname = self.state.nickname(),
482 chain_id = ?request.get_ref().chain_id()
483 )
484 )]
485 async fn handle_block_proposal(
486 &self,
487 request: Request<BlockProposal>,
488 ) -> Result<Response<ChainInfoResult>, Status> {
489 let start = Instant::now();
490 let proposal = request.into_inner().try_into()?;
491 trace!(?proposal, "Handling block proposal");
492 Ok(Response::new(
493 match self.state.clone().handle_block_proposal(proposal).await {
494 Ok((info, actions)) => {
495 Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
496 self.handle_network_actions(actions);
497 info.try_into()?
498 }
499 Err(error) => {
500 Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
501 self.log_error(&error, "Failed to handle block proposal");
502 NodeError::from(error).try_into()?
503 }
504 },
505 ))
506 }
507
508 #[instrument(
509 target = "grpc_server",
510 skip_all,
511 err,
512 fields(
513 nickname = self.state.nickname(),
514 chain_id = ?request.get_ref().chain_id()
515 )
516 )]
517 async fn handle_lite_certificate(
518 &self,
519 request: Request<LiteCertificate>,
520 ) -> Result<Response<ChainInfoResult>, Status> {
521 let start = Instant::now();
522 let HandleLiteCertRequest {
523 certificate,
524 wait_for_outgoing_messages,
525 } = request.into_inner().try_into()?;
526 trace!(?certificate, "Handling lite certificate");
527 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
528 match Box::pin(
529 self.state
530 .clone()
531 .handle_lite_certificate(certificate, sender),
532 )
533 .await
534 {
535 Ok((info, actions)) => {
536 Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
537 self.handle_network_actions(actions);
538 if let Some(receiver) = receiver {
539 if let Err(e) = receiver.await {
540 error!("Failed to wait for message delivery: {e}");
541 }
542 }
543 Ok(Response::new(info.try_into()?))
544 }
545 Err(error) => {
546 Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
547 self.log_error(&error, "Failed to handle lite certificate");
548 Ok(Response::new(NodeError::from(error).try_into()?))
549 }
550 }
551 }
552
553 #[instrument(
554 target = "grpc_server",
555 skip_all,
556 err,
557 fields(
558 nickname = self.state.nickname(),
559 chain_id = ?request.get_ref().chain_id()
560 )
561 )]
562 async fn handle_confirmed_certificate(
563 &self,
564 request: Request<api::HandleConfirmedCertificateRequest>,
565 ) -> Result<Response<ChainInfoResult>, Status> {
566 let start = Instant::now();
567 let HandleConfirmedCertificateRequest {
568 certificate,
569 wait_for_outgoing_messages,
570 } = request.into_inner().try_into()?;
571 trace!(?certificate, "Handling certificate");
572 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
573 match self
574 .state
575 .clone()
576 .handle_confirmed_certificate(certificate, sender)
577 .await
578 {
579 Ok((info, actions)) => {
580 Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
581 self.handle_network_actions(actions);
582 if let Some(receiver) = receiver {
583 if let Err(e) = receiver.await {
584 error!("Failed to wait for message delivery: {e}");
585 }
586 }
587 Ok(Response::new(info.try_into()?))
588 }
589 Err(error) => {
590 Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
591 self.log_error(&error, "Failed to handle confirmed certificate");
592 Ok(Response::new(NodeError::from(error).try_into()?))
593 }
594 }
595 }
596
597 #[instrument(
598 target = "grpc_server",
599 skip_all,
600 err,
601 fields(
602 nickname = self.state.nickname(),
603 chain_id = ?request.get_ref().chain_id()
604 )
605 )]
606 async fn handle_validated_certificate(
607 &self,
608 request: Request<api::HandleValidatedCertificateRequest>,
609 ) -> Result<Response<ChainInfoResult>, Status> {
610 let start = Instant::now();
611 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
612 trace!(?certificate, "Handling certificate");
613 match self
614 .state
615 .clone()
616 .handle_validated_certificate(certificate)
617 .await
618 {
619 Ok((info, actions)) => {
620 Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
621 self.handle_network_actions(actions);
622 Ok(Response::new(info.try_into()?))
623 }
624 Err(error) => {
625 Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
626 self.log_error(&error, "Failed to handle validated certificate");
627 Ok(Response::new(NodeError::from(error).try_into()?))
628 }
629 }
630 }
631
632 #[instrument(
633 target = "grpc_server",
634 skip_all,
635 err,
636 fields(
637 nickname = self.state.nickname(),
638 chain_id = ?request.get_ref().chain_id()
639 )
640 )]
641 async fn handle_timeout_certificate(
642 &self,
643 request: Request<api::HandleTimeoutCertificateRequest>,
644 ) -> Result<Response<ChainInfoResult>, Status> {
645 let start = Instant::now();
646 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
647 trace!(?certificate, "Handling Timeout certificate");
648 match self
649 .state
650 .clone()
651 .handle_timeout_certificate(certificate)
652 .await
653 {
654 Ok((info, _actions)) => {
655 Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
656 Ok(Response::new(info.try_into()?))
657 }
658 Err(error) => {
659 Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
660 self.log_error(&error, "Failed to handle timeout certificate");
661 Ok(Response::new(NodeError::from(error).try_into()?))
662 }
663 }
664 }
665
666 #[instrument(
667 target = "grpc_server",
668 skip_all,
669 err,
670 fields(
671 nickname = self.state.nickname(),
672 chain_id = ?request.get_ref().chain_id()
673 )
674 )]
675 async fn handle_chain_info_query(
676 &self,
677 request: Request<ChainInfoQuery>,
678 ) -> Result<Response<ChainInfoResult>, Status> {
679 let start = Instant::now();
680 let query = request.into_inner().try_into()?;
681 trace!(?query, "Handling chain info query");
682 match self.state.clone().handle_chain_info_query(query).await {
683 Ok((info, actions)) => {
684 Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
685 self.handle_network_actions(actions);
686 Ok(Response::new(info.try_into()?))
687 }
688 Err(error) => {
689 Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
690 self.log_error(&error, "Failed to handle chain info query");
691 Ok(Response::new(NodeError::from(error).try_into()?))
692 }
693 }
694 }
695
696 #[instrument(
697 target = "grpc_server",
698 skip_all,
699 err,
700 fields(
701 nickname = self.state.nickname(),
702 chain_id = ?request.get_ref().chain_id()
703 )
704 )]
705 async fn download_pending_blob(
706 &self,
707 request: Request<PendingBlobRequest>,
708 ) -> Result<Response<PendingBlobResult>, Status> {
709 let start = Instant::now();
710 let (chain_id, blob_id) = request.into_inner().try_into()?;
711 trace!(?chain_id, ?blob_id, "Download pending blob");
712 match self
713 .state
714 .clone()
715 .download_pending_blob(chain_id, blob_id)
716 .await
717 {
718 Ok(blob) => {
719 Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
720 Ok(Response::new(blob.into_content().try_into()?))
721 }
722 Err(error) => {
723 Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
724 self.log_error(&error, "Failed to download pending blob");
725 Ok(Response::new(NodeError::from(error).try_into()?))
726 }
727 }
728 }
729
730 #[instrument(
731 target = "grpc_server",
732 skip_all,
733 err,
734 fields(
735 nickname = self.state.nickname(),
736 chain_id = ?request.get_ref().chain_id
737 )
738 )]
739 async fn handle_pending_blob(
740 &self,
741 request: Request<HandlePendingBlobRequest>,
742 ) -> Result<Response<ChainInfoResult>, Status> {
743 let start = Instant::now();
744 let (chain_id, blob_content) = request.into_inner().try_into()?;
745 let blob = Blob::new(blob_content);
746 let blob_id = blob.id();
747 trace!(?chain_id, ?blob_id, "Handle pending blob");
748 match self.state.clone().handle_pending_blob(chain_id, blob).await {
749 Ok(info) => {
750 Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
751 Ok(Response::new(info.try_into()?))
752 }
753 Err(error) => {
754 Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
755 self.log_error(&error, "Failed to handle pending blob");
756 Ok(Response::new(NodeError::from(error).try_into()?))
757 }
758 }
759 }
760
761 #[instrument(
762 target = "grpc_server",
763 skip_all,
764 err,
765 fields(
766 nickname = self.state.nickname(),
767 chain_id = ?request.get_ref().chain_id()
768 )
769 )]
770 async fn handle_cross_chain_request(
771 &self,
772 request: Request<CrossChainRequest>,
773 ) -> Result<Response<()>, Status> {
774 let start = Instant::now();
775 let request = request.into_inner().try_into()?;
776 trace!(?request, "Handling cross-chain request");
777 match self.state.clone().handle_cross_chain_request(request).await {
778 Ok(actions) => {
779 Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
780 self.handle_network_actions(actions)
781 }
782 Err(error) => {
783 Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
784 let nickname = self.state.nickname();
785 error!(nickname, %error, "Failed to handle cross-chain request");
786 }
787 }
788 Ok(Response::new(()))
789 }
790}
791
792pub trait GrpcProxyable {
795 fn chain_id(&self) -> Option<ChainId>;
796}
797
798impl GrpcProxyable for BlockProposal {
799 fn chain_id(&self) -> Option<ChainId> {
800 self.chain_id.clone()?.try_into().ok()
801 }
802}
803
804impl GrpcProxyable for LiteCertificate {
805 fn chain_id(&self) -> Option<ChainId> {
806 self.chain_id.clone()?.try_into().ok()
807 }
808}
809
810impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
811 fn chain_id(&self) -> Option<ChainId> {
812 self.chain_id.clone()?.try_into().ok()
813 }
814}
815
816impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
817 fn chain_id(&self) -> Option<ChainId> {
818 self.chain_id.clone()?.try_into().ok()
819 }
820}
821
822impl GrpcProxyable for api::HandleValidatedCertificateRequest {
823 fn chain_id(&self) -> Option<ChainId> {
824 self.chain_id.clone()?.try_into().ok()
825 }
826}
827
828impl GrpcProxyable for ChainInfoQuery {
829 fn chain_id(&self) -> Option<ChainId> {
830 self.chain_id.clone()?.try_into().ok()
831 }
832}
833
834impl GrpcProxyable for PendingBlobRequest {
835 fn chain_id(&self) -> Option<ChainId> {
836 self.chain_id.clone()?.try_into().ok()
837 }
838}
839
840impl GrpcProxyable for HandlePendingBlobRequest {
841 fn chain_id(&self) -> Option<ChainId> {
842 self.chain_id.clone()?.try_into().ok()
843 }
844}
845
846impl GrpcProxyable for CrossChainRequest {
847 fn chain_id(&self) -> Option<ChainId> {
848 use super::api::cross_chain_request::Inner;
849
850 match self.inner.as_ref()? {
851 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
852 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
853 recipient.clone()?.try_into().ok()
854 }
855 }
856 }
857}