linera_rpc/simple/
client.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::future::Future;
6
7use futures::{sink::SinkExt, stream::StreamExt};
8use linera_base::{
9    crypto::CryptoHash,
10    data_types::{BlobContent, NetworkDescription},
11    identifiers::{BlobId, ChainId},
12    time::{timer, Duration},
13};
14use linera_chain::{
15    data_types::BlockProposal,
16    types::{
17        ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
18    },
19};
20use linera_core::{
21    data_types::{ChainInfoQuery, ChainInfoResponse},
22    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
23};
24use linera_version::VersionInfo;
25
26use super::{codec, transport::TransportProtocol};
27use crate::{
28    config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
29    HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
30    RpcMessage,
31};
32
33#[derive(Clone)]
34pub struct SimpleClient {
35    network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
36    send_timeout: Duration,
37    recv_timeout: Duration,
38}
39
40impl SimpleClient {
41    pub(crate) fn new(
42        network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
43        send_timeout: Duration,
44        recv_timeout: Duration,
45    ) -> Self {
46        Self {
47            network,
48            send_timeout,
49            recv_timeout,
50        }
51    }
52
53    async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
54        let address = format!("{}:{}", self.network.host, self.network.port);
55        let mut stream = self.network.protocol.connect(address).await?;
56        // Send message
57        timer::timeout(self.send_timeout, stream.send(message))
58            .await
59            .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
60        // Wait for reply
61        timer::timeout(self.recv_timeout, stream.next())
62            .await
63            .map_err(|timeout| codec::Error::IoError(timeout.into()))?
64            .transpose()?
65            .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
66    }
67
68    async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
69    where
70        Response: TryFrom<RpcMessage>,
71        Response::Error: From<codec::Error>,
72    {
73        self.send_recv_internal(query).await?.try_into()
74    }
75}
76
77impl ValidatorNode for SimpleClient {
78    type NotificationStream = NotificationStream;
79
80    /// Initiates a new block.
81    async fn handle_block_proposal(
82        &self,
83        proposal: BlockProposal,
84    ) -> Result<ChainInfoResponse, NodeError> {
85        let request = RpcMessage::BlockProposal(Box::new(proposal));
86        self.query(request).await
87    }
88
89    /// Processes a lite certificate.
90    async fn handle_lite_certificate(
91        &self,
92        certificate: LiteCertificate<'_>,
93        delivery: CrossChainMessageDelivery,
94    ) -> Result<ChainInfoResponse, NodeError> {
95        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
96        let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
97            certificate: certificate.cloned(),
98            wait_for_outgoing_messages,
99        }));
100        self.query(request).await
101    }
102
103    /// Processes a validated certificate.
104    async fn handle_validated_certificate(
105        &self,
106        certificate: ValidatedBlockCertificate,
107    ) -> Result<ChainInfoResponse, NodeError> {
108        let request = HandleValidatedCertificateRequest { certificate };
109        let request = RpcMessage::ValidatedCertificate(Box::new(request));
110        self.query(request).await
111    }
112
113    /// Processes a confirmed certificate.
114    async fn handle_confirmed_certificate(
115        &self,
116        certificate: ConfirmedBlockCertificate,
117        delivery: CrossChainMessageDelivery,
118    ) -> Result<ChainInfoResponse, NodeError> {
119        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
120        let request = HandleConfirmedCertificateRequest {
121            certificate,
122            wait_for_outgoing_messages,
123        };
124        let request = RpcMessage::ConfirmedCertificate(Box::new(request));
125        self.query(request).await
126    }
127
128    /// Processes a timeout certificate.
129    async fn handle_timeout_certificate(
130        &self,
131        certificate: TimeoutCertificate,
132    ) -> Result<ChainInfoResponse, NodeError> {
133        let request = HandleTimeoutCertificateRequest { certificate };
134        let request = RpcMessage::TimeoutCertificate(Box::new(request));
135        self.query(request).await
136    }
137
138    /// Handles information queries for this chain.
139    async fn handle_chain_info_query(
140        &self,
141        query: ChainInfoQuery,
142    ) -> Result<ChainInfoResponse, NodeError> {
143        let request = RpcMessage::ChainInfoQuery(Box::new(query));
144        self.query(request).await
145    }
146
147    fn subscribe(
148        &self,
149        _chains: Vec<ChainId>,
150    ) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
151        let transport = self.network.protocol.to_string();
152        async { Err(NodeError::SubscriptionError { transport }) }
153    }
154
155    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
156        self.query(RpcMessage::VersionInfoQuery).await
157    }
158
159    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
160        self.query(RpcMessage::NetworkDescriptionQuery).await
161    }
162
163    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
164        self.query(RpcMessage::UploadBlob(Box::new(content))).await
165    }
166
167    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
168        self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
169            .await
170    }
171
172    async fn download_pending_blob(
173        &self,
174        chain_id: ChainId,
175        blob_id: BlobId,
176    ) -> Result<BlobContent, NodeError> {
177        self.query(RpcMessage::DownloadPendingBlob(Box::new((
178            chain_id, blob_id,
179        ))))
180        .await
181    }
182
183    async fn handle_pending_blob(
184        &self,
185        chain_id: ChainId,
186        blob: BlobContent,
187    ) -> Result<ChainInfoResponse, NodeError> {
188        self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
189            .await
190    }
191
192    async fn download_certificate(
193        &self,
194        hash: CryptoHash,
195    ) -> Result<ConfirmedBlockCertificate, NodeError> {
196        Ok(self
197            .download_certificates(vec![hash])
198            .await?
199            .into_iter()
200            .next()
201            .unwrap()) // UNWRAP: We know there is exactly one certificate, otherwise we would have an error.
202    }
203
204    async fn download_certificates(
205        &self,
206        hashes: Vec<CryptoHash>,
207    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
208        let certificates = self
209            .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
210                hashes.clone(),
211            ))
212            .await?;
213
214        if certificates.len() != hashes.len() {
215            let missing_hashes: Vec<CryptoHash> = hashes
216                .into_iter()
217                .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
218                .collect();
219            Err(NodeError::MissingCertificates(missing_hashes))
220        } else {
221            Ok(certificates)
222        }
223    }
224
225    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
226        self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
227            .await
228    }
229
230    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
231        self.query(RpcMessage::MissingBlobIds(blob_ids)).await
232    }
233}