linera_rpc/simple/
client.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::future::Future;
6
7use futures::{sink::SinkExt, stream::StreamExt};
8use linera_base::{
9    crypto::CryptoHash,
10    data_types::{BlobContent, BlockHeight, NetworkDescription},
11    identifiers::{BlobId, ChainId},
12    time::{timer, Duration},
13};
14use linera_chain::{
15    data_types::BlockProposal,
16    types::{
17        ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
18    },
19};
20use linera_core::{
21    data_types::{ChainInfoQuery, ChainInfoResponse},
22    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
23};
24use linera_version::VersionInfo;
25
26use super::{codec, transport::TransportProtocol};
27use crate::{
28    config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
29    HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
30    RpcMessage,
31};
32
33#[derive(Clone)]
34pub struct SimpleClient {
35    network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
36    send_timeout: Duration,
37    recv_timeout: Duration,
38}
39
40impl SimpleClient {
41    pub(crate) fn new(
42        network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
43        send_timeout: Duration,
44        recv_timeout: Duration,
45    ) -> Self {
46        Self {
47            network,
48            send_timeout,
49            recv_timeout,
50        }
51    }
52
53    async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
54        let address = format!("{}:{}", self.network.host, self.network.port);
55        let mut stream = self.network.protocol.connect(address).await?;
56        // Send message
57        timer::timeout(self.send_timeout, stream.send(message))
58            .await
59            .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
60        // Wait for reply
61        timer::timeout(self.recv_timeout, stream.next())
62            .await
63            .map_err(|timeout| codec::Error::IoError(timeout.into()))?
64            .transpose()?
65            .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
66    }
67
68    async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
69    where
70        Response: TryFrom<RpcMessage>,
71        Response::Error: From<codec::Error>,
72    {
73        self.send_recv_internal(query).await?.try_into()
74    }
75}
76
77impl ValidatorNode for SimpleClient {
78    type NotificationStream = NotificationStream;
79
80    fn address(&self) -> String {
81        format!(
82            "{}://{}:{}",
83            self.network.protocol, self.network.host, self.network.port
84        )
85    }
86
87    /// Initiates a new block.
88    async fn handle_block_proposal(
89        &self,
90        proposal: BlockProposal,
91    ) -> Result<ChainInfoResponse, NodeError> {
92        let request = RpcMessage::BlockProposal(Box::new(proposal));
93        self.query(request).await
94    }
95
96    /// Processes a lite certificate.
97    async fn handle_lite_certificate(
98        &self,
99        certificate: LiteCertificate<'_>,
100        delivery: CrossChainMessageDelivery,
101    ) -> Result<ChainInfoResponse, NodeError> {
102        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
103        let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
104            certificate: certificate.cloned(),
105            wait_for_outgoing_messages,
106        }));
107        self.query(request).await
108    }
109
110    /// Processes a validated certificate.
111    async fn handle_validated_certificate(
112        &self,
113        certificate: ValidatedBlockCertificate,
114    ) -> Result<ChainInfoResponse, NodeError> {
115        let request = HandleValidatedCertificateRequest { certificate };
116        let request = RpcMessage::ValidatedCertificate(Box::new(request));
117        self.query(request).await
118    }
119
120    /// Processes a confirmed certificate.
121    async fn handle_confirmed_certificate(
122        &self,
123        certificate: ConfirmedBlockCertificate,
124        delivery: CrossChainMessageDelivery,
125    ) -> Result<ChainInfoResponse, NodeError> {
126        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
127        let request = HandleConfirmedCertificateRequest {
128            certificate,
129            wait_for_outgoing_messages,
130        };
131        let request = RpcMessage::ConfirmedCertificate(Box::new(request));
132        self.query(request).await
133    }
134
135    /// Processes a timeout certificate.
136    async fn handle_timeout_certificate(
137        &self,
138        certificate: TimeoutCertificate,
139    ) -> Result<ChainInfoResponse, NodeError> {
140        let request = HandleTimeoutCertificateRequest { certificate };
141        let request = RpcMessage::TimeoutCertificate(Box::new(request));
142        self.query(request).await
143    }
144
145    /// Handles information queries for this chain.
146    async fn handle_chain_info_query(
147        &self,
148        query: ChainInfoQuery,
149    ) -> Result<ChainInfoResponse, NodeError> {
150        let request = RpcMessage::ChainInfoQuery(Box::new(query));
151        self.query(request).await
152    }
153
154    fn subscribe(
155        &self,
156        _chains: Vec<ChainId>,
157    ) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
158        let transport = self.network.protocol.to_string();
159        async { Err(NodeError::SubscriptionError { transport }) }
160    }
161
162    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
163        self.query(RpcMessage::VersionInfoQuery).await
164    }
165
166    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
167        self.query(RpcMessage::NetworkDescriptionQuery).await
168    }
169
170    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
171        self.query(RpcMessage::UploadBlob(Box::new(content))).await
172    }
173
174    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
175        self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
176            .await
177    }
178
179    async fn download_pending_blob(
180        &self,
181        chain_id: ChainId,
182        blob_id: BlobId,
183    ) -> Result<BlobContent, NodeError> {
184        self.query(RpcMessage::DownloadPendingBlob(Box::new((
185            chain_id, blob_id,
186        ))))
187        .await
188    }
189
190    async fn handle_pending_blob(
191        &self,
192        chain_id: ChainId,
193        blob: BlobContent,
194    ) -> Result<ChainInfoResponse, NodeError> {
195        self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
196            .await
197    }
198
199    async fn download_certificate(
200        &self,
201        hash: CryptoHash,
202    ) -> Result<ConfirmedBlockCertificate, NodeError> {
203        Ok(self
204            .download_certificates(vec![hash])
205            .await?
206            .into_iter()
207            .next()
208            .unwrap()) // UNWRAP: We know there is exactly one certificate, otherwise we would have an error.
209    }
210
211    async fn download_certificates(
212        &self,
213        hashes: Vec<CryptoHash>,
214    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
215        let certificates = self
216            .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
217                hashes.clone(),
218            ))
219            .await?;
220
221        if certificates.len() != hashes.len() {
222            let missing_hashes: Vec<CryptoHash> = hashes
223                .into_iter()
224                .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
225                .collect();
226            Err(NodeError::MissingCertificates(missing_hashes))
227        } else {
228            Ok(certificates)
229        }
230    }
231
232    async fn download_certificates_by_heights(
233        &self,
234        chain_id: ChainId,
235        heights: Vec<BlockHeight>,
236    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
237        let expected_count = heights.len();
238        let certificates: Vec<ConfirmedBlockCertificate> = self
239            .query(RpcMessage::DownloadCertificatesByHeights(
240                chain_id,
241                heights.clone(),
242            ))
243            .await?;
244
245        if certificates.len() < expected_count {
246            return Err(NodeError::MissingCertificatesByHeights { chain_id, heights });
247        }
248        Ok(certificates)
249    }
250
251    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
252        self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
253            .await
254    }
255
256    async fn blob_last_used_by_certificate(
257        &self,
258        blob_id: BlobId,
259    ) -> Result<ConfirmedBlockCertificate, NodeError> {
260        self.query::<ConfirmedBlockCertificate>(RpcMessage::BlobLastUsedByCertificate(Box::new(
261            blob_id,
262        )))
263        .await
264    }
265
266    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
267        self.query(RpcMessage::MissingBlobIds(blob_ids)).await
268    }
269
270    async fn get_shard_info(
271        &self,
272        chain_id: ChainId,
273    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
274        let rpc_shard_info: crate::message::ShardInfo =
275            self.query(RpcMessage::ShardInfoQuery(chain_id)).await?;
276        Ok(linera_core::data_types::ShardInfo {
277            shard_id: rpc_shard_info.shard_id,
278            total_shards: rpc_shard_info.total_shards,
279        })
280    }
281}