linera_rpc/simple/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10    data_types::CrossChainRequest,
11    node::NodeError,
12    worker::{NetworkActions, WorkerError, WorkerState},
13    JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22    config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23    cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29    S: Storage,
30{
31    network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32    host: String,
33    port: u16,
34    state: WorkerState<S>,
35    shard_id: ShardId,
36    cross_chain_config: CrossChainConfig,
37    // Stats
38    packets_processed: u64,
39    user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44    S: Storage,
45{
46    pub fn new(
47        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48        host: String,
49        port: u16,
50        state: WorkerState<S>,
51        shard_id: ShardId,
52        cross_chain_config: CrossChainConfig,
53    ) -> Self {
54        Self {
55            network,
56            host,
57            port,
58            state,
59            shard_id,
60            cross_chain_config,
61            packets_processed: 0,
62            user_errors: 0,
63        }
64    }
65
66    pub fn packets_processed(&self) -> u64 {
67        self.packets_processed
68    }
69
70    pub fn user_errors(&self) -> u64 {
71        self.user_errors
72    }
73}
74
75impl<S> Server<S>
76where
77    S: Storage + Clone + Send + Sync + 'static,
78{
79    #[expect(clippy::too_many_arguments)]
80    async fn forward_cross_chain_queries(
81        nickname: String,
82        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83        cross_chain_max_retries: u32,
84        cross_chain_retry_delay: Duration,
85        cross_chain_sender_delay: Duration,
86        cross_chain_sender_failure_rate: f32,
87        this_shard: ShardId,
88        receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
89    ) {
90        let pool = Arc::new(Mutex::new(
91            network
92                .protocol
93                .make_outgoing_connection_pool()
94                .await
95                .expect("Initialization should not fail"),
96        ));
97        let handle_request = move |shard_id, request| {
98            let pool = pool.clone();
99            let shard = network.shard(shard_id);
100            let remote_address = format!("{}:{}", shard.host, shard.port);
101            let message = RpcMessage::CrossChainRequest(Box::new(request));
102            async move {
103                pool.lock()
104                    .await
105                    .send_message_to(message.clone(), &remote_address)
106                    .await?;
107                anyhow::Result::<_, anyhow::Error>::Ok(())
108            }
109        };
110        cross_chain_message_queue::forward_cross_chain_queries(
111            nickname,
112            cross_chain_max_retries,
113            cross_chain_retry_delay,
114            cross_chain_sender_delay,
115            cross_chain_sender_failure_rate,
116            this_shard,
117            receiver,
118            handle_request,
119        )
120        .await;
121    }
122
123    pub fn spawn(
124        self,
125        shutdown_signal: CancellationToken,
126        join_set: &mut JoinSet<()>,
127    ) -> ServerHandle {
128        info!(
129            "Listening to {:?} traffic on {}:{}",
130            self.network.protocol, self.host, self.port
131        );
132        let address = (self.host.clone(), self.port);
133
134        let (cross_chain_sender, cross_chain_receiver) =
135            mpsc::channel(self.cross_chain_config.queue_size);
136
137        join_set.spawn_task(Self::forward_cross_chain_queries(
138            self.state.nickname().to_string(),
139            self.network.clone(),
140            self.cross_chain_config.max_retries,
141            Duration::from_millis(self.cross_chain_config.retry_delay_ms),
142            Duration::from_millis(self.cross_chain_config.sender_delay_ms),
143            self.cross_chain_config.sender_failure_rate,
144            self.shard_id,
145            cross_chain_receiver,
146        ));
147
148        let protocol = self.network.protocol;
149        let state = RunningServerState {
150            server: self,
151            cross_chain_sender,
152        };
153        // Launch server for the appropriate protocol.
154        protocol.spawn_server(address, state, shutdown_signal, join_set)
155    }
156}
157
158#[derive(Clone)]
159struct RunningServerState<S>
160where
161    S: Storage,
162{
163    server: Server<S>,
164    cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
165}
166
167#[async_trait]
168impl<S> MessageHandler for RunningServerState<S>
169where
170    S: Storage + Clone + Send + Sync + 'static,
171{
172    #[instrument(
173        target = "simple_server",
174        skip_all,
175        fields(
176            nickname = self.server.state.nickname(),
177            chain_id = ?message.target_chain_id()
178        )
179    )]
180    async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
181        let reply = match message {
182            RpcMessage::BlockProposal(message) => {
183                match self.server.state.handle_block_proposal(*message).await {
184                    Ok((info, actions)) => {
185                        // Cross-shard requests
186                        self.handle_network_actions(actions);
187                        // Response
188                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
189                    }
190                    Err(error) => {
191                        let nickname = self.server.state.nickname();
192                        warn!(nickname, %error, "Failed to handle block proposal");
193                        Err(error.into())
194                    }
195                }
196            }
197            RpcMessage::LiteCertificate(request) => {
198                let (sender, receiver) = request
199                    .wait_for_outgoing_messages
200                    .then(oneshot::channel)
201                    .unzip();
202                match Box::pin(
203                    self.server
204                        .state
205                        .handle_lite_certificate(request.certificate, sender),
206                )
207                .await
208                {
209                    Ok((info, actions)) => {
210                        // Cross-shard requests
211                        self.handle_network_actions(actions);
212                        if let Some(receiver) = receiver {
213                            if let Err(e) = receiver.await {
214                                error!("Failed to wait for message delivery: {e}");
215                            }
216                        }
217                        // Response
218                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
219                    }
220                    Err(error) => {
221                        let nickname = self.server.state.nickname();
222                        if let WorkerError::MissingCertificateValue = &error {
223                            debug!(nickname, %error, "Failed to handle lite certificate");
224                        } else {
225                            error!(nickname, %error, "Failed to handle lite certificate");
226                        }
227                        Err(error.into())
228                    }
229                }
230            }
231            RpcMessage::TimeoutCertificate(request) => {
232                match self
233                    .server
234                    .state
235                    .handle_timeout_certificate(request.certificate)
236                    .await
237                {
238                    Ok((info, actions)) => {
239                        // Cross-shard requests
240                        self.handle_network_actions(actions);
241                        // Response
242                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
243                    }
244                    Err(error) => {
245                        let nickname = self.server.state.nickname();
246                        error!(nickname, %error, "Failed to handle timeout certificate");
247                        Err(error.into())
248                    }
249                }
250            }
251            RpcMessage::ValidatedCertificate(request) => {
252                match self
253                    .server
254                    .state
255                    .handle_validated_certificate(request.certificate)
256                    .await
257                {
258                    Ok((info, actions)) => {
259                        // Cross-shard requests
260                        self.handle_network_actions(actions);
261                        // Response
262                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
263                    }
264                    Err(error) => {
265                        error!(
266                            nickname = self.server.state.nickname(), %error,
267                            "Failed to handle validated certificate"
268                        );
269                        Err(error.into())
270                    }
271                }
272            }
273            RpcMessage::ConfirmedCertificate(request) => {
274                let (sender, receiver) = request
275                    .wait_for_outgoing_messages
276                    .then(oneshot::channel)
277                    .unzip();
278                match self
279                    .server
280                    .state
281                    .handle_confirmed_certificate(request.certificate, sender)
282                    .await
283                {
284                    Ok((info, actions)) => {
285                        // Cross-shard requests
286                        self.handle_network_actions(actions);
287                        if let Some(receiver) = receiver {
288                            if let Err(e) = receiver.await {
289                                error!("Failed to wait for message delivery: {e}");
290                            }
291                        }
292                        // Response
293                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
294                    }
295                    Err(error) => {
296                        let nickname = self.server.state.nickname();
297                        error!(nickname, %error, "Failed to handle confirmed certificate");
298                        Err(error.into())
299                    }
300                }
301            }
302            RpcMessage::ChainInfoQuery(message) => {
303                match self.server.state.handle_chain_info_query(*message).await {
304                    Ok((info, actions)) => {
305                        // Cross-shard requests
306                        self.handle_network_actions(actions);
307                        // Response
308                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
309                    }
310                    Err(error) => {
311                        let nickname = self.server.state.nickname();
312                        error!(nickname, %error, "Failed to handle chain info query");
313                        Err(error.into())
314                    }
315                }
316            }
317            RpcMessage::CrossChainRequest(request) => {
318                match self.server.state.handle_cross_chain_request(*request).await {
319                    Ok(actions) => {
320                        self.handle_network_actions(actions);
321                    }
322                    Err(error) => {
323                        let nickname = self.server.state.nickname();
324                        error!(nickname, %error, "Failed to handle cross-chain request");
325                    }
326                }
327                // No user to respond to.
328                Ok(None)
329            }
330            RpcMessage::DownloadPendingBlob(request) => {
331                let (chain_id, blob_id) = *request;
332                match self
333                    .server
334                    .state
335                    .download_pending_blob(chain_id, blob_id)
336                    .await
337                {
338                    Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
339                        blob.into(),
340                    )))),
341                    Err(error) => {
342                        let nickname = self.server.state.nickname();
343                        error!(nickname, %error, "Failed to handle pending blob request");
344                        Err(error.into())
345                    }
346                }
347            }
348            RpcMessage::HandlePendingBlob(request) => {
349                let (chain_id, blob_content) = *request;
350                match self
351                    .server
352                    .state
353                    .handle_pending_blob(chain_id, Blob::new(blob_content))
354                    .await
355                {
356                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
357                    Err(error) => {
358                        let nickname = self.server.state.nickname();
359                        error!(nickname, %error, "Failed to handle pending blob");
360                        Err(error.into())
361                    }
362                }
363            }
364
365            RpcMessage::VersionInfoQuery => {
366                Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
367            }
368
369            RpcMessage::Vote(_)
370            | RpcMessage::Error(_)
371            | RpcMessage::ChainInfoResponse(_)
372            | RpcMessage::VersionInfoResponse(_)
373            | RpcMessage::NetworkDescriptionQuery
374            | RpcMessage::NetworkDescriptionResponse(_)
375            | RpcMessage::DownloadBlob(_)
376            | RpcMessage::DownloadBlobResponse(_)
377            | RpcMessage::DownloadPendingBlobResponse(_)
378            | RpcMessage::DownloadConfirmedBlock(_)
379            | RpcMessage::DownloadConfirmedBlockResponse(_)
380            | RpcMessage::BlobLastUsedBy(_)
381            | RpcMessage::BlobLastUsedByResponse(_)
382            | RpcMessage::MissingBlobIds(_)
383            | RpcMessage::MissingBlobIdsResponse(_)
384            | RpcMessage::DownloadCertificates(_)
385            | RpcMessage::DownloadCertificatesResponse(_)
386            | RpcMessage::UploadBlob(_)
387            | RpcMessage::UploadBlobResponse(_) => Err(NodeError::UnexpectedMessage),
388        };
389
390        self.server.packets_processed += 1;
391        if self.server.packets_processed % 5000 == 0 {
392            debug!(
393                "[{}] {}:{} (shard {}) has processed {} packets",
394                self.server.state.nickname(),
395                self.server.host,
396                self.server.port,
397                self.server.shard_id,
398                self.server.packets_processed
399            );
400        }
401
402        match reply {
403            Ok(x) => x,
404            Err(error) => {
405                // TODO(#459): Make it a warning or an error again.
406                debug!(
407                    "[{}] User query failed: {}",
408                    self.server.state.nickname(),
409                    error
410                );
411                self.server.user_errors += 1;
412                Some(error.into())
413            }
414        }
415    }
416}
417
418impl<S> RunningServerState<S>
419where
420    S: Storage + Send,
421{
422    fn handle_network_actions(&mut self, actions: NetworkActions) {
423        for request in actions.cross_chain_requests {
424            let shard_id = self.server.network.get_shard_id(request.target_chain_id());
425            debug!(
426                "[{}] Scheduling cross-chain query: {} -> {}",
427                self.server.state.nickname(),
428                self.server.shard_id,
429                shard_id
430            );
431            if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
432                error!(%error, "dropping cross-chain request");
433                break;
434            }
435        }
436    }
437}