1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 num::NonZeroUsize,
7 sync::Arc,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::ServiceStoreClient;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbStore;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbStore;
39use linera_views::{
40 memory::MemoryStore, random::generate_test_namespace, store::TestKeyValueStore as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbStore,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{ChainClientOptions, Client},
52 data_types::*,
53 node::{
54 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55 ValidatorNodeProvider,
56 },
57 notifier::ChannelNotifier,
58 worker::{NetworkActions, Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63 Honest,
64 Offline,
65 OfflineWithInfo,
66 Malicious,
67 DontSendConfirmVote,
68 DontProcessValidated,
69 DontSendValidateVote,
70}
71
72struct LocalValidator<S>
79where
80 S: Storage,
81{
82 state: WorkerState<S>,
83 fault_type: FaultType,
84 notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90 S: Storage,
91{
92 public_key: ValidatorPublicKey,
93 client: Arc<Mutex<LocalValidator<S>>>,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98 S: Storage + Clone + Send + Sync + 'static,
99{
100 type NotificationStream = NotificationStream;
101
102 async fn handle_block_proposal(
103 &self,
104 proposal: BlockProposal,
105 ) -> Result<ChainInfoResponse, NodeError> {
106 self.spawn_and_receive(move |validator, sender| {
107 validator.do_handle_block_proposal(proposal, sender)
108 })
109 .await
110 }
111
112 async fn handle_lite_certificate(
113 &self,
114 certificate: LiteCertificate<'_>,
115 _delivery: CrossChainMessageDelivery,
116 ) -> Result<ChainInfoResponse, NodeError> {
117 let certificate = certificate.cloned();
118 self.spawn_and_receive(move |validator, sender| {
119 validator.do_handle_lite_certificate(certificate, sender)
120 })
121 .await
122 }
123
124 async fn handle_timeout_certificate(
125 &self,
126 certificate: GenericCertificate<Timeout>,
127 ) -> Result<ChainInfoResponse, NodeError> {
128 self.spawn_and_receive(move |validator, sender| {
129 validator.do_handle_certificate(certificate, sender)
130 })
131 .await
132 }
133
134 async fn handle_validated_certificate(
135 &self,
136 certificate: GenericCertificate<ValidatedBlock>,
137 ) -> Result<ChainInfoResponse, NodeError> {
138 self.spawn_and_receive(move |validator, sender| {
139 validator.do_handle_certificate(certificate, sender)
140 })
141 .await
142 }
143
144 async fn handle_confirmed_certificate(
145 &self,
146 certificate: GenericCertificate<ConfirmedBlock>,
147 _delivery: CrossChainMessageDelivery,
148 ) -> Result<ChainInfoResponse, NodeError> {
149 self.spawn_and_receive(move |validator, sender| {
150 validator.do_handle_certificate(certificate, sender)
151 })
152 .await
153 }
154
155 async fn handle_chain_info_query(
156 &self,
157 query: ChainInfoQuery,
158 ) -> Result<ChainInfoResponse, NodeError> {
159 self.spawn_and_receive(move |validator, sender| {
160 validator.do_handle_chain_info_query(query, sender)
161 })
162 .await
163 }
164
165 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167 .await
168 }
169
170 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171 Ok(Default::default())
172 }
173
174 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175 Ok(self
176 .client
177 .lock()
178 .await
179 .state
180 .storage_client()
181 .read_network_description()
182 .await
183 .transpose()
184 .ok_or(NodeError::ViewError {
185 error: "missing NetworkDescription".to_owned(),
186 })??)
187 }
188
189 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
190 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
191 .await
192 }
193
194 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
195 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
196 .await
197 }
198
199 async fn download_pending_blob(
200 &self,
201 chain_id: ChainId,
202 blob_id: BlobId,
203 ) -> Result<BlobContent, NodeError> {
204 self.spawn_and_receive(move |validator, sender| {
205 validator.do_download_pending_blob(chain_id, blob_id, sender)
206 })
207 .await
208 }
209
210 async fn handle_pending_blob(
211 &self,
212 chain_id: ChainId,
213 blob: BlobContent,
214 ) -> Result<ChainInfoResponse, NodeError> {
215 self.spawn_and_receive(move |validator, sender| {
216 validator.do_handle_pending_blob(chain_id, blob, sender)
217 })
218 .await
219 }
220
221 async fn download_certificate(
222 &self,
223 hash: CryptoHash,
224 ) -> Result<ConfirmedBlockCertificate, NodeError> {
225 self.spawn_and_receive(move |validator, sender| {
226 validator.do_download_certificate(hash, sender)
227 })
228 .await
229 }
230
231 async fn download_certificates(
232 &self,
233 hashes: Vec<CryptoHash>,
234 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
235 self.spawn_and_receive(move |validator, sender| {
236 validator.do_download_certificates(hashes, sender)
237 })
238 .await
239 }
240
241 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
242 self.spawn_and_receive(move |validator, sender| {
243 validator.do_blob_last_used_by(blob_id, sender)
244 })
245 .await
246 }
247
248 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
249 self.spawn_and_receive(move |validator, sender| {
250 validator.do_missing_blob_ids(blob_ids, sender)
251 })
252 .await
253 }
254}
255
256impl<S> LocalValidatorClient<S>
257where
258 S: Storage + Clone + Send + Sync + 'static,
259{
260 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
261 let client = LocalValidator {
262 fault_type: FaultType::Honest,
263 state,
264 notifier: Arc::new(ChannelNotifier::default()),
265 };
266 Self {
267 public_key,
268 client: Arc::new(Mutex::new(client)),
269 }
270 }
271
272 pub fn name(&self) -> ValidatorPublicKey {
273 self.public_key
274 }
275
276 async fn set_fault_type(&self, fault_type: FaultType) {
277 self.client.lock().await.fault_type = fault_type;
278 }
279
280 async fn fault_type(&self) -> FaultType {
281 self.client.lock().await.fault_type
282 }
283
284 pub async fn chain_info_with_manager_values(
286 &mut self,
287 chain_id: ChainId,
288 ) -> Result<Box<ChainInfo>, NodeError> {
289 let query = ChainInfoQuery::new(chain_id).with_manager_values();
290 let response = self.handle_chain_info_query(query).await?;
291 Ok(response.info)
292 }
293
294 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
297 where
298 T: Send + 'static,
299 R: Future<Output = Result<(), T>> + Send,
300 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
301 {
302 let validator = self.clone();
303 let (sender, receiver) = oneshot::channel();
304 tokio::spawn(async move {
305 if f(validator, sender).await.is_err() {
306 tracing::debug!("result could not be sent");
307 }
308 });
309 receiver.await.unwrap()
310 }
311
312 async fn do_handle_block_proposal(
313 self,
314 proposal: BlockProposal,
315 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
316 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
317 let mut validator = self.client.lock().await;
318 let handle_block_proposal_result =
319 Self::handle_block_proposal(proposal, &mut validator).await;
320 let result = match handle_block_proposal_result {
321 Some(Err(NodeError::BlobsNotFound(_))) => {
322 handle_block_proposal_result.expect("handle_block_proposal_result should be Some")
323 }
324 _ => match validator.fault_type {
325 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
326 error: "offline".to_string(),
327 }),
328 FaultType::Malicious => Err(ArithmeticError::Overflow.into()),
329 FaultType::DontSendValidateVote => Err(NodeError::ClientIoError {
330 error: "refusing to validate".to_string(),
331 }),
332 FaultType::Honest
333 | FaultType::DontSendConfirmVote
334 | FaultType::DontProcessValidated => handle_block_proposal_result
335 .expect("handle_block_proposal_result should be Some"),
336 },
337 };
338 sender.send(result.map(|(info, _actions)| info))
340 }
341
342 async fn handle_block_proposal(
343 proposal: BlockProposal,
344 validator: &mut MutexGuard<'_, LocalValidator<S>>,
345 ) -> Option<Result<(ChainInfoResponse, NetworkActions), NodeError>> {
346 match validator.fault_type {
347 FaultType::Offline | FaultType::OfflineWithInfo | FaultType::Malicious => None,
348 FaultType::Honest
349 | FaultType::DontSendConfirmVote
350 | FaultType::DontProcessValidated
351 | FaultType::DontSendValidateVote => Some(
352 validator
353 .state
354 .handle_block_proposal(proposal)
355 .await
356 .map_err(Into::into),
357 ),
358 }
359 }
360
361 async fn handle_certificate<T: ProcessableCertificate>(
362 certificate: GenericCertificate<T>,
363 validator: &mut MutexGuard<'_, LocalValidator<S>>,
364 ) -> Option<Result<ChainInfoResponse, NodeError>> {
365 match validator.fault_type {
366 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => None,
367 FaultType::Honest
368 | FaultType::DontSendConfirmVote
369 | FaultType::Malicious
370 | FaultType::DontProcessValidated
371 | FaultType::DontSendValidateVote => Some(
372 validator
373 .state
374 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
375 .await
376 .map_err(Into::into),
377 ),
378 FaultType::Offline | FaultType::OfflineWithInfo => None,
379 }
380 }
381
382 async fn do_handle_lite_certificate(
383 self,
384 certificate: LiteCertificate<'_>,
385 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
386 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
387 let client = self.client.clone();
388 let mut validator = client.lock().await;
389 let result = async move {
390 match validator.state.full_certificate(certificate).await? {
391 Either::Left(confirmed) => {
392 self.do_handle_certificate_internal(confirmed, &mut validator)
393 .await
394 }
395 Either::Right(validated) => {
396 self.do_handle_certificate_internal(validated, &mut validator)
397 .await
398 }
399 }
400 }
401 .await;
402 sender.send(result)
403 }
404
405 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
406 &self,
407 certificate: GenericCertificate<T>,
408 validator: &mut MutexGuard<'_, LocalValidator<S>>,
409 ) -> Result<ChainInfoResponse, NodeError> {
410 let handle_certificate_result = Self::handle_certificate(certificate, validator).await;
411 match handle_certificate_result {
412 Some(Err(NodeError::BlobsNotFound(_))) => {
413 handle_certificate_result.expect("handle_certificate_result should be Some")
414 }
415 _ => match validator.fault_type {
416 FaultType::DontSendConfirmVote | FaultType::DontProcessValidated
417 if T::KIND == CertificateKind::Validated =>
418 {
419 Err(NodeError::ClientIoError {
420 error: "refusing to confirm".to_string(),
421 })
422 }
423 FaultType::Honest
424 | FaultType::DontSendConfirmVote
425 | FaultType::DontProcessValidated
426 | FaultType::Malicious
427 | FaultType::DontSendValidateVote => {
428 handle_certificate_result.expect("handle_certificate_result should be Some")
429 }
430 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
431 error: "offline".to_string(),
432 }),
433 },
434 }
435 }
436
437 async fn do_handle_certificate<T: ProcessableCertificate>(
438 self,
439 certificate: GenericCertificate<T>,
440 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
441 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
442 let mut validator = self.client.lock().await;
443 let result = self
444 .do_handle_certificate_internal(certificate, &mut validator)
445 .await;
446 sender.send(result)
447 }
448
449 async fn do_handle_chain_info_query(
450 self,
451 query: ChainInfoQuery,
452 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
453 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
454 let validator = self.client.lock().await;
455 let result = if validator.fault_type == FaultType::Offline {
456 Err(NodeError::ClientIoError {
457 error: "offline".to_string(),
458 })
459 } else {
460 validator
461 .state
462 .handle_chain_info_query(query)
463 .await
464 .map_err(Into::into)
465 };
466 sender.send(result.map(|(info, _actions)| info))
468 }
469
470 async fn do_subscribe(
471 self,
472 chains: Vec<ChainId>,
473 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
474 ) -> Result<(), Result<NotificationStream, NodeError>> {
475 let validator = self.client.lock().await;
476 let rx = validator.notifier.subscribe(chains);
477 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
478 sender.send(Ok(stream))
479 }
480
481 async fn do_upload_blob(
482 self,
483 content: BlobContent,
484 sender: oneshot::Sender<Result<BlobId, NodeError>>,
485 ) -> Result<(), Result<BlobId, NodeError>> {
486 let validator = self.client.lock().await;
487 let blob = Blob::new(content);
488 let id = blob.id();
489 let storage = validator.state.storage_client();
490 let result = match storage.maybe_write_blobs(&[blob]).await {
491 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
492 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
493 Err(error) => Err(error.into()),
494 };
495 sender.send(result)
496 }
497
498 async fn do_download_blob(
499 self,
500 blob_id: BlobId,
501 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
502 ) -> Result<(), Result<BlobContent, NodeError>> {
503 let validator = self.client.lock().await;
504 let blob = validator
505 .state
506 .storage_client()
507 .read_blob(blob_id)
508 .await
509 .map_err(Into::into);
510 let blob = match blob {
511 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
512 Err(error) => Err(error),
513 };
514 sender.send(blob.map(|blob| blob.into_content()))
515 }
516
517 async fn do_download_pending_blob(
518 self,
519 chain_id: ChainId,
520 blob_id: BlobId,
521 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
522 ) -> Result<(), Result<BlobContent, NodeError>> {
523 let validator = self.client.lock().await;
524 let result = validator
525 .state
526 .download_pending_blob(chain_id, blob_id)
527 .await
528 .map_err(Into::into);
529 sender.send(result.map(|blob| blob.into_content()))
530 }
531
532 async fn do_handle_pending_blob(
533 self,
534 chain_id: ChainId,
535 blob: BlobContent,
536 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
537 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
538 let validator = self.client.lock().await;
539 let result = validator
540 .state
541 .handle_pending_blob(chain_id, Blob::new(blob))
542 .await
543 .map_err(Into::into);
544 sender.send(result)
545 }
546
547 async fn do_download_certificate(
548 self,
549 hash: CryptoHash,
550 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
551 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
552 let validator = self.client.lock().await;
553 let certificate = validator
554 .state
555 .storage_client()
556 .read_certificate(hash)
557 .await
558 .map_err(Into::into);
559
560 let certificate = match certificate {
561 Err(error) => Err(error),
562 Ok(entry) => match entry {
563 Some(certificate) => Ok(certificate),
564 None => {
565 panic!("Missing certificate: {hash}");
566 }
567 },
568 };
569
570 sender.send(certificate)
571 }
572
573 async fn do_download_certificates(
574 self,
575 hashes: Vec<CryptoHash>,
576 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
577 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
578 let validator = self.client.lock().await;
579 let certificates = validator
580 .state
581 .storage_client()
582 .read_certificates(hashes.clone())
583 .await
584 .map_err(Into::into);
585
586 let certificates = match certificates {
587 Err(error) => Err(error),
588 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
589 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
590 ResultReadCertificates::InvalidHashes(hashes) => {
591 panic!("Missing certificates: {:?}", hashes)
592 }
593 },
594 };
595
596 sender.send(certificates)
597 }
598
599 async fn do_blob_last_used_by(
600 self,
601 blob_id: BlobId,
602 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
603 ) -> Result<(), Result<CryptoHash, NodeError>> {
604 let validator = self.client.lock().await;
605 let blob_state = validator
606 .state
607 .storage_client()
608 .read_blob_state(blob_id)
609 .await
610 .map_err(Into::into);
611 let certificate_hash = match blob_state {
612 Err(err) => Err(err),
613 Ok(blob_state) => match blob_state {
614 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
615 Some(blob_state) => blob_state
616 .last_used_by
617 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
618 },
619 };
620
621 sender.send(certificate_hash)
622 }
623
624 async fn do_missing_blob_ids(
625 self,
626 blob_ids: Vec<BlobId>,
627 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
628 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
629 let validator = self.client.lock().await;
630 let missing_blob_ids = validator
631 .state
632 .storage_client()
633 .missing_blobs(&blob_ids)
634 .await
635 .map_err(Into::into);
636 sender.send(missing_blob_ids)
637 }
638}
639
640#[derive(Clone)]
641pub struct NodeProvider<S>(BTreeMap<ValidatorPublicKey, Arc<Mutex<LocalValidator<S>>>>)
642where
643 S: Storage;
644
645impl<S> ValidatorNodeProvider for NodeProvider<S>
646where
647 S: Storage + Clone + Send + Sync + 'static,
648{
649 type Node = LocalValidatorClient<S>;
650
651 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
652 unimplemented!()
653 }
654
655 fn make_nodes_from_list<A>(
656 &self,
657 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
658 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
659 where
660 A: AsRef<str>,
661 {
662 Ok(validators
663 .into_iter()
664 .map(|(public_key, address)| {
665 self.0
666 .get(&public_key)
667 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
668 address: address.as_ref().to_string(),
669 })
670 .cloned()
671 .map(|client| (public_key, LocalValidatorClient { public_key, client }))
672 })
673 .collect::<Result<Vec<_>, _>>()?
674 .into_iter())
675 }
676}
677
678impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
679where
680 S: Storage,
681{
682 fn from_iter<T>(iter: T) -> Self
683 where
684 T: IntoIterator<Item = LocalValidatorClient<S>>,
685 {
686 let destructure =
687 |validator: LocalValidatorClient<S>| (validator.public_key, validator.client);
688 Self(iter.into_iter().map(destructure).collect())
689 }
690}
691
692#[allow(dead_code)]
699pub struct TestBuilder<B: StorageBuilder> {
700 storage_builder: B,
701 pub initial_committee: Committee,
702 admin_description: Option<ChainDescription>,
703 network_description: Option<NetworkDescription>,
704 genesis_storage_builder: GenesisStorageBuilder,
705 validator_clients: Vec<LocalValidatorClient<B::Storage>>,
706 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
707 chain_client_storages: Vec<B::Storage>,
708 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
709 pub signer: InMemorySigner,
710}
711
712#[async_trait]
713pub trait StorageBuilder {
714 type Storage: Storage + Clone + Send + Sync + 'static;
715
716 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
717
718 fn clock(&self) -> &TestClock;
719}
720
721#[derive(Default)]
722struct GenesisStorageBuilder {
723 accounts: Vec<GenesisAccount>,
724}
725
726struct GenesisAccount {
727 description: ChainDescription,
728 public_key: AccountPublicKey,
729}
730
731impl GenesisStorageBuilder {
732 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
733 self.accounts.push(GenesisAccount {
734 description,
735 public_key,
736 })
737 }
738
739 async fn build<S>(&self, storage: S) -> S
740 where
741 S: Storage + Clone + Send + Sync + 'static,
742 {
743 for account in &self.accounts {
744 storage
745 .create_chain(account.description.clone())
746 .await
747 .unwrap();
748 }
749 storage
750 }
751}
752
753pub type ChainClient<S> =
754 crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
755
756impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
757 pub async fn read_confirmed_blocks_downward(
759 &self,
760 from: CryptoHash,
761 limit: u32,
762 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
763 let mut hash = Some(from);
764 let mut values = Vec::new();
765 for _ in 0..limit {
766 let Some(next_hash) = hash else {
767 break;
768 };
769 let value = self.read_confirmed_block(next_hash).await?;
770 hash = value.block().header.previous_block_hash;
771 values.push(value);
772 }
773 Ok(values)
774 }
775}
776
777impl<B> TestBuilder<B>
778where
779 B: StorageBuilder,
780{
781 pub async fn new(
782 mut storage_builder: B,
783 count: usize,
784 with_faulty_validators: usize,
785 mut signer: InMemorySigner,
786 ) -> Result<Self, anyhow::Error> {
787 let mut validators = Vec::new();
788 for _ in 0..count {
789 let validator_keypair = ValidatorKeypair::generate();
790 let account_public_key = signer.generate_new();
791 validators.push((validator_keypair, account_public_key));
792 }
793 let for_committee = validators
794 .iter()
795 .map(|(validating, account)| (validating.public_key, *account))
796 .collect::<Vec<_>>();
797 let initial_committee = Committee::make_simple(for_committee);
798 let mut validator_clients = Vec::new();
799 let mut validator_storages = HashMap::new();
800 let mut faulty_validators = HashSet::new();
801 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
802 let validator_public_key = validator_keypair.public_key;
803 let storage = storage_builder.build().await?;
804 let state = WorkerState::new(
805 format!("Node {}", i),
806 Some(validator_keypair.secret_key),
807 storage.clone(),
808 NonZeroUsize::new(100).expect("Chain worker limit should not be zero"),
809 )
810 .with_allow_inactive_chains(false)
811 .with_allow_messages_from_deprecated_epochs(false);
812 let validator = LocalValidatorClient::new(validator_public_key, state);
813 if i < with_faulty_validators {
814 faulty_validators.insert(validator_public_key);
815 validator.set_fault_type(FaultType::Malicious).await;
816 }
817 validator_clients.push(validator);
818 validator_storages.insert(validator_public_key, storage);
819 }
820 tracing::info!(
821 "Test will use the following faulty validators: {:?}",
822 faulty_validators
823 );
824 Ok(Self {
825 storage_builder,
826 initial_committee,
827 admin_description: None,
828 network_description: None,
829 genesis_storage_builder: GenesisStorageBuilder::default(),
830 validator_clients,
831 validator_storages,
832 chain_client_storages: Vec::new(),
833 chain_owners: BTreeMap::new(),
834 signer,
835 })
836 }
837
838 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
839 let validators = self.initial_committee.validators().clone();
840 self.initial_committee = Committee::new(validators, policy);
841 self
842 }
843
844 pub async fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
845 let mut faulty_validators = vec![];
846 for index in indexes.as_ref() {
847 let validator = &mut self.validator_clients[*index];
848 validator.set_fault_type(fault_type).await;
849 faulty_validators.push(validator.public_key);
850 }
851 tracing::info!(
852 "Making the following validators {:?}: {:?}",
853 fault_type,
854 faulty_validators
855 );
856 }
857
858 pub async fn add_root_chain(
863 &mut self,
864 index: u32,
865 balance: Amount,
866 ) -> anyhow::Result<ChainClient<B::Storage>> {
867 if self.admin_description.is_none() && index != 0 {
869 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
870 }
871 let origin = ChainOrigin::Root(index);
872 let public_key = self.signer.generate_new();
873 let open_chain_config = InitialChainConfig {
874 ownership: ChainOwnership::single(public_key.into()),
875 epoch: Epoch(0),
876 min_active_epoch: Epoch(0),
877 max_active_epoch: Epoch(0),
878 balance,
879 application_permissions: ApplicationPermissions::default(),
880 };
881 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
882 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
883 if index == 0 {
884 self.admin_description = Some(description.clone());
885 self.network_description = Some(NetworkDescription {
886 admin_chain_id: description.id(),
887 genesis_config_hash: CryptoHash::test_hash("genesis config"),
889 genesis_timestamp: Timestamp::from(0),
890 genesis_committee_blob_hash: committee_blob.id().hash,
891 name: "test network".to_string(),
892 });
893 }
894 self.genesis_storage_builder
896 .add(description.clone(), public_key);
897
898 let network_description = self.network_description.as_ref().unwrap();
899
900 for validator in &self.validator_clients {
901 let storage = self
902 .validator_storages
903 .get_mut(&validator.public_key)
904 .unwrap();
905 storage
906 .write_network_description(network_description)
907 .await
908 .expect("writing the NetworkDescription should succeed");
909 storage
910 .write_blob(&committee_blob)
911 .await
912 .expect("writing a blob should succeed");
913 if validator.fault_type().await == FaultType::Malicious {
914 let origin = description.origin();
915 let config = InitialChainConfig {
916 balance: Amount::ZERO,
917 ..description.config().clone()
918 };
919 storage
920 .create_chain(ChainDescription::new(origin, config, Timestamp::from(0)))
921 .await
922 .unwrap();
923 } else {
924 storage.create_chain(description.clone()).await.unwrap();
925 }
926 }
927 for storage in self.chain_client_storages.iter_mut() {
928 storage.create_chain(description.clone()).await.unwrap();
929 }
930 let chain_id = description.id();
931 self.chain_owners.insert(chain_id, public_key.into());
932 self.make_client(chain_id, None, BlockHeight::ZERO).await
933 }
934
935 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
936 let mut result = Vec::new();
937 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
938 assert_eq!(
939 genesis_account.description.origin(),
940 ChainOrigin::Root(i as u32)
941 );
942 result.push((
943 genesis_account.public_key,
944 genesis_account.description.config().balance,
945 ));
946 }
947 result
948 }
949
950 pub fn admin_id(&self) -> ChainId {
951 self.admin_description
952 .as_ref()
953 .expect("admin chain not initialized")
954 .id()
955 }
956
957 pub fn admin_description(&self) -> Option<&ChainDescription> {
958 self.admin_description.as_ref()
959 }
960
961 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
962 self.validator_clients.iter().cloned().collect()
963 }
964
965 pub fn node(&mut self, index: usize) -> &mut LocalValidatorClient<B::Storage> {
966 &mut self.validator_clients[index]
967 }
968
969 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
970 let storage = self.storage_builder.build().await?;
971 let network_description = self.network_description.as_ref().unwrap();
972 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
973 storage
974 .write_network_description(network_description)
975 .await
976 .expect("writing the NetworkDescription should succeed");
977 storage
978 .write_blob(&committee_blob)
979 .await
980 .expect("writing a blob should succeed");
981 Ok(self.genesis_storage_builder.build(storage).await)
982 }
983
984 pub async fn make_client(
985 &mut self,
986 chain_id: ChainId,
987 block_hash: Option<CryptoHash>,
988 block_height: BlockHeight,
989 ) -> anyhow::Result<ChainClient<B::Storage>> {
990 let storage = self.make_storage().await?;
993 self.chain_client_storages.push(storage.clone());
994 let client = Arc::new(Client::new(
995 crate::environment::Impl {
996 network: self.make_node_provider(),
997 storage,
998 signer: self.signer.clone(),
999 },
1000 self.admin_id(),
1001 false,
1002 [chain_id],
1003 format!("Client node for {:.8}", chain_id),
1004 NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
1005 ChainClientOptions::test_default(),
1006 ));
1007 Ok(client.create_chain_client(
1008 chain_id,
1009 block_hash,
1010 block_height,
1011 None,
1012 self.chain_owners.get(&chain_id).copied(),
1013 ))
1014 }
1015
1016 pub async fn check_that_validators_have_certificate(
1018 &self,
1019 chain_id: ChainId,
1020 block_height: BlockHeight,
1021 target_count: usize,
1022 ) -> Option<ConfirmedBlockCertificate> {
1023 let query =
1024 ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(BlockHeightRange {
1025 start: block_height,
1026 limit: Some(1),
1027 });
1028 let mut count = 0;
1029 let mut certificate = None;
1030 for validator in self.validator_clients.clone() {
1031 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1032 if response.check(validator.public_key).is_ok() {
1033 let ChainInfo {
1034 mut requested_sent_certificate_hashes,
1035 ..
1036 } = *response.info;
1037 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1038 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1039 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1040 if cert.inner().block().header.chain_id == chain_id
1041 && cert.inner().block().header.height == block_height
1042 {
1043 cert.check(&self.initial_committee).unwrap();
1044 count += 1;
1045 certificate = Some(cert);
1046 }
1047 }
1048 }
1049 }
1050 }
1051 }
1052 assert!(count >= target_count);
1053 certificate
1054 }
1055
1056 pub async fn check_that_validators_are_in_round(
1059 &self,
1060 chain_id: ChainId,
1061 block_height: BlockHeight,
1062 round: Round,
1063 target_count: usize,
1064 ) {
1065 let query = ChainInfoQuery::new(chain_id);
1066 let mut count = 0;
1067 for validator in self.validator_clients.clone() {
1068 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1069 if response.info.manager.current_round == round
1070 && response.info.next_block_height == block_height
1071 && response.check(validator.public_key).is_ok()
1072 {
1073 count += 1;
1074 }
1075 }
1076 }
1077 assert!(count >= target_count);
1078 }
1079
1080 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1082 for validator in &self.validator_clients {
1083 let guard = validator.client.lock().await;
1084 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1085 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1086 }
1087 }
1088}
1089
1090#[cfg(feature = "rocksdb")]
1091static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1093
1094#[derive(Default)]
1095pub struct MemoryStorageBuilder {
1096 namespace: String,
1097 instance_counter: usize,
1098 wasm_runtime: Option<WasmRuntime>,
1099 clock: TestClock,
1100}
1101
1102#[async_trait]
1103impl StorageBuilder for MemoryStorageBuilder {
1104 type Storage = DbStorage<MemoryStore, TestClock>;
1105
1106 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1107 self.instance_counter += 1;
1108 let config = MemoryStore::new_test_config().await?;
1109 if self.namespace.is_empty() {
1110 self.namespace = generate_test_namespace();
1111 }
1112 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1113 Ok(
1114 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1115 .await?,
1116 )
1117 }
1118
1119 fn clock(&self) -> &TestClock {
1120 &self.clock
1121 }
1122}
1123
1124impl MemoryStorageBuilder {
1125 #[allow(dead_code)]
1128 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1129 MemoryStorageBuilder {
1130 wasm_runtime: wasm_runtime.into(),
1131 ..MemoryStorageBuilder::default()
1132 }
1133 }
1134}
1135
1136#[cfg(feature = "rocksdb")]
1137pub struct RocksDbStorageBuilder {
1138 namespace: String,
1139 instance_counter: usize,
1140 wasm_runtime: Option<WasmRuntime>,
1141 clock: TestClock,
1142 _permit: SemaphorePermit<'static>,
1143}
1144
1145#[cfg(feature = "rocksdb")]
1146impl RocksDbStorageBuilder {
1147 pub async fn new() -> Self {
1148 RocksDbStorageBuilder {
1149 namespace: String::new(),
1150 instance_counter: 0,
1151 wasm_runtime: None,
1152 clock: TestClock::default(),
1153 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1154 }
1155 }
1156
1157 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1160 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1161 RocksDbStorageBuilder {
1162 wasm_runtime: wasm_runtime.into(),
1163 ..RocksDbStorageBuilder::new().await
1164 }
1165 }
1166}
1167
1168#[cfg(feature = "rocksdb")]
1169#[async_trait]
1170impl StorageBuilder for RocksDbStorageBuilder {
1171 type Storage = DbStorage<RocksDbStore, TestClock>;
1172
1173 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1174 self.instance_counter += 1;
1175 let config = RocksDbStore::new_test_config().await?;
1176 if self.namespace.is_empty() {
1177 self.namespace = generate_test_namespace();
1178 }
1179 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1180 Ok(
1181 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1182 .await?,
1183 )
1184 }
1185
1186 fn clock(&self) -> &TestClock {
1187 &self.clock
1188 }
1189}
1190
1191#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1192#[derive(Default)]
1193pub struct ServiceStorageBuilder {
1194 namespace: String,
1195 instance_counter: usize,
1196 wasm_runtime: Option<WasmRuntime>,
1197 clock: TestClock,
1198}
1199
1200#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1201impl ServiceStorageBuilder {
1202 pub async fn new() -> Self {
1204 Self::with_wasm_runtime(None).await
1205 }
1206
1207 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1209 ServiceStorageBuilder {
1210 wasm_runtime: wasm_runtime.into(),
1211 ..ServiceStorageBuilder::default()
1212 }
1213 }
1214}
1215
1216#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1217#[async_trait]
1218impl StorageBuilder for ServiceStorageBuilder {
1219 type Storage = DbStorage<ServiceStoreClient, TestClock>;
1220
1221 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1222 self.instance_counter += 1;
1223 let config = ServiceStoreClient::new_test_config().await?;
1224 if self.namespace.is_empty() {
1225 self.namespace = generate_test_namespace();
1226 }
1227 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1228 Ok(
1229 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1230 .await?,
1231 )
1232 }
1233
1234 fn clock(&self) -> &TestClock {
1235 &self.clock
1236 }
1237}
1238
1239#[cfg(feature = "dynamodb")]
1240#[derive(Default)]
1241pub struct DynamoDbStorageBuilder {
1242 namespace: String,
1243 instance_counter: usize,
1244 wasm_runtime: Option<WasmRuntime>,
1245 clock: TestClock,
1246}
1247
1248#[cfg(feature = "dynamodb")]
1249impl DynamoDbStorageBuilder {
1250 #[allow(dead_code)]
1253 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1254 DynamoDbStorageBuilder {
1255 wasm_runtime: wasm_runtime.into(),
1256 ..DynamoDbStorageBuilder::default()
1257 }
1258 }
1259}
1260
1261#[cfg(feature = "dynamodb")]
1262#[async_trait]
1263impl StorageBuilder for DynamoDbStorageBuilder {
1264 type Storage = DbStorage<DynamoDbStore, TestClock>;
1265
1266 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1267 self.instance_counter += 1;
1268 let config = DynamoDbStore::new_test_config().await?;
1269 if self.namespace.is_empty() {
1270 self.namespace = generate_test_namespace();
1271 }
1272 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1273 Ok(
1274 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1275 .await?,
1276 )
1277 }
1278
1279 fn clock(&self) -> &TestClock {
1280 &self.clock
1281 }
1282}
1283
1284#[cfg(feature = "scylladb")]
1285#[derive(Default)]
1286pub struct ScyllaDbStorageBuilder {
1287 namespace: String,
1288 instance_counter: usize,
1289 wasm_runtime: Option<WasmRuntime>,
1290 clock: TestClock,
1291}
1292
1293#[cfg(feature = "scylladb")]
1294impl ScyllaDbStorageBuilder {
1295 #[allow(dead_code)]
1298 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1299 ScyllaDbStorageBuilder {
1300 wasm_runtime: wasm_runtime.into(),
1301 ..ScyllaDbStorageBuilder::default()
1302 }
1303 }
1304}
1305
1306#[cfg(feature = "scylladb")]
1307#[async_trait]
1308impl StorageBuilder for ScyllaDbStorageBuilder {
1309 type Storage = DbStorage<ScyllaDbStore, TestClock>;
1310
1311 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1312 self.instance_counter += 1;
1313 let config = ScyllaDbStore::new_test_config().await?;
1314 if self.namespace.is_empty() {
1315 self.namespace = generate_test_namespace();
1316 }
1317 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1318 Ok(
1319 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1320 .await?,
1321 )
1322 }
1323
1324 fn clock(&self) -> &TestClock {
1325 &self.clock
1326 }
1327}