1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 num::NonZeroUsize,
7 sync::Arc,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::ServiceStoreClient;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbStore;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbStore;
39use linera_views::{
40 memory::MemoryStore, random::generate_test_namespace, store::TestKeyValueStore as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbStore,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{ChainClientOptions, Client},
52 data_types::*,
53 node::{
54 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55 ValidatorNodeProvider,
56 },
57 notifier::ChannelNotifier,
58 worker::{NetworkActions, Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63 Honest,
64 Offline,
65 OfflineWithInfo,
66 Malicious,
67 DontSendConfirmVote,
68 DontProcessValidated,
69 DontSendValidateVote,
70}
71
72struct LocalValidator<S>
79where
80 S: Storage,
81{
82 state: WorkerState<S>,
83 fault_type: FaultType,
84 notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90 S: Storage,
91{
92 public_key: ValidatorPublicKey,
93 client: Arc<Mutex<LocalValidator<S>>>,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98 S: Storage + Clone + Send + Sync + 'static,
99{
100 type NotificationStream = NotificationStream;
101
102 async fn handle_block_proposal(
103 &self,
104 proposal: BlockProposal,
105 ) -> Result<ChainInfoResponse, NodeError> {
106 self.spawn_and_receive(move |validator, sender| {
107 validator.do_handle_block_proposal(proposal, sender)
108 })
109 .await
110 }
111
112 async fn handle_lite_certificate(
113 &self,
114 certificate: LiteCertificate<'_>,
115 _delivery: CrossChainMessageDelivery,
116 ) -> Result<ChainInfoResponse, NodeError> {
117 let certificate = certificate.cloned();
118 self.spawn_and_receive(move |validator, sender| {
119 validator.do_handle_lite_certificate(certificate, sender)
120 })
121 .await
122 }
123
124 async fn handle_timeout_certificate(
125 &self,
126 certificate: GenericCertificate<Timeout>,
127 ) -> Result<ChainInfoResponse, NodeError> {
128 self.spawn_and_receive(move |validator, sender| {
129 validator.do_handle_certificate(certificate, sender)
130 })
131 .await
132 }
133
134 async fn handle_validated_certificate(
135 &self,
136 certificate: GenericCertificate<ValidatedBlock>,
137 ) -> Result<ChainInfoResponse, NodeError> {
138 self.spawn_and_receive(move |validator, sender| {
139 validator.do_handle_certificate(certificate, sender)
140 })
141 .await
142 }
143
144 async fn handle_confirmed_certificate(
145 &self,
146 certificate: GenericCertificate<ConfirmedBlock>,
147 _delivery: CrossChainMessageDelivery,
148 ) -> Result<ChainInfoResponse, NodeError> {
149 self.spawn_and_receive(move |validator, sender| {
150 validator.do_handle_certificate(certificate, sender)
151 })
152 .await
153 }
154
155 async fn handle_chain_info_query(
156 &self,
157 query: ChainInfoQuery,
158 ) -> Result<ChainInfoResponse, NodeError> {
159 self.spawn_and_receive(move |validator, sender| {
160 validator.do_handle_chain_info_query(query, sender)
161 })
162 .await
163 }
164
165 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167 .await
168 }
169
170 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171 Ok(Default::default())
172 }
173
174 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175 Ok(NetworkDescription {
176 name: "test network".to_string(),
177 genesis_config_hash: CryptoHash::test_hash("genesis config"),
178 genesis_timestamp: Timestamp::default(),
179 admin_chain_id: self
180 .client
181 .lock()
182 .await
183 .state
184 .storage_client()
185 .read_network_description()
186 .await
187 .unwrap()
188 .unwrap()
189 .admin_chain_id,
190 })
191 }
192
193 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
194 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
195 .await
196 }
197
198 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
199 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
200 .await
201 }
202
203 async fn download_pending_blob(
204 &self,
205 chain_id: ChainId,
206 blob_id: BlobId,
207 ) -> Result<BlobContent, NodeError> {
208 self.spawn_and_receive(move |validator, sender| {
209 validator.do_download_pending_blob(chain_id, blob_id, sender)
210 })
211 .await
212 }
213
214 async fn handle_pending_blob(
215 &self,
216 chain_id: ChainId,
217 blob: BlobContent,
218 ) -> Result<ChainInfoResponse, NodeError> {
219 self.spawn_and_receive(move |validator, sender| {
220 validator.do_handle_pending_blob(chain_id, blob, sender)
221 })
222 .await
223 }
224
225 async fn download_certificate(
226 &self,
227 hash: CryptoHash,
228 ) -> Result<ConfirmedBlockCertificate, NodeError> {
229 self.spawn_and_receive(move |validator, sender| {
230 validator.do_download_certificate(hash, sender)
231 })
232 .await
233 }
234
235 async fn download_certificates(
236 &self,
237 hashes: Vec<CryptoHash>,
238 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
239 self.spawn_and_receive(move |validator, sender| {
240 validator.do_download_certificates(hashes, sender)
241 })
242 .await
243 }
244
245 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
246 self.spawn_and_receive(move |validator, sender| {
247 validator.do_blob_last_used_by(blob_id, sender)
248 })
249 .await
250 }
251
252 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
253 self.spawn_and_receive(move |validator, sender| {
254 validator.do_missing_blob_ids(blob_ids, sender)
255 })
256 .await
257 }
258}
259
260impl<S> LocalValidatorClient<S>
261where
262 S: Storage + Clone + Send + Sync + 'static,
263{
264 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
265 let client = LocalValidator {
266 fault_type: FaultType::Honest,
267 state,
268 notifier: Arc::new(ChannelNotifier::default()),
269 };
270 Self {
271 public_key,
272 client: Arc::new(Mutex::new(client)),
273 }
274 }
275
276 pub fn name(&self) -> ValidatorPublicKey {
277 self.public_key
278 }
279
280 async fn set_fault_type(&self, fault_type: FaultType) {
281 self.client.lock().await.fault_type = fault_type;
282 }
283
284 async fn fault_type(&self) -> FaultType {
285 self.client.lock().await.fault_type
286 }
287
288 pub async fn chain_info_with_manager_values(
290 &mut self,
291 chain_id: ChainId,
292 ) -> Result<Box<ChainInfo>, NodeError> {
293 let query = ChainInfoQuery::new(chain_id).with_manager_values();
294 let response = self.handle_chain_info_query(query).await?;
295 Ok(response.info)
296 }
297
298 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
301 where
302 T: Send + 'static,
303 R: Future<Output = Result<(), T>> + Send,
304 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
305 {
306 let validator = self.clone();
307 let (sender, receiver) = oneshot::channel();
308 tokio::spawn(async move {
309 if f(validator, sender).await.is_err() {
310 tracing::debug!("result could not be sent");
311 }
312 });
313 receiver.await.unwrap()
314 }
315
316 async fn do_handle_block_proposal(
317 self,
318 proposal: BlockProposal,
319 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
320 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
321 let mut validator = self.client.lock().await;
322 let handle_block_proposal_result =
323 Self::handle_block_proposal(proposal, &mut validator).await;
324 let result = match handle_block_proposal_result {
325 Some(Err(NodeError::BlobsNotFound(_))) => {
326 handle_block_proposal_result.expect("handle_block_proposal_result should be Some")
327 }
328 _ => match validator.fault_type {
329 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
330 error: "offline".to_string(),
331 }),
332 FaultType::Malicious => Err(ArithmeticError::Overflow.into()),
333 FaultType::DontSendValidateVote => Err(NodeError::ClientIoError {
334 error: "refusing to validate".to_string(),
335 }),
336 FaultType::Honest
337 | FaultType::DontSendConfirmVote
338 | FaultType::DontProcessValidated => handle_block_proposal_result
339 .expect("handle_block_proposal_result should be Some"),
340 },
341 };
342 sender.send(result.map(|(info, _actions)| info))
344 }
345
346 async fn handle_block_proposal(
347 proposal: BlockProposal,
348 validator: &mut MutexGuard<'_, LocalValidator<S>>,
349 ) -> Option<Result<(ChainInfoResponse, NetworkActions), NodeError>> {
350 match validator.fault_type {
351 FaultType::Offline | FaultType::OfflineWithInfo | FaultType::Malicious => None,
352 FaultType::Honest
353 | FaultType::DontSendConfirmVote
354 | FaultType::DontProcessValidated
355 | FaultType::DontSendValidateVote => Some(
356 validator
357 .state
358 .handle_block_proposal(proposal)
359 .await
360 .map_err(Into::into),
361 ),
362 }
363 }
364
365 async fn handle_certificate<T: ProcessableCertificate>(
366 certificate: GenericCertificate<T>,
367 validator: &mut MutexGuard<'_, LocalValidator<S>>,
368 ) -> Option<Result<ChainInfoResponse, NodeError>> {
369 match validator.fault_type {
370 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => None,
371 FaultType::Honest
372 | FaultType::DontSendConfirmVote
373 | FaultType::Malicious
374 | FaultType::DontProcessValidated
375 | FaultType::DontSendValidateVote => Some(
376 validator
377 .state
378 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
379 .await
380 .map_err(Into::into),
381 ),
382 FaultType::Offline | FaultType::OfflineWithInfo => None,
383 }
384 }
385
386 async fn do_handle_lite_certificate(
387 self,
388 certificate: LiteCertificate<'_>,
389 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
390 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
391 let client = self.client.clone();
392 let mut validator = client.lock().await;
393 let result = async move {
394 match validator.state.full_certificate(certificate).await? {
395 Either::Left(confirmed) => {
396 self.do_handle_certificate_internal(confirmed, &mut validator)
397 .await
398 }
399 Either::Right(validated) => {
400 self.do_handle_certificate_internal(validated, &mut validator)
401 .await
402 }
403 }
404 }
405 .await;
406 sender.send(result)
407 }
408
409 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
410 &self,
411 certificate: GenericCertificate<T>,
412 validator: &mut MutexGuard<'_, LocalValidator<S>>,
413 ) -> Result<ChainInfoResponse, NodeError> {
414 let handle_certificate_result = Self::handle_certificate(certificate, validator).await;
415 match handle_certificate_result {
416 Some(Err(NodeError::BlobsNotFound(_))) => {
417 handle_certificate_result.expect("handle_certificate_result should be Some")
418 }
419 _ => match validator.fault_type {
420 FaultType::DontSendConfirmVote | FaultType::DontProcessValidated
421 if T::KIND == CertificateKind::Validated =>
422 {
423 Err(NodeError::ClientIoError {
424 error: "refusing to confirm".to_string(),
425 })
426 }
427 FaultType::Honest
428 | FaultType::DontSendConfirmVote
429 | FaultType::DontProcessValidated
430 | FaultType::Malicious
431 | FaultType::DontSendValidateVote => {
432 handle_certificate_result.expect("handle_certificate_result should be Some")
433 }
434 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435 error: "offline".to_string(),
436 }),
437 },
438 }
439 }
440
441 async fn do_handle_certificate<T: ProcessableCertificate>(
442 self,
443 certificate: GenericCertificate<T>,
444 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
445 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
446 let mut validator = self.client.lock().await;
447 let result = self
448 .do_handle_certificate_internal(certificate, &mut validator)
449 .await;
450 sender.send(result)
451 }
452
453 async fn do_handle_chain_info_query(
454 self,
455 query: ChainInfoQuery,
456 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
457 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
458 let validator = self.client.lock().await;
459 let result = if validator.fault_type == FaultType::Offline {
460 Err(NodeError::ClientIoError {
461 error: "offline".to_string(),
462 })
463 } else {
464 validator
465 .state
466 .handle_chain_info_query(query)
467 .await
468 .map_err(Into::into)
469 };
470 sender.send(result.map(|(info, _actions)| info))
472 }
473
474 async fn do_subscribe(
475 self,
476 chains: Vec<ChainId>,
477 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
478 ) -> Result<(), Result<NotificationStream, NodeError>> {
479 let validator = self.client.lock().await;
480 let rx = validator.notifier.subscribe(chains);
481 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
482 sender.send(Ok(stream))
483 }
484
485 async fn do_upload_blob(
486 self,
487 content: BlobContent,
488 sender: oneshot::Sender<Result<BlobId, NodeError>>,
489 ) -> Result<(), Result<BlobId, NodeError>> {
490 let validator = self.client.lock().await;
491 let blob = Blob::new(content);
492 let id = blob.id();
493 let storage = validator.state.storage_client();
494 let result = match storage.maybe_write_blobs(&[blob]).await {
495 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
496 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
497 Err(error) => Err(error.into()),
498 };
499 sender.send(result)
500 }
501
502 async fn do_download_blob(
503 self,
504 blob_id: BlobId,
505 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
506 ) -> Result<(), Result<BlobContent, NodeError>> {
507 let validator = self.client.lock().await;
508 let blob = validator
509 .state
510 .storage_client()
511 .read_blob(blob_id)
512 .await
513 .map_err(Into::into);
514 let blob = match blob {
515 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
516 Err(error) => Err(error),
517 };
518 sender.send(blob.map(|blob| blob.into_content()))
519 }
520
521 async fn do_download_pending_blob(
522 self,
523 chain_id: ChainId,
524 blob_id: BlobId,
525 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
526 ) -> Result<(), Result<BlobContent, NodeError>> {
527 let validator = self.client.lock().await;
528 let result = validator
529 .state
530 .download_pending_blob(chain_id, blob_id)
531 .await
532 .map_err(Into::into);
533 sender.send(result.map(|blob| blob.into_content()))
534 }
535
536 async fn do_handle_pending_blob(
537 self,
538 chain_id: ChainId,
539 blob: BlobContent,
540 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
541 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
542 let validator = self.client.lock().await;
543 let result = validator
544 .state
545 .handle_pending_blob(chain_id, Blob::new(blob))
546 .await
547 .map_err(Into::into);
548 sender.send(result)
549 }
550
551 async fn do_download_certificate(
552 self,
553 hash: CryptoHash,
554 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
555 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
556 let validator = self.client.lock().await;
557 let certificate = validator
558 .state
559 .storage_client()
560 .read_certificate(hash)
561 .await
562 .map_err(Into::into);
563
564 sender.send(certificate)
565 }
566
567 async fn do_download_certificates(
568 self,
569 hashes: Vec<CryptoHash>,
570 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
571 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
572 let validator = self.client.lock().await;
573 let certificates = validator
574 .state
575 .storage_client()
576 .read_certificates(hashes)
577 .await
578 .map_err(Into::into);
579
580 sender.send(certificates)
581 }
582
583 async fn do_blob_last_used_by(
584 self,
585 blob_id: BlobId,
586 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
587 ) -> Result<(), Result<CryptoHash, NodeError>> {
588 let validator = self.client.lock().await;
589 let blob_state = validator
590 .state
591 .storage_client()
592 .read_blob_state(blob_id)
593 .await
594 .map_err(Into::into);
595 let certificate_hash = match blob_state {
596 Err(err) => Err(err),
597 Ok(blob_state) => match blob_state {
598 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
599 Some(blob_state) => blob_state
600 .last_used_by
601 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
602 },
603 };
604
605 sender.send(certificate_hash)
606 }
607
608 async fn do_missing_blob_ids(
609 self,
610 blob_ids: Vec<BlobId>,
611 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
612 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
613 let validator = self.client.lock().await;
614 let missing_blob_ids = validator
615 .state
616 .storage_client()
617 .missing_blobs(&blob_ids)
618 .await
619 .map_err(Into::into);
620 sender.send(missing_blob_ids)
621 }
622}
623
624#[derive(Clone)]
625pub struct NodeProvider<S>(BTreeMap<ValidatorPublicKey, Arc<Mutex<LocalValidator<S>>>>)
626where
627 S: Storage;
628
629impl<S> ValidatorNodeProvider for NodeProvider<S>
630where
631 S: Storage + Clone + Send + Sync + 'static,
632{
633 type Node = LocalValidatorClient<S>;
634
635 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
636 unimplemented!()
637 }
638
639 fn make_nodes_from_list<A>(
640 &self,
641 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
642 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
643 where
644 A: AsRef<str>,
645 {
646 Ok(validators
647 .into_iter()
648 .map(|(public_key, address)| {
649 self.0
650 .get(&public_key)
651 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
652 address: address.as_ref().to_string(),
653 })
654 .cloned()
655 .map(|client| (public_key, LocalValidatorClient { public_key, client }))
656 })
657 .collect::<Result<Vec<_>, _>>()?
658 .into_iter())
659 }
660}
661
662impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
663where
664 S: Storage,
665{
666 fn from_iter<T>(iter: T) -> Self
667 where
668 T: IntoIterator<Item = LocalValidatorClient<S>>,
669 {
670 let destructure =
671 |validator: LocalValidatorClient<S>| (validator.public_key, validator.client);
672 Self(iter.into_iter().map(destructure).collect())
673 }
674}
675
676#[allow(dead_code)]
683pub struct TestBuilder<B: StorageBuilder> {
684 storage_builder: B,
685 pub initial_committee: Committee,
686 admin_description: Option<ChainDescription>,
687 network_description: Option<NetworkDescription>,
688 genesis_storage_builder: GenesisStorageBuilder,
689 validator_clients: Vec<LocalValidatorClient<B::Storage>>,
690 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
691 chain_client_storages: Vec<B::Storage>,
692 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
693 pub signer: InMemorySigner,
694}
695
696#[async_trait]
697pub trait StorageBuilder {
698 type Storage: Storage + Clone + Send + Sync + 'static;
699
700 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
701
702 fn clock(&self) -> &TestClock;
703}
704
705#[derive(Default)]
706struct GenesisStorageBuilder {
707 accounts: Vec<GenesisAccount>,
708}
709
710struct GenesisAccount {
711 description: ChainDescription,
712 public_key: AccountPublicKey,
713}
714
715impl GenesisStorageBuilder {
716 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
717 self.accounts.push(GenesisAccount {
718 description,
719 public_key,
720 })
721 }
722
723 async fn build<S>(&self, storage: S) -> S
724 where
725 S: Storage + Clone + Send + Sync + 'static,
726 {
727 for account in &self.accounts {
728 storage
729 .create_chain(account.description.clone())
730 .await
731 .unwrap();
732 }
733 storage
734 }
735}
736
737pub type ChainClient<S> =
738 crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
739
740impl<B> TestBuilder<B>
741where
742 B: StorageBuilder,
743{
744 pub async fn new(
745 mut storage_builder: B,
746 count: usize,
747 with_faulty_validators: usize,
748 mut signer: InMemorySigner,
749 ) -> Result<Self, anyhow::Error> {
750 let mut validators = Vec::new();
751 for _ in 0..count {
752 let validator_keypair = ValidatorKeypair::generate();
753 let account_public_key = signer.generate_new();
754 validators.push((validator_keypair, account_public_key));
755 }
756 let for_committee = validators
757 .iter()
758 .map(|(validating, account)| (validating.public_key, *account))
759 .collect::<Vec<_>>();
760 let initial_committee = Committee::make_simple(for_committee);
761 let mut validator_clients = Vec::new();
762 let mut validator_storages = HashMap::new();
763 let mut faulty_validators = HashSet::new();
764 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
765 let validator_public_key = validator_keypair.public_key;
766 let storage = storage_builder.build().await?;
767 let state = WorkerState::new(
768 format!("Node {}", i),
769 Some(validator_keypair.secret_key),
770 storage.clone(),
771 NonZeroUsize::new(100).expect("Chain worker limit should not be zero"),
772 )
773 .with_allow_inactive_chains(false)
774 .with_allow_messages_from_deprecated_epochs(false);
775 let validator = LocalValidatorClient::new(validator_public_key, state);
776 if i < with_faulty_validators {
777 faulty_validators.insert(validator_public_key);
778 validator.set_fault_type(FaultType::Malicious).await;
779 }
780 validator_clients.push(validator);
781 validator_storages.insert(validator_public_key, storage);
782 }
783 tracing::info!(
784 "Test will use the following faulty validators: {:?}",
785 faulty_validators
786 );
787 Ok(Self {
788 storage_builder,
789 initial_committee,
790 admin_description: None,
791 network_description: None,
792 genesis_storage_builder: GenesisStorageBuilder::default(),
793 validator_clients,
794 validator_storages,
795 chain_client_storages: Vec::new(),
796 chain_owners: BTreeMap::new(),
797 signer,
798 })
799 }
800
801 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
802 let validators = self.initial_committee.validators().clone();
803 self.initial_committee = Committee::new(validators, policy);
804 self
805 }
806
807 pub async fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
808 let mut faulty_validators = vec![];
809 for index in indexes.as_ref() {
810 let validator = &mut self.validator_clients[*index];
811 validator.set_fault_type(fault_type).await;
812 faulty_validators.push(validator.public_key);
813 }
814 tracing::info!(
815 "Making the following validators {:?}: {:?}",
816 fault_type,
817 faulty_validators
818 );
819 }
820
821 pub async fn add_root_chain(
826 &mut self,
827 index: u32,
828 balance: Amount,
829 ) -> anyhow::Result<ChainClient<B::Storage>> {
830 if self.admin_description.is_none() && index != 0 {
832 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
833 }
834 let origin = ChainOrigin::Root(index);
835 let mut committees = BTreeMap::new();
836 committees.insert(
837 Epoch(0),
838 bcs::to_bytes(&self.initial_committee)
839 .expect("Serializing a committee should not fail!"),
840 );
841 let public_key = self.signer.generate_new();
842 let open_chain_config = InitialChainConfig {
843 ownership: ChainOwnership::single(public_key.into()),
844 epoch: Epoch(0),
845 committees,
846 balance,
847 application_permissions: ApplicationPermissions::default(),
848 };
849 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
850 if index == 0 {
851 self.admin_description = Some(description.clone());
852 self.network_description = Some(NetworkDescription {
853 admin_chain_id: description.id(),
854 genesis_config_hash: CryptoHash::test_hash("genesis config"),
856 genesis_timestamp: Timestamp::from(0),
857 name: "test network".to_string(),
858 });
859 }
860 self.genesis_storage_builder
862 .add(description.clone(), public_key);
863 for validator in &self.validator_clients {
864 let storage = self
865 .validator_storages
866 .get_mut(&validator.public_key)
867 .unwrap();
868 storage
869 .write_network_description(self.network_description.as_ref().unwrap())
870 .await
871 .expect("writing the NetworkDescription should succeed");
872 if validator.fault_type().await == FaultType::Malicious {
873 let origin = description.origin();
874 let config = InitialChainConfig {
875 balance: Amount::ZERO,
876 ..description.config().clone()
877 };
878 storage
879 .create_chain(ChainDescription::new(origin, config, Timestamp::from(0)))
880 .await
881 .unwrap();
882 } else {
883 storage.create_chain(description.clone()).await.unwrap();
884 }
885 }
886 for storage in self.chain_client_storages.iter_mut() {
887 storage.create_chain(description.clone()).await.unwrap();
888 }
889 let chain_id = description.id();
890 self.chain_owners.insert(chain_id, public_key.into());
891 self.make_client(chain_id, None, BlockHeight::ZERO).await
892 }
893
894 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
895 let mut result = Vec::new();
896 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
897 assert_eq!(
898 genesis_account.description.origin(),
899 ChainOrigin::Root(i as u32)
900 );
901 result.push((
902 genesis_account.public_key,
903 genesis_account.description.config().balance,
904 ));
905 }
906 result
907 }
908
909 pub fn admin_id(&self) -> ChainId {
910 self.admin_description
911 .as_ref()
912 .expect("admin chain not initialized")
913 .id()
914 }
915
916 pub fn admin_description(&self) -> Option<&ChainDescription> {
917 self.admin_description.as_ref()
918 }
919
920 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
921 self.validator_clients.iter().cloned().collect()
922 }
923
924 pub fn node(&mut self, index: usize) -> &mut LocalValidatorClient<B::Storage> {
925 &mut self.validator_clients[index]
926 }
927
928 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
929 let storage = self.storage_builder.build().await?;
930 storage
931 .write_network_description(self.network_description.as_ref().unwrap())
932 .await
933 .expect("writing the NetworkDescription should succeed");
934 Ok(self.genesis_storage_builder.build(storage).await)
935 }
936
937 pub async fn make_client(
938 &mut self,
939 chain_id: ChainId,
940 block_hash: Option<CryptoHash>,
941 block_height: BlockHeight,
942 ) -> anyhow::Result<ChainClient<B::Storage>> {
943 let storage = self.make_storage().await?;
946 self.chain_client_storages.push(storage.clone());
947 let client = Arc::new(Client::new(
948 crate::environment::Impl {
949 network: self.make_node_provider(),
950 storage,
951 signer: self.signer.clone(),
952 },
953 self.admin_id(),
954 false,
955 [chain_id],
956 format!("Client node for {:.8}", chain_id),
957 NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
958 ChainClientOptions::test_default(),
959 ));
960 Ok(client.create_chain_client(
961 chain_id,
962 block_hash,
963 block_height,
964 None,
965 self.chain_owners.get(&chain_id).copied(),
966 ))
967 }
968
969 pub async fn check_that_validators_have_certificate(
971 &self,
972 chain_id: ChainId,
973 block_height: BlockHeight,
974 target_count: usize,
975 ) -> Option<ConfirmedBlockCertificate> {
976 let query =
977 ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(BlockHeightRange {
978 start: block_height,
979 limit: Some(1),
980 });
981 let mut count = 0;
982 let mut certificate = None;
983 for validator in self.validator_clients.clone() {
984 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
985 if response.check(&validator.public_key).is_ok() {
986 let ChainInfo {
987 mut requested_sent_certificate_hashes,
988 ..
989 } = *response.info;
990 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
991 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
992 if let Ok(cert) = validator.download_certificate(cert_hash).await {
993 if cert.inner().block().header.chain_id == chain_id
994 && cert.inner().block().header.height == block_height
995 {
996 cert.check(&self.initial_committee).unwrap();
997 count += 1;
998 certificate = Some(cert);
999 }
1000 }
1001 }
1002 }
1003 }
1004 }
1005 assert!(count >= target_count);
1006 certificate
1007 }
1008
1009 pub async fn check_that_validators_are_in_round(
1012 &self,
1013 chain_id: ChainId,
1014 block_height: BlockHeight,
1015 round: Round,
1016 target_count: usize,
1017 ) {
1018 let query = ChainInfoQuery::new(chain_id);
1019 let mut count = 0;
1020 for validator in self.validator_clients.clone() {
1021 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1022 if response.info.manager.current_round == round
1023 && response.info.next_block_height == block_height
1024 && response.check(&validator.public_key).is_ok()
1025 {
1026 count += 1;
1027 }
1028 }
1029 }
1030 assert!(count >= target_count);
1031 }
1032
1033 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1035 for validator in &self.validator_clients {
1036 let guard = validator.client.lock().await;
1037 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1038 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1039 }
1040 }
1041}
1042
1043#[cfg(feature = "rocksdb")]
1044static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1046
1047#[derive(Default)]
1048pub struct MemoryStorageBuilder {
1049 namespace: String,
1050 instance_counter: usize,
1051 wasm_runtime: Option<WasmRuntime>,
1052 clock: TestClock,
1053}
1054
1055#[async_trait]
1056impl StorageBuilder for MemoryStorageBuilder {
1057 type Storage = DbStorage<MemoryStore, TestClock>;
1058
1059 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1060 self.instance_counter += 1;
1061 let config = MemoryStore::new_test_config().await?;
1062 if self.namespace.is_empty() {
1063 self.namespace = generate_test_namespace();
1064 }
1065 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1066 Ok(
1067 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1068 .await?,
1069 )
1070 }
1071
1072 fn clock(&self) -> &TestClock {
1073 &self.clock
1074 }
1075}
1076
1077impl MemoryStorageBuilder {
1078 #[allow(dead_code)]
1081 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1082 MemoryStorageBuilder {
1083 wasm_runtime: wasm_runtime.into(),
1084 ..MemoryStorageBuilder::default()
1085 }
1086 }
1087}
1088
1089#[cfg(feature = "rocksdb")]
1090pub struct RocksDbStorageBuilder {
1091 namespace: String,
1092 instance_counter: usize,
1093 wasm_runtime: Option<WasmRuntime>,
1094 clock: TestClock,
1095 _permit: SemaphorePermit<'static>,
1096}
1097
1098#[cfg(feature = "rocksdb")]
1099impl RocksDbStorageBuilder {
1100 pub async fn new() -> Self {
1101 RocksDbStorageBuilder {
1102 namespace: String::new(),
1103 instance_counter: 0,
1104 wasm_runtime: None,
1105 clock: TestClock::default(),
1106 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1107 }
1108 }
1109
1110 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1113 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1114 RocksDbStorageBuilder {
1115 wasm_runtime: wasm_runtime.into(),
1116 ..RocksDbStorageBuilder::new().await
1117 }
1118 }
1119}
1120
1121#[cfg(feature = "rocksdb")]
1122#[async_trait]
1123impl StorageBuilder for RocksDbStorageBuilder {
1124 type Storage = DbStorage<RocksDbStore, TestClock>;
1125
1126 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1127 self.instance_counter += 1;
1128 let config = RocksDbStore::new_test_config().await?;
1129 if self.namespace.is_empty() {
1130 self.namespace = generate_test_namespace();
1131 }
1132 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1133 Ok(
1134 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1135 .await?,
1136 )
1137 }
1138
1139 fn clock(&self) -> &TestClock {
1140 &self.clock
1141 }
1142}
1143
1144#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1145#[derive(Default)]
1146pub struct ServiceStorageBuilder {
1147 namespace: String,
1148 instance_counter: usize,
1149 wasm_runtime: Option<WasmRuntime>,
1150 clock: TestClock,
1151}
1152
1153#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1154impl ServiceStorageBuilder {
1155 pub async fn new() -> Self {
1157 Self::with_wasm_runtime(None).await
1158 }
1159
1160 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1162 ServiceStorageBuilder {
1163 wasm_runtime: wasm_runtime.into(),
1164 ..ServiceStorageBuilder::default()
1165 }
1166 }
1167}
1168
1169#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1170#[async_trait]
1171impl StorageBuilder for ServiceStorageBuilder {
1172 type Storage = DbStorage<ServiceStoreClient, TestClock>;
1173
1174 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1175 self.instance_counter += 1;
1176 let config = ServiceStoreClient::new_test_config().await?;
1177 if self.namespace.is_empty() {
1178 self.namespace = generate_test_namespace();
1179 }
1180 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1181 Ok(
1182 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1183 .await?,
1184 )
1185 }
1186
1187 fn clock(&self) -> &TestClock {
1188 &self.clock
1189 }
1190}
1191
1192#[cfg(feature = "dynamodb")]
1193#[derive(Default)]
1194pub struct DynamoDbStorageBuilder {
1195 namespace: String,
1196 instance_counter: usize,
1197 wasm_runtime: Option<WasmRuntime>,
1198 clock: TestClock,
1199}
1200
1201#[cfg(feature = "dynamodb")]
1202impl DynamoDbStorageBuilder {
1203 #[allow(dead_code)]
1206 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1207 DynamoDbStorageBuilder {
1208 wasm_runtime: wasm_runtime.into(),
1209 ..DynamoDbStorageBuilder::default()
1210 }
1211 }
1212}
1213
1214#[cfg(feature = "dynamodb")]
1215#[async_trait]
1216impl StorageBuilder for DynamoDbStorageBuilder {
1217 type Storage = DbStorage<DynamoDbStore, TestClock>;
1218
1219 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1220 self.instance_counter += 1;
1221 let config = DynamoDbStore::new_test_config().await?;
1222 if self.namespace.is_empty() {
1223 self.namespace = generate_test_namespace();
1224 }
1225 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1226 Ok(
1227 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1228 .await?,
1229 )
1230 }
1231
1232 fn clock(&self) -> &TestClock {
1233 &self.clock
1234 }
1235}
1236
1237#[cfg(feature = "scylladb")]
1238#[derive(Default)]
1239pub struct ScyllaDbStorageBuilder {
1240 namespace: String,
1241 instance_counter: usize,
1242 wasm_runtime: Option<WasmRuntime>,
1243 clock: TestClock,
1244}
1245
1246#[cfg(feature = "scylladb")]
1247impl ScyllaDbStorageBuilder {
1248 #[allow(dead_code)]
1251 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1252 ScyllaDbStorageBuilder {
1253 wasm_runtime: wasm_runtime.into(),
1254 ..ScyllaDbStorageBuilder::default()
1255 }
1256 }
1257}
1258
1259#[cfg(feature = "scylladb")]
1260#[async_trait]
1261impl StorageBuilder for ScyllaDbStorageBuilder {
1262 type Storage = DbStorage<ScyllaDbStore, TestClock>;
1263
1264 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1265 self.instance_counter += 1;
1266 let config = ScyllaDbStore::new_test_config().await?;
1267 if self.namespace.is_empty() {
1268 self.namespace = generate_test_namespace();
1269 }
1270 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1271 Ok(
1272 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1273 .await?,
1274 )
1275 }
1276
1277 fn clock(&self) -> &TestClock {
1278 &self.clock
1279 }
1280}