1use futures::{sink::SinkExt, stream::StreamExt};
6use linera_base::{
7 crypto::CryptoHash,
8 data_types::{BlobContent, BlockHeight, NetworkDescription},
9 identifiers::{BlobId, ChainId},
10 time::{timer, Duration},
11};
12use linera_chain::{
13 data_types::BlockProposal,
14 types::{
15 ConfirmedBlockCertificate, LiteCertificate, TimeoutCertificate, ValidatedBlockCertificate,
16 },
17};
18use linera_core::{
19 data_types::{ChainInfoQuery, ChainInfoResponse},
20 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
21};
22use linera_version::VersionInfo;
23
24use super::{codec, transport::TransportProtocol};
25use crate::{
26 config::ValidatorPublicNetworkPreConfig, HandleConfirmedCertificateRequest,
27 HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
28 RpcMessage,
29};
30
31#[derive(Clone)]
32pub struct SimpleClient {
33 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
34 send_timeout: Duration,
35 recv_timeout: Duration,
36}
37
38impl SimpleClient {
39 pub(crate) fn new(
40 network: ValidatorPublicNetworkPreConfig<TransportProtocol>,
41 send_timeout: Duration,
42 recv_timeout: Duration,
43 ) -> Self {
44 Self {
45 network,
46 send_timeout,
47 recv_timeout,
48 }
49 }
50
51 async fn send_recv_internal(&self, message: RpcMessage) -> Result<RpcMessage, codec::Error> {
52 let address = format!("{}:{}", self.network.host, self.network.port);
53 let mut stream = self.network.protocol.connect(address).await?;
54 timer::timeout(self.send_timeout, stream.send(message))
56 .await
57 .map_err(|timeout| codec::Error::IoError(timeout.into()))??;
58 timer::timeout(self.recv_timeout, stream.next())
60 .await
61 .map_err(|timeout| codec::Error::IoError(timeout.into()))?
62 .transpose()?
63 .ok_or_else(|| codec::Error::IoError(std::io::ErrorKind::UnexpectedEof.into()))
64 }
65
66 async fn query<Response>(&self, query: RpcMessage) -> Result<Response, Response::Error>
67 where
68 Response: TryFrom<RpcMessage>,
69 Response::Error: From<codec::Error>,
70 {
71 self.send_recv_internal(query).await?.try_into()
72 }
73}
74
75impl ValidatorNode for SimpleClient {
76 type NotificationStream = NotificationStream;
77
78 fn address(&self) -> String {
79 format!(
80 "{}://{}:{}",
81 self.network.protocol, self.network.host, self.network.port
82 )
83 }
84
85 async fn handle_block_proposal(
87 &self,
88 proposal: BlockProposal,
89 ) -> Result<ChainInfoResponse, NodeError> {
90 let request = RpcMessage::BlockProposal(Box::new(proposal));
91 self.query(request).await
92 }
93
94 async fn handle_lite_certificate(
96 &self,
97 certificate: LiteCertificate<'_>,
98 delivery: CrossChainMessageDelivery,
99 ) -> Result<ChainInfoResponse, NodeError> {
100 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
101 let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
102 certificate: certificate.cloned(),
103 wait_for_outgoing_messages,
104 }));
105 self.query(request).await
106 }
107
108 async fn handle_validated_certificate(
110 &self,
111 certificate: ValidatedBlockCertificate,
112 ) -> Result<ChainInfoResponse, NodeError> {
113 let request = HandleValidatedCertificateRequest { certificate };
114 let request = RpcMessage::ValidatedCertificate(Box::new(request));
115 self.query(request).await
116 }
117
118 async fn handle_confirmed_certificate(
120 &self,
121 certificate: ConfirmedBlockCertificate,
122 delivery: CrossChainMessageDelivery,
123 ) -> Result<ChainInfoResponse, NodeError> {
124 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
125 let request = HandleConfirmedCertificateRequest {
126 certificate,
127 wait_for_outgoing_messages,
128 };
129 let request = RpcMessage::ConfirmedCertificate(Box::new(request));
130 self.query(request).await
131 }
132
133 async fn handle_timeout_certificate(
135 &self,
136 certificate: TimeoutCertificate,
137 ) -> Result<ChainInfoResponse, NodeError> {
138 let request = HandleTimeoutCertificateRequest { certificate };
139 let request = RpcMessage::TimeoutCertificate(Box::new(request));
140 self.query(request).await
141 }
142
143 async fn handle_chain_info_query(
145 &self,
146 query: ChainInfoQuery,
147 ) -> Result<ChainInfoResponse, NodeError> {
148 let request = RpcMessage::ChainInfoQuery(Box::new(query));
149 self.query(request).await
150 }
151
152 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
153 let mut stream = self
154 .network
155 .protocol
156 .connect((self.network.host.clone(), self.network.port))
157 .await
158 .map_err(|e| NodeError::ClientIoError {
159 error: e.to_string(),
160 })?;
161 timer::timeout(
163 self.send_timeout,
164 stream.send(RpcMessage::SubscribeNotifications(chains)),
165 )
166 .await
167 .map_err(|timeout| NodeError::ClientIoError {
168 error: timeout.to_string(),
169 })?
170 .map_err(|e| NodeError::ClientIoError {
171 error: e.to_string(),
172 })?;
173 let notification_stream = stream.filter_map(|result| async {
175 match result {
176 Ok(RpcMessage::Notification(notification)) => Some(*notification),
177 _ => None,
178 }
179 });
180 Ok(Box::pin(notification_stream) as NotificationStream)
181 }
182
183 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
184 self.query(RpcMessage::VersionInfoQuery).await
185 }
186
187 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
188 self.query(RpcMessage::NetworkDescriptionQuery).await
189 }
190
191 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
192 self.query(RpcMessage::UploadBlob(Box::new(content))).await
193 }
194
195 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
196 self.query(RpcMessage::DownloadBlob(Box::new(blob_id)))
197 .await
198 }
199
200 async fn download_pending_blob(
201 &self,
202 chain_id: ChainId,
203 blob_id: BlobId,
204 ) -> Result<BlobContent, NodeError> {
205 self.query(RpcMessage::DownloadPendingBlob(Box::new((
206 chain_id, blob_id,
207 ))))
208 .await
209 }
210
211 async fn handle_pending_blob(
212 &self,
213 chain_id: ChainId,
214 blob: BlobContent,
215 ) -> Result<ChainInfoResponse, NodeError> {
216 self.query(RpcMessage::HandlePendingBlob(Box::new((chain_id, blob))))
217 .await
218 }
219
220 async fn download_certificate(
221 &self,
222 hash: CryptoHash,
223 ) -> Result<ConfirmedBlockCertificate, NodeError> {
224 Ok(self
225 .download_certificates(vec![hash])
226 .await?
227 .into_iter()
228 .next()
229 .unwrap()) }
231
232 async fn download_certificates(
233 &self,
234 hashes: Vec<CryptoHash>,
235 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
236 let certificates = self
237 .query::<Vec<ConfirmedBlockCertificate>>(RpcMessage::DownloadCertificates(
238 hashes.clone(),
239 ))
240 .await?;
241
242 if certificates.len() != hashes.len() {
243 let missing_hashes: Vec<CryptoHash> = hashes
244 .into_iter()
245 .filter(|hash| !certificates.iter().any(|cert| cert.hash() == *hash))
246 .collect();
247 Err(NodeError::MissingCertificates(missing_hashes))
248 } else {
249 Ok(certificates)
250 }
251 }
252
253 async fn download_certificates_by_heights(
254 &self,
255 chain_id: ChainId,
256 heights: Vec<BlockHeight>,
257 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
258 let expected_count = heights.len();
259 let certificates: Vec<ConfirmedBlockCertificate> = self
260 .query(RpcMessage::DownloadCertificatesByHeights(
261 chain_id,
262 heights.clone(),
263 ))
264 .await?;
265
266 if certificates.len() < expected_count {
267 return Err(NodeError::MissingCertificatesByHeights { chain_id, heights });
268 }
269 Ok(certificates)
270 }
271
272 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
273 self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
274 .await
275 }
276
277 async fn blob_last_used_by_certificate(
278 &self,
279 blob_id: BlobId,
280 ) -> Result<ConfirmedBlockCertificate, NodeError> {
281 self.query::<ConfirmedBlockCertificate>(RpcMessage::BlobLastUsedByCertificate(Box::new(
282 blob_id,
283 )))
284 .await
285 }
286
287 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
288 self.query(RpcMessage::MissingBlobIds(blob_ids)).await
289 }
290
291 async fn get_shard_info(
292 &self,
293 chain_id: ChainId,
294 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
295 let rpc_shard_info: crate::message::ShardInfo =
296 self.query(RpcMessage::ShardInfoQuery(chain_id)).await?;
297 Ok(linera_core::data_types::ShardInfo {
298 shard_id: rpc_shard_info.shard_id,
299 total_shards: rpc_shard_info.total_shards,
300 })
301 }
302}