linera_rpc/simple/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{pin::Pin, sync::Arc};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt as _};
8use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
9use linera_core::{
10    data_types::CrossChainRequest,
11    node::NodeError,
12    worker::{NetworkActions, Notification, WorkerError, WorkerState},
13    JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync, sync::oneshot, task::JoinSet};
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_util::sync::CancellationToken;
19use tracing::{debug, error, info, instrument};
20
21use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
22use crate::{
23    config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
24    cross_chain_message_queue, RpcMessage,
25};
26
27#[derive(Clone)]
28pub struct Server<S>
29where
30    S: Storage,
31{
32    network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
33    host: String,
34    port: u16,
35    state: WorkerState<S>,
36    shard_id: ShardId,
37    cross_chain_config: CrossChainConfig,
38    // Stats
39    packets_processed: u64,
40    user_errors: u64,
41}
42
43impl<S> Server<S>
44where
45    S: Storage,
46{
47    pub fn new(
48        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
49        host: String,
50        port: u16,
51        state: WorkerState<S>,
52        shard_id: ShardId,
53        cross_chain_config: CrossChainConfig,
54    ) -> Self {
55        Self {
56            network,
57            host,
58            port,
59            state,
60            shard_id,
61            cross_chain_config,
62            packets_processed: 0,
63            user_errors: 0,
64        }
65    }
66
67    pub fn packets_processed(&self) -> u64 {
68        self.packets_processed
69    }
70
71    pub fn user_errors(&self) -> u64 {
72        self.user_errors
73    }
74}
75
76impl<S> Server<S>
77where
78    S: Storage + Clone + Send + Sync + 'static,
79{
80    #[expect(clippy::too_many_arguments)]
81    async fn forward_cross_chain_queries(
82        nickname: String,
83        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
84        cross_chain_max_retries: u32,
85        cross_chain_retry_delay: Duration,
86        cross_chain_max_backoff: Duration,
87        cross_chain_sender_delay: Duration,
88        cross_chain_sender_failure_rate: f32,
89        this_shard: ShardId,
90        receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
91    ) {
92        let pool = Arc::new(Mutex::new(
93            network
94                .protocol
95                .make_outgoing_connection_pool()
96                .await
97                .expect("Initialization should not fail"),
98        ));
99        let handle_request = move |shard_id, request| {
100            let pool = pool.clone();
101            let shard = network.shard(shard_id);
102            let remote_address = format!("{}:{}", shard.host, shard.port);
103            let message = RpcMessage::CrossChainRequest(Box::new(request));
104            async move {
105                pool.lock()
106                    .await
107                    .send_message_to(message.clone(), &remote_address)
108                    .await?;
109                anyhow::Result::<_, anyhow::Error>::Ok(())
110            }
111        };
112        cross_chain_message_queue::forward_cross_chain_queries(
113            nickname,
114            cross_chain_max_retries,
115            cross_chain_retry_delay,
116            cross_chain_max_backoff,
117            cross_chain_sender_delay,
118            cross_chain_sender_failure_rate,
119            this_shard,
120            receiver,
121            handle_request,
122        )
123        .await;
124    }
125
126    pub fn spawn(
127        mut self,
128        shutdown_signal: CancellationToken,
129        join_set: &mut JoinSet<()>,
130    ) -> ServerHandle {
131        info!(
132            "Listening to {:?} traffic on {}:{}",
133            self.network.protocol, self.host, self.port
134        );
135        let address = (self.host.clone(), self.port);
136
137        let (cross_chain_sender, cross_chain_receiver) =
138            mpsc::channel(self.cross_chain_config.queue_size);
139
140        let (notification_sender, _) = sync::broadcast::channel(1000);
141
142        // Give the worker a shard-routing sender for cross-chain requests generated
143        // outside the normal `NetworkActions` return path (specifically, the
144        // `RevertConfirm`s emitted after resetting a corrupted chain).
145        {
146            let routing_network = self.network.clone();
147            let routing_sender = cross_chain_sender.clone();
148            self.state = self
149                .state
150                .clone()
151                .with_outbound_cross_chain_sender(Arc::new(move |request| {
152                    let shard_id = routing_network.get_shard_id(request.target_chain_id());
153                    if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
154                        tracing::error!(%error, "dropping cross-chain request");
155                    }
156                }));
157        }
158
159        join_set.spawn_task(Self::forward_cross_chain_queries(
160            self.state.nickname().to_string(),
161            self.network.clone(),
162            self.cross_chain_config.max_retries,
163            Duration::from_millis(self.cross_chain_config.retry_delay_ms),
164            Duration::from_millis(self.cross_chain_config.max_backoff_ms),
165            Duration::from_millis(self.cross_chain_config.sender_delay_ms),
166            self.cross_chain_config.sender_failure_rate,
167            self.shard_id,
168            cross_chain_receiver,
169        ));
170
171        let protocol = self.network.protocol;
172        let state = RunningServerState {
173            server: self,
174            cross_chain_sender,
175            notification_sender,
176        };
177        // Launch server for the appropriate protocol.
178        protocol.spawn_server(address, state, shutdown_signal, join_set)
179    }
180}
181
182#[derive(Clone)]
183struct RunningServerState<S>
184where
185    S: Storage,
186{
187    server: Server<S>,
188    cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
189    notification_sender: sync::broadcast::Sender<Notification>,
190}
191
192#[async_trait]
193impl<S> MessageHandler for RunningServerState<S>
194where
195    S: Storage + Clone + Send + Sync + 'static,
196{
197    #[instrument(
198        target = "simple_server",
199        skip_all,
200        fields(
201            nickname = self.server.state.nickname(),
202            chain_id = ?message.target_chain_id()
203        )
204    )]
205    async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
206        let reply = match message {
207            RpcMessage::BlockProposal(message) => {
208                match self.server.state.handle_block_proposal(*message).await {
209                    Ok((info, actions)) => {
210                        // Cross-shard requests
211                        self.handle_network_actions(actions);
212                        // Response
213                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
214                    }
215                    Err(error) => {
216                        self.log_error(&error, "Failed to handle block proposal");
217                        Err(error.into())
218                    }
219                }
220            }
221            RpcMessage::LiteCertificate(request) => {
222                let (sender, receiver) = request
223                    .wait_for_outgoing_messages
224                    .then(oneshot::channel)
225                    .unzip();
226                match Box::pin(
227                    self.server
228                        .state
229                        .handle_lite_certificate(request.certificate, sender),
230                )
231                .await
232                {
233                    Ok((info, actions)) => {
234                        // Cross-shard requests
235                        self.handle_network_actions(actions);
236                        if let Some(receiver) = receiver {
237                            if let Err(e) = receiver.await {
238                                error!("Failed to wait for message delivery: {e}");
239                            }
240                        }
241                        // Response
242                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
243                    }
244                    Err(error) => {
245                        let nickname = self.server.state.nickname();
246                        if let WorkerError::MissingCertificateValue = &error {
247                            debug!(nickname, %error, "Failed to handle lite certificate");
248                        } else {
249                            error!(nickname, %error, "Failed to handle lite certificate");
250                        }
251                        Err(error.into())
252                    }
253                }
254            }
255            RpcMessage::TimeoutCertificate(request) => {
256                match self
257                    .server
258                    .state
259                    .handle_timeout_certificate(request.certificate)
260                    .await
261                {
262                    Ok((info, actions)) => {
263                        // Cross-shard requests
264                        self.handle_network_actions(actions);
265                        // Response
266                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
267                    }
268                    Err(error) => {
269                        self.log_error(&error, "Failed to handle timeout certificate");
270                        Err(error.into())
271                    }
272                }
273            }
274            RpcMessage::ValidatedCertificate(request) => {
275                match self
276                    .server
277                    .state
278                    .handle_validated_certificate(request.certificate)
279                    .await
280                {
281                    Ok((info, actions)) => {
282                        // Cross-shard requests
283                        self.handle_network_actions(actions);
284                        // Response
285                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
286                    }
287                    Err(error) => {
288                        self.log_error(&error, "Failed to handle validated certificate");
289                        Err(error.into())
290                    }
291                }
292            }
293            RpcMessage::ConfirmedCertificate(request) => {
294                let (sender, receiver) = request
295                    .wait_for_outgoing_messages
296                    .then(oneshot::channel)
297                    .unzip();
298                match self
299                    .server
300                    .state
301                    .handle_confirmed_certificate(request.certificate, sender)
302                    .await
303                {
304                    Ok((info, actions)) => {
305                        // Cross-shard requests
306                        self.handle_network_actions(actions);
307                        if let Some(receiver) = receiver {
308                            if let Err(e) = receiver.await {
309                                error!("Failed to wait for message delivery: {e}");
310                            }
311                        }
312                        // Response
313                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
314                    }
315                    Err(error) => {
316                        self.log_error(&error, "Failed to handle confirmed certificate");
317                        Err(error.into())
318                    }
319                }
320            }
321            RpcMessage::ChainInfoQuery(message) => {
322                match self.server.state.handle_chain_info_query(*message).await {
323                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
324                    Err(error) => {
325                        self.log_error(&error, "Failed to handle chain info query");
326                        Err(error.into())
327                    }
328                }
329            }
330            RpcMessage::CrossChainRequest(request) => {
331                match self.server.state.handle_cross_chain_request(*request).await {
332                    Ok(actions) => {
333                        self.handle_network_actions(actions);
334                    }
335                    Err(error) => {
336                        self.log_error(&error, "Failed to handle cross-chain request");
337                    }
338                }
339                // No user to respond to.
340                Ok(None)
341            }
342            RpcMessage::DownloadPendingBlob(request) => {
343                let (chain_id, blob_id) = *request;
344                match self
345                    .server
346                    .state
347                    .download_pending_blob(chain_id, blob_id)
348                    .await
349                {
350                    Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
351                        blob.content().clone(),
352                    )))),
353                    Err(error) => {
354                        self.log_error(&error, "Failed to handle pending blob request");
355                        Err(error.into())
356                    }
357                }
358            }
359            RpcMessage::HandlePendingBlob(request) => {
360                let (chain_id, blob_content) = *request;
361                match self
362                    .server
363                    .state
364                    .handle_pending_blob(chain_id, Blob::new(blob_content))
365                    .await
366                {
367                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
368                    Err(error) => {
369                        self.log_error(&error, "Failed to handle pending blob");
370                        Err(error.into())
371                    }
372                }
373            }
374
375            RpcMessage::VersionInfoQuery => {
376                Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
377            }
378
379            RpcMessage::SubscribeNotifications(_) | RpcMessage::Notification(_) => {
380                // Subscriptions are handled at the transport level, not here.
381                Err(NodeError::UnexpectedMessage)
382            }
383
384            RpcMessage::Vote(_)
385            | RpcMessage::Error(_)
386            | RpcMessage::ChainInfoResponse(_)
387            | RpcMessage::VersionInfoResponse(_)
388            | RpcMessage::NetworkDescriptionQuery
389            | RpcMessage::NetworkDescriptionResponse(_)
390            | RpcMessage::ShardInfoQuery(_)
391            | RpcMessage::ShardInfoResponse(_)
392            | RpcMessage::DownloadBlob(_)
393            | RpcMessage::DownloadBlobs(_)
394            | RpcMessage::DownloadBlobResponse(_)
395            | RpcMessage::DownloadPendingBlobResponse(_)
396            | RpcMessage::DownloadConfirmedBlock(_)
397            | RpcMessage::DownloadConfirmedBlockResponse(_)
398            | RpcMessage::BlobLastUsedBy(_)
399            | RpcMessage::BlobLastUsedByResponse(_)
400            | RpcMessage::BlobLastUsedByCertificate(_)
401            | RpcMessage::BlobLastUsedByCertificateResponse(_)
402            | RpcMessage::MissingBlobIds(_)
403            | RpcMessage::MissingBlobIdsResponse(_)
404            | RpcMessage::EventBlockHeights(_)
405            | RpcMessage::EventBlockHeightsResponse(_)
406            | RpcMessage::DownloadCertificates(_)
407            | RpcMessage::DownloadCertificatesResponse(_)
408            | RpcMessage::UploadBlob(_)
409            | RpcMessage::UploadBlobResponse(_)
410            | RpcMessage::DownloadCertificatesByHeights(_, _)
411            | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
412                Err(NodeError::UnexpectedMessage)
413            }
414        };
415
416        self.server.packets_processed += 1;
417        // We allow this because `is_multiple_of` is still unstable in our MSRV.
418        #[allow(unknown_lints)]
419        #[expect(clippy::manual_is_multiple_of)]
420        if self.server.packets_processed % 5000 == 0 {
421            debug!(
422                "[{}] {}:{} (shard {}) has processed {} packets",
423                self.server.state.nickname(),
424                self.server.host,
425                self.server.port,
426                self.server.shard_id,
427                self.server.packets_processed
428            );
429        }
430
431        match reply {
432            Ok(x) => x,
433            Err(error) => {
434                // TODO(#459): Make it a warning or an error again.
435                debug!(
436                    "[{}] User query failed: {}",
437                    self.server.state.nickname(),
438                    error
439                );
440                self.server.user_errors += 1;
441                Some(error.into())
442            }
443        }
444    }
445
446    async fn handle_subscribe(
447        &mut self,
448        chains: Vec<ChainId>,
449    ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
450        RunningServerState::subscribe_to_notifications(self, chains).await
451    }
452}
453
454impl<S> RunningServerState<S>
455where
456    S: Storage + Clone + Send + Sync + 'static,
457{
458    async fn subscribe_to_notifications(
459        &self,
460        chains: Vec<ChainId>,
461    ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
462        let receiver = self.notification_sender.subscribe();
463        let stream = BroadcastStream::new(receiver).filter_map(move |result| {
464            let chains = chains.clone();
465            async move {
466                match result {
467                    Ok(notification) if chains.contains(&notification.chain_id) => {
468                        Some(RpcMessage::Notification(Box::new(notification)))
469                    }
470                    _ => None,
471                }
472            }
473        });
474        Some(Box::pin(stream))
475    }
476}
477
478impl<S> RunningServerState<S>
479where
480    S: Storage + Send,
481{
482    fn handle_network_actions(&mut self, actions: NetworkActions) {
483        for request in actions.cross_chain_requests {
484            let shard_id = self.server.network.get_shard_id(request.target_chain_id());
485            debug!(
486                "[{}] Scheduling cross-chain query: {} -> {}",
487                self.server.state.nickname(),
488                self.server.shard_id,
489                shard_id
490            );
491            if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
492                error!(%error, "dropping cross-chain request");
493                break;
494            }
495        }
496        for notification in actions.notifications {
497            debug!("Scheduling notification query");
498            if let Err(error) = self.notification_sender.send(notification) {
499                debug!(%error, "dropping notification (no receivers)");
500            }
501        }
502    }
503
504    fn log_error(&self, error: &WorkerError, context: &str) {
505        let nickname = self.server.state.nickname();
506        if error.is_local() {
507            error!(nickname, %error, "{}", context);
508        } else {
509            debug!(nickname, %error, "{}", context);
510        }
511    }
512}