1use std::future::Future;
6
7use futures::{sink::SinkExt, stream::StreamExt};
8use linera_base::{
9 crypto::CryptoHash,
10 data_types::{BlobContent, NetworkDescription},
11 identifiers::{BlobId, ChainId},
12 time::{timer, Duration},
13};
14use linera_chain::{
15 data_types::BlockProposal,
16 types::{
17 ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
18 },
19};
20use linera_core::{
21 data_types::{ChainInfoQuery, ChainInfoResponse},
22 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
23};
24use linera_version::VersionInfo;
25
26use super::{codec, transport::TransportProtocol};
27use crate::{
28 config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
29 HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
30 RpcMessage,
31};
32
33#[derive(Clone)]
34pub struct SimpleClient {
35 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
36 send_timeout: Duration,
37 recv_timeout: Duration,
38}
39
40impl SimpleClient {
41 pub(crate) fn new(
42 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
43 send_timeout: Duration,
44 recv_timeout: Duration,
45 ) -> Self {
46 Self {
47 network,
48 send_timeout,
49 recv_timeout,
50 }
51 }
52
53 async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
54 let address = format!("{}:{}", self.network.host, self.network.port);
55 let mut stream = self.network.protocol.connect(address).await?;
56 timer::timeout(self.send_timeout, stream.send(message))
58 .await
59 .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
60 timer::timeout(self.recv_timeout, stream.next())
62 .await
63 .map_err(|timeout| codec::Error::IoError(timeout.into()))?
64 .transpose()?
65 .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
66 }
67
68 async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
69 where
70 Response: TryFrom<RpcMessage>,
71 Response::Error: From<codec::Error>,
72 {
73 self.send_recv_internal(query).await?.try_into()
74 }
75}
76
77impl ValidatorNode for SimpleClient {
78 type NotificationStream = NotificationStream;
79
80 async fn handle_block_proposal(
82 &self,
83 proposal: BlockProposal,
84 ) -> Result<ChainInfoResponse, NodeError> {
85 let request = RpcMessage::BlockProposal(Box::new(proposal));
86 self.query(request).await
87 }
88
89 async fn handle_lite_certificate(
91 &self,
92 certificate: LiteCertificate<'_>,
93 delivery: CrossChainMessageDelivery,
94 ) -> Result<ChainInfoResponse, NodeError> {
95 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
96 let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
97 certificate: certificate.cloned(),
98 wait_for_outgoing_messages,
99 }));
100 self.query(request).await
101 }
102
103 async fn handle_validated_certificate(
105 &self,
106 certificate: ValidatedBlockCertificate,
107 ) -> Result<ChainInfoResponse, NodeError> {
108 let request = HandleValidatedCertificateRequest { certificate };
109 let request = RpcMessage::ValidatedCertificate(Box::new(request));
110 self.query(request).await
111 }
112
113 async fn handle_confirmed_certificate(
115 &self,
116 certificate: ConfirmedBlockCertificate,
117 delivery: CrossChainMessageDelivery,
118 ) -> Result<ChainInfoResponse, NodeError> {
119 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
120 let request = HandleConfirmedCertificateRequest {
121 certificate,
122 wait_for_outgoing_messages,
123 };
124 let request = RpcMessage::ConfirmedCertificate(Box::new(request));
125 self.query(request).await
126 }
127
128 async fn handle_timeout_certificate(
130 &self,
131 certificate: TimeoutCertificate,
132 ) -> Result<ChainInfoResponse, NodeError> {
133 let request = HandleTimeoutCertificateRequest { certificate };
134 let request = RpcMessage::TimeoutCertificate(Box::new(request));
135 self.query(request).await
136 }
137
138 async fn handle_chain_info_query(
140 &self,
141 query: ChainInfoQuery,
142 ) -> Result<ChainInfoResponse, NodeError> {
143 let request = RpcMessage::ChainInfoQuery(Box::new(query));
144 self.query(request).await
145 }
146
147 fn subscribe(
148 &self,
149 _chains: Vec<ChainId>,
150 ) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
151 let transport = self.network.protocol.to_string();
152 async { Err(NodeError::SubscriptionError { transport }) }
153 }
154
155 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
156 self.query(RpcMessage::VersionInfoQuery).await
157 }
158
159 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
160 self.query(RpcMessage::NetworkDescriptionQuery).await
161 }
162
163 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
164 self.query(RpcMessage::UploadBlob(Box::new(content))).await
165 }
166
167 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
168 self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
169 .await
170 }
171
172 async fn download_pending_blob(
173 &self,
174 chain_id: ChainId,
175 blob_id: BlobId,
176 ) -> Result<BlobContent, NodeError> {
177 self.query(RpcMessage::DownloadPendingBlob(Box::new((
178 chain_id, blob_id,
179 ))))
180 .await
181 }
182
183 async fn handle_pending_blob(
184 &self,
185 chain_id: ChainId,
186 blob: BlobContent,
187 ) -> Result<ChainInfoResponse, NodeError> {
188 self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
189 .await
190 }
191
192 async fn download_certificate(
193 &self,
194 hash: CryptoHash,
195 ) -> Result<ConfirmedBlockCertificate, NodeError> {
196 Ok(self
197 .download_certificates(vec![hash])
198 .await?
199 .into_iter()
200 .next()
201 .unwrap()) }
203
204 async fn download_certificates(
205 &self,
206 hashes: Vec<CryptoHash>,
207 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
208 let certificates = self
209 .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
210 hashes.clone(),
211 ))
212 .await?;
213
214 if certificates.len() != hashes.len() {
215 let missing_hashes: Vec<CryptoHash> = hashes
216 .into_iter()
217 .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
218 .collect();
219 Err(NodeError::MissingCertificates(missing_hashes))
220 } else {
221 Ok(certificates)
222 }
223 }
224
225 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
226 self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
227 .await
228 }
229
230 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
231 self.query(RpcMessage::MissingBlobIds(blob_ids)).await
232 }
233}