1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7 time::Duration,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbDatabase,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{chain_client, Client},
52 data_types::*,
53 environment::{TestSigner, TestWallet},
54 node::{
55 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
56 ValidatorNodeProvider,
57 },
58 notifier::ChannelNotifier,
59 worker::{Notification, ProcessableCertificate, WorkerState},
60};
61
62#[derive(Debug, PartialEq, Clone, Copy)]
63pub enum FaultType {
64 Honest,
65 Offline,
66 OfflineWithInfo,
67 NoChains,
68 DontSendConfirmVote,
69 DontProcessValidated,
70 DontSendValidateVote,
71}
72
73struct LocalValidator<S>
80where
81 S: Storage,
82{
83 state: WorkerState<S>,
84 notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90 S: Storage,
91{
92 public_key: ValidatorPublicKey,
93 client: Arc<Mutex<LocalValidator<S>>>,
94 fault_type: FaultType,
95}
96
97impl<S> ValidatorNode for LocalValidatorClient<S>
98where
99 S: Storage + Clone + Send + Sync + 'static,
100{
101 type NotificationStream = NotificationStream;
102
103 fn address(&self) -> String {
104 format!("local:{}", self.public_key)
105 }
106
107 async fn handle_block_proposal(
108 &self,
109 proposal: BlockProposal,
110 ) -> Result<ChainInfoResponse, NodeError> {
111 self.spawn_and_receive(move |validator, sender| {
112 validator.do_handle_block_proposal(proposal, sender)
113 })
114 .await
115 }
116
117 async fn handle_lite_certificate(
118 &self,
119 certificate: LiteCertificate<'_>,
120 _delivery: CrossChainMessageDelivery,
121 ) -> Result<ChainInfoResponse, NodeError> {
122 let certificate = certificate.cloned();
123 self.spawn_and_receive(move |validator, sender| {
124 validator.do_handle_lite_certificate(certificate, sender)
125 })
126 .await
127 }
128
129 async fn handle_timeout_certificate(
130 &self,
131 certificate: GenericCertificate<Timeout>,
132 ) -> Result<ChainInfoResponse, NodeError> {
133 self.spawn_and_receive(move |validator, sender| {
134 validator.do_handle_certificate(certificate, sender)
135 })
136 .await
137 }
138
139 async fn handle_validated_certificate(
140 &self,
141 certificate: GenericCertificate<ValidatedBlock>,
142 ) -> Result<ChainInfoResponse, NodeError> {
143 self.spawn_and_receive(move |validator, sender| {
144 validator.do_handle_certificate(certificate, sender)
145 })
146 .await
147 }
148
149 async fn handle_confirmed_certificate(
150 &self,
151 certificate: GenericCertificate<ConfirmedBlock>,
152 _delivery: CrossChainMessageDelivery,
153 ) -> Result<ChainInfoResponse, NodeError> {
154 self.spawn_and_receive(move |validator, sender| {
155 validator.do_handle_certificate(certificate, sender)
156 })
157 .await
158 }
159
160 async fn handle_chain_info_query(
161 &self,
162 query: ChainInfoQuery,
163 ) -> Result<ChainInfoResponse, NodeError> {
164 self.spawn_and_receive(move |validator, sender| {
165 validator.do_handle_chain_info_query(query, sender)
166 })
167 .await
168 }
169
170 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
171 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
172 .await
173 }
174
175 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
176 Ok(Default::default())
177 }
178
179 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
180 Ok(self
181 .client
182 .lock()
183 .await
184 .state
185 .storage_client()
186 .read_network_description()
187 .await
188 .transpose()
189 .ok_or(NodeError::ViewError {
190 error: "missing NetworkDescription".to_owned(),
191 })??)
192 }
193
194 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
195 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
196 .await
197 }
198
199 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
200 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
201 .await
202 }
203
204 async fn download_pending_blob(
205 &self,
206 chain_id: ChainId,
207 blob_id: BlobId,
208 ) -> Result<BlobContent, NodeError> {
209 self.spawn_and_receive(move |validator, sender| {
210 validator.do_download_pending_blob(chain_id, blob_id, sender)
211 })
212 .await
213 }
214
215 async fn handle_pending_blob(
216 &self,
217 chain_id: ChainId,
218 blob: BlobContent,
219 ) -> Result<ChainInfoResponse, NodeError> {
220 self.spawn_and_receive(move |validator, sender| {
221 validator.do_handle_pending_blob(chain_id, blob, sender)
222 })
223 .await
224 }
225
226 async fn download_certificate(
227 &self,
228 hash: CryptoHash,
229 ) -> Result<ConfirmedBlockCertificate, NodeError> {
230 self.spawn_and_receive(move |validator, sender| {
231 validator.do_download_certificate(hash, sender)
232 })
233 .await
234 }
235
236 async fn download_certificates(
237 &self,
238 hashes: Vec<CryptoHash>,
239 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
240 self.spawn_and_receive(move |validator, sender| {
241 validator.do_download_certificates(hashes, sender)
242 })
243 .await
244 }
245
246 async fn download_certificates_by_heights(
247 &self,
248 chain_id: ChainId,
249 heights: Vec<BlockHeight>,
250 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
251 self.spawn_and_receive(move |validator, sender| {
252 validator.do_download_certificates_by_heights(chain_id, heights, sender)
253 })
254 .await
255 }
256
257 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
258 self.spawn_and_receive(move |validator, sender| {
259 validator.do_blob_last_used_by(blob_id, sender)
260 })
261 .await
262 }
263
264 async fn blob_last_used_by_certificate(
265 &self,
266 blob_id: BlobId,
267 ) -> Result<ConfirmedBlockCertificate, NodeError> {
268 self.spawn_and_receive(move |validator, sender| {
269 validator.do_blob_last_used_by_certificate(blob_id, sender)
270 })
271 .await
272 }
273
274 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
275 self.spawn_and_receive(move |validator, sender| {
276 validator.do_missing_blob_ids(blob_ids, sender)
277 })
278 .await
279 }
280
281 async fn get_shard_info(
282 &self,
283 _chain_id: ChainId,
284 ) -> Result<crate::data_types::ShardInfo, NodeError> {
285 Ok(crate::data_types::ShardInfo {
287 shard_id: 0,
288 total_shards: 1,
289 })
290 }
291}
292
293impl<S> LocalValidatorClient<S>
294where
295 S: Storage + Clone + Send + Sync + 'static,
296{
297 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
298 let client = LocalValidator {
299 state,
300 notifier: Arc::new(ChannelNotifier::default()),
301 };
302 Self {
303 public_key,
304 client: Arc::new(Mutex::new(client)),
305 fault_type: FaultType::Honest,
306 }
307 }
308
309 pub fn name(&self) -> ValidatorPublicKey {
310 self.public_key
311 }
312
313 fn set_fault_type(&mut self, fault_type: FaultType) {
314 self.fault_type = fault_type;
315 }
316
317 pub async fn chain_info_with_manager_values(
319 &mut self,
320 chain_id: ChainId,
321 ) -> Result<Box<ChainInfo>, NodeError> {
322 let query = ChainInfoQuery::new(chain_id).with_manager_values();
323 let response = self.handle_chain_info_query(query).await?;
324 Ok(response.info)
325 }
326
327 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
330 where
331 T: Send + 'static,
332 R: Future<Output = Result<(), T>> + Send,
333 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
334 {
335 let validator = self.clone();
336 let (sender, receiver) = oneshot::channel();
337 tokio::spawn(async move {
338 if f(validator, sender).await.is_err() {
339 tracing::debug!("result could not be sent");
340 }
341 });
342 receiver.await.unwrap()
343 }
344
345 async fn do_handle_block_proposal(
346 self,
347 proposal: BlockProposal,
348 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
349 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
350 let result = match self.fault_type {
351 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
352 error: "offline".to_string(),
353 }),
354 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
355 FaultType::DontSendValidateVote
356 | FaultType::Honest
357 | FaultType::DontSendConfirmVote
358 | FaultType::DontProcessValidated => {
359 let result = self
360 .client
361 .lock()
362 .await
363 .state
364 .handle_block_proposal(proposal)
365 .await
366 .map_err(Into::into);
367 if self.fault_type == FaultType::DontSendValidateVote {
368 Err(NodeError::ClientIoError {
369 error: "refusing to validate".to_string(),
370 })
371 } else {
372 result
373 }
374 }
375 };
376 sender.send(result.map(|(info, _actions)| info))
378 }
379
380 async fn do_handle_lite_certificate(
381 self,
382 certificate: LiteCertificate<'_>,
383 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
384 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
385 let client = self.client.clone();
386 let mut validator = client.lock().await;
387 let result = async move {
388 match validator.state.full_certificate(certificate).await? {
389 Either::Left(confirmed) => {
390 self.do_handle_certificate_internal(confirmed, &mut validator)
391 .await
392 }
393 Either::Right(validated) => {
394 self.do_handle_certificate_internal(validated, &mut validator)
395 .await
396 }
397 }
398 }
399 .await;
400 sender.send(result)
401 }
402
403 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
404 &self,
405 certificate: GenericCertificate<T>,
406 validator: &mut MutexGuard<'_, LocalValidator<S>>,
407 ) -> Result<ChainInfoResponse, NodeError> {
408 match self.fault_type {
409 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
410 Err(NodeError::ClientIoError {
411 error: "refusing to process validated block".to_string(),
412 })
413 }
414 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
415 FaultType::Honest
416 | FaultType::DontSendConfirmVote
417 | FaultType::DontProcessValidated
418 | FaultType::DontSendValidateVote => {
419 let result = validator
420 .state
421 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
422 .await
423 .map_err(Into::into);
424 if T::KIND == CertificateKind::Validated
425 && self.fault_type == FaultType::DontSendConfirmVote
426 {
427 Err(NodeError::ClientIoError {
428 error: "refusing to confirm".to_string(),
429 })
430 } else {
431 result
432 }
433 }
434 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435 error: "offline".to_string(),
436 }),
437 }
438 }
439
440 async fn do_handle_certificate<T: ProcessableCertificate>(
441 self,
442 certificate: GenericCertificate<T>,
443 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
444 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
445 let mut validator = self.client.lock().await;
446 let result = self
447 .do_handle_certificate_internal(certificate, &mut validator)
448 .await;
449 sender.send(result)
450 }
451
452 async fn do_handle_chain_info_query(
453 self,
454 query: ChainInfoQuery,
455 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
456 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
457 let validator = self.client.lock().await;
458 let result = match self.fault_type {
459 FaultType::Offline => Err(NodeError::ClientIoError {
460 error: "offline".to_string(),
461 }),
462 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
463 FaultType::Honest
464 | FaultType::DontSendConfirmVote
465 | FaultType::DontProcessValidated
466 | FaultType::DontSendValidateVote
467 | FaultType::OfflineWithInfo => validator
468 .state
469 .handle_chain_info_query(query)
470 .await
471 .map_err(Into::into),
472 };
473 sender.send(result.map(|(info, _actions)| info))
475 }
476
477 async fn do_subscribe(
478 self,
479 chains: Vec<ChainId>,
480 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
481 ) -> Result<(), Result<NotificationStream, NodeError>> {
482 let validator = self.client.lock().await;
483 let rx = validator.notifier.subscribe(chains);
484 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
485 sender.send(Ok(stream))
486 }
487
488 async fn do_upload_blob(
489 self,
490 content: BlobContent,
491 sender: oneshot::Sender<Result<BlobId, NodeError>>,
492 ) -> Result<(), Result<BlobId, NodeError>> {
493 let validator = self.client.lock().await;
494 let blob = Blob::new(content);
495 let id = blob.id();
496 let storage = validator.state.storage_client();
497 let result = match storage.maybe_write_blobs(&[blob]).await {
498 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
499 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
500 Err(error) => Err(error.into()),
501 };
502 sender.send(result)
503 }
504
505 async fn do_download_blob(
506 self,
507 blob_id: BlobId,
508 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
509 ) -> Result<(), Result<BlobContent, NodeError>> {
510 let validator = self.client.lock().await;
511 let blob = validator
512 .state
513 .storage_client()
514 .read_blob(blob_id)
515 .await
516 .map_err(Into::into);
517 let blob = match blob {
518 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
519 Err(error) => Err(error),
520 };
521 sender.send(blob.map(|blob| blob.into_content()))
522 }
523
524 async fn do_download_pending_blob(
525 self,
526 chain_id: ChainId,
527 blob_id: BlobId,
528 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
529 ) -> Result<(), Result<BlobContent, NodeError>> {
530 let validator = self.client.lock().await;
531 let result = validator
532 .state
533 .download_pending_blob(chain_id, blob_id)
534 .await
535 .map_err(Into::into);
536 sender.send(result.map(|blob| blob.into_content()))
537 }
538
539 async fn do_handle_pending_blob(
540 self,
541 chain_id: ChainId,
542 blob: BlobContent,
543 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
544 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
545 let validator = self.client.lock().await;
546 let result = validator
547 .state
548 .handle_pending_blob(chain_id, Blob::new(blob))
549 .await
550 .map_err(Into::into);
551 sender.send(result)
552 }
553
554 async fn do_download_certificate(
555 self,
556 hash: CryptoHash,
557 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
558 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
559 let validator = self.client.lock().await;
560 let certificate = validator
561 .state
562 .storage_client()
563 .read_certificate(hash)
564 .await
565 .map_err(Into::into);
566
567 let certificate = match certificate {
568 Err(error) => Err(error),
569 Ok(entry) => match entry {
570 Some(certificate) => Ok(certificate),
571 None => {
572 panic!("Missing certificate: {hash}");
573 }
574 },
575 };
576
577 sender.send(certificate)
578 }
579
580 async fn do_download_certificates(
581 self,
582 hashes: Vec<CryptoHash>,
583 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
584 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
585 let validator = self.client.lock().await;
586 let certificates = validator
587 .state
588 .storage_client()
589 .read_certificates(hashes.clone())
590 .await
591 .map_err(Into::into);
592
593 let certificates = match certificates {
594 Err(error) => Err(error),
595 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
596 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
597 ResultReadCertificates::InvalidHashes(hashes) => {
598 panic!("Missing certificates: {:?}", hashes)
599 }
600 },
601 };
602
603 sender.send(certificates)
604 }
605
606 async fn do_download_certificates_by_heights(
607 self,
608 chain_id: ChainId,
609 heights: Vec<BlockHeight>,
610 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
611 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
612 let (query_sender, query_receiver) = oneshot::channel();
614 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
615
616 let self_clone = self.clone();
617 self.do_handle_chain_info_query(query, query_sender)
618 .await
619 .expect("Failed to handle chain info query");
620
621 let chain_info_response = query_receiver.await.map_err(|_| {
623 Err(NodeError::ClientIoError {
624 error: "Failed to receive chain info response".to_string(),
625 })
626 })?;
627
628 let hashes = match chain_info_response {
629 Ok(response) => response.info.requested_sent_certificate_hashes,
630 Err(e) => {
631 return sender.send(Err(e));
632 }
633 };
634
635 let (cert_sender, cert_receiver) = oneshot::channel();
637 self_clone
638 .do_download_certificates(hashes, cert_sender)
639 .await?;
640
641 let result = cert_receiver.await.map_err(|_| {
643 Err(NodeError::ClientIoError {
644 error: "Failed to receive certificates".to_string(),
645 })
646 })?;
647
648 sender.send(result)
649 }
650
651 async fn do_blob_last_used_by(
652 self,
653 blob_id: BlobId,
654 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
655 ) -> Result<(), Result<CryptoHash, NodeError>> {
656 let validator = self.client.lock().await;
657 let blob_state = validator
658 .state
659 .storage_client()
660 .read_blob_state(blob_id)
661 .await
662 .map_err(Into::into);
663 let certificate_hash = match blob_state {
664 Err(err) => Err(err),
665 Ok(blob_state) => match blob_state {
666 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
667 Some(blob_state) => blob_state
668 .last_used_by
669 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
670 },
671 };
672
673 sender.send(certificate_hash)
674 }
675
676 async fn do_blob_last_used_by_certificate(
677 self,
678 blob_id: BlobId,
679 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
680 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
681 match self.blob_last_used_by(blob_id).await {
682 Ok(cert_hash) => {
683 let cert = self.download_certificate(cert_hash).await;
684 sender.send(cert)
685 }
686 Err(err) => sender.send(Err(err)),
687 }
688 }
689
690 async fn do_missing_blob_ids(
691 self,
692 blob_ids: Vec<BlobId>,
693 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
694 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
695 let validator = self.client.lock().await;
696 let missing_blob_ids = validator
697 .state
698 .storage_client()
699 .missing_blobs(&blob_ids)
700 .await
701 .map_err(Into::into);
702 sender.send(missing_blob_ids)
703 }
704}
705
706#[derive(Clone)]
707pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
708where
709 S: Storage;
710
711impl<S> NodeProvider<S>
712where
713 S: Storage + Clone,
714{
715 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
716 self.0.lock().unwrap().clone()
717 }
718}
719
720impl<S> ValidatorNodeProvider for NodeProvider<S>
721where
722 S: Storage + Clone + Send + Sync + 'static,
723{
724 type Node = LocalValidatorClient<S>;
725
726 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
727 unimplemented!()
728 }
729
730 fn make_nodes_from_list<A>(
731 &self,
732 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
733 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
734 where
735 A: AsRef<str>,
736 {
737 let list = self.0.lock().unwrap();
738 Ok(validators
739 .into_iter()
740 .map(|(public_key, address)| {
741 list.iter()
742 .find(|client| client.public_key == public_key)
743 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
744 address: address.as_ref().to_string(),
745 })
746 .map(|client| (public_key, client.clone()))
747 })
748 .collect::<Result<Vec<_>, _>>()?
749 .into_iter())
750 }
751}
752
753impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
754where
755 S: Storage,
756{
757 fn from_iter<T>(iter: T) -> Self
758 where
759 T: IntoIterator<Item = LocalValidatorClient<S>>,
760 {
761 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
762 }
763}
764
765pub struct TestBuilder<B: StorageBuilder> {
772 storage_builder: B,
773 pub initial_committee: Committee,
774 admin_description: Option<ChainDescription>,
775 network_description: Option<NetworkDescription>,
776 genesis_storage_builder: GenesisStorageBuilder,
777 node_provider: NodeProvider<B::Storage>,
778 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
779 chain_client_storages: Vec<B::Storage>,
780 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
781 pub signer: TestSigner,
782}
783
784#[async_trait]
785pub trait StorageBuilder {
786 type Storage: Storage + Clone + Send + Sync + 'static;
787
788 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
789
790 fn clock(&self) -> &TestClock;
791}
792
793#[derive(Default)]
794struct GenesisStorageBuilder {
795 accounts: Vec<GenesisAccount>,
796}
797
798struct GenesisAccount {
799 description: ChainDescription,
800 public_key: AccountPublicKey,
801}
802
803impl GenesisStorageBuilder {
804 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
805 self.accounts.push(GenesisAccount {
806 description,
807 public_key,
808 })
809 }
810
811 async fn build<S>(&self, storage: S) -> S
812 where
813 S: Storage + Clone + Send + Sync + 'static,
814 {
815 for account in &self.accounts {
816 storage
817 .create_chain(account.description.clone())
818 .await
819 .unwrap();
820 }
821 storage
822 }
823}
824
825pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828 pub async fn read_confirmed_blocks_downward(
830 &self,
831 from: CryptoHash,
832 limit: u32,
833 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834 let mut hash = Some(from);
835 let mut values = Vec::new();
836 for _ in 0..limit {
837 let Some(next_hash) = hash else {
838 break;
839 };
840 let value = self.read_confirmed_block(next_hash).await?;
841 hash = value.block().header.previous_block_hash;
842 values.push(value);
843 }
844 Ok(values)
845 }
846}
847
848impl<B> TestBuilder<B>
849where
850 B: StorageBuilder,
851{
852 pub async fn new(
853 mut storage_builder: B,
854 count: usize,
855 with_faulty_validators: usize,
856 mut signer: TestSigner,
857 ) -> Result<Self, anyhow::Error> {
858 let mut validators = Vec::new();
859 for _ in 0..count {
860 let validator_keypair = ValidatorKeypair::generate();
861 let account_public_key = signer.generate_new();
862 validators.push((validator_keypair, account_public_key));
863 }
864 let for_committee = validators
865 .iter()
866 .map(|(validating, account)| (validating.public_key, *account))
867 .collect::<Vec<_>>();
868 let initial_committee = Committee::make_simple(for_committee);
869 let mut validator_clients = Vec::new();
870 let mut validator_storages = HashMap::new();
871 let mut faulty_validators = HashSet::new();
872 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873 let validator_public_key = validator_keypair.public_key;
874 let storage = storage_builder.build().await?;
875 let state = WorkerState::new(
876 format!("Node {}", i),
877 Some(validator_keypair.secret_key),
878 storage.clone(),
879 5_000,
880 10_000,
881 )
882 .with_allow_inactive_chains(false)
883 .with_allow_messages_from_deprecated_epochs(false);
884 let mut validator = LocalValidatorClient::new(validator_public_key, state);
885 if i < with_faulty_validators {
886 faulty_validators.insert(validator_public_key);
887 validator.set_fault_type(FaultType::NoChains);
888 }
889 validator_clients.push(validator);
890 validator_storages.insert(validator_public_key, storage);
891 }
892 tracing::info!(
893 "Test will use the following faulty validators: {:?}",
894 faulty_validators
895 );
896 Ok(Self {
897 storage_builder,
898 initial_committee,
899 admin_description: None,
900 network_description: None,
901 genesis_storage_builder: GenesisStorageBuilder::default(),
902 node_provider: NodeProvider::from_iter(validator_clients),
903 validator_storages,
904 chain_client_storages: Vec::new(),
905 chain_owners: BTreeMap::new(),
906 signer,
907 })
908 }
909
910 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911 let validators = self.initial_committee.validators().clone();
912 self.initial_committee = Committee::new(validators, policy);
913 self
914 }
915
916 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917 let mut faulty_validators = vec![];
918 let mut validator_clients = self.node_provider.0.lock().unwrap();
919 for index in indexes.as_ref() {
920 let validator = &mut validator_clients[*index];
921 validator.set_fault_type(fault_type);
922 faulty_validators.push(validator.public_key);
923 }
924 tracing::info!(
925 "Making the following validators {:?}: {:?}",
926 fault_type,
927 faulty_validators
928 );
929 }
930
931 pub async fn add_root_chain(
936 &mut self,
937 index: u32,
938 balance: Amount,
939 ) -> anyhow::Result<ChainClient<B::Storage>> {
940 if self.admin_description.is_none() && index != 0 {
942 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943 }
944 let origin = ChainOrigin::Root(index);
945 let public_key = self.signer.generate_new();
946 let open_chain_config = InitialChainConfig {
947 ownership: ChainOwnership::single(public_key.into()),
948 epoch: Epoch(0),
949 min_active_epoch: Epoch(0),
950 max_active_epoch: Epoch(0),
951 balance,
952 application_permissions: ApplicationPermissions::default(),
953 };
954 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956 if index == 0 {
957 self.admin_description = Some(description.clone());
958 self.network_description = Some(NetworkDescription {
959 admin_chain_id: description.id(),
960 genesis_config_hash: CryptoHash::test_hash("genesis config"),
962 genesis_timestamp: Timestamp::from(0),
963 genesis_committee_blob_hash: committee_blob.id().hash,
964 name: "test network".to_string(),
965 });
966 }
967 self.genesis_storage_builder
969 .add(description.clone(), public_key);
970
971 let network_description = self.network_description.as_ref().unwrap();
972
973 for validator in self.node_provider.all_nodes() {
974 let storage = self
975 .validator_storages
976 .get_mut(&validator.public_key)
977 .unwrap();
978 storage
979 .write_network_description(network_description)
980 .await
981 .expect("writing the NetworkDescription should succeed");
982 storage
983 .write_blob(&committee_blob)
984 .await
985 .expect("writing a blob should succeed");
986 storage.create_chain(description.clone()).await.unwrap();
987 }
988 for storage in &mut self.chain_client_storages {
989 storage.create_chain(description.clone()).await.unwrap();
990 }
991 let chain_id = description.id();
992 self.chain_owners.insert(chain_id, public_key.into());
993 self.make_client(chain_id, None, BlockHeight::ZERO).await
994 }
995
996 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997 let mut result = Vec::new();
998 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999 assert_eq!(
1000 genesis_account.description.origin(),
1001 ChainOrigin::Root(i as u32)
1002 );
1003 result.push((
1004 genesis_account.public_key,
1005 genesis_account.description.config().balance,
1006 ));
1007 }
1008 result
1009 }
1010
1011 pub fn admin_id(&self) -> ChainId {
1012 self.admin_description
1013 .as_ref()
1014 .expect("admin chain not initialized")
1015 .id()
1016 }
1017
1018 pub fn admin_description(&self) -> Option<&ChainDescription> {
1019 self.admin_description.as_ref()
1020 }
1021
1022 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023 self.node_provider.clone()
1024 }
1025
1026 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027 self.node_provider.0.lock().unwrap()[index].clone()
1028 }
1029
1030 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031 let storage = self.storage_builder.build().await?;
1032 let network_description = self.network_description.as_ref().unwrap();
1033 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034 storage
1035 .write_network_description(network_description)
1036 .await
1037 .expect("writing the NetworkDescription should succeed");
1038 storage
1039 .write_blob(&committee_blob)
1040 .await
1041 .expect("writing a blob should succeed");
1042 Ok(self.genesis_storage_builder.build(storage).await)
1043 }
1044
1045 pub async fn make_client_with_options(
1046 &mut self,
1047 chain_id: ChainId,
1048 block_hash: Option<CryptoHash>,
1049 block_height: BlockHeight,
1050 options: chain_client::Options,
1051 follow_only: bool,
1052 ) -> anyhow::Result<ChainClient<B::Storage>> {
1053 let storage = self.make_storage().await?;
1056 self.chain_client_storages.push(storage.clone());
1057 let mode = if follow_only {
1058 crate::client::ListeningMode::FollowChain
1059 } else {
1060 crate::client::ListeningMode::FullChain
1061 };
1062 let client = Arc::new(Client::new(
1063 crate::environment::Impl {
1064 network: self.make_node_provider(),
1065 storage,
1066 signer: self.signer.clone(),
1067 wallet: TestWallet::default(),
1068 },
1069 self.admin_id(),
1070 false,
1071 [(chain_id, mode)],
1072 format!("Client node for {:.8}", chain_id),
1073 Duration::from_secs(30),
1074 Duration::from_secs(1),
1075 options,
1076 5_000,
1077 10_000,
1078 crate::client::RequestsSchedulerConfig::default(),
1079 ));
1080 Ok(client.create_chain_client(
1081 chain_id,
1082 block_hash,
1083 block_height,
1084 None,
1085 self.chain_owners.get(&chain_id).copied(),
1086 None,
1087 follow_only,
1088 ))
1089 }
1090
1091 pub async fn make_client(
1092 &mut self,
1093 chain_id: ChainId,
1094 block_hash: Option<CryptoHash>,
1095 block_height: BlockHeight,
1096 ) -> anyhow::Result<ChainClient<B::Storage>> {
1097 self.make_client_with_options(
1098 chain_id,
1099 block_hash,
1100 block_height,
1101 chain_client::Options::test_default(),
1102 false,
1103 )
1104 .await
1105 }
1106
1107 pub async fn check_that_validators_have_certificate(
1109 &self,
1110 chain_id: ChainId,
1111 block_height: BlockHeight,
1112 target_count: usize,
1113 ) -> Option<ConfirmedBlockCertificate> {
1114 let query = ChainInfoQuery::new(chain_id)
1115 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1116 let mut count = 0;
1117 let mut certificate = None;
1118 for validator in self.node_provider.all_nodes() {
1119 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1120 if response.check(validator.public_key).is_ok() {
1121 let ChainInfo {
1122 mut requested_sent_certificate_hashes,
1123 ..
1124 } = *response.info;
1125 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1126 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1127 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1128 if cert.inner().block().header.chain_id == chain_id
1129 && cert.inner().block().header.height == block_height
1130 {
1131 cert.check(&self.initial_committee).unwrap();
1132 count += 1;
1133 certificate = Some(cert);
1134 }
1135 }
1136 }
1137 }
1138 }
1139 }
1140 assert!(count >= target_count);
1141 certificate
1142 }
1143
1144 pub async fn check_that_validators_are_in_round(
1147 &self,
1148 chain_id: ChainId,
1149 block_height: BlockHeight,
1150 round: Round,
1151 target_count: usize,
1152 ) {
1153 let query = ChainInfoQuery::new(chain_id);
1154 let mut count = 0;
1155 for validator in self.node_provider.all_nodes() {
1156 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1157 if response.info.manager.current_round == round
1158 && response.info.next_block_height == block_height
1159 && response.check(validator.public_key).is_ok()
1160 {
1161 count += 1;
1162 }
1163 }
1164 }
1165 assert!(count >= target_count);
1166 }
1167
1168 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1170 for validator in self.node_provider.all_nodes() {
1171 let guard = validator.client.lock().await;
1172 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1173 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1174 }
1175 }
1176}
1177
1178#[cfg(feature = "rocksdb")]
1179static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1181
1182#[derive(Default)]
1183pub struct MemoryStorageBuilder {
1184 namespace: String,
1185 instance_counter: usize,
1186 wasm_runtime: Option<WasmRuntime>,
1187 clock: TestClock,
1188}
1189
1190#[async_trait]
1191impl StorageBuilder for MemoryStorageBuilder {
1192 type Storage = DbStorage<MemoryDatabase, TestClock>;
1193
1194 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1195 self.instance_counter += 1;
1196 let config = MemoryDatabase::new_test_config().await?;
1197 if self.namespace.is_empty() {
1198 self.namespace = generate_test_namespace();
1199 }
1200 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1201 Ok(
1202 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1203 .await?,
1204 )
1205 }
1206
1207 fn clock(&self) -> &TestClock {
1208 &self.clock
1209 }
1210}
1211
1212impl MemoryStorageBuilder {
1213 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1216 MemoryStorageBuilder {
1217 wasm_runtime: wasm_runtime.into(),
1218 ..MemoryStorageBuilder::default()
1219 }
1220 }
1221}
1222
1223#[cfg(feature = "rocksdb")]
1224pub struct RocksDbStorageBuilder {
1225 namespace: String,
1226 instance_counter: usize,
1227 wasm_runtime: Option<WasmRuntime>,
1228 clock: TestClock,
1229 _permit: SemaphorePermit<'static>,
1230}
1231
1232#[cfg(feature = "rocksdb")]
1233impl RocksDbStorageBuilder {
1234 pub async fn new() -> Self {
1235 RocksDbStorageBuilder {
1236 namespace: String::new(),
1237 instance_counter: 0,
1238 wasm_runtime: None,
1239 clock: TestClock::default(),
1240 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1241 }
1242 }
1243
1244 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1247 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1248 RocksDbStorageBuilder {
1249 wasm_runtime: wasm_runtime.into(),
1250 ..RocksDbStorageBuilder::new().await
1251 }
1252 }
1253}
1254
1255#[cfg(feature = "rocksdb")]
1256#[async_trait]
1257impl StorageBuilder for RocksDbStorageBuilder {
1258 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1259
1260 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1261 self.instance_counter += 1;
1262 let config = RocksDbDatabase::new_test_config().await?;
1263 if self.namespace.is_empty() {
1264 self.namespace = generate_test_namespace();
1265 }
1266 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1267 Ok(
1268 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1269 .await?,
1270 )
1271 }
1272
1273 fn clock(&self) -> &TestClock {
1274 &self.clock
1275 }
1276}
1277
1278#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1279#[derive(Default)]
1280pub struct ServiceStorageBuilder {
1281 namespace: String,
1282 instance_counter: usize,
1283 wasm_runtime: Option<WasmRuntime>,
1284 clock: TestClock,
1285}
1286
1287#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1288impl ServiceStorageBuilder {
1289 pub fn new() -> Self {
1291 Self::with_wasm_runtime(None)
1292 }
1293
1294 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1296 ServiceStorageBuilder {
1297 wasm_runtime: wasm_runtime.into(),
1298 ..ServiceStorageBuilder::default()
1299 }
1300 }
1301}
1302
1303#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1304#[async_trait]
1305impl StorageBuilder for ServiceStorageBuilder {
1306 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1307
1308 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1309 self.instance_counter += 1;
1310 let config = StorageServiceDatabase::new_test_config().await?;
1311 if self.namespace.is_empty() {
1312 self.namespace = generate_test_namespace();
1313 }
1314 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1315 Ok(
1316 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1317 .await?,
1318 )
1319 }
1320
1321 fn clock(&self) -> &TestClock {
1322 &self.clock
1323 }
1324}
1325
1326#[cfg(feature = "dynamodb")]
1327#[derive(Default)]
1328pub struct DynamoDbStorageBuilder {
1329 namespace: String,
1330 instance_counter: usize,
1331 wasm_runtime: Option<WasmRuntime>,
1332 clock: TestClock,
1333}
1334
1335#[cfg(feature = "dynamodb")]
1336impl DynamoDbStorageBuilder {
1337 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1340 DynamoDbStorageBuilder {
1341 wasm_runtime: wasm_runtime.into(),
1342 ..DynamoDbStorageBuilder::default()
1343 }
1344 }
1345}
1346
1347#[cfg(feature = "dynamodb")]
1348#[async_trait]
1349impl StorageBuilder for DynamoDbStorageBuilder {
1350 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1351
1352 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1353 self.instance_counter += 1;
1354 let config = DynamoDbDatabase::new_test_config().await?;
1355 if self.namespace.is_empty() {
1356 self.namespace = generate_test_namespace();
1357 }
1358 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1359 Ok(
1360 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1361 .await?,
1362 )
1363 }
1364
1365 fn clock(&self) -> &TestClock {
1366 &self.clock
1367 }
1368}
1369
1370#[cfg(feature = "scylladb")]
1371#[derive(Default)]
1372pub struct ScyllaDbStorageBuilder {
1373 namespace: String,
1374 instance_counter: usize,
1375 wasm_runtime: Option<WasmRuntime>,
1376 clock: TestClock,
1377}
1378
1379#[cfg(feature = "scylladb")]
1380impl ScyllaDbStorageBuilder {
1381 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1384 ScyllaDbStorageBuilder {
1385 wasm_runtime: wasm_runtime.into(),
1386 ..ScyllaDbStorageBuilder::default()
1387 }
1388 }
1389}
1390
1391#[cfg(feature = "scylladb")]
1392#[async_trait]
1393impl StorageBuilder for ScyllaDbStorageBuilder {
1394 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1395
1396 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1397 self.instance_counter += 1;
1398 let config = ScyllaDbDatabase::new_test_config().await?;
1399 if self.namespace.is_empty() {
1400 self.namespace = generate_test_namespace();
1401 }
1402 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1403 Ok(
1404 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1405 .await?,
1406 )
1407 }
1408
1409 fn clock(&self) -> &TestClock {
1410 &self.clock
1411 }
1412}
1413
1414pub trait ClientOutcomeResultExt<T, E> {
1415 fn unwrap_ok_committed(self) -> T;
1416}
1417
1418impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1419 fn unwrap_ok_committed(self) -> T {
1420 self.unwrap().unwrap()
1421 }
1422}