linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    num::NonZeroUsize,
7    sync::Arc,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::ServiceStoreClient;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbStore;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbStore;
39use linera_views::{
40    memory::MemoryStore, random::generate_test_namespace, store::TestKeyValueStore as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbStore,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{ChainClientOptions, Client},
52    data_types::*,
53    node::{
54        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55        ValidatorNodeProvider,
56    },
57    notifier::ChannelNotifier,
58    worker::{NetworkActions, Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63    Honest,
64    Offline,
65    OfflineWithInfo,
66    Malicious,
67    DontSendConfirmVote,
68    DontProcessValidated,
69    DontSendValidateVote,
70}
71
72/// A validator used for testing. "Faulty" validators ignore block proposals (but not
73/// certificates or info queries) and have the wrong initial balance for all chains.
74///
75/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
76/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
77/// tasks if the client stopped waiting for the response.
78struct LocalValidator<S>
79where
80    S: Storage,
81{
82    state: WorkerState<S>,
83    fault_type: FaultType,
84    notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90    S: Storage,
91{
92    public_key: ValidatorPublicKey,
93    client: Arc<Mutex<LocalValidator<S>>>,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98    S: Storage + Clone + Send + Sync + 'static,
99{
100    type NotificationStream = NotificationStream;
101
102    async fn handle_block_proposal(
103        &self,
104        proposal: BlockProposal,
105    ) -> Result<ChainInfoResponse, NodeError> {
106        self.spawn_and_receive(move |validator, sender| {
107            validator.do_handle_block_proposal(proposal, sender)
108        })
109        .await
110    }
111
112    async fn handle_lite_certificate(
113        &self,
114        certificate: LiteCertificate<'_>,
115        _delivery: CrossChainMessageDelivery,
116    ) -> Result<ChainInfoResponse, NodeError> {
117        let certificate = certificate.cloned();
118        self.spawn_and_receive(move |validator, sender| {
119            validator.do_handle_lite_certificate(certificate, sender)
120        })
121        .await
122    }
123
124    async fn handle_timeout_certificate(
125        &self,
126        certificate: GenericCertificate<Timeout>,
127    ) -> Result<ChainInfoResponse, NodeError> {
128        self.spawn_and_receive(move |validator, sender| {
129            validator.do_handle_certificate(certificate, sender)
130        })
131        .await
132    }
133
134    async fn handle_validated_certificate(
135        &self,
136        certificate: GenericCertificate<ValidatedBlock>,
137    ) -> Result<ChainInfoResponse, NodeError> {
138        self.spawn_and_receive(move |validator, sender| {
139            validator.do_handle_certificate(certificate, sender)
140        })
141        .await
142    }
143
144    async fn handle_confirmed_certificate(
145        &self,
146        certificate: GenericCertificate<ConfirmedBlock>,
147        _delivery: CrossChainMessageDelivery,
148    ) -> Result<ChainInfoResponse, NodeError> {
149        self.spawn_and_receive(move |validator, sender| {
150            validator.do_handle_certificate(certificate, sender)
151        })
152        .await
153    }
154
155    async fn handle_chain_info_query(
156        &self,
157        query: ChainInfoQuery,
158    ) -> Result<ChainInfoResponse, NodeError> {
159        self.spawn_and_receive(move |validator, sender| {
160            validator.do_handle_chain_info_query(query, sender)
161        })
162        .await
163    }
164
165    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167            .await
168    }
169
170    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171        Ok(Default::default())
172    }
173
174    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175        Ok(self
176            .client
177            .lock()
178            .await
179            .state
180            .storage_client()
181            .read_network_description()
182            .await
183            .transpose()
184            .ok_or(NodeError::ViewError {
185                error: "missing NetworkDescription".to_owned(),
186            })??)
187    }
188
189    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
190        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
191            .await
192    }
193
194    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
195        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
196            .await
197    }
198
199    async fn download_pending_blob(
200        &self,
201        chain_id: ChainId,
202        blob_id: BlobId,
203    ) -> Result<BlobContent, NodeError> {
204        self.spawn_and_receive(move |validator, sender| {
205            validator.do_download_pending_blob(chain_id, blob_id, sender)
206        })
207        .await
208    }
209
210    async fn handle_pending_blob(
211        &self,
212        chain_id: ChainId,
213        blob: BlobContent,
214    ) -> Result<ChainInfoResponse, NodeError> {
215        self.spawn_and_receive(move |validator, sender| {
216            validator.do_handle_pending_blob(chain_id, blob, sender)
217        })
218        .await
219    }
220
221    async fn download_certificate(
222        &self,
223        hash: CryptoHash,
224    ) -> Result<ConfirmedBlockCertificate, NodeError> {
225        self.spawn_and_receive(move |validator, sender| {
226            validator.do_download_certificate(hash, sender)
227        })
228        .await
229    }
230
231    async fn download_certificates(
232        &self,
233        hashes: Vec<CryptoHash>,
234    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
235        self.spawn_and_receive(move |validator, sender| {
236            validator.do_download_certificates(hashes, sender)
237        })
238        .await
239    }
240
241    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
242        self.spawn_and_receive(move |validator, sender| {
243            validator.do_blob_last_used_by(blob_id, sender)
244        })
245        .await
246    }
247
248    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
249        self.spawn_and_receive(move |validator, sender| {
250            validator.do_missing_blob_ids(blob_ids, sender)
251        })
252        .await
253    }
254}
255
256impl<S> LocalValidatorClient<S>
257where
258    S: Storage + Clone + Send + Sync + 'static,
259{
260    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
261        let client = LocalValidator {
262            fault_type: FaultType::Honest,
263            state,
264            notifier: Arc::new(ChannelNotifier::default()),
265        };
266        Self {
267            public_key,
268            client: Arc::new(Mutex::new(client)),
269        }
270    }
271
272    pub fn name(&self) -> ValidatorPublicKey {
273        self.public_key
274    }
275
276    async fn set_fault_type(&self, fault_type: FaultType) {
277        self.client.lock().await.fault_type = fault_type;
278    }
279
280    async fn fault_type(&self) -> FaultType {
281        self.client.lock().await.fault_type
282    }
283
284    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
285    pub async fn chain_info_with_manager_values(
286        &mut self,
287        chain_id: ChainId,
288    ) -> Result<Box<ChainInfo>, NodeError> {
289        let query = ChainInfoQuery::new(chain_id).with_manager_values();
290        let response = self.handle_chain_info_query(query).await?;
291        Ok(response.info)
292    }
293
294    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
295    /// Returns the value that the future puts into the sender.
296    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
297    where
298        T: Send + 'static,
299        R: Future<Output = Result<(), T>> + Send,
300        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
301    {
302        let validator = self.clone();
303        let (sender, receiver) = oneshot::channel();
304        tokio::spawn(async move {
305            if f(validator, sender).await.is_err() {
306                tracing::debug!("result could not be sent");
307            }
308        });
309        receiver.await.unwrap()
310    }
311
312    async fn do_handle_block_proposal(
313        self,
314        proposal: BlockProposal,
315        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
316    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
317        let mut validator = self.client.lock().await;
318        let handle_block_proposal_result =
319            Self::handle_block_proposal(proposal, &mut validator).await;
320        let result = match handle_block_proposal_result {
321            Some(Err(NodeError::BlobsNotFound(_))) => {
322                handle_block_proposal_result.expect("handle_block_proposal_result should be Some")
323            }
324            _ => match validator.fault_type {
325                FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
326                    error: "offline".to_string(),
327                }),
328                FaultType::Malicious => Err(ArithmeticError::Overflow.into()),
329                FaultType::DontSendValidateVote => Err(NodeError::ClientIoError {
330                    error: "refusing to validate".to_string(),
331                }),
332                FaultType::Honest
333                | FaultType::DontSendConfirmVote
334                | FaultType::DontProcessValidated => handle_block_proposal_result
335                    .expect("handle_block_proposal_result should be Some"),
336            },
337        };
338        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
339        sender.send(result.map(|(info, _actions)| info))
340    }
341
342    async fn handle_block_proposal(
343        proposal: BlockProposal,
344        validator: &mut MutexGuard<'_, LocalValidator<S>>,
345    ) -> Option<Result<(ChainInfoResponse, NetworkActions), NodeError>> {
346        match validator.fault_type {
347            FaultType::Offline | FaultType::OfflineWithInfo | FaultType::Malicious => None,
348            FaultType::Honest
349            | FaultType::DontSendConfirmVote
350            | FaultType::DontProcessValidated
351            | FaultType::DontSendValidateVote => Some(
352                validator
353                    .state
354                    .handle_block_proposal(proposal)
355                    .await
356                    .map_err(Into::into),
357            ),
358        }
359    }
360
361    async fn handle_certificate<T: ProcessableCertificate>(
362        certificate: GenericCertificate<T>,
363        validator: &mut MutexGuard<'_, LocalValidator<S>>,
364    ) -> Option<Result<ChainInfoResponse, NodeError>> {
365        match validator.fault_type {
366            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => None,
367            FaultType::Honest
368            | FaultType::DontSendConfirmVote
369            | FaultType::Malicious
370            | FaultType::DontProcessValidated
371            | FaultType::DontSendValidateVote => Some(
372                validator
373                    .state
374                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
375                    .await
376                    .map_err(Into::into),
377            ),
378            FaultType::Offline | FaultType::OfflineWithInfo => None,
379        }
380    }
381
382    async fn do_handle_lite_certificate(
383        self,
384        certificate: LiteCertificate<'_>,
385        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
386    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
387        let client = self.client.clone();
388        let mut validator = client.lock().await;
389        let result = async move {
390            match validator.state.full_certificate(certificate).await? {
391                Either::Left(confirmed) => {
392                    self.do_handle_certificate_internal(confirmed, &mut validator)
393                        .await
394                }
395                Either::Right(validated) => {
396                    self.do_handle_certificate_internal(validated, &mut validator)
397                        .await
398                }
399            }
400        }
401        .await;
402        sender.send(result)
403    }
404
405    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
406        &self,
407        certificate: GenericCertificate<T>,
408        validator: &mut MutexGuard<'_, LocalValidator<S>>,
409    ) -> Result<ChainInfoResponse, NodeError> {
410        let handle_certificate_result = Self::handle_certificate(certificate, validator).await;
411        match handle_certificate_result {
412            Some(Err(NodeError::BlobsNotFound(_))) => {
413                handle_certificate_result.expect("handle_certificate_result should be Some")
414            }
415            _ => match validator.fault_type {
416                FaultType::DontSendConfirmVote | FaultType::DontProcessValidated
417                    if T::KIND == CertificateKind::Validated =>
418                {
419                    Err(NodeError::ClientIoError {
420                        error: "refusing to confirm".to_string(),
421                    })
422                }
423                FaultType::Honest
424                | FaultType::DontSendConfirmVote
425                | FaultType::DontProcessValidated
426                | FaultType::Malicious
427                | FaultType::DontSendValidateVote => {
428                    handle_certificate_result.expect("handle_certificate_result should be Some")
429                }
430                FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
431                    error: "offline".to_string(),
432                }),
433            },
434        }
435    }
436
437    async fn do_handle_certificate<T: ProcessableCertificate>(
438        self,
439        certificate: GenericCertificate<T>,
440        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
441    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
442        let mut validator = self.client.lock().await;
443        let result = self
444            .do_handle_certificate_internal(certificate, &mut validator)
445            .await;
446        sender.send(result)
447    }
448
449    async fn do_handle_chain_info_query(
450        self,
451        query: ChainInfoQuery,
452        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
453    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
454        let validator = self.client.lock().await;
455        let result = if validator.fault_type == FaultType::Offline {
456            Err(NodeError::ClientIoError {
457                error: "offline".to_string(),
458            })
459        } else {
460            validator
461                .state
462                .handle_chain_info_query(query)
463                .await
464                .map_err(Into::into)
465        };
466        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
467        sender.send(result.map(|(info, _actions)| info))
468    }
469
470    async fn do_subscribe(
471        self,
472        chains: Vec<ChainId>,
473        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
474    ) -> Result<(), Result<NotificationStream, NodeError>> {
475        let validator = self.client.lock().await;
476        let rx = validator.notifier.subscribe(chains);
477        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
478        sender.send(Ok(stream))
479    }
480
481    async fn do_upload_blob(
482        self,
483        content: BlobContent,
484        sender: oneshot::Sender<Result<BlobId, NodeError>>,
485    ) -> Result<(), Result<BlobId, NodeError>> {
486        let validator = self.client.lock().await;
487        let blob = Blob::new(content);
488        let id = blob.id();
489        let storage = validator.state.storage_client();
490        let result = match storage.maybe_write_blobs(&[blob]).await {
491            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
492            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
493            Err(error) => Err(error.into()),
494        };
495        sender.send(result)
496    }
497
498    async fn do_download_blob(
499        self,
500        blob_id: BlobId,
501        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
502    ) -> Result<(), Result<BlobContent, NodeError>> {
503        let validator = self.client.lock().await;
504        let blob = validator
505            .state
506            .storage_client()
507            .read_blob(blob_id)
508            .await
509            .map_err(Into::into);
510        let blob = match blob {
511            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
512            Err(error) => Err(error),
513        };
514        sender.send(blob.map(|blob| blob.into_content()))
515    }
516
517    async fn do_download_pending_blob(
518        self,
519        chain_id: ChainId,
520        blob_id: BlobId,
521        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
522    ) -> Result<(), Result<BlobContent, NodeError>> {
523        let validator = self.client.lock().await;
524        let result = validator
525            .state
526            .download_pending_blob(chain_id, blob_id)
527            .await
528            .map_err(Into::into);
529        sender.send(result.map(|blob| blob.into_content()))
530    }
531
532    async fn do_handle_pending_blob(
533        self,
534        chain_id: ChainId,
535        blob: BlobContent,
536        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
537    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
538        let validator = self.client.lock().await;
539        let result = validator
540            .state
541            .handle_pending_blob(chain_id, Blob::new(blob))
542            .await
543            .map_err(Into::into);
544        sender.send(result)
545    }
546
547    async fn do_download_certificate(
548        self,
549        hash: CryptoHash,
550        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
551    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
552        let validator = self.client.lock().await;
553        let certificate = validator
554            .state
555            .storage_client()
556            .read_certificate(hash)
557            .await
558            .map_err(Into::into);
559
560        let certificate = match certificate {
561            Err(error) => Err(error),
562            Ok(entry) => match entry {
563                Some(certificate) => Ok(certificate),
564                None => {
565                    panic!("Missing certificate: {hash}");
566                }
567            },
568        };
569
570        sender.send(certificate)
571    }
572
573    async fn do_download_certificates(
574        self,
575        hashes: Vec<CryptoHash>,
576        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
577    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
578        let validator = self.client.lock().await;
579        let certificates = validator
580            .state
581            .storage_client()
582            .read_certificates(hashes.clone())
583            .await
584            .map_err(Into::into);
585
586        let certificates = match certificates {
587            Err(error) => Err(error),
588            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
589                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
590                ResultReadCertificates::InvalidHashes(hashes) => {
591                    panic!("Missing certificates: {:?}", hashes)
592                }
593            },
594        };
595
596        sender.send(certificates)
597    }
598
599    async fn do_blob_last_used_by(
600        self,
601        blob_id: BlobId,
602        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
603    ) -> Result<(), Result<CryptoHash, NodeError>> {
604        let validator = self.client.lock().await;
605        let blob_state = validator
606            .state
607            .storage_client()
608            .read_blob_state(blob_id)
609            .await
610            .map_err(Into::into);
611        let certificate_hash = match blob_state {
612            Err(err) => Err(err),
613            Ok(blob_state) => match blob_state {
614                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
615                Some(blob_state) => blob_state
616                    .last_used_by
617                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
618            },
619        };
620
621        sender.send(certificate_hash)
622    }
623
624    async fn do_missing_blob_ids(
625        self,
626        blob_ids: Vec<BlobId>,
627        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
628    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
629        let validator = self.client.lock().await;
630        let missing_blob_ids = validator
631            .state
632            .storage_client()
633            .missing_blobs(&blob_ids)
634            .await
635            .map_err(Into::into);
636        sender.send(missing_blob_ids)
637    }
638}
639
640#[derive(Clone)]
641pub struct NodeProvider<S>(BTreeMap<ValidatorPublicKey, Arc<Mutex<LocalValidator<S>>>>)
642where
643    S: Storage;
644
645impl<S> ValidatorNodeProvider for NodeProvider<S>
646where
647    S: Storage + Clone + Send + Sync + 'static,
648{
649    type Node = LocalValidatorClient<S>;
650
651    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
652        unimplemented!()
653    }
654
655    fn make_nodes_from_list<A>(
656        &self,
657        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
658    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
659    where
660        A: AsRef<str>,
661    {
662        Ok(validators
663            .into_iter()
664            .map(|(public_key, address)| {
665                self.0
666                    .get(&public_key)
667                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
668                        address: address.as_ref().to_string(),
669                    })
670                    .cloned()
671                    .map(|client| (public_key, LocalValidatorClient { public_key, client }))
672            })
673            .collect::<Result<Vec<_>, _>>()?
674            .into_iter())
675    }
676}
677
678impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
679where
680    S: Storage,
681{
682    fn from_iter<T>(iter: T) -> Self
683    where
684        T: IntoIterator<Item = LocalValidatorClient<S>>,
685    {
686        let destructure =
687            |validator: LocalValidatorClient<S>| (validator.public_key, validator.client);
688        Self(iter.into_iter().map(destructure).collect())
689    }
690}
691
692// NOTE:
693// * To communicate with a quorum of validators, chain clients iterate over a copy of
694// `validator_clients` to spawn I/O tasks.
695// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
696// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
697// communicate with.
698#[allow(dead_code)]
699pub struct TestBuilder<B: StorageBuilder> {
700    storage_builder: B,
701    pub initial_committee: Committee,
702    admin_description: Option<ChainDescription>,
703    network_description: Option<NetworkDescription>,
704    genesis_storage_builder: GenesisStorageBuilder,
705    validator_clients: Vec<LocalValidatorClient<B::Storage>>,
706    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
707    chain_client_storages: Vec<B::Storage>,
708    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
709    pub signer: InMemorySigner,
710}
711
712#[async_trait]
713pub trait StorageBuilder {
714    type Storage: Storage + Clone + Send + Sync + 'static;
715
716    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
717
718    fn clock(&self) -> &TestClock;
719}
720
721#[derive(Default)]
722struct GenesisStorageBuilder {
723    accounts: Vec<GenesisAccount>,
724}
725
726struct GenesisAccount {
727    description: ChainDescription,
728    public_key: AccountPublicKey,
729}
730
731impl GenesisStorageBuilder {
732    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
733        self.accounts.push(GenesisAccount {
734            description,
735            public_key,
736        })
737    }
738
739    async fn build<S>(&self, storage: S) -> S
740    where
741        S: Storage + Clone + Send + Sync + 'static,
742    {
743        for account in &self.accounts {
744            storage
745                .create_chain(account.description.clone())
746                .await
747                .unwrap();
748        }
749        storage
750    }
751}
752
753pub type ChainClient<S> =
754    crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
755
756impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
757    /// Reads the hashed certificate values in descending order from the given hash.
758    pub async fn read_confirmed_blocks_downward(
759        &self,
760        from: CryptoHash,
761        limit: u32,
762    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
763        let mut hash = Some(from);
764        let mut values = Vec::new();
765        for _ in 0..limit {
766            let Some(next_hash) = hash else {
767                break;
768            };
769            let value = self.read_confirmed_block(next_hash).await?;
770            hash = value.block().header.previous_block_hash;
771            values.push(value);
772        }
773        Ok(values)
774    }
775}
776
777impl<B> TestBuilder<B>
778where
779    B: StorageBuilder,
780{
781    pub async fn new(
782        mut storage_builder: B,
783        count: usize,
784        with_faulty_validators: usize,
785        mut signer: InMemorySigner,
786    ) -> Result<Self, anyhow::Error> {
787        let mut validators = Vec::new();
788        for _ in 0..count {
789            let validator_keypair = ValidatorKeypair::generate();
790            let account_public_key = signer.generate_new();
791            validators.push((validator_keypair, account_public_key));
792        }
793        let for_committee = validators
794            .iter()
795            .map(|(validating, account)| (validating.public_key, *account))
796            .collect::<Vec<_>>();
797        let initial_committee = Committee::make_simple(for_committee);
798        let mut validator_clients = Vec::new();
799        let mut validator_storages = HashMap::new();
800        let mut faulty_validators = HashSet::new();
801        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
802            let validator_public_key = validator_keypair.public_key;
803            let storage = storage_builder.build().await?;
804            let state = WorkerState::new(
805                format!("Node {}", i),
806                Some(validator_keypair.secret_key),
807                storage.clone(),
808                NonZeroUsize::new(100).expect("Chain worker limit should not be zero"),
809            )
810            .with_allow_inactive_chains(false)
811            .with_allow_messages_from_deprecated_epochs(false);
812            let validator = LocalValidatorClient::new(validator_public_key, state);
813            if i < with_faulty_validators {
814                faulty_validators.insert(validator_public_key);
815                validator.set_fault_type(FaultType::Malicious).await;
816            }
817            validator_clients.push(validator);
818            validator_storages.insert(validator_public_key, storage);
819        }
820        tracing::info!(
821            "Test will use the following faulty validators: {:?}",
822            faulty_validators
823        );
824        Ok(Self {
825            storage_builder,
826            initial_committee,
827            admin_description: None,
828            network_description: None,
829            genesis_storage_builder: GenesisStorageBuilder::default(),
830            validator_clients,
831            validator_storages,
832            chain_client_storages: Vec::new(),
833            chain_owners: BTreeMap::new(),
834            signer,
835        })
836    }
837
838    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
839        let validators = self.initial_committee.validators().clone();
840        self.initial_committee = Committee::new(validators, policy);
841        self
842    }
843
844    pub async fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
845        let mut faulty_validators = vec![];
846        for index in indexes.as_ref() {
847            let validator = &mut self.validator_clients[*index];
848            validator.set_fault_type(fault_type).await;
849            faulty_validators.push(validator.public_key);
850        }
851        tracing::info!(
852            "Making the following validators {:?}: {:?}",
853            fault_type,
854            faulty_validators
855        );
856    }
857
858    /// Creates the root chain with the given `index`, and returns a client for it.
859    ///
860    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
861    /// is automatically set to zero.
862    pub async fn add_root_chain(
863        &mut self,
864        index: u32,
865        balance: Amount,
866    ) -> anyhow::Result<ChainClient<B::Storage>> {
867        // Make sure the admin chain is initialized.
868        if self.admin_description.is_none() && index != 0 {
869            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
870        }
871        let origin = ChainOrigin::Root(index);
872        let public_key = self.signer.generate_new();
873        let open_chain_config = InitialChainConfig {
874            ownership: ChainOwnership::single(public_key.into()),
875            epoch: Epoch(0),
876            min_active_epoch: Epoch(0),
877            max_active_epoch: Epoch(0),
878            balance,
879            application_permissions: ApplicationPermissions::default(),
880        };
881        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
882        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
883        if index == 0 {
884            self.admin_description = Some(description.clone());
885            self.network_description = Some(NetworkDescription {
886                admin_chain_id: description.id(),
887                // dummy values to fill the description
888                genesis_config_hash: CryptoHash::test_hash("genesis config"),
889                genesis_timestamp: Timestamp::from(0),
890                genesis_committee_blob_hash: committee_blob.id().hash,
891                name: "test network".to_string(),
892            });
893        }
894        // Remember what's in the genesis store for future clients to join.
895        self.genesis_storage_builder
896            .add(description.clone(), public_key);
897
898        let network_description = self.network_description.as_ref().unwrap();
899
900        for validator in &self.validator_clients {
901            let storage = self
902                .validator_storages
903                .get_mut(&validator.public_key)
904                .unwrap();
905            storage
906                .write_network_description(network_description)
907                .await
908                .expect("writing the NetworkDescription should succeed");
909            storage
910                .write_blob(&committee_blob)
911                .await
912                .expect("writing a blob should succeed");
913            if validator.fault_type().await == FaultType::Malicious {
914                let origin = description.origin();
915                let config = InitialChainConfig {
916                    balance: Amount::ZERO,
917                    ..description.config().clone()
918                };
919                storage
920                    .create_chain(ChainDescription::new(origin, config, Timestamp::from(0)))
921                    .await
922                    .unwrap();
923            } else {
924                storage.create_chain(description.clone()).await.unwrap();
925            }
926        }
927        for storage in self.chain_client_storages.iter_mut() {
928            storage.create_chain(description.clone()).await.unwrap();
929        }
930        let chain_id = description.id();
931        self.chain_owners.insert(chain_id, public_key.into());
932        self.make_client(chain_id, None, BlockHeight::ZERO).await
933    }
934
935    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
936        let mut result = Vec::new();
937        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
938            assert_eq!(
939                genesis_account.description.origin(),
940                ChainOrigin::Root(i as u32)
941            );
942            result.push((
943                genesis_account.public_key,
944                genesis_account.description.config().balance,
945            ));
946        }
947        result
948    }
949
950    pub fn admin_id(&self) -> ChainId {
951        self.admin_description
952            .as_ref()
953            .expect("admin chain not initialized")
954            .id()
955    }
956
957    pub fn admin_description(&self) -> Option<&ChainDescription> {
958        self.admin_description.as_ref()
959    }
960
961    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
962        self.validator_clients.iter().cloned().collect()
963    }
964
965    pub fn node(&mut self, index: usize) -> &mut LocalValidatorClient<B::Storage> {
966        &mut self.validator_clients[index]
967    }
968
969    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
970        let storage = self.storage_builder.build().await?;
971        let network_description = self.network_description.as_ref().unwrap();
972        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
973        storage
974            .write_network_description(network_description)
975            .await
976            .expect("writing the NetworkDescription should succeed");
977        storage
978            .write_blob(&committee_blob)
979            .await
980            .expect("writing a blob should succeed");
981        Ok(self.genesis_storage_builder.build(storage).await)
982    }
983
984    pub async fn make_client(
985        &mut self,
986        chain_id: ChainId,
987        block_hash: Option<CryptoHash>,
988        block_height: BlockHeight,
989    ) -> anyhow::Result<ChainClient<B::Storage>> {
990        // Note that new clients are only given the genesis store: they must figure out
991        // the rest by asking validators.
992        let storage = self.make_storage().await?;
993        self.chain_client_storages.push(storage.clone());
994        let client = Arc::new(Client::new(
995            crate::environment::Impl {
996                network: self.make_node_provider(),
997                storage,
998                signer: self.signer.clone(),
999            },
1000            self.admin_id(),
1001            false,
1002            [chain_id],
1003            format!("Client node for {:.8}", chain_id),
1004            NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
1005            ChainClientOptions::test_default(),
1006        ));
1007        Ok(client.create_chain_client(
1008            chain_id,
1009            block_hash,
1010            block_height,
1011            None,
1012            self.chain_owners.get(&chain_id).copied(),
1013        ))
1014    }
1015
1016    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1017    pub async fn check_that_validators_have_certificate(
1018        &self,
1019        chain_id: ChainId,
1020        block_height: BlockHeight,
1021        target_count: usize,
1022    ) -> Option<ConfirmedBlockCertificate> {
1023        let query =
1024            ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(BlockHeightRange {
1025                start: block_height,
1026                limit: Some(1),
1027            });
1028        let mut count = 0;
1029        let mut certificate = None;
1030        for validator in self.validator_clients.clone() {
1031            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1032                if response.check(validator.public_key).is_ok() {
1033                    let ChainInfo {
1034                        mut requested_sent_certificate_hashes,
1035                        ..
1036                    } = *response.info;
1037                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1038                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1039                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1040                            if cert.inner().block().header.chain_id == chain_id
1041                                && cert.inner().block().header.height == block_height
1042                            {
1043                                cert.check(&self.initial_committee).unwrap();
1044                                count += 1;
1045                                certificate = Some(cert);
1046                            }
1047                        }
1048                    }
1049                }
1050            }
1051        }
1052        assert!(count >= target_count);
1053        certificate
1054    }
1055
1056    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1057    /// in the expected round.
1058    pub async fn check_that_validators_are_in_round(
1059        &self,
1060        chain_id: ChainId,
1061        block_height: BlockHeight,
1062        round: Round,
1063        target_count: usize,
1064    ) {
1065        let query = ChainInfoQuery::new(chain_id);
1066        let mut count = 0;
1067        for validator in self.validator_clients.clone() {
1068            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1069                if response.info.manager.current_round == round
1070                    && response.info.next_block_height == block_height
1071                    && response.check(validator.public_key).is_ok()
1072                {
1073                    count += 1;
1074                }
1075            }
1076        }
1077        assert!(count >= target_count);
1078    }
1079
1080    /// Panics if any validator has a nonempty outbox for the given chain.
1081    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1082        for validator in &self.validator_clients {
1083            let guard = validator.client.lock().await;
1084            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1085            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1086        }
1087    }
1088}
1089
1090#[cfg(feature = "rocksdb")]
1091/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1092static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1093
1094#[derive(Default)]
1095pub struct MemoryStorageBuilder {
1096    namespace: String,
1097    instance_counter: usize,
1098    wasm_runtime: Option<WasmRuntime>,
1099    clock: TestClock,
1100}
1101
1102#[async_trait]
1103impl StorageBuilder for MemoryStorageBuilder {
1104    type Storage = DbStorage<MemoryStore, TestClock>;
1105
1106    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1107        self.instance_counter += 1;
1108        let config = MemoryStore::new_test_config().await?;
1109        if self.namespace.is_empty() {
1110            self.namespace = generate_test_namespace();
1111        }
1112        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1113        Ok(
1114            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1115                .await?,
1116        )
1117    }
1118
1119    fn clock(&self) -> &TestClock {
1120        &self.clock
1121    }
1122}
1123
1124impl MemoryStorageBuilder {
1125    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1126    /// applications.
1127    #[allow(dead_code)]
1128    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1129        MemoryStorageBuilder {
1130            wasm_runtime: wasm_runtime.into(),
1131            ..MemoryStorageBuilder::default()
1132        }
1133    }
1134}
1135
1136#[cfg(feature = "rocksdb")]
1137pub struct RocksDbStorageBuilder {
1138    namespace: String,
1139    instance_counter: usize,
1140    wasm_runtime: Option<WasmRuntime>,
1141    clock: TestClock,
1142    _permit: SemaphorePermit<'static>,
1143}
1144
1145#[cfg(feature = "rocksdb")]
1146impl RocksDbStorageBuilder {
1147    pub async fn new() -> Self {
1148        RocksDbStorageBuilder {
1149            namespace: String::new(),
1150            instance_counter: 0,
1151            wasm_runtime: None,
1152            clock: TestClock::default(),
1153            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1154        }
1155    }
1156
1157    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1158    /// applications.
1159    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1160    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1161        RocksDbStorageBuilder {
1162            wasm_runtime: wasm_runtime.into(),
1163            ..RocksDbStorageBuilder::new().await
1164        }
1165    }
1166}
1167
1168#[cfg(feature = "rocksdb")]
1169#[async_trait]
1170impl StorageBuilder for RocksDbStorageBuilder {
1171    type Storage = DbStorage<RocksDbStore, TestClock>;
1172
1173    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1174        self.instance_counter += 1;
1175        let config = RocksDbStore::new_test_config().await?;
1176        if self.namespace.is_empty() {
1177            self.namespace = generate_test_namespace();
1178        }
1179        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1180        Ok(
1181            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1182                .await?,
1183        )
1184    }
1185
1186    fn clock(&self) -> &TestClock {
1187        &self.clock
1188    }
1189}
1190
1191#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1192#[derive(Default)]
1193pub struct ServiceStorageBuilder {
1194    namespace: String,
1195    instance_counter: usize,
1196    wasm_runtime: Option<WasmRuntime>,
1197    clock: TestClock,
1198}
1199
1200#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1201impl ServiceStorageBuilder {
1202    /// Creates a `ServiceStorage`.
1203    pub async fn new() -> Self {
1204        Self::with_wasm_runtime(None).await
1205    }
1206
1207    /// Creates a `ServiceStorage` with the given Wasm runtime.
1208    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1209        ServiceStorageBuilder {
1210            wasm_runtime: wasm_runtime.into(),
1211            ..ServiceStorageBuilder::default()
1212        }
1213    }
1214}
1215
1216#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1217#[async_trait]
1218impl StorageBuilder for ServiceStorageBuilder {
1219    type Storage = DbStorage<ServiceStoreClient, TestClock>;
1220
1221    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1222        self.instance_counter += 1;
1223        let config = ServiceStoreClient::new_test_config().await?;
1224        if self.namespace.is_empty() {
1225            self.namespace = generate_test_namespace();
1226        }
1227        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1228        Ok(
1229            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1230                .await?,
1231        )
1232    }
1233
1234    fn clock(&self) -> &TestClock {
1235        &self.clock
1236    }
1237}
1238
1239#[cfg(feature = "dynamodb")]
1240#[derive(Default)]
1241pub struct DynamoDbStorageBuilder {
1242    namespace: String,
1243    instance_counter: usize,
1244    wasm_runtime: Option<WasmRuntime>,
1245    clock: TestClock,
1246}
1247
1248#[cfg(feature = "dynamodb")]
1249impl DynamoDbStorageBuilder {
1250    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1251    /// applications.
1252    #[allow(dead_code)]
1253    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1254        DynamoDbStorageBuilder {
1255            wasm_runtime: wasm_runtime.into(),
1256            ..DynamoDbStorageBuilder::default()
1257        }
1258    }
1259}
1260
1261#[cfg(feature = "dynamodb")]
1262#[async_trait]
1263impl StorageBuilder for DynamoDbStorageBuilder {
1264    type Storage = DbStorage<DynamoDbStore, TestClock>;
1265
1266    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1267        self.instance_counter += 1;
1268        let config = DynamoDbStore::new_test_config().await?;
1269        if self.namespace.is_empty() {
1270            self.namespace = generate_test_namespace();
1271        }
1272        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1273        Ok(
1274            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1275                .await?,
1276        )
1277    }
1278
1279    fn clock(&self) -> &TestClock {
1280        &self.clock
1281    }
1282}
1283
1284#[cfg(feature = "scylladb")]
1285#[derive(Default)]
1286pub struct ScyllaDbStorageBuilder {
1287    namespace: String,
1288    instance_counter: usize,
1289    wasm_runtime: Option<WasmRuntime>,
1290    clock: TestClock,
1291}
1292
1293#[cfg(feature = "scylladb")]
1294impl ScyllaDbStorageBuilder {
1295    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1296    /// applications.
1297    #[allow(dead_code)]
1298    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1299        ScyllaDbStorageBuilder {
1300            wasm_runtime: wasm_runtime.into(),
1301            ..ScyllaDbStorageBuilder::default()
1302        }
1303    }
1304}
1305
1306#[cfg(feature = "scylladb")]
1307#[async_trait]
1308impl StorageBuilder for ScyllaDbStorageBuilder {
1309    type Storage = DbStorage<ScyllaDbStore, TestClock>;
1310
1311    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1312        self.instance_counter += 1;
1313        let config = ScyllaDbStore::new_test_config().await?;
1314        if self.namespace.is_empty() {
1315            self.namespace = generate_test_namespace();
1316        }
1317        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1318        Ok(
1319            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1320                .await?,
1321        )
1322    }
1323
1324    fn clock(&self) -> &TestClock {
1325        &self.clock
1326    }
1327}