1use std::future::Future;
6
7use futures::{sink::SinkExt, stream::StreamExt};
8use linera_base::{
9 crypto::CryptoHash,
10 data_types::{BlobContent, BlockHeight, NetworkDescription},
11 identifiers::{BlobId, ChainId},
12 time::{timer, Duration},
13};
14use linera_chain::{
15 data_types::BlockProposal,
16 types::{
17 ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
18 },
19};
20use linera_core::{
21 data_types::{ChainInfoQuery, ChainInfoResponse},
22 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
23};
24use linera_version::VersionInfo;
25
26use super::{codec, transport::TransportProtocol};
27use crate::{
28 config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
29 HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
30 RpcMessage,
31};
32
33#[derive(Clone)]
34pub struct SimpleClient {
35 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
36 send_timeout: Duration,
37 recv_timeout: Duration,
38}
39
40impl SimpleClient {
41 pub(crate) fn new(
42 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
43 send_timeout: Duration,
44 recv_timeout: Duration,
45 ) -> Self {
46 Self {
47 network,
48 send_timeout,
49 recv_timeout,
50 }
51 }
52
53 async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
54 let address = format!("{}:{}", self.network.host, self.network.port);
55 let mut stream = self.network.protocol.connect(address).await?;
56 timer::timeout(self.send_timeout, stream.send(message))
58 .await
59 .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
60 timer::timeout(self.recv_timeout, stream.next())
62 .await
63 .map_err(|timeout| codec::Error::IoError(timeout.into()))?
64 .transpose()?
65 .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
66 }
67
68 async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
69 where
70 Response: TryFrom<RpcMessage>,
71 Response::Error: From<codec::Error>,
72 {
73 self.send_recv_internal(query).await?.try_into()
74 }
75}
76
77impl ValidatorNode for SimpleClient {
78 type NotificationStream = NotificationStream;
79
80 fn address(&self) -> String {
81 format!(
82 "{}://{}:{}",
83 self.network.protocol, self.network.host, self.network.port
84 )
85 }
86
87 async fn handle_block_proposal(
89 &self,
90 proposal: BlockProposal,
91 ) -> Result<ChainInfoResponse, NodeError> {
92 let request = RpcMessage::BlockProposal(Box::new(proposal));
93 self.query(request).await
94 }
95
96 async fn handle_lite_certificate(
98 &self,
99 certificate: LiteCertificate<'_>,
100 delivery: CrossChainMessageDelivery,
101 ) -> Result<ChainInfoResponse, NodeError> {
102 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
103 let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
104 certificate: certificate.cloned(),
105 wait_for_outgoing_messages,
106 }));
107 self.query(request).await
108 }
109
110 async fn handle_validated_certificate(
112 &self,
113 certificate: ValidatedBlockCertificate,
114 ) -> Result<ChainInfoResponse, NodeError> {
115 let request = HandleValidatedCertificateRequest { certificate };
116 let request = RpcMessage::ValidatedCertificate(Box::new(request));
117 self.query(request).await
118 }
119
120 async fn handle_confirmed_certificate(
122 &self,
123 certificate: ConfirmedBlockCertificate,
124 delivery: CrossChainMessageDelivery,
125 ) -> Result<ChainInfoResponse, NodeError> {
126 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
127 let request = HandleConfirmedCertificateRequest {
128 certificate,
129 wait_for_outgoing_messages,
130 };
131 let request = RpcMessage::ConfirmedCertificate(Box::new(request));
132 self.query(request).await
133 }
134
135 async fn handle_timeout_certificate(
137 &self,
138 certificate: TimeoutCertificate,
139 ) -> Result<ChainInfoResponse, NodeError> {
140 let request = HandleTimeoutCertificateRequest { certificate };
141 let request = RpcMessage::TimeoutCertificate(Box::new(request));
142 self.query(request).await
143 }
144
145 async fn handle_chain_info_query(
147 &self,
148 query: ChainInfoQuery,
149 ) -> Result<ChainInfoResponse, NodeError> {
150 let request = RpcMessage::ChainInfoQuery(Box::new(query));
151 self.query(request).await
152 }
153
154 fn subscribe(
155 &self,
156 _chains: Vec<ChainId>,
157 ) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
158 let transport = self.network.protocol.to_string();
159 async { Err(NodeError::SubscriptionError { transport }) }
160 }
161
162 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
163 self.query(RpcMessage::VersionInfoQuery).await
164 }
165
166 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
167 self.query(RpcMessage::NetworkDescriptionQuery).await
168 }
169
170 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
171 self.query(RpcMessage::UploadBlob(Box::new(content))).await
172 }
173
174 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
175 self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
176 .await
177 }
178
179 async fn download_pending_blob(
180 &self,
181 chain_id: ChainId,
182 blob_id: BlobId,
183 ) -> Result<BlobContent, NodeError> {
184 self.query(RpcMessage::DownloadPendingBlob(Box::new((
185 chain_id, blob_id,
186 ))))
187 .await
188 }
189
190 async fn handle_pending_blob(
191 &self,
192 chain_id: ChainId,
193 blob: BlobContent,
194 ) -> Result<ChainInfoResponse, NodeError> {
195 self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
196 .await
197 }
198
199 async fn download_certificate(
200 &self,
201 hash: CryptoHash,
202 ) -> Result<ConfirmedBlockCertificate, NodeError> {
203 Ok(self
204 .download_certificates(vec![hash])
205 .await?
206 .into_iter()
207 .next()
208 .unwrap()) }
210
211 async fn download_certificates(
212 &self,
213 hashes: Vec<CryptoHash>,
214 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
215 let certificates = self
216 .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
217 hashes.clone(),
218 ))
219 .await?;
220
221 if certificates.len() != hashes.len() {
222 let missing_hashes: Vec<CryptoHash> = hashes
223 .into_iter()
224 .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
225 .collect();
226 Err(NodeError::MissingCertificates(missing_hashes))
227 } else {
228 Ok(certificates)
229 }
230 }
231
232 async fn download_certificates_by_heights(
233 &self,
234 chain_id: ChainId,
235 heights: Vec<BlockHeight>,
236 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
237 let expected_count = heights.len();
238 let certificates: Vec<ConfirmedBlockCertificate> = self
239 .query(RpcMessage::DownloadCertificatesByHeights(
240 chain_id,
241 heights.clone(),
242 ))
243 .await?;
244
245 if certificates.len() < expected_count {
246 return Err(NodeError::MissingCertificatesByHeights { chain_id, heights });
247 }
248 Ok(certificates)
249 }
250
251 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
252 self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
253 .await
254 }
255
256 async fn blob_last_used_by_certificate(
257 &self,
258 blob_id: BlobId,
259 ) -> Result<ConfirmedBlockCertificate, NodeError> {
260 self.query::<ConfirmedBlockCertificate>(RpcMessage::BlobLastUsedByCertificate(Box::new(
261 blob_id,
262 )))
263 .await
264 }
265
266 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
267 self.query(RpcMessage::MissingBlobIds(blob_ids)).await
268 }
269
270 async fn get_shard_info(
271 &self,
272 chain_id: ChainId,
273 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
274 let rpc_shard_info: crate::message::ShardInfo =
275 self.query(RpcMessage::ShardInfoQuery(chain_id)).await?;
276 Ok(linera_core::data_types::ShardInfo {
277 shard_id: rpc_shard_info.shard_id,
278 total_shards: rpc_shard_info.total_shards,
279 })
280 }
281}