Skip to main content

linera_rpc/simple/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{pin::Pin, sync::Arc};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt as _};
8use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
9use linera_core::{
10    data_types::CrossChainRequest,
11    node::NodeError,
12    worker::{NetworkActions, Notification, WorkerError, WorkerState},
13    JoinSetExt as _, ProcessConfirmedBlockMode,
14};
15use linera_storage::Storage;
16use tokio::{sync, sync::oneshot, task::JoinSet};
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_util::sync::CancellationToken;
19use tracing::{debug, error, info, instrument};
20
21use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
22use crate::{
23    config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
24    cross_chain_message_queue, RpcMessage,
25};
26
27#[derive(Clone)]
28pub struct Server<S>
29where
30    S: Storage,
31{
32    network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
33    host: String,
34    port: u16,
35    state: WorkerState<S>,
36    shard_id: ShardId,
37    cross_chain_config: CrossChainConfig,
38    // Stats
39    packets_processed: u64,
40    user_errors: u64,
41}
42
43impl<S> Server<S>
44where
45    S: Storage,
46{
47    pub fn new(
48        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
49        host: String,
50        port: u16,
51        state: WorkerState<S>,
52        shard_id: ShardId,
53        cross_chain_config: CrossChainConfig,
54    ) -> Self {
55        Self {
56            network,
57            host,
58            port,
59            state,
60            shard_id,
61            cross_chain_config,
62            packets_processed: 0,
63            user_errors: 0,
64        }
65    }
66
67    pub fn packets_processed(&self) -> u64 {
68        self.packets_processed
69    }
70
71    pub fn user_errors(&self) -> u64 {
72        self.user_errors
73    }
74}
75
76impl<S> Server<S>
77where
78    S: Storage + Clone + Send + Sync + 'static,
79{
80    #[expect(clippy::too_many_arguments)]
81    async fn forward_cross_chain_queries(
82        nickname: String,
83        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
84        cross_chain_max_retries: u32,
85        cross_chain_retry_delay: Duration,
86        cross_chain_max_backoff: Duration,
87        cross_chain_sender_delay: Duration,
88        cross_chain_sender_failure_rate: f32,
89        this_shard: ShardId,
90        receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
91    ) {
92        let pool = Arc::new(Mutex::new(
93            network
94                .protocol
95                .make_outgoing_connection_pool()
96                .await
97                .expect("Initialization should not fail"),
98        ));
99        let handle_request = move |shard_id, request| {
100            let pool = pool.clone();
101            let shard = network.shard(shard_id);
102            let remote_address = format!("{}:{}", shard.host, shard.port);
103            let message = RpcMessage::CrossChainRequest(Box::new(request));
104            async move {
105                pool.lock()
106                    .await
107                    .send_message_to(message.clone(), &remote_address)
108                    .await?;
109                anyhow::Result::<_, anyhow::Error>::Ok(())
110            }
111        };
112        cross_chain_message_queue::forward_cross_chain_queries(
113            nickname,
114            cross_chain_max_retries,
115            cross_chain_retry_delay,
116            cross_chain_max_backoff,
117            cross_chain_sender_delay,
118            cross_chain_sender_failure_rate,
119            this_shard,
120            receiver,
121            handle_request,
122        )
123        .await;
124    }
125
126    pub fn spawn(
127        mut self,
128        shutdown_signal: CancellationToken,
129        join_set: &mut JoinSet<()>,
130    ) -> ServerHandle {
131        info!(
132            "Listening to {:?} traffic on {}:{}",
133            self.network.protocol, self.host, self.port
134        );
135        let address = (self.host.clone(), self.port);
136
137        let (cross_chain_sender, cross_chain_receiver) =
138            mpsc::channel(self.cross_chain_config.queue_size);
139
140        let (notification_sender, _) = sync::broadcast::channel(1000);
141
142        // Give the worker a shard-routing sender for cross-chain requests generated
143        // outside the normal `NetworkActions` return path (specifically, the
144        // `RevertConfirm`s emitted after resetting a corrupted chain).
145        {
146            let routing_network = self.network.clone();
147            let routing_sender = cross_chain_sender.clone();
148            self.state = self
149                .state
150                .clone()
151                .with_outbound_cross_chain_sender(Arc::new(move |request| {
152                    let shard_id = routing_network.get_shard_id(request.target_chain_id());
153                    if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
154                        tracing::error!(%error, "dropping cross-chain request");
155                    }
156                }));
157        }
158
159        join_set.spawn_task(Self::forward_cross_chain_queries(
160            self.state.nickname().to_string(),
161            self.network.clone(),
162            self.cross_chain_config.max_retries,
163            Duration::from_millis(self.cross_chain_config.retry_delay_ms),
164            Duration::from_millis(self.cross_chain_config.max_backoff_ms),
165            Duration::from_millis(self.cross_chain_config.sender_delay_ms),
166            self.cross_chain_config.sender_failure_rate,
167            self.shard_id,
168            cross_chain_receiver,
169        ));
170
171        let protocol = self.network.protocol;
172        let state = RunningServerState {
173            server: self,
174            cross_chain_sender,
175            notification_sender,
176        };
177        // Launch server for the appropriate protocol.
178        protocol.spawn_server(address, state, shutdown_signal, join_set)
179    }
180}
181
182#[derive(Clone)]
183struct RunningServerState<S>
184where
185    S: Storage,
186{
187    server: Server<S>,
188    cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
189    notification_sender: sync::broadcast::Sender<Notification>,
190}
191
192#[async_trait]
193impl<S> MessageHandler for RunningServerState<S>
194where
195    S: Storage + Clone + Send + Sync + 'static,
196{
197    #[instrument(
198        target = "simple_server",
199        skip_all,
200        fields(
201            nickname = self.server.state.nickname(),
202            chain_id = ?message.target_chain_id()
203        )
204    )]
205    async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
206        let reply = match message {
207            RpcMessage::BlockProposal(message) => {
208                let (result, actions) = self.server.state.handle_block_proposal(*message).await;
209                // Dispatch actions whether or not the proposal was accepted: a
210                // rejected proposal can still advance the manager's `current_round`
211                // (via `update_signed_proposal` on the `HasIncompatibleConfirmedVote`
212                // recovery path), and subscribers need the resulting `NewRound`
213                // notification.
214                self.handle_network_actions(actions);
215                match result {
216                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
217                    Err(error) => {
218                        self.log_error(&error, "Failed to handle block proposal");
219                        Err(error.into())
220                    }
221                }
222            }
223            RpcMessage::LiteCertificate(request) => {
224                let (sender, receiver) = request
225                    .wait_for_outgoing_messages
226                    .then(oneshot::channel)
227                    .unzip();
228                match Box::pin(
229                    self.server
230                        .state
231                        .handle_lite_certificate(request.certificate, sender),
232                )
233                .await
234                {
235                    Ok((info, actions)) => {
236                        // Cross-shard requests
237                        self.handle_network_actions(actions);
238                        if let Some(receiver) = receiver {
239                            if let Err(e) = receiver.await {
240                                error!("Failed to wait for message delivery: {e}");
241                            }
242                        }
243                        // Response
244                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
245                    }
246                    Err(error) => {
247                        let nickname = self.server.state.nickname();
248                        if let WorkerError::MissingCertificateValue = &error {
249                            debug!(nickname, %error, "Failed to handle lite certificate");
250                        } else {
251                            error!(nickname, %error, "Failed to handle lite certificate");
252                        }
253                        Err(error.into())
254                    }
255                }
256            }
257            RpcMessage::TimeoutCertificate(request) => {
258                match self
259                    .server
260                    .state
261                    .handle_timeout_certificate(request.certificate)
262                    .await
263                {
264                    Ok((info, actions)) => {
265                        // Cross-shard requests
266                        self.handle_network_actions(actions);
267                        // Response
268                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
269                    }
270                    Err(error) => {
271                        self.log_error(&error, "Failed to handle timeout certificate");
272                        Err(error.into())
273                    }
274                }
275            }
276            RpcMessage::ValidatedCertificate(request) => {
277                match self
278                    .server
279                    .state
280                    .handle_validated_certificate(request.certificate)
281                    .await
282                {
283                    Ok((info, actions)) => {
284                        // Cross-shard requests
285                        self.handle_network_actions(actions);
286                        // Response
287                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
288                    }
289                    Err(error) => {
290                        self.log_error(&error, "Failed to handle validated certificate");
291                        Err(error.into())
292                    }
293                }
294            }
295            RpcMessage::ConfirmedCertificate(request) => {
296                let (sender, receiver) = request
297                    .wait_for_outgoing_messages
298                    .then(oneshot::channel)
299                    .unzip();
300                match self
301                    .server
302                    .state
303                    .handle_confirmed_certificate(
304                        request.certificate,
305                        ProcessConfirmedBlockMode::Auto,
306                        sender,
307                    )
308                    .await
309                {
310                    Ok((info, actions)) => {
311                        // Cross-shard requests
312                        self.handle_network_actions(actions);
313                        if let Some(receiver) = receiver {
314                            if let Err(e) = receiver.await {
315                                error!("Failed to wait for message delivery: {e}");
316                            }
317                        }
318                        // Response
319                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
320                    }
321                    Err(error) => {
322                        self.log_error(&error, "Failed to handle confirmed certificate");
323                        Err(error.into())
324                    }
325                }
326            }
327            RpcMessage::ChainInfoQuery(message) => {
328                match self.server.state.handle_chain_info_query(*message).await {
329                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
330                    Err(error) => {
331                        self.log_error(&error, "Failed to handle chain info query");
332                        Err(error.into())
333                    }
334                }
335            }
336            RpcMessage::CrossChainRequest(request) => {
337                match self.server.state.handle_cross_chain_request(*request).await {
338                    Ok(actions) => {
339                        self.handle_network_actions(actions);
340                    }
341                    Err(error) => {
342                        self.log_error(&error, "Failed to handle cross-chain request");
343                    }
344                }
345                // No user to respond to.
346                Ok(None)
347            }
348            RpcMessage::DownloadPendingBlob(request) => {
349                let (chain_id, blob_id) = *request;
350                match self
351                    .server
352                    .state
353                    .download_pending_blob(chain_id, blob_id)
354                    .await
355                {
356                    Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
357                        blob.content().clone(),
358                    )))),
359                    Err(error) => {
360                        self.log_error(&error, "Failed to handle pending blob request");
361                        Err(error.into())
362                    }
363                }
364            }
365            RpcMessage::HandlePendingBlob(request) => {
366                let (chain_id, blob_content) = *request;
367                match self
368                    .server
369                    .state
370                    .handle_pending_blob(chain_id, Blob::new(blob_content))
371                    .await
372                {
373                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
374                    Err(error) => {
375                        self.log_error(&error, "Failed to handle pending blob");
376                        Err(error.into())
377                    }
378                }
379            }
380
381            RpcMessage::VersionInfoQuery => {
382                Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
383            }
384
385            RpcMessage::SubscribeNotifications(_) | RpcMessage::Notification(_) => {
386                // Subscriptions are handled at the transport level, not here.
387                Err(NodeError::UnexpectedMessage)
388            }
389
390            RpcMessage::Vote(_)
391            | RpcMessage::Error(_)
392            | RpcMessage::ChainInfoResponse(_)
393            | RpcMessage::VersionInfoResponse(_)
394            | RpcMessage::NetworkDescriptionQuery
395            | RpcMessage::NetworkDescriptionResponse(_)
396            | RpcMessage::ShardInfoQuery(_)
397            | RpcMessage::ShardInfoResponse(_)
398            | RpcMessage::DownloadBlob(_)
399            | RpcMessage::DownloadBlobs(_)
400            | RpcMessage::DownloadBlobResponse(_)
401            | RpcMessage::DownloadPendingBlobResponse(_)
402            | RpcMessage::DownloadConfirmedBlock(_)
403            | RpcMessage::DownloadConfirmedBlockResponse(_)
404            | RpcMessage::BlobLastUsedBy(_)
405            | RpcMessage::BlobLastUsedByResponse(_)
406            | RpcMessage::BlobLastUsedByCertificate(_)
407            | RpcMessage::BlobLastUsedByCertificateResponse(_)
408            | RpcMessage::MissingBlobIds(_)
409            | RpcMessage::MissingBlobIdsResponse(_)
410            | RpcMessage::EventBlockHeights(_)
411            | RpcMessage::EventBlockHeightsResponse(_)
412            | RpcMessage::DownloadCertificates(_)
413            | RpcMessage::DownloadCertificatesResponse(_)
414            | RpcMessage::UploadBlob(_)
415            | RpcMessage::UploadBlobResponse(_)
416            | RpcMessage::DownloadCertificatesByHeights(_, _)
417            | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
418                Err(NodeError::UnexpectedMessage)
419            }
420        };
421
422        self.server.packets_processed += 1;
423        // We allow this because `is_multiple_of` is still unstable in our MSRV.
424        #[allow(unknown_lints)]
425        #[expect(clippy::manual_is_multiple_of)]
426        if self.server.packets_processed % 5000 == 0 {
427            debug!(
428                "[{}] {}:{} (shard {}) has processed {} packets",
429                self.server.state.nickname(),
430                self.server.host,
431                self.server.port,
432                self.server.shard_id,
433                self.server.packets_processed
434            );
435        }
436
437        match reply {
438            Ok(x) => x,
439            Err(error) => {
440                // TODO(#459): Make it a warning or an error again.
441                debug!(
442                    "[{}] User query failed: {}",
443                    self.server.state.nickname(),
444                    error
445                );
446                self.server.user_errors += 1;
447                Some(error.into())
448            }
449        }
450    }
451
452    async fn handle_subscribe(
453        &mut self,
454        chains: Vec<ChainId>,
455    ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
456        RunningServerState::subscribe_to_notifications(self, chains).await
457    }
458}
459
460impl<S> RunningServerState<S>
461where
462    S: Storage + Clone + Send + Sync + 'static,
463{
464    async fn subscribe_to_notifications(
465        &self,
466        chains: Vec<ChainId>,
467    ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
468        let receiver = self.notification_sender.subscribe();
469        let stream = BroadcastStream::new(receiver).filter_map(move |result| {
470            let chains = chains.clone();
471            async move {
472                match result {
473                    Ok(notification) if chains.contains(&notification.chain_id) => {
474                        Some(RpcMessage::Notification(Box::new(notification)))
475                    }
476                    _ => None,
477                }
478            }
479        });
480        Some(Box::pin(stream))
481    }
482}
483
484impl<S> RunningServerState<S>
485where
486    S: Storage + Send,
487{
488    fn handle_network_actions(&mut self, actions: NetworkActions) {
489        for request in actions.cross_chain_requests {
490            let shard_id = self.server.network.get_shard_id(request.target_chain_id());
491            debug!(
492                "[{}] Scheduling cross-chain query: {} -> {}",
493                self.server.state.nickname(),
494                self.server.shard_id,
495                shard_id
496            );
497            if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
498                error!(%error, "dropping cross-chain request");
499                break;
500            }
501        }
502        for notification in actions.notifications {
503            debug!("Scheduling notification query");
504            if let Err(error) = self.notification_sender.send(notification) {
505                debug!(%error, "dropping notification (no receivers)");
506            }
507        }
508    }
509
510    fn log_error(&self, error: &WorkerError, context: &str) {
511        let nickname = self.server.state.nickname();
512        if error.is_local() {
513            error!(nickname, %error, "{}", context);
514        } else {
515            debug!(nickname, %error, "{}", context);
516        }
517    }
518}