linera_rpc/simple/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10    data_types::CrossChainRequest,
11    node::NodeError,
12    worker::{NetworkActions, WorkerError, WorkerState},
13    JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22    config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23    cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29    S: Storage,
30{
31    network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32    host: String,
33    port: u16,
34    state: WorkerState<S>,
35    shard_id: ShardId,
36    cross_chain_config: CrossChainConfig,
37    // Stats
38    packets_processed: u64,
39    user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44    S: Storage,
45{
46    pub fn new(
47        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48        host: String,
49        port: u16,
50        state: WorkerState<S>,
51        shard_id: ShardId,
52        cross_chain_config: CrossChainConfig,
53    ) -> Self {
54        Self {
55            network,
56            host,
57            port,
58            state,
59            shard_id,
60            cross_chain_config,
61            packets_processed: 0,
62            user_errors: 0,
63        }
64    }
65
66    pub fn packets_processed(&self) -> u64 {
67        self.packets_processed
68    }
69
70    pub fn user_errors(&self) -> u64 {
71        self.user_errors
72    }
73}
74
75impl<S> Server<S>
76where
77    S: Storage + Clone + Send + Sync + 'static,
78{
79    #[expect(clippy::too_many_arguments)]
80    async fn forward_cross_chain_queries(
81        nickname: String,
82        network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83        cross_chain_max_retries: u32,
84        cross_chain_retry_delay: Duration,
85        cross_chain_sender_delay: Duration,
86        cross_chain_sender_failure_rate: f32,
87        this_shard: ShardId,
88        receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
89    ) {
90        let pool = Arc::new(Mutex::new(
91            network
92                .protocol
93                .make_outgoing_connection_pool()
94                .await
95                .expect("Initialization should not fail"),
96        ));
97        let handle_request = move |shard_id, request| {
98            let pool = pool.clone();
99            let shard = network.shard(shard_id);
100            let remote_address = format!("{}:{}", shard.host, shard.port);
101            let message = RpcMessage::CrossChainRequest(Box::new(request));
102            async move {
103                pool.lock()
104                    .await
105                    .send_message_to(message.clone(), &remote_address)
106                    .await?;
107                anyhow::Result::<_, anyhow::Error>::Ok(())
108            }
109        };
110        cross_chain_message_queue::forward_cross_chain_queries(
111            nickname,
112            cross_chain_max_retries,
113            cross_chain_retry_delay,
114            cross_chain_sender_delay,
115            cross_chain_sender_failure_rate,
116            this_shard,
117            receiver,
118            handle_request,
119        )
120        .await;
121    }
122
123    pub fn spawn(
124        self,
125        shutdown_signal: CancellationToken,
126        join_set: &mut JoinSet<()>,
127    ) -> ServerHandle {
128        info!(
129            "Listening to {:?} traffic on {}:{}",
130            self.network.protocol, self.host, self.port
131        );
132        let address = (self.host.clone(), self.port);
133
134        let (cross_chain_sender, cross_chain_receiver) =
135            mpsc::channel(self.cross_chain_config.queue_size);
136
137        join_set.spawn_task(Self::forward_cross_chain_queries(
138            self.state.nickname().to_string(),
139            self.network.clone(),
140            self.cross_chain_config.max_retries,
141            Duration::from_millis(self.cross_chain_config.retry_delay_ms),
142            Duration::from_millis(self.cross_chain_config.sender_delay_ms),
143            self.cross_chain_config.sender_failure_rate,
144            self.shard_id,
145            cross_chain_receiver,
146        ));
147
148        let protocol = self.network.protocol;
149        let state = RunningServerState {
150            server: self,
151            cross_chain_sender,
152        };
153        // Launch server for the appropriate protocol.
154        protocol.spawn_server(address, state, shutdown_signal, join_set)
155    }
156}
157
158#[derive(Clone)]
159struct RunningServerState<S>
160where
161    S: Storage,
162{
163    server: Server<S>,
164    cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
165}
166
167#[async_trait]
168impl<S> MessageHandler for RunningServerState<S>
169where
170    S: Storage + Clone + Send + Sync + 'static,
171{
172    #[instrument(
173        target = "simple_server",
174        skip_all,
175        fields(
176            nickname = self.server.state.nickname(),
177            chain_id = ?message.target_chain_id()
178        )
179    )]
180    async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
181        let reply = match message {
182            RpcMessage::BlockProposal(message) => {
183                match self.server.state.handle_block_proposal(*message).await {
184                    Ok((info, actions)) => {
185                        // Cross-shard requests
186                        self.handle_network_actions(actions);
187                        // Response
188                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
189                    }
190                    Err(error) => {
191                        self.log_error(&error, "Failed to handle block proposal");
192                        Err(error.into())
193                    }
194                }
195            }
196            RpcMessage::LiteCertificate(request) => {
197                let (sender, receiver) = request
198                    .wait_for_outgoing_messages
199                    .then(oneshot::channel)
200                    .unzip();
201                match Box::pin(
202                    self.server
203                        .state
204                        .handle_lite_certificate(request.certificate, sender),
205                )
206                .await
207                {
208                    Ok((info, actions)) => {
209                        // Cross-shard requests
210                        self.handle_network_actions(actions);
211                        if let Some(receiver) = receiver {
212                            if let Err(e) = receiver.await {
213                                error!("Failed to wait for message delivery: {e}");
214                            }
215                        }
216                        // Response
217                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
218                    }
219                    Err(error) => {
220                        let nickname = self.server.state.nickname();
221                        if let WorkerError::MissingCertificateValue = &error {
222                            debug!(nickname, %error, "Failed to handle lite certificate");
223                        } else {
224                            error!(nickname, %error, "Failed to handle lite certificate");
225                        }
226                        Err(error.into())
227                    }
228                }
229            }
230            RpcMessage::TimeoutCertificate(request) => {
231                match self
232                    .server
233                    .state
234                    .handle_timeout_certificate(request.certificate)
235                    .await
236                {
237                    Ok((info, actions)) => {
238                        // Cross-shard requests
239                        self.handle_network_actions(actions);
240                        // Response
241                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
242                    }
243                    Err(error) => {
244                        self.log_error(&error, "Failed to handle timeout certificate");
245                        Err(error.into())
246                    }
247                }
248            }
249            RpcMessage::ValidatedCertificate(request) => {
250                match self
251                    .server
252                    .state
253                    .handle_validated_certificate(request.certificate)
254                    .await
255                {
256                    Ok((info, actions)) => {
257                        // Cross-shard requests
258                        self.handle_network_actions(actions);
259                        // Response
260                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
261                    }
262                    Err(error) => {
263                        self.log_error(&error, "Failed to handle validated certificate");
264                        Err(error.into())
265                    }
266                }
267            }
268            RpcMessage::ConfirmedCertificate(request) => {
269                let (sender, receiver) = request
270                    .wait_for_outgoing_messages
271                    .then(oneshot::channel)
272                    .unzip();
273                match self
274                    .server
275                    .state
276                    .handle_confirmed_certificate(request.certificate, sender)
277                    .await
278                {
279                    Ok((info, actions)) => {
280                        // Cross-shard requests
281                        self.handle_network_actions(actions);
282                        if let Some(receiver) = receiver {
283                            if let Err(e) = receiver.await {
284                                error!("Failed to wait for message delivery: {e}");
285                            }
286                        }
287                        // Response
288                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
289                    }
290                    Err(error) => {
291                        self.log_error(&error, "Failed to handle confirmed certificate");
292                        Err(error.into())
293                    }
294                }
295            }
296            RpcMessage::ChainInfoQuery(message) => {
297                match self.server.state.handle_chain_info_query(*message).await {
298                    Ok((info, actions)) => {
299                        // Cross-shard requests
300                        self.handle_network_actions(actions);
301                        // Response
302                        Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
303                    }
304                    Err(error) => {
305                        self.log_error(&error, "Failed to handle chain info query");
306                        Err(error.into())
307                    }
308                }
309            }
310            RpcMessage::CrossChainRequest(request) => {
311                match self.server.state.handle_cross_chain_request(*request).await {
312                    Ok(actions) => {
313                        self.handle_network_actions(actions);
314                    }
315                    Err(error) => {
316                        self.log_error(&error, "Failed to handle cross-chain request");
317                    }
318                }
319                // No user to respond to.
320                Ok(None)
321            }
322            RpcMessage::DownloadPendingBlob(request) => {
323                let (chain_id, blob_id) = *request;
324                match self
325                    .server
326                    .state
327                    .download_pending_blob(chain_id, blob_id)
328                    .await
329                {
330                    Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
331                        blob.into(),
332                    )))),
333                    Err(error) => {
334                        self.log_error(&error, "Failed to handle pending blob request");
335                        Err(error.into())
336                    }
337                }
338            }
339            RpcMessage::HandlePendingBlob(request) => {
340                let (chain_id, blob_content) = *request;
341                match self
342                    .server
343                    .state
344                    .handle_pending_blob(chain_id, Blob::new(blob_content))
345                    .await
346                {
347                    Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
348                    Err(error) => {
349                        self.log_error(&error, "Failed to handle pending blob");
350                        Err(error.into())
351                    }
352                }
353            }
354
355            RpcMessage::VersionInfoQuery => {
356                Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
357            }
358
359            RpcMessage::Vote(_)
360            | RpcMessage::Error(_)
361            | RpcMessage::ChainInfoResponse(_)
362            | RpcMessage::VersionInfoResponse(_)
363            | RpcMessage::NetworkDescriptionQuery
364            | RpcMessage::NetworkDescriptionResponse(_)
365            | RpcMessage::ShardInfoQuery(_)
366            | RpcMessage::ShardInfoResponse(_)
367            | RpcMessage::DownloadBlob(_)
368            | RpcMessage::DownloadBlobResponse(_)
369            | RpcMessage::DownloadPendingBlobResponse(_)
370            | RpcMessage::DownloadConfirmedBlock(_)
371            | RpcMessage::DownloadConfirmedBlockResponse(_)
372            | RpcMessage::BlobLastUsedBy(_)
373            | RpcMessage::BlobLastUsedByResponse(_)
374            | RpcMessage::BlobLastUsedByCertificate(_)
375            | RpcMessage::BlobLastUsedByCertificateResponse(_)
376            | RpcMessage::MissingBlobIds(_)
377            | RpcMessage::MissingBlobIdsResponse(_)
378            | RpcMessage::DownloadCertificates(_)
379            | RpcMessage::DownloadCertificatesResponse(_)
380            | RpcMessage::UploadBlob(_)
381            | RpcMessage::UploadBlobResponse(_)
382            | RpcMessage::DownloadCertificatesByHeights(_, _)
383            | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
384                Err(NodeError::UnexpectedMessage)
385            }
386        };
387
388        self.server.packets_processed += 1;
389        if self.server.packets_processed % 5000 == 0 {
390            debug!(
391                "[{}] {}:{} (shard {}) has processed {} packets",
392                self.server.state.nickname(),
393                self.server.host,
394                self.server.port,
395                self.server.shard_id,
396                self.server.packets_processed
397            );
398        }
399
400        match reply {
401            Ok(x) => x,
402            Err(error) => {
403                // TODO(#459): Make it a warning or an error again.
404                debug!(
405                    "[{}] User query failed: {}",
406                    self.server.state.nickname(),
407                    error
408                );
409                self.server.user_errors += 1;
410                Some(error.into())
411            }
412        }
413    }
414}
415
416impl<S> RunningServerState<S>
417where
418    S: Storage + Send,
419{
420    fn handle_network_actions(&mut self, actions: NetworkActions) {
421        for request in actions.cross_chain_requests {
422            let shard_id = self.server.network.get_shard_id(request.target_chain_id());
423            debug!(
424                "[{}] Scheduling cross-chain query: {} -> {}",
425                self.server.state.nickname(),
426                self.server.shard_id,
427                shard_id
428            );
429            if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
430                error!(%error, "dropping cross-chain request");
431                break;
432            }
433        }
434    }
435
436    fn log_error(&self, error: &WorkerError, context: &str) {
437        let nickname = self.server.state.nickname();
438        if error.is_local() {
439            error!(nickname, %error, "{}", context);
440        } else {
441            debug!(nickname, %error, "{}", context);
442        }
443    }
444}