linera_rpc/simple/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10    data_types::CrossChainRequest,
11    node::NodeError,
12    worker::{NetworkActions, WorkerError, WorkerState},
13    JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22    config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23    cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29    S: Storage,
30{
31    network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32    host: String,
33    port: u16,
34    state: WorkerState<S>,
35    shard_id: ShardId,
36    cross_chain_config: CrossChainConfig,
37    // Stats
38    packets_processed: u64,
39    user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44    S: Storage,
45{
46    pub fn new(
47        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48        host: String,
49        port: u16,
50        state: WorkerState<S>,
51        shard_id: ShardId,
52        cross_chain_config: CrossChainConfig,
53    ) -> Self {
54        Self {
55            network,
56            host,
57            port,
58            state,
59            shard_id,
60            cross_chain_config,
61            packets_processed: 0,
62            user_errors: 0,
63        }
64    }
65
66    pub fn packets_processed(&self) -> u64 {
67        self.packets_processed
68    }
69
70    pub fn user_errors(&self) -> u64 {
71        self.user_errors
72    }
73}
74
75impl<S> Server<S>
76where
77    S: Storage + Clone + Send + Sync + 'static,
78{
79    #[expect(clippy::too_many_arguments)]
80    async fn forward_cross_chain_queries(
81        nickname: String,
82        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83        cross_chain_max_retries: u32,
84        cross_chain_retry_delay: Duration,
85        cross_chain_max_backoff: Duration,
86        cross_chain_sender_delay: Duration,
87        cross_chain_sender_failure_rate: f32,
88        this_shard: ShardId,
89        receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
90    ) {
91        let pool = Arc::new(Mutex::new(
92            network
93                .protocol
94                .make_outgoing_connection_pool()
95                .await
96                .expect("Initialization should not fail"),
97        ));
98        let handle_request = move |shard_id, request| {
99            let pool = pool.clone();
100            let shard = network.shard(shard_id);
101            let remote_address = format!("{}:{}", shard.host, shard.port);
102            let message = RpcMessage::CrossChainRequest(Box::new(request));
103            async move {
104                pool.lock()
105                    .await
106                    .send_message_to(message.clone(), &remote_address)
107                    .await?;
108                anyhow::Result::<_, anyhow::Error>::Ok(())
109            }
110        };
111        cross_chain_message_queue::forward_cross_chain_queries(
112            nickname,
113            cross_chain_max_retries,
114            cross_chain_retry_delay,
115            cross_chain_max_backoff,
116            cross_chain_sender_delay,
117            cross_chain_sender_failure_rate,
118            this_shard,
119            receiver,
120            handle_request,
121        )
122        .await;
123    }
124
125    pub fn spawn(
126        self,
127        shutdown_signal: CancellationToken,
128        join_set: &mut JoinSet<()>,
129    ) -> ServerHandle {
130        info!(
131            "Listening to {:?} traffic on {}:{}",
132            self.network.protocol, self.host, self.port
133        );
134        let address = (self.host.clone(), self.port);
135
136        let (cross_chain_sender, cross_chain_receiver) =
137            mpsc::channel(self.cross_chain_config.queue_size);
138
139        join_set.spawn_task(Self::forward_cross_chain_queries(
140            self.state.nickname().to_string(),
141            self.network.clone(),
142            self.cross_chain_config.max_retries,
143            Duration::from_millis(self.cross_chain_config.retry_delay_ms),
144            Duration::from_millis(self.cross_chain_config.max_backoff_ms),
145            Duration::from_millis(self.cross_chain_config.sender_delay_ms),
146            self.cross_chain_config.sender_failure_rate,
147            self.shard_id,
148            cross_chain_receiver,
149        ));
150
151        let protocol = self.network.protocol;
152        let state = RunningServerState {
153            server: self,
154            cross_chain_sender,
155        };
156        // Launch server for the appropriate protocol.
157        protocol.spawn_server(address, state, shutdown_signal, join_set)
158    }
159}
160
161#[derive(Clone)]
162struct RunningServerState<S>
163where
164    S: Storage,
165{
166    server: Server<S>,
167    cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
168}
169
170#[async_trait]
171impl<S> MessageHandler for RunningServerState<S>
172where
173    S: Storage + Clone + Send + Sync + 'static,
174{
175    #[instrument(
176        target = "simple_server",
177        skip_all,
178        fields(
179            nickname = self.server.state.nickname(),
180            chain_id = ?message.target_chain_id()
181        )
182    )]
183    async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
184        let reply = match message {
185            RpcMessage::BlockProposal(message) => {
186                match self.server.state.handle_block_proposal(*message).await {
187                    Ok((info, actions)) => {
188                        // Cross-shard requests
189                        self.handle_network_actions(actions);
190                        // Response
191                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
192                    }
193                    Err(error) => {
194                        self.log_error(&error, "Failed to handle block proposal");
195                        Err(error.into())
196                    }
197                }
198            }
199            RpcMessage::LiteCertificate(request) => {
200                let (sender, receiver) = request
201                    .wait_for_outgoing_messages
202                    .then(oneshot::channel)
203                    .unzip();
204                match Box::pin(
205                    self.server
206                        .state
207                        .handle_lite_certificate(request.certificate, sender),
208                )
209                .await
210                {
211                    Ok((info, actions)) => {
212                        // Cross-shard requests
213                        self.handle_network_actions(actions);
214                        if let Some(receiver) = receiver {
215                            if let Err(e) = receiver.await {
216                                error!("Failed to wait for message delivery: {e}");
217                            }
218                        }
219                        // Response
220                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
221                    }
222                    Err(error) => {
223                        let nickname = self.server.state.nickname();
224                        if let WorkerError::MissingCertificateValue = &error {
225                            debug!(nickname, %error, "Failed to handle lite certificate");
226                        } else {
227                            error!(nickname, %error, "Failed to handle lite certificate");
228                        }
229                        Err(error.into())
230                    }
231                }
232            }
233            RpcMessage::TimeoutCertificate(request) => {
234                match self
235                    .server
236                    .state
237                    .handle_timeout_certificate(request.certificate)
238                    .await
239                {
240                    Ok((info, actions)) => {
241                        // Cross-shard requests
242                        self.handle_network_actions(actions);
243                        // Response
244                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
245                    }
246                    Err(error) => {
247                        self.log_error(&error, "Failed to handle timeout certificate");
248                        Err(error.into())
249                    }
250                }
251            }
252            RpcMessage::ValidatedCertificate(request) => {
253                match self
254                    .server
255                    .state
256                    .handle_validated_certificate(request.certificate)
257                    .await
258                {
259                    Ok((info, actions)) => {
260                        // Cross-shard requests
261                        self.handle_network_actions(actions);
262                        // Response
263                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
264                    }
265                    Err(error) => {
266                        self.log_error(&error, "Failed to handle validated certificate");
267                        Err(error.into())
268                    }
269                }
270            }
271            RpcMessage::ConfirmedCertificate(request) => {
272                let (sender, receiver) = request
273                    .wait_for_outgoing_messages
274                    .then(oneshot::channel)
275                    .unzip();
276                match self
277                    .server
278                    .state
279                    .handle_confirmed_certificate(request.certificate, sender)
280                    .await
281                {
282                    Ok((info, actions)) => {
283                        // Cross-shard requests
284                        self.handle_network_actions(actions);
285                        if let Some(receiver) = receiver {
286                            if let Err(e) = receiver.await {
287                                error!("Failed to wait for message delivery: {e}");
288                            }
289                        }
290                        // Response
291                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
292                    }
293                    Err(error) => {
294                        self.log_error(&error, "Failed to handle confirmed certificate");
295                        Err(error.into())
296                    }
297                }
298            }
299            RpcMessage::ChainInfoQuery(message) => {
300                match self.server.state.handle_chain_info_query(*message).await {
301                    Ok((info, actions)) => {
302                        // Cross-shard requests
303                        self.handle_network_actions(actions);
304                        // Response
305                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
306                    }
307                    Err(error) => {
308                        self.log_error(&error, "Failed to handle chain info query");
309                        Err(error.into())
310                    }
311                }
312            }
313            RpcMessage::CrossChainRequest(request) => {
314                match self.server.state.handle_cross_chain_request(*request).await {
315                    Ok(actions) => {
316                        self.handle_network_actions(actions);
317                    }
318                    Err(error) => {
319                        self.log_error(&error, "Failed to handle cross-chain request");
320                    }
321                }
322                // No user to respond to.
323                Ok(None)
324            }
325            RpcMessage::DownloadPendingBlob(request) => {
326                let (chain_id, blob_id) = *request;
327                match self
328                    .server
329                    .state
330                    .download_pending_blob(chain_id, blob_id)
331                    .await
332                {
333                    Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
334                        blob.into(),
335                    )))),
336                    Err(error) => {
337                        self.log_error(&error, "Failed to handle pending blob request");
338                        Err(error.into())
339                    }
340                }
341            }
342            RpcMessage::HandlePendingBlob(request) => {
343                let (chain_id, blob_content) = *request;
344                match self
345                    .server
346                    .state
347                    .handle_pending_blob(chain_id, Blob::new(blob_content))
348                    .await
349                {
350                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
351                    Err(error) => {
352                        self.log_error(&error, "Failed to handle pending blob");
353                        Err(error.into())
354                    }
355                }
356            }
357
358            RpcMessage::VersionInfoQuery => {
359                Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
360            }
361
362            RpcMessage::Vote(_)
363            | RpcMessage::Error(_)
364            | RpcMessage::ChainInfoResponse(_)
365            | RpcMessage::VersionInfoResponse(_)
366            | RpcMessage::NetworkDescriptionQuery
367            | RpcMessage::NetworkDescriptionResponse(_)
368            | RpcMessage::ShardInfoQuery(_)
369            | RpcMessage::ShardInfoResponse(_)
370            | RpcMessage::DownloadBlob(_)
371            | RpcMessage::DownloadBlobResponse(_)
372            | RpcMessage::DownloadPendingBlobResponse(_)
373            | RpcMessage::DownloadConfirmedBlock(_)
374            | RpcMessage::DownloadConfirmedBlockResponse(_)
375            | RpcMessage::BlobLastUsedBy(_)
376            | RpcMessage::BlobLastUsedByResponse(_)
377            | RpcMessage::BlobLastUsedByCertificate(_)
378            | RpcMessage::BlobLastUsedByCertificateResponse(_)
379            | RpcMessage::MissingBlobIds(_)
380            | RpcMessage::MissingBlobIdsResponse(_)
381            | RpcMessage::DownloadCertificates(_)
382            | RpcMessage::DownloadCertificatesResponse(_)
383            | RpcMessage::UploadBlob(_)
384            | RpcMessage::UploadBlobResponse(_)
385            | RpcMessage::DownloadCertificatesByHeights(_, _)
386            | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
387                Err(NodeError::UnexpectedMessage)
388            }
389        };
390
391        self.server.packets_processed += 1;
392        // We allow this because `is_multiple_of` is still unstable in our MSRV.
393        #[allow(unknown_lints)]
394        #[allow(clippy::manual_is_multiple_of)]
395        if self.server.packets_processed % 5000 == 0 {
396            debug!(
397                "[{}] {}:{} (shard {}) has processed {} packets",
398                self.server.state.nickname(),
399                self.server.host,
400                self.server.port,
401                self.server.shard_id,
402                self.server.packets_processed
403            );
404        }
405
406        match reply {
407            Ok(x) => x,
408            Err(error) => {
409                // TODO(#459): Make it a warning or an error again.
410                debug!(
411                    "[{}] User query failed: {}",
412                    self.server.state.nickname(),
413                    error
414                );
415                self.server.user_errors += 1;
416                Some(error.into())
417            }
418        }
419    }
420}
421
422impl<S> RunningServerState<S>
423where
424    S: Storage + Send,
425{
426    fn handle_network_actions(&mut self, actions: NetworkActions) {
427        for request in actions.cross_chain_requests {
428            let shard_id = self.server.network.get_shard_id(request.target_chain_id());
429            debug!(
430                "[{}] Scheduling cross-chain query: {} -> {}",
431                self.server.state.nickname(),
432                self.server.shard_id,
433                shard_id
434            );
435            if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
436                error!(%error, "dropping cross-chain request");
437                break;
438            }
439        }
440    }
441
442    fn log_error(&self, error: &WorkerError, context: &str) {
443        let nickname = self.server.state.nickname();
444        if error.is_local() {
445            error!(nickname, %error, "{}", context);
446        } else {
447            debug!(nickname, %error, "{}", context);
448        }
449    }
450}