1use std::{collections::HashSet, time::Duration};
5
6use custom_debug_derive::Debug;
7use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt};
8use linera_base::{
9 crypto::{CryptoHash, ValidatorPublicKey},
10 data_types::{Blob, BlockHeight},
11 ensure,
12 identifiers::{BlobId, ChainId},
13};
14use linera_chain::{
15 data_types::BlockProposal,
16 types::{
17 CertificateValue, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate,
18 TimeoutCertificate, ValidatedBlockCertificate,
19 },
20};
21use rand::seq::SliceRandom as _;
22use tracing::{instrument, warn};
23
24use crate::{
25 data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
26 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
27};
28
29#[derive(Clone, Debug)]
31pub struct RemoteNode<N> {
32 pub public_key: ValidatorPublicKey,
33 #[debug(skip)]
34 pub node: N,
35}
36
37impl<N: ValidatorNode> RemoteNode<N> {
38 pub(crate) async fn handle_chain_info_query(
39 &self,
40 query: ChainInfoQuery,
41 ) -> Result<Box<ChainInfo>, NodeError> {
42 let chain_id = query.chain_id;
43 let response = self.node.handle_chain_info_query(query).await?;
44 self.check_and_return_info(response, chain_id)
45 }
46
47 #[instrument(level = "trace")]
48 pub(crate) async fn handle_block_proposal(
49 &self,
50 proposal: Box<BlockProposal>,
51 ) -> Result<Box<ChainInfo>, NodeError> {
52 let chain_id = proposal.content.block.chain_id;
53 let response = self.node.handle_block_proposal(*proposal).await?;
54 self.check_and_return_info(response, chain_id)
55 }
56
57 pub(crate) async fn handle_timeout_certificate(
58 &self,
59 certificate: TimeoutCertificate,
60 ) -> Result<Box<ChainInfo>, NodeError> {
61 let chain_id = certificate.inner().chain_id();
62 let response = self.node.handle_timeout_certificate(certificate).await?;
63 self.check_and_return_info(response, chain_id)
64 }
65
66 pub(crate) async fn handle_confirmed_certificate(
67 &self,
68 certificate: ConfirmedBlockCertificate,
69 delivery: CrossChainMessageDelivery,
70 ) -> Result<Box<ChainInfo>, NodeError> {
71 let chain_id = certificate.inner().chain_id();
72 let response = self
73 .node
74 .handle_confirmed_certificate(certificate, delivery)
75 .await?;
76 self.check_and_return_info(response, chain_id)
77 }
78
79 pub(crate) async fn handle_validated_certificate(
80 &self,
81 certificate: ValidatedBlockCertificate,
82 ) -> Result<Box<ChainInfo>, NodeError> {
83 let chain_id = certificate.inner().chain_id();
84 let response = self.node.handle_validated_certificate(certificate).await?;
85 self.check_and_return_info(response, chain_id)
86 }
87
88 #[instrument(level = "trace")]
89 pub(crate) async fn handle_lite_certificate(
90 &self,
91 certificate: LiteCertificate<'_>,
92 delivery: CrossChainMessageDelivery,
93 ) -> Result<Box<ChainInfo>, NodeError> {
94 let chain_id = certificate.value.chain_id;
95 let response = self
96 .node
97 .handle_lite_certificate(certificate, delivery)
98 .await?;
99 self.check_and_return_info(response, chain_id)
100 }
101
102 pub(crate) async fn handle_optimized_validated_certificate(
103 &mut self,
104 certificate: &ValidatedBlockCertificate,
105 delivery: CrossChainMessageDelivery,
106 ) -> Result<Box<ChainInfo>, NodeError> {
107 if certificate.is_signed_by(&self.public_key) {
108 let result = self
109 .handle_lite_certificate(certificate.lite_certificate(), delivery)
110 .await;
111 match result {
112 Err(NodeError::MissingCertificateValue) => {
113 warn!(
114 "Validator {} forgot a certificate value that they signed before",
115 self.public_key
116 );
117 }
118 _ => return result,
119 }
120 }
121 self.handle_validated_certificate(certificate.clone()).await
122 }
123
124 pub(crate) async fn handle_optimized_confirmed_certificate(
125 &mut self,
126 certificate: &ConfirmedBlockCertificate,
127 delivery: CrossChainMessageDelivery,
128 ) -> Result<Box<ChainInfo>, NodeError> {
129 if certificate.is_signed_by(&self.public_key) {
130 let result = self
131 .handle_lite_certificate(certificate.lite_certificate(), delivery)
132 .await;
133 match result {
134 Err(NodeError::MissingCertificateValue) => {
135 warn!(
136 "Validator {} forgot a certificate value that they signed before",
137 self.public_key
138 );
139 }
140 _ => return result,
141 }
142 }
143 self.handle_confirmed_certificate(certificate.clone(), delivery)
144 .await
145 }
146
147 fn check_and_return_info(
148 &self,
149 response: ChainInfoResponse,
150 chain_id: ChainId,
151 ) -> Result<Box<ChainInfo>, NodeError> {
152 let manager = &response.info.manager;
153 let proposed = manager.requested_proposed.as_ref();
154 let locking = manager.requested_locking.as_ref();
155 ensure!(
156 proposed.is_none_or(|proposal| proposal.content.block.chain_id == chain_id)
157 && locking.is_none_or(|cert| cert.chain_id() == chain_id)
158 && response.check(self.public_key).is_ok(),
159 NodeError::InvalidChainInfoResponse
160 );
161 Ok(response.info)
162 }
163
164 #[instrument(level = "trace", skip_all)]
165 pub(crate) async fn query_certificates_from(
166 &self,
167 chain_id: ChainId,
168 start: BlockHeight,
169 limit: u64,
170 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
171 tracing::debug!(name = ?self.public_key, ?chain_id, ?start, ?limit, "Querying certificates");
172 let range = BlockHeightRange {
173 start,
174 limit: Some(limit),
175 };
176 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range);
177 let info = self.handle_chain_info_query(query).await?;
178 self.node
179 .download_certificates(info.requested_sent_certificate_hashes)
180 .await?
181 .into_iter()
182 .map(|c| {
183 ensure!(
184 c.inner().chain_id() == chain_id,
185 NodeError::UnexpectedCertificateValue
186 );
187 ConfirmedBlockCertificate::try_from(c)
188 .map_err(|_| NodeError::InvalidChainInfoResponse)
189 })
190 .collect()
191 }
192
193 #[instrument(level = "trace")]
194 pub(crate) async fn download_certificate_for_blob(
195 &self,
196 blob_id: BlobId,
197 ) -> Result<ConfirmedBlockCertificate, NodeError> {
198 let last_used_hash = self.node.blob_last_used_by(blob_id).await?;
199 let certificate = self.node.download_certificate(last_used_hash).await?;
200 if !certificate.block().requires_or_creates_blob(&blob_id) {
201 warn!(
202 "Got invalid last used by certificate for blob {} from validator {}",
203 blob_id, self.public_key
204 );
205 return Err(NodeError::InvalidCertificateForBlob(blob_id));
206 }
207 Ok(certificate)
208 }
209
210 #[instrument(level = "trace")]
212 pub(crate) async fn send_pending_blobs(
213 &self,
214 chain_id: ChainId,
215 blobs: Vec<Blob>,
216 ) -> Result<(), NodeError> {
217 let tasks = blobs
218 .into_iter()
219 .map(|blob| self.node.handle_pending_blob(chain_id, blob.into_content()));
220 try_join_all(tasks).await?;
221 Ok(())
222 }
223
224 #[instrument(level = "trace")]
225 async fn try_download_blob(&self, blob_id: BlobId) -> Option<Blob> {
226 match self.node.download_blob(blob_id).await {
227 Ok(blob) => {
228 let blob = Blob::new(blob);
229 if blob.id() != blob_id {
230 tracing::info!(
231 "Validator {} sent an invalid blob {blob_id}.",
232 self.public_key
233 );
234 None
235 } else {
236 Some(blob)
237 }
238 }
239 Err(error) => {
240 tracing::debug!(
241 "Failed to fetch blob {blob_id} from validator {}: {error}",
242 self.public_key
243 );
244 None
245 }
246 }
247 }
248
249 #[instrument(level = "trace")]
252 pub(crate) async fn fetch_sent_certificate_hashes(
253 &self,
254 chain_id: ChainId,
255 range: BlockHeightRange,
256 ) -> Result<Vec<CryptoHash>, NodeError> {
257 let query =
258 ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range.clone());
259 let response = self.handle_chain_info_query(query).await?;
260 let hashes = response.requested_sent_certificate_hashes;
261
262 if range
263 .limit
264 .is_some_and(|limit| hashes.len() as u64 != limit)
265 {
266 warn!(
267 ?range,
268 received_num = hashes.len(),
269 "Validator sent invalid number of certificate hashes."
270 );
271 return Err(NodeError::InvalidChainInfoResponse);
272 }
273 Ok(hashes)
274 }
275
276 #[instrument(level = "trace")]
277 pub async fn download_certificates(
278 &self,
279 hashes: Vec<CryptoHash>,
280 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
281 if hashes.is_empty() {
282 return Ok(Vec::new());
283 }
284 let certificates = self.node.download_certificates(hashes.clone()).await?;
285 let returned = certificates
286 .iter()
287 .map(ConfirmedBlockCertificate::hash)
288 .collect();
289 ensure!(
290 returned == hashes,
291 NodeError::UnexpectedCertificates {
292 returned,
293 requested: hashes
294 }
295 );
296 Ok(certificates)
297 }
298
299 #[instrument(level = "trace", skip(validators))]
302 pub async fn download_blob(
303 validators: &[Self],
304 blob_id: BlobId,
305 timeout: Duration,
306 ) -> Option<Blob> {
307 let mut validators = validators.iter().collect::<Vec<_>>();
309 validators.shuffle(&mut rand::thread_rng());
310 let mut stream = validators
311 .into_iter()
312 .zip(0..)
313 .map(|(remote_node, i)| async move {
314 linera_base::time::timer::sleep(timeout * i * i).await;
315 remote_node.try_download_blob(blob_id).await
316 })
317 .collect::<FuturesUnordered<_>>();
318 while let Some(maybe_blob) = stream.next().await {
319 if let Some(blob) = maybe_blob {
320 return Some(blob);
321 }
322 }
323 None
324 }
325
326 #[instrument(level = "trace", skip(validators))]
330 pub async fn download_blobs(
331 blob_ids: &[BlobId],
332 validators: &[Self],
333 timeout: Duration,
334 ) -> Option<Vec<Blob>> {
335 let mut stream = blob_ids
336 .iter()
337 .map(|blob_id| Self::download_blob(validators, *blob_id, timeout))
338 .collect::<FuturesUnordered<_>>();
339 let mut blobs = Vec::new();
340 while let Some(maybe_blob) = stream.next().await {
341 blobs.push(maybe_blob?);
342 }
343 Some(blobs)
344 }
345
346 pub fn check_blobs_not_found<T: CertificateValue>(
349 &self,
350 certificate: &GenericCertificate<T>,
351 blob_ids: &[BlobId],
352 ) -> Result<(), NodeError> {
353 ensure!(!blob_ids.is_empty(), NodeError::EmptyBlobsNotFound);
354 let required = certificate.inner().required_blob_ids();
355 let public_key = &self.public_key;
356 for blob_id in blob_ids {
357 if !required.contains(blob_id) {
358 warn!("validator {public_key} requested blob {blob_id:?} but it is not required");
359 return Err(NodeError::UnexpectedEntriesInBlobsNotFound);
360 }
361 }
362 let unique_missing_blob_ids = blob_ids.iter().cloned().collect::<HashSet<_>>();
363 if blob_ids.len() > unique_missing_blob_ids.len() {
364 warn!("blobs requested by validator {public_key} contain duplicates");
365 return Err(NodeError::DuplicatesInBlobsNotFound);
366 }
367 Ok(())
368 }
369}