1use std::collections::{HashSet, VecDeque};
5
6use custom_debug_derive::Debug;
7use futures::future::try_join_all;
8use linera_base::{
9 crypto::ValidatorPublicKey,
10 data_types::{Blob, BlockHeight},
11 ensure,
12 identifiers::{BlobId, ChainId},
13};
14use linera_chain::{
15 data_types::BlockProposal,
16 types::{
17 CertificateValue, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate,
18 TimeoutCertificate, ValidatedBlockCertificate,
19 },
20};
21use tracing::{debug, info, instrument};
22
23use crate::{
24 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
25 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
26};
27
28#[derive(Clone, Debug)]
30pub struct RemoteNode<N> {
31 pub public_key: ValidatorPublicKey,
32 #[debug(skip)]
33 pub node: N,
34}
35
36impl<N: ValidatorNode> RemoteNode<N> {
37 pub(crate) async fn handle_chain_info_query(
38 &self,
39 query: ChainInfoQuery,
40 ) -> Result<Box<ChainInfo>, NodeError> {
41 let chain_id = query.chain_id;
42 let response = self.node.handle_chain_info_query(query).await?;
43 self.check_and_return_info(response, chain_id)
44 }
45
46 #[instrument(level = "trace")]
47 pub(crate) async fn handle_block_proposal(
48 &self,
49 proposal: Box<BlockProposal>,
50 ) -> Result<Box<ChainInfo>, NodeError> {
51 let chain_id = proposal.content.block.chain_id;
52 let response = self.node.handle_block_proposal(*proposal).await?;
53 self.check_and_return_info(response, chain_id)
54 }
55
56 pub(crate) async fn handle_timeout_certificate(
57 &self,
58 certificate: TimeoutCertificate,
59 ) -> Result<Box<ChainInfo>, NodeError> {
60 let chain_id = certificate.inner().chain_id();
61 let response = self.node.handle_timeout_certificate(certificate).await?;
62 self.check_and_return_info(response, chain_id)
63 }
64
65 pub(crate) async fn handle_confirmed_certificate(
66 &self,
67 certificate: ConfirmedBlockCertificate,
68 delivery: CrossChainMessageDelivery,
69 ) -> Result<Box<ChainInfo>, NodeError> {
70 let chain_id = certificate.inner().chain_id();
71 let response = self
72 .node
73 .handle_confirmed_certificate(certificate, delivery)
74 .await?;
75 self.check_and_return_info(response, chain_id)
76 }
77
78 pub(crate) async fn handle_validated_certificate(
79 &self,
80 certificate: ValidatedBlockCertificate,
81 ) -> Result<Box<ChainInfo>, NodeError> {
82 let chain_id = certificate.inner().chain_id();
83 let response = self.node.handle_validated_certificate(certificate).await?;
84 self.check_and_return_info(response, chain_id)
85 }
86
87 #[instrument(level = "trace")]
88 pub(crate) async fn handle_lite_certificate(
89 &self,
90 certificate: LiteCertificate<'_>,
91 delivery: CrossChainMessageDelivery,
92 ) -> Result<Box<ChainInfo>, NodeError> {
93 let chain_id = certificate.value.chain_id;
94 let response = self
95 .node
96 .handle_lite_certificate(certificate, delivery)
97 .await?;
98 self.check_and_return_info(response, chain_id)
99 }
100
101 pub(crate) async fn handle_optimized_validated_certificate(
102 &mut self,
103 certificate: &ValidatedBlockCertificate,
104 delivery: CrossChainMessageDelivery,
105 ) -> Result<Box<ChainInfo>, NodeError> {
106 if certificate.is_signed_by(&self.public_key) {
107 let result = self
108 .handle_lite_certificate(certificate.lite_certificate(), delivery)
109 .await;
110 match result {
111 Err(NodeError::MissingCertificateValue) => {
112 debug!(
113 address = self.address(),
114 certificate_hash = %certificate.hash(),
115 "validator forgot a validated block value that they signed before",
116 );
117 }
118 _ => return result,
119 }
120 }
121 self.handle_validated_certificate(certificate.clone()).await
122 }
123
124 pub(crate) async fn handle_optimized_confirmed_certificate(
125 &mut self,
126 certificate: &ConfirmedBlockCertificate,
127 delivery: CrossChainMessageDelivery,
128 ) -> Result<Box<ChainInfo>, NodeError> {
129 if certificate.is_signed_by(&self.public_key) {
130 let result = self
131 .handle_lite_certificate(certificate.lite_certificate(), delivery)
132 .await;
133 match result {
134 Err(NodeError::MissingCertificateValue) => {
135 debug!(
136 address = self.address(),
137 certificate_hash = %certificate.hash(),
138 "validator forgot a confirmed block value that they signed before",
139 );
140 }
141 _ => return result,
142 }
143 }
144 self.handle_confirmed_certificate(certificate.clone(), delivery)
145 .await
146 }
147
148 fn check_and_return_info(
149 &self,
150 response: ChainInfoResponse,
151 chain_id: ChainId,
152 ) -> Result<Box<ChainInfo>, NodeError> {
153 let manager = &response.info.manager;
154 let proposed = manager.requested_proposed.as_ref();
155 let locking = manager.requested_locking.as_ref();
156 ensure!(
157 proposed.is_none_or(|proposal| proposal.content.block.chain_id == chain_id)
158 && locking.is_none_or(|cert| cert.chain_id() == chain_id)
159 && response.check(self.public_key).is_ok(),
160 NodeError::InvalidChainInfoResponse
161 );
162 Ok(response.info)
163 }
164
165 #[instrument(level = "trace")]
166 pub(crate) async fn download_certificate_for_blob(
167 &self,
168 blob_id: BlobId,
169 ) -> Result<ConfirmedBlockCertificate, NodeError> {
170 let certificate = self.node.blob_last_used_by_certificate(blob_id).await?;
171 if !certificate.block().requires_or_creates_blob(&blob_id) {
172 info!(
173 address = self.address(),
174 %blob_id,
175 "got invalid last used by certificate for blob from validator",
176 );
177 return Err(NodeError::InvalidCertificateForBlob(blob_id));
178 }
179 Ok(certificate)
180 }
181
182 #[instrument(level = "trace")]
184 pub(crate) async fn send_pending_blobs(
185 &self,
186 chain_id: ChainId,
187 blobs: Vec<Blob>,
188 ) -> Result<(), NodeError> {
189 let tasks = blobs
190 .into_iter()
191 .map(|blob| self.node.handle_pending_blob(chain_id, blob.into_content()));
192 try_join_all(tasks).await?;
193 Ok(())
194 }
195
196 #[instrument(level = "trace")]
197 pub async fn download_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, NodeError> {
198 match self.node.download_blob(blob_id).await {
199 Ok(blob) => {
200 let blob = Blob::new(blob);
201 if blob.id() != blob_id {
202 tracing::info!(
203 address = self.address(),
204 %blob_id,
205 "validator sent an invalid blob.",
206 );
207 Ok(None)
208 } else {
209 Ok(Some(blob))
210 }
211 }
212 Err(NodeError::BlobsNotFound(_error)) => {
213 tracing::debug!(
214 ?blob_id,
215 address = self.address(),
216 "validator is missing the blob",
217 );
218 Ok(None)
219 }
220 Err(error) => Err(error),
221 }
222 }
223
224 #[instrument(level = "trace")]
226 pub async fn download_certificates_by_heights(
227 &self,
228 chain_id: ChainId,
229 heights: Vec<BlockHeight>,
230 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
231 let mut expected_heights = VecDeque::from(heights.clone());
232 let certificates = self
233 .node
234 .download_certificates_by_heights(chain_id, heights)
235 .await?;
236
237 if certificates.len() > expected_heights.len() {
238 return Err(NodeError::TooManyCertificatesReturned {
239 chain_id,
240 remote_node: Box::new(self.public_key),
241 });
242 }
243
244 for certificate in &certificates {
245 ensure!(
246 certificate.inner().chain_id() == chain_id,
247 NodeError::UnexpectedCertificateValue
248 );
249 if let Some(expected_height) = expected_heights.pop_front() {
250 ensure!(
251 expected_height == certificate.inner().height(),
252 NodeError::UnexpectedCertificateValue
253 );
254 } else {
255 return Err(NodeError::UnexpectedCertificateValue);
256 }
257 }
258
259 ensure!(
260 expected_heights.is_empty(),
261 NodeError::MissingCertificatesByHeights {
262 chain_id,
263 heights: expected_heights.into_iter().collect(),
264 }
265 );
266 Ok(certificates)
267 }
268
269 pub fn check_blobs_not_found<T: CertificateValue>(
272 &self,
273 certificate: &GenericCertificate<T>,
274 blob_ids: &[BlobId],
275 ) -> Result<(), NodeError> {
276 ensure!(!blob_ids.is_empty(), NodeError::EmptyBlobsNotFound);
277 let required = certificate.inner().required_blob_ids();
278 for blob_id in blob_ids {
279 if !required.contains(blob_id) {
280 info!(
281 address = self.address(),
282 %blob_id,
283 "validator requested blob but it is not required",
284 );
285 return Err(NodeError::UnexpectedEntriesInBlobsNotFound);
286 }
287 }
288 let unique_missing_blob_ids = blob_ids.iter().copied().collect::<HashSet<_>>();
289 if blob_ids.len() > unique_missing_blob_ids.len() {
290 info!(
291 address = self.address(),
292 "blobs requested by validator contain duplicates",
293 );
294 return Err(NodeError::DuplicatesInBlobsNotFound);
295 }
296 Ok(())
297 }
298
299 pub fn address(&self) -> String {
301 self.node.address()
302 }
303}
304
305impl<N: ValidatorNode> PartialEq for RemoteNode<N> {
306 fn eq(&self, other: &Self) -> bool {
307 self.public_key == other.public_key
308 }
309}
310
311impl<N: ValidatorNode> Eq for RemoteNode<N> {}