linera_rpc/simple/
client.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use futures::{sink::SinkExt, stream::StreamExt};
6use linera_base::{
7    crypto::CryptoHash,
8    data_types::{BlobContent, BlockHeight, NetworkDescription},
9    identifiers::{BlobId, ChainId},
10    time::{timer, Duration},
11};
12use linera_chain::{
13    data_types::BlockProposal,
14    types::{
15        ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
16    },
17};
18use linera_core::{
19    data_types::{ChainInfoQuery, ChainInfoResponse},
20    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
21};
22use linera_version::VersionInfo;
23
24use super::{codec, transport::TransportProtocol};
25use crate::{
26    config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
27    HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
28    RpcMessage,
29};
30
31#[derive(Clone)]
32pub struct SimpleClient {
33    network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
34    send_timeout: Duration,
35    recv_timeout: Duration,
36}
37
38impl SimpleClient {
39    pub(crate) fn new(
40        network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
41        send_timeout: Duration,
42        recv_timeout: Duration,
43    ) -> Self {
44        Self {
45            network,
46            send_timeout,
47            recv_timeout,
48        }
49    }
50
51    async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
52        let address = format!("{}:{}", self.network.host, self.network.port);
53        let mut stream = self.network.protocol.connect(address).await?;
54        // Send message
55        timer::timeout(self.send_timeout, stream.send(message))
56            .await
57            .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
58        // Wait for reply
59        timer::timeout(self.recv_timeout, stream.next())
60            .await
61            .map_err(|timeout| codec::Error::IoError(timeout.into()))?
62            .transpose()?
63            .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
64    }
65
66    async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
67    where
68        Response: TryFrom<RpcMessage>,
69        Response::Error: From<codec::Error>,
70    {
71        self.send_recv_internal(query).await?.try_into()
72    }
73}
74
75impl ValidatorNode for SimpleClient {
76    type NotificationStream = NotificationStream;
77
78    fn address(&self) -> String {
79        format!(
80            "{}://{}:{}",
81            self.network.protocol, self.network.host, self.network.port
82        )
83    }
84
85    /// Initiates a new block.
86    async fn handle_block_proposal(
87        &self,
88        proposal: BlockProposal,
89    ) -> Result<ChainInfoResponse, NodeError> {
90        let request = RpcMessage::BlockProposal(Box::new(proposal));
91        self.query(request).await
92    }
93
94    /// Processes a lite certificate.
95    async fn handle_lite_certificate(
96        &self,
97        certificate: LiteCertificate<'_>,
98        delivery: CrossChainMessageDelivery,
99    ) -> Result<ChainInfoResponse, NodeError> {
100        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
101        let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
102            certificate: certificate.cloned(),
103            wait_for_outgoing_messages,
104        }));
105        self.query(request).await
106    }
107
108    /// Processes a validated certificate.
109    async fn handle_validated_certificate(
110        &self,
111        certificate: ValidatedBlockCertificate,
112    ) -> Result<ChainInfoResponse, NodeError> {
113        let request = HandleValidatedCertificateRequest { certificate };
114        let request = RpcMessage::ValidatedCertificate(Box::new(request));
115        self.query(request).await
116    }
117
118    /// Processes a confirmed certificate.
119    async fn handle_confirmed_certificate(
120        &self,
121        certificate: ConfirmedBlockCertificate,
122        delivery: CrossChainMessageDelivery,
123    ) -> Result<ChainInfoResponse, NodeError> {
124        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
125        let request = HandleConfirmedCertificateRequest {
126            certificate,
127            wait_for_outgoing_messages,
128        };
129        let request = RpcMessage::ConfirmedCertificate(Box::new(request));
130        self.query(request).await
131    }
132
133    /// Processes a timeout certificate.
134    async fn handle_timeout_certificate(
135        &self,
136        certificate: TimeoutCertificate,
137    ) -> Result<ChainInfoResponse, NodeError> {
138        let request = HandleTimeoutCertificateRequest { certificate };
139        let request = RpcMessage::TimeoutCertificate(Box::new(request));
140        self.query(request).await
141    }
142
143    /// Handles information queries for this chain.
144    async fn handle_chain_info_query(
145        &self,
146        query: ChainInfoQuery,
147    ) -> Result<ChainInfoResponse, NodeError> {
148        let request = RpcMessage::ChainInfoQuery(Box::new(query));
149        self.query(request).await
150    }
151
152    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
153        let mut stream = self
154            .network
155            .protocol
156            .connect((self.network.host.clone(), self.network.port))
157            .await
158            .map_err(|e| NodeError::ClientIoError {
159                error: e.to_string(),
160            })?;
161        // Send subscription request
162        timer::timeout(
163            self.send_timeout,
164            stream.send(RpcMessage::SubscribeNotifications(chains)),
165        )
166        .await
167        .map_err(|timeout| NodeError::ClientIoError {
168            error: timeout.to_string(),
169        })?
170        .map_err(|e| NodeError::ClientIoError {
171            error: e.to_string(),
172        })?;
173        // Return a stream that reads notifications from the connection
174        let notification_stream = stream.filter_map(|result| async {
175            match result {
176                Ok(RpcMessage::Notification(notification)) => Some(*notification),
177                _ => None,
178            }
179        });
180        Ok(Box::pin(notification_stream) as NotificationStream)
181    }
182
183    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
184        self.query(RpcMessage::VersionInfoQuery).await
185    }
186
187    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
188        self.query(RpcMessage::NetworkDescriptionQuery).await
189    }
190
191    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
192        self.query(RpcMessage::UploadBlob(Box::new(content))).await
193    }
194
195    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
196        self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
197            .await
198    }
199
200    async fn download_pending_blob(
201        &self,
202        chain_id: ChainId,
203        blob_id: BlobId,
204    ) -> Result<BlobContent, NodeError> {
205        self.query(RpcMessage::DownloadPendingBlob(Box::new((
206            chain_id, blob_id,
207        ))))
208        .await
209    }
210
211    async fn handle_pending_blob(
212        &self,
213        chain_id: ChainId,
214        blob: BlobContent,
215    ) -> Result<ChainInfoResponse, NodeError> {
216        self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
217            .await
218    }
219
220    async fn download_certificate(
221        &self,
222        hash: CryptoHash,
223    ) -> Result<ConfirmedBlockCertificate, NodeError> {
224        Ok(self
225            .download_certificates(vec![hash])
226            .await?
227            .into_iter()
228            .next()
229            .unwrap()) // UNWRAP: We know there is exactly one certificate, otherwise we would have an error.
230    }
231
232    async fn download_certificates(
233        &self,
234        hashes: Vec<CryptoHash>,
235    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
236        let certificates = self
237            .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
238                hashes.clone(),
239            ))
240            .await?;
241
242        if certificates.len() != hashes.len() {
243            let missing_hashes: Vec<CryptoHash> = hashes
244                .into_iter()
245                .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
246                .collect();
247            Err(NodeError::MissingCertificates(missing_hashes))
248        } else {
249            Ok(certificates)
250        }
251    }
252
253    async fn download_certificates_by_heights(
254        &self,
255        chain_id: ChainId,
256        heights: Vec<BlockHeight>,
257    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
258        let expected_count = heights.len();
259        let certificates: Vec<ConfirmedBlockCertificate> = self
260            .query(RpcMessage::DownloadCertificatesByHeights(
261                chain_id,
262                heights.clone(),
263            ))
264            .await?;
265
266        if certificates.len() < expected_count {
267            return Err(NodeError::MissingCertificatesByHeights { chain_id, heights });
268        }
269        Ok(certificates)
270    }
271
272    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
273        self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
274            .await
275    }
276
277    async fn blob_last_used_by_certificate(
278        &self,
279        blob_id: BlobId,
280    ) -> Result<ConfirmedBlockCertificate, NodeError> {
281        self.query::<ConfirmedBlockCertificate>(RpcMessage::BlobLastUsedByCertificate(Box::new(
282            blob_id,
283        )))
284        .await
285    }
286
287    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
288        self.query(RpcMessage::MissingBlobIds(blob_ids)).await
289    }
290
291    async fn get_shard_info(
292        &self,
293        chain_id: ChainId,
294    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
295        let rpc_shard_info: crate::message::ShardInfo =
296            self.query(RpcMessage::ShardInfoQuery(chain_id)).await?;
297        Ok(linera_core::data_types::ShardInfo {
298            shard_id: rpc_shard_info.shard_id,
299            total_shards: rpc_shard_info.total_shards,
300        })
301    }
302}