linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    num::NonZeroUsize,
7    sync::Arc,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::ServiceStoreClient;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbStore;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbStore;
39use linera_views::{
40    memory::MemoryStore, random::generate_test_namespace, store::TestKeyValueStore as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbStore,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{ChainClientOptions, Client},
52    data_types::*,
53    node::{
54        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55        ValidatorNodeProvider,
56    },
57    notifier::ChannelNotifier,
58    worker::{NetworkActions, Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63    Honest,
64    Offline,
65    OfflineWithInfo,
66    Malicious,
67    DontSendConfirmVote,
68    DontProcessValidated,
69    DontSendValidateVote,
70}
71
72/// A validator used for testing. "Faulty" validators ignore block proposals (but not
73/// certificates or info queries) and have the wrong initial balance for all chains.
74///
75/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
76/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
77/// tasks if the client stopped waiting for the response.
78struct LocalValidator<S>
79where
80    S: Storage,
81{
82    state: WorkerState<S>,
83    fault_type: FaultType,
84    notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90    S: Storage,
91{
92    public_key: ValidatorPublicKey,
93    client: Arc<Mutex<LocalValidator<S>>>,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98    S: Storage + Clone + Send + Sync + 'static,
99{
100    type NotificationStream = NotificationStream;
101
102    async fn handle_block_proposal(
103        &self,
104        proposal: BlockProposal,
105    ) -> Result<ChainInfoResponse, NodeError> {
106        self.spawn_and_receive(move |validator, sender| {
107            validator.do_handle_block_proposal(proposal, sender)
108        })
109        .await
110    }
111
112    async fn handle_lite_certificate(
113        &self,
114        certificate: LiteCertificate<'_>,
115        _delivery: CrossChainMessageDelivery,
116    ) -> Result<ChainInfoResponse, NodeError> {
117        let certificate = certificate.cloned();
118        self.spawn_and_receive(move |validator, sender| {
119            validator.do_handle_lite_certificate(certificate, sender)
120        })
121        .await
122    }
123
124    async fn handle_timeout_certificate(
125        &self,
126        certificate: GenericCertificate<Timeout>,
127    ) -> Result<ChainInfoResponse, NodeError> {
128        self.spawn_and_receive(move |validator, sender| {
129            validator.do_handle_certificate(certificate, sender)
130        })
131        .await
132    }
133
134    async fn handle_validated_certificate(
135        &self,
136        certificate: GenericCertificate<ValidatedBlock>,
137    ) -> Result<ChainInfoResponse, NodeError> {
138        self.spawn_and_receive(move |validator, sender| {
139            validator.do_handle_certificate(certificate, sender)
140        })
141        .await
142    }
143
144    async fn handle_confirmed_certificate(
145        &self,
146        certificate: GenericCertificate<ConfirmedBlock>,
147        _delivery: CrossChainMessageDelivery,
148    ) -> Result<ChainInfoResponse, NodeError> {
149        self.spawn_and_receive(move |validator, sender| {
150            validator.do_handle_certificate(certificate, sender)
151        })
152        .await
153    }
154
155    async fn handle_chain_info_query(
156        &self,
157        query: ChainInfoQuery,
158    ) -> Result<ChainInfoResponse, NodeError> {
159        self.spawn_and_receive(move |validator, sender| {
160            validator.do_handle_chain_info_query(query, sender)
161        })
162        .await
163    }
164
165    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167            .await
168    }
169
170    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171        Ok(Default::default())
172    }
173
174    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175        Ok(NetworkDescription {
176            name: "test network".to_string(),
177            genesis_config_hash: CryptoHash::test_hash("genesis config"),
178            genesis_timestamp: Timestamp::default(),
179            admin_chain_id: self
180                .client
181                .lock()
182                .await
183                .state
184                .storage_client()
185                .read_network_description()
186                .await
187                .unwrap()
188                .unwrap()
189                .admin_chain_id,
190        })
191    }
192
193    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
194        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
195            .await
196    }
197
198    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
199        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
200            .await
201    }
202
203    async fn download_pending_blob(
204        &self,
205        chain_id: ChainId,
206        blob_id: BlobId,
207    ) -> Result<BlobContent, NodeError> {
208        self.spawn_and_receive(move |validator, sender| {
209            validator.do_download_pending_blob(chain_id, blob_id, sender)
210        })
211        .await
212    }
213
214    async fn handle_pending_blob(
215        &self,
216        chain_id: ChainId,
217        blob: BlobContent,
218    ) -> Result<ChainInfoResponse, NodeError> {
219        self.spawn_and_receive(move |validator, sender| {
220            validator.do_handle_pending_blob(chain_id, blob, sender)
221        })
222        .await
223    }
224
225    async fn download_certificate(
226        &self,
227        hash: CryptoHash,
228    ) -> Result<ConfirmedBlockCertificate, NodeError> {
229        self.spawn_and_receive(move |validator, sender| {
230            validator.do_download_certificate(hash, sender)
231        })
232        .await
233    }
234
235    async fn download_certificates(
236        &self,
237        hashes: Vec<CryptoHash>,
238    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
239        self.spawn_and_receive(move |validator, sender| {
240            validator.do_download_certificates(hashes, sender)
241        })
242        .await
243    }
244
245    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
246        self.spawn_and_receive(move |validator, sender| {
247            validator.do_blob_last_used_by(blob_id, sender)
248        })
249        .await
250    }
251
252    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
253        self.spawn_and_receive(move |validator, sender| {
254            validator.do_missing_blob_ids(blob_ids, sender)
255        })
256        .await
257    }
258}
259
260impl<S> LocalValidatorClient<S>
261where
262    S: Storage + Clone + Send + Sync + 'static,
263{
264    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
265        let client = LocalValidator {
266            fault_type: FaultType::Honest,
267            state,
268            notifier: Arc::new(ChannelNotifier::default()),
269        };
270        Self {
271            public_key,
272            client: Arc::new(Mutex::new(client)),
273        }
274    }
275
276    pub fn name(&self) -> ValidatorPublicKey {
277        self.public_key
278    }
279
280    async fn set_fault_type(&self, fault_type: FaultType) {
281        self.client.lock().await.fault_type = fault_type;
282    }
283
284    async fn fault_type(&self) -> FaultType {
285        self.client.lock().await.fault_type
286    }
287
288    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
289    pub async fn chain_info_with_manager_values(
290        &mut self,
291        chain_id: ChainId,
292    ) -> Result<Box<ChainInfo>, NodeError> {
293        let query = ChainInfoQuery::new(chain_id).with_manager_values();
294        let response = self.handle_chain_info_query(query).await?;
295        Ok(response.info)
296    }
297
298    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
299    /// Returns the value that the future puts into the sender.
300    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
301    where
302        T: Send + 'static,
303        R: Future<Output = Result<(), T>> + Send,
304        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
305    {
306        let validator = self.clone();
307        let (sender, receiver) = oneshot::channel();
308        tokio::spawn(async move {
309            if f(validator, sender).await.is_err() {
310                tracing::debug!("result could not be sent");
311            }
312        });
313        receiver.await.unwrap()
314    }
315
316    async fn do_handle_block_proposal(
317        self,
318        proposal: BlockProposal,
319        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
320    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
321        let mut validator = self.client.lock().await;
322        let handle_block_proposal_result =
323            Self::handle_block_proposal(proposal, &mut validator).await;
324        let result = match handle_block_proposal_result {
325            Some(Err(NodeError::BlobsNotFound(_))) => {
326                handle_block_proposal_result.expect("handle_block_proposal_result should be Some")
327            }
328            _ => match validator.fault_type {
329                FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
330                    error: "offline".to_string(),
331                }),
332                FaultType::Malicious => Err(ArithmeticError::Overflow.into()),
333                FaultType::DontSendValidateVote => Err(NodeError::ClientIoError {
334                    error: "refusing to validate".to_string(),
335                }),
336                FaultType::Honest
337                | FaultType::DontSendConfirmVote
338                | FaultType::DontProcessValidated => handle_block_proposal_result
339                    .expect("handle_block_proposal_result should be Some"),
340            },
341        };
342        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
343        sender.send(result.map(|(info, _actions)| info))
344    }
345
346    async fn handle_block_proposal(
347        proposal: BlockProposal,
348        validator: &mut MutexGuard<'_, LocalValidator<S>>,
349    ) -> Option<Result<(ChainInfoResponse, NetworkActions), NodeError>> {
350        match validator.fault_type {
351            FaultType::Offline | FaultType::OfflineWithInfo | FaultType::Malicious => None,
352            FaultType::Honest
353            | FaultType::DontSendConfirmVote
354            | FaultType::DontProcessValidated
355            | FaultType::DontSendValidateVote => Some(
356                validator
357                    .state
358                    .handle_block_proposal(proposal)
359                    .await
360                    .map_err(Into::into),
361            ),
362        }
363    }
364
365    async fn handle_certificate<T: ProcessableCertificate>(
366        certificate: GenericCertificate<T>,
367        validator: &mut MutexGuard<'_, LocalValidator<S>>,
368    ) -> Option<Result<ChainInfoResponse, NodeError>> {
369        match validator.fault_type {
370            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => None,
371            FaultType::Honest
372            | FaultType::DontSendConfirmVote
373            | FaultType::Malicious
374            | FaultType::DontProcessValidated
375            | FaultType::DontSendValidateVote => Some(
376                validator
377                    .state
378                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
379                    .await
380                    .map_err(Into::into),
381            ),
382            FaultType::Offline | FaultType::OfflineWithInfo => None,
383        }
384    }
385
386    async fn do_handle_lite_certificate(
387        self,
388        certificate: LiteCertificate<'_>,
389        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
390    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
391        let client = self.client.clone();
392        let mut validator = client.lock().await;
393        let result = async move {
394            match validator.state.full_certificate(certificate).await? {
395                Either::Left(confirmed) => {
396                    self.do_handle_certificate_internal(confirmed, &mut validator)
397                        .await
398                }
399                Either::Right(validated) => {
400                    self.do_handle_certificate_internal(validated, &mut validator)
401                        .await
402                }
403            }
404        }
405        .await;
406        sender.send(result)
407    }
408
409    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
410        &self,
411        certificate: GenericCertificate<T>,
412        validator: &mut MutexGuard<'_, LocalValidator<S>>,
413    ) -> Result<ChainInfoResponse, NodeError> {
414        let handle_certificate_result = Self::handle_certificate(certificate, validator).await;
415        match handle_certificate_result {
416            Some(Err(NodeError::BlobsNotFound(_))) => {
417                handle_certificate_result.expect("handle_certificate_result should be Some")
418            }
419            _ => match validator.fault_type {
420                FaultType::DontSendConfirmVote | FaultType::DontProcessValidated
421                    if T::KIND == CertificateKind::Validated =>
422                {
423                    Err(NodeError::ClientIoError {
424                        error: "refusing to confirm".to_string(),
425                    })
426                }
427                FaultType::Honest
428                | FaultType::DontSendConfirmVote
429                | FaultType::DontProcessValidated
430                | FaultType::Malicious
431                | FaultType::DontSendValidateVote => {
432                    handle_certificate_result.expect("handle_certificate_result should be Some")
433                }
434                FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435                    error: "offline".to_string(),
436                }),
437            },
438        }
439    }
440
441    async fn do_handle_certificate<T: ProcessableCertificate>(
442        self,
443        certificate: GenericCertificate<T>,
444        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
445    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
446        let mut validator = self.client.lock().await;
447        let result = self
448            .do_handle_certificate_internal(certificate, &mut validator)
449            .await;
450        sender.send(result)
451    }
452
453    async fn do_handle_chain_info_query(
454        self,
455        query: ChainInfoQuery,
456        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
457    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
458        let validator = self.client.lock().await;
459        let result = if validator.fault_type == FaultType::Offline {
460            Err(NodeError::ClientIoError {
461                error: "offline".to_string(),
462            })
463        } else {
464            validator
465                .state
466                .handle_chain_info_query(query)
467                .await
468                .map_err(Into::into)
469        };
470        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
471        sender.send(result.map(|(info, _actions)| info))
472    }
473
474    async fn do_subscribe(
475        self,
476        chains: Vec<ChainId>,
477        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
478    ) -> Result<(), Result<NotificationStream, NodeError>> {
479        let validator = self.client.lock().await;
480        let rx = validator.notifier.subscribe(chains);
481        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
482        sender.send(Ok(stream))
483    }
484
485    async fn do_upload_blob(
486        self,
487        content: BlobContent,
488        sender: oneshot::Sender<Result<BlobId, NodeError>>,
489    ) -> Result<(), Result<BlobId, NodeError>> {
490        let validator = self.client.lock().await;
491        let blob = Blob::new(content);
492        let id = blob.id();
493        let storage = validator.state.storage_client();
494        let result = match storage.maybe_write_blobs(&[blob]).await {
495            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
496            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
497            Err(error) => Err(error.into()),
498        };
499        sender.send(result)
500    }
501
502    async fn do_download_blob(
503        self,
504        blob_id: BlobId,
505        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
506    ) -> Result<(), Result<BlobContent, NodeError>> {
507        let validator = self.client.lock().await;
508        let blob = validator
509            .state
510            .storage_client()
511            .read_blob(blob_id)
512            .await
513            .map_err(Into::into);
514        let blob = match blob {
515            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
516            Err(error) => Err(error),
517        };
518        sender.send(blob.map(|blob| blob.into_content()))
519    }
520
521    async fn do_download_pending_blob(
522        self,
523        chain_id: ChainId,
524        blob_id: BlobId,
525        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
526    ) -> Result<(), Result<BlobContent, NodeError>> {
527        let validator = self.client.lock().await;
528        let result = validator
529            .state
530            .download_pending_blob(chain_id, blob_id)
531            .await
532            .map_err(Into::into);
533        sender.send(result.map(|blob| blob.into_content()))
534    }
535
536    async fn do_handle_pending_blob(
537        self,
538        chain_id: ChainId,
539        blob: BlobContent,
540        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
541    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
542        let validator = self.client.lock().await;
543        let result = validator
544            .state
545            .handle_pending_blob(chain_id, Blob::new(blob))
546            .await
547            .map_err(Into::into);
548        sender.send(result)
549    }
550
551    async fn do_download_certificate(
552        self,
553        hash: CryptoHash,
554        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
555    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
556        let validator = self.client.lock().await;
557        let certificate = validator
558            .state
559            .storage_client()
560            .read_certificate(hash)
561            .await
562            .map_err(Into::into);
563
564        sender.send(certificate)
565    }
566
567    async fn do_download_certificates(
568        self,
569        hashes: Vec<CryptoHash>,
570        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
571    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
572        let validator = self.client.lock().await;
573        let certificates = validator
574            .state
575            .storage_client()
576            .read_certificates(hashes)
577            .await
578            .map_err(Into::into);
579
580        sender.send(certificates)
581    }
582
583    async fn do_blob_last_used_by(
584        self,
585        blob_id: BlobId,
586        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
587    ) -> Result<(), Result<CryptoHash, NodeError>> {
588        let validator = self.client.lock().await;
589        let blob_state = validator
590            .state
591            .storage_client()
592            .read_blob_state(blob_id)
593            .await
594            .map_err(Into::into);
595        let certificate_hash = match blob_state {
596            Err(err) => Err(err),
597            Ok(blob_state) => match blob_state {
598                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
599                Some(blob_state) => blob_state
600                    .last_used_by
601                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
602            },
603        };
604
605        sender.send(certificate_hash)
606    }
607
608    async fn do_missing_blob_ids(
609        self,
610        blob_ids: Vec<BlobId>,
611        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
612    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
613        let validator = self.client.lock().await;
614        let missing_blob_ids = validator
615            .state
616            .storage_client()
617            .missing_blobs(&blob_ids)
618            .await
619            .map_err(Into::into);
620        sender.send(missing_blob_ids)
621    }
622}
623
624#[derive(Clone)]
625pub struct NodeProvider<S>(BTreeMap<ValidatorPublicKey, Arc<Mutex<LocalValidator<S>>>>)
626where
627    S: Storage;
628
629impl<S> ValidatorNodeProvider for NodeProvider<S>
630where
631    S: Storage + Clone + Send + Sync + 'static,
632{
633    type Node = LocalValidatorClient<S>;
634
635    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
636        unimplemented!()
637    }
638
639    fn make_nodes_from_list<A>(
640        &self,
641        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
642    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
643    where
644        A: AsRef<str>,
645    {
646        Ok(validators
647            .into_iter()
648            .map(|(public_key, address)| {
649                self.0
650                    .get(&public_key)
651                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
652                        address: address.as_ref().to_string(),
653                    })
654                    .cloned()
655                    .map(|client| (public_key, LocalValidatorClient { public_key, client }))
656            })
657            .collect::<Result<Vec<_>, _>>()?
658            .into_iter())
659    }
660}
661
662impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
663where
664    S: Storage,
665{
666    fn from_iter<T>(iter: T) -> Self
667    where
668        T: IntoIterator<Item = LocalValidatorClient<S>>,
669    {
670        let destructure =
671            |validator: LocalValidatorClient<S>| (validator.public_key, validator.client);
672        Self(iter.into_iter().map(destructure).collect())
673    }
674}
675
676// NOTE:
677// * To communicate with a quorum of validators, chain clients iterate over a copy of
678// `validator_clients` to spawn I/O tasks.
679// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
680// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
681// communicate with.
682#[allow(dead_code)]
683pub struct TestBuilder<B: StorageBuilder> {
684    storage_builder: B,
685    pub initial_committee: Committee,
686    admin_description: Option<ChainDescription>,
687    network_description: Option<NetworkDescription>,
688    genesis_storage_builder: GenesisStorageBuilder,
689    validator_clients: Vec<LocalValidatorClient<B::Storage>>,
690    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
691    chain_client_storages: Vec<B::Storage>,
692    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
693    pub signer: InMemorySigner,
694}
695
696#[async_trait]
697pub trait StorageBuilder {
698    type Storage: Storage + Clone + Send + Sync + 'static;
699
700    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
701
702    fn clock(&self) -> &TestClock;
703}
704
705#[derive(Default)]
706struct GenesisStorageBuilder {
707    accounts: Vec<GenesisAccount>,
708}
709
710struct GenesisAccount {
711    description: ChainDescription,
712    public_key: AccountPublicKey,
713}
714
715impl GenesisStorageBuilder {
716    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
717        self.accounts.push(GenesisAccount {
718            description,
719            public_key,
720        })
721    }
722
723    async fn build<S>(&self, storage: S) -> S
724    where
725        S: Storage + Clone + Send + Sync + 'static,
726    {
727        for account in &self.accounts {
728            storage
729                .create_chain(account.description.clone())
730                .await
731                .unwrap();
732        }
733        storage
734    }
735}
736
737pub type ChainClient<S> =
738    crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
739
740impl<B> TestBuilder<B>
741where
742    B: StorageBuilder,
743{
744    pub async fn new(
745        mut storage_builder: B,
746        count: usize,
747        with_faulty_validators: usize,
748        mut signer: InMemorySigner,
749    ) -> Result<Self, anyhow::Error> {
750        let mut validators = Vec::new();
751        for _ in 0..count {
752            let validator_keypair = ValidatorKeypair::generate();
753            let account_public_key = signer.generate_new();
754            validators.push((validator_keypair, account_public_key));
755        }
756        let for_committee = validators
757            .iter()
758            .map(|(validating, account)| (validating.public_key, *account))
759            .collect::<Vec<_>>();
760        let initial_committee = Committee::make_simple(for_committee);
761        let mut validator_clients = Vec::new();
762        let mut validator_storages = HashMap::new();
763        let mut faulty_validators = HashSet::new();
764        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
765            let validator_public_key = validator_keypair.public_key;
766            let storage = storage_builder.build().await?;
767            let state = WorkerState::new(
768                format!("Node {}", i),
769                Some(validator_keypair.secret_key),
770                storage.clone(),
771                NonZeroUsize::new(100).expect("Chain worker limit should not be zero"),
772            )
773            .with_allow_inactive_chains(false)
774            .with_allow_messages_from_deprecated_epochs(false);
775            let validator = LocalValidatorClient::new(validator_public_key, state);
776            if i < with_faulty_validators {
777                faulty_validators.insert(validator_public_key);
778                validator.set_fault_type(FaultType::Malicious).await;
779            }
780            validator_clients.push(validator);
781            validator_storages.insert(validator_public_key, storage);
782        }
783        tracing::info!(
784            "Test will use the following faulty validators: {:?}",
785            faulty_validators
786        );
787        Ok(Self {
788            storage_builder,
789            initial_committee,
790            admin_description: None,
791            network_description: None,
792            genesis_storage_builder: GenesisStorageBuilder::default(),
793            validator_clients,
794            validator_storages,
795            chain_client_storages: Vec::new(),
796            chain_owners: BTreeMap::new(),
797            signer,
798        })
799    }
800
801    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
802        let validators = self.initial_committee.validators().clone();
803        self.initial_committee = Committee::new(validators, policy);
804        self
805    }
806
807    pub async fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
808        let mut faulty_validators = vec![];
809        for index in indexes.as_ref() {
810            let validator = &mut self.validator_clients[*index];
811            validator.set_fault_type(fault_type).await;
812            faulty_validators.push(validator.public_key);
813        }
814        tracing::info!(
815            "Making the following validators {:?}: {:?}",
816            fault_type,
817            faulty_validators
818        );
819    }
820
821    /// Creates the root chain with the given `index`, and returns a client for it.
822    ///
823    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
824    /// is automatically set to zero.
825    pub async fn add_root_chain(
826        &mut self,
827        index: u32,
828        balance: Amount,
829    ) -> anyhow::Result<ChainClient<B::Storage>> {
830        // Make sure the admin chain is initialized.
831        if self.admin_description.is_none() && index != 0 {
832            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
833        }
834        let origin = ChainOrigin::Root(index);
835        let mut committees = BTreeMap::new();
836        committees.insert(
837            Epoch(0),
838            bcs::to_bytes(&self.initial_committee)
839                .expect("Serializing a committee should not fail!"),
840        );
841        let public_key = self.signer.generate_new();
842        let open_chain_config = InitialChainConfig {
843            ownership: ChainOwnership::single(public_key.into()),
844            epoch: Epoch(0),
845            committees,
846            balance,
847            application_permissions: ApplicationPermissions::default(),
848        };
849        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
850        if index == 0 {
851            self.admin_description = Some(description.clone());
852            self.network_description = Some(NetworkDescription {
853                admin_chain_id: description.id(),
854                // dummy values to fill the description
855                genesis_config_hash: CryptoHash::test_hash("genesis config"),
856                genesis_timestamp: Timestamp::from(0),
857                name: "test network".to_string(),
858            });
859        }
860        // Remember what's in the genesis store for future clients to join.
861        self.genesis_storage_builder
862            .add(description.clone(), public_key);
863        for validator in &self.validator_clients {
864            let storage = self
865                .validator_storages
866                .get_mut(&validator.public_key)
867                .unwrap();
868            storage
869                .write_network_description(self.network_description.as_ref().unwrap())
870                .await
871                .expect("writing the NetworkDescription should succeed");
872            if validator.fault_type().await == FaultType::Malicious {
873                let origin = description.origin();
874                let config = InitialChainConfig {
875                    balance: Amount::ZERO,
876                    ..description.config().clone()
877                };
878                storage
879                    .create_chain(ChainDescription::new(origin, config, Timestamp::from(0)))
880                    .await
881                    .unwrap();
882            } else {
883                storage.create_chain(description.clone()).await.unwrap();
884            }
885        }
886        for storage in self.chain_client_storages.iter_mut() {
887            storage.create_chain(description.clone()).await.unwrap();
888        }
889        let chain_id = description.id();
890        self.chain_owners.insert(chain_id, public_key.into());
891        self.make_client(chain_id, None, BlockHeight::ZERO).await
892    }
893
894    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
895        let mut result = Vec::new();
896        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
897            assert_eq!(
898                genesis_account.description.origin(),
899                ChainOrigin::Root(i as u32)
900            );
901            result.push((
902                genesis_account.public_key,
903                genesis_account.description.config().balance,
904            ));
905        }
906        result
907    }
908
909    pub fn admin_id(&self) -> ChainId {
910        self.admin_description
911            .as_ref()
912            .expect("admin chain not initialized")
913            .id()
914    }
915
916    pub fn admin_description(&self) -> Option<&ChainDescription> {
917        self.admin_description.as_ref()
918    }
919
920    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
921        self.validator_clients.iter().cloned().collect()
922    }
923
924    pub fn node(&mut self, index: usize) -> &mut LocalValidatorClient<B::Storage> {
925        &mut self.validator_clients[index]
926    }
927
928    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
929        let storage = self.storage_builder.build().await?;
930        storage
931            .write_network_description(self.network_description.as_ref().unwrap())
932            .await
933            .expect("writing the NetworkDescription should succeed");
934        Ok(self.genesis_storage_builder.build(storage).await)
935    }
936
937    pub async fn make_client(
938        &mut self,
939        chain_id: ChainId,
940        block_hash: Option<CryptoHash>,
941        block_height: BlockHeight,
942    ) -> anyhow::Result<ChainClient<B::Storage>> {
943        // Note that new clients are only given the genesis store: they must figure out
944        // the rest by asking validators.
945        let storage = self.make_storage().await?;
946        self.chain_client_storages.push(storage.clone());
947        let client = Arc::new(Client::new(
948            crate::environment::Impl {
949                network: self.make_node_provider(),
950                storage,
951                signer: self.signer.clone(),
952            },
953            self.admin_id(),
954            false,
955            [chain_id],
956            format!("Client node for {:.8}", chain_id),
957            NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
958            ChainClientOptions::test_default(),
959        ));
960        Ok(client.create_chain_client(
961            chain_id,
962            block_hash,
963            block_height,
964            None,
965            self.chain_owners.get(&chain_id).copied(),
966        ))
967    }
968
969    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
970    pub async fn check_that_validators_have_certificate(
971        &self,
972        chain_id: ChainId,
973        block_height: BlockHeight,
974        target_count: usize,
975    ) -> Option<ConfirmedBlockCertificate> {
976        let query =
977            ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(BlockHeightRange {
978                start: block_height,
979                limit: Some(1),
980            });
981        let mut count = 0;
982        let mut certificate = None;
983        for validator in self.validator_clients.clone() {
984            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
985                if response.check(&validator.public_key).is_ok() {
986                    let ChainInfo {
987                        mut requested_sent_certificate_hashes,
988                        ..
989                    } = *response.info;
990                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
991                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
992                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
993                            if cert.inner().block().header.chain_id == chain_id
994                                && cert.inner().block().header.height == block_height
995                            {
996                                cert.check(&self.initial_committee).unwrap();
997                                count += 1;
998                                certificate = Some(cert);
999                            }
1000                        }
1001                    }
1002                }
1003            }
1004        }
1005        assert!(count >= target_count);
1006        certificate
1007    }
1008
1009    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1010    /// in the expected round.
1011    pub async fn check_that_validators_are_in_round(
1012        &self,
1013        chain_id: ChainId,
1014        block_height: BlockHeight,
1015        round: Round,
1016        target_count: usize,
1017    ) {
1018        let query = ChainInfoQuery::new(chain_id);
1019        let mut count = 0;
1020        for validator in self.validator_clients.clone() {
1021            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1022                if response.info.manager.current_round == round
1023                    && response.info.next_block_height == block_height
1024                    && response.check(&validator.public_key).is_ok()
1025                {
1026                    count += 1;
1027                }
1028            }
1029        }
1030        assert!(count >= target_count);
1031    }
1032
1033    /// Panics if any validator has a nonempty outbox for the given chain.
1034    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1035        for validator in &self.validator_clients {
1036            let guard = validator.client.lock().await;
1037            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1038            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1039        }
1040    }
1041}
1042
1043#[cfg(feature = "rocksdb")]
1044/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1045static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1046
1047#[derive(Default)]
1048pub struct MemoryStorageBuilder {
1049    namespace: String,
1050    instance_counter: usize,
1051    wasm_runtime: Option<WasmRuntime>,
1052    clock: TestClock,
1053}
1054
1055#[async_trait]
1056impl StorageBuilder for MemoryStorageBuilder {
1057    type Storage = DbStorage<MemoryStore, TestClock>;
1058
1059    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1060        self.instance_counter += 1;
1061        let config = MemoryStore::new_test_config().await?;
1062        if self.namespace.is_empty() {
1063            self.namespace = generate_test_namespace();
1064        }
1065        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1066        Ok(
1067            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1068                .await?,
1069        )
1070    }
1071
1072    fn clock(&self) -> &TestClock {
1073        &self.clock
1074    }
1075}
1076
1077impl MemoryStorageBuilder {
1078    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1079    /// applications.
1080    #[allow(dead_code)]
1081    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1082        MemoryStorageBuilder {
1083            wasm_runtime: wasm_runtime.into(),
1084            ..MemoryStorageBuilder::default()
1085        }
1086    }
1087}
1088
1089#[cfg(feature = "rocksdb")]
1090pub struct RocksDbStorageBuilder {
1091    namespace: String,
1092    instance_counter: usize,
1093    wasm_runtime: Option<WasmRuntime>,
1094    clock: TestClock,
1095    _permit: SemaphorePermit<'static>,
1096}
1097
1098#[cfg(feature = "rocksdb")]
1099impl RocksDbStorageBuilder {
1100    pub async fn new() -> Self {
1101        RocksDbStorageBuilder {
1102            namespace: String::new(),
1103            instance_counter: 0,
1104            wasm_runtime: None,
1105            clock: TestClock::default(),
1106            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1107        }
1108    }
1109
1110    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1111    /// applications.
1112    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1113    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1114        RocksDbStorageBuilder {
1115            wasm_runtime: wasm_runtime.into(),
1116            ..RocksDbStorageBuilder::new().await
1117        }
1118    }
1119}
1120
1121#[cfg(feature = "rocksdb")]
1122#[async_trait]
1123impl StorageBuilder for RocksDbStorageBuilder {
1124    type Storage = DbStorage<RocksDbStore, TestClock>;
1125
1126    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1127        self.instance_counter += 1;
1128        let config = RocksDbStore::new_test_config().await?;
1129        if self.namespace.is_empty() {
1130            self.namespace = generate_test_namespace();
1131        }
1132        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1133        Ok(
1134            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1135                .await?,
1136        )
1137    }
1138
1139    fn clock(&self) -> &TestClock {
1140        &self.clock
1141    }
1142}
1143
1144#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1145#[derive(Default)]
1146pub struct ServiceStorageBuilder {
1147    namespace: String,
1148    instance_counter: usize,
1149    wasm_runtime: Option<WasmRuntime>,
1150    clock: TestClock,
1151}
1152
1153#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1154impl ServiceStorageBuilder {
1155    /// Creates a `ServiceStorage`.
1156    pub async fn new() -> Self {
1157        Self::with_wasm_runtime(None).await
1158    }
1159
1160    /// Creates a `ServiceStorage` with the given Wasm runtime.
1161    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1162        ServiceStorageBuilder {
1163            wasm_runtime: wasm_runtime.into(),
1164            ..ServiceStorageBuilder::default()
1165        }
1166    }
1167}
1168
1169#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1170#[async_trait]
1171impl StorageBuilder for ServiceStorageBuilder {
1172    type Storage = DbStorage<ServiceStoreClient, TestClock>;
1173
1174    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1175        self.instance_counter += 1;
1176        let config = ServiceStoreClient::new_test_config().await?;
1177        if self.namespace.is_empty() {
1178            self.namespace = generate_test_namespace();
1179        }
1180        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1181        Ok(
1182            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1183                .await?,
1184        )
1185    }
1186
1187    fn clock(&self) -> &TestClock {
1188        &self.clock
1189    }
1190}
1191
1192#[cfg(feature = "dynamodb")]
1193#[derive(Default)]
1194pub struct DynamoDbStorageBuilder {
1195    namespace: String,
1196    instance_counter: usize,
1197    wasm_runtime: Option<WasmRuntime>,
1198    clock: TestClock,
1199}
1200
1201#[cfg(feature = "dynamodb")]
1202impl DynamoDbStorageBuilder {
1203    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1204    /// applications.
1205    #[allow(dead_code)]
1206    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1207        DynamoDbStorageBuilder {
1208            wasm_runtime: wasm_runtime.into(),
1209            ..DynamoDbStorageBuilder::default()
1210        }
1211    }
1212}
1213
1214#[cfg(feature = "dynamodb")]
1215#[async_trait]
1216impl StorageBuilder for DynamoDbStorageBuilder {
1217    type Storage = DbStorage<DynamoDbStore, TestClock>;
1218
1219    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1220        self.instance_counter += 1;
1221        let config = DynamoDbStore::new_test_config().await?;
1222        if self.namespace.is_empty() {
1223            self.namespace = generate_test_namespace();
1224        }
1225        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1226        Ok(
1227            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1228                .await?,
1229        )
1230    }
1231
1232    fn clock(&self) -> &TestClock {
1233        &self.clock
1234    }
1235}
1236
1237#[cfg(feature = "scylladb")]
1238#[derive(Default)]
1239pub struct ScyllaDbStorageBuilder {
1240    namespace: String,
1241    instance_counter: usize,
1242    wasm_runtime: Option<WasmRuntime>,
1243    clock: TestClock,
1244}
1245
1246#[cfg(feature = "scylladb")]
1247impl ScyllaDbStorageBuilder {
1248    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1249    /// applications.
1250    #[allow(dead_code)]
1251    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1252        ScyllaDbStorageBuilder {
1253            wasm_runtime: wasm_runtime.into(),
1254            ..ScyllaDbStorageBuilder::default()
1255        }
1256    }
1257}
1258
1259#[cfg(feature = "scylladb")]
1260#[async_trait]
1261impl StorageBuilder for ScyllaDbStorageBuilder {
1262    type Storage = DbStorage<ScyllaDbStore, TestClock>;
1263
1264    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1265        self.instance_counter += 1;
1266        let config = ScyllaDbStore::new_test_config().await?;
1267        if self.namespace.is_empty() {
1268            self.namespace = generate_test_namespace();
1269        }
1270        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1271        Ok(
1272            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1273                .await?,
1274        )
1275    }
1276
1277    fn clock(&self) -> &TestClock {
1278        &self.clock
1279    }
1280}