linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    sync::Arc,
7    time::Duration,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbDatabase,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{ChainClientOptions, Client},
52    data_types::*,
53    node::{
54        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55        ValidatorNodeProvider,
56    },
57    notifier::ChannelNotifier,
58    worker::{Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63    Honest,
64    Offline,
65    OfflineWithInfo,
66    NoChains,
67    DontSendConfirmVote,
68    DontProcessValidated,
69    DontSendValidateVote,
70}
71
72/// A validator used for testing. "Faulty" validators ignore block proposals (but not
73/// certificates or info queries) and have the wrong initial balance for all chains.
74///
75/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
76/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
77/// tasks if the client stopped waiting for the response.
78struct LocalValidator<S>
79where
80    S: Storage,
81{
82    state: WorkerState<S>,
83    notifier: Arc<ChannelNotifier<Notification>>,
84}
85
86#[derive(Clone)]
87pub struct LocalValidatorClient<S>
88where
89    S: Storage,
90{
91    public_key: ValidatorPublicKey,
92    client: Arc<Mutex<LocalValidator<S>>>,
93    fault_type: FaultType,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98    S: Storage + Clone + Send + Sync + 'static,
99{
100    type NotificationStream = NotificationStream;
101
102    async fn handle_block_proposal(
103        &self,
104        proposal: BlockProposal,
105    ) -> Result<ChainInfoResponse, NodeError> {
106        self.spawn_and_receive(move |validator, sender| {
107            validator.do_handle_block_proposal(proposal, sender)
108        })
109        .await
110    }
111
112    async fn handle_lite_certificate(
113        &self,
114        certificate: LiteCertificate<'_>,
115        _delivery: CrossChainMessageDelivery,
116    ) -> Result<ChainInfoResponse, NodeError> {
117        let certificate = certificate.cloned();
118        self.spawn_and_receive(move |validator, sender| {
119            validator.do_handle_lite_certificate(certificate, sender)
120        })
121        .await
122    }
123
124    async fn handle_timeout_certificate(
125        &self,
126        certificate: GenericCertificate<Timeout>,
127    ) -> Result<ChainInfoResponse, NodeError> {
128        self.spawn_and_receive(move |validator, sender| {
129            validator.do_handle_certificate(certificate, sender)
130        })
131        .await
132    }
133
134    async fn handle_validated_certificate(
135        &self,
136        certificate: GenericCertificate<ValidatedBlock>,
137    ) -> Result<ChainInfoResponse, NodeError> {
138        self.spawn_and_receive(move |validator, sender| {
139            validator.do_handle_certificate(certificate, sender)
140        })
141        .await
142    }
143
144    async fn handle_confirmed_certificate(
145        &self,
146        certificate: GenericCertificate<ConfirmedBlock>,
147        _delivery: CrossChainMessageDelivery,
148    ) -> Result<ChainInfoResponse, NodeError> {
149        self.spawn_and_receive(move |validator, sender| {
150            validator.do_handle_certificate(certificate, sender)
151        })
152        .await
153    }
154
155    async fn handle_chain_info_query(
156        &self,
157        query: ChainInfoQuery,
158    ) -> Result<ChainInfoResponse, NodeError> {
159        self.spawn_and_receive(move |validator, sender| {
160            validator.do_handle_chain_info_query(query, sender)
161        })
162        .await
163    }
164
165    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167            .await
168    }
169
170    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171        Ok(Default::default())
172    }
173
174    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175        Ok(self
176            .client
177            .lock()
178            .await
179            .state
180            .storage_client()
181            .read_network_description()
182            .await
183            .transpose()
184            .ok_or(NodeError::ViewError {
185                error: "missing NetworkDescription".to_owned(),
186            })??)
187    }
188
189    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
190        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
191            .await
192    }
193
194    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
195        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
196            .await
197    }
198
199    async fn download_pending_blob(
200        &self,
201        chain_id: ChainId,
202        blob_id: BlobId,
203    ) -> Result<BlobContent, NodeError> {
204        self.spawn_and_receive(move |validator, sender| {
205            validator.do_download_pending_blob(chain_id, blob_id, sender)
206        })
207        .await
208    }
209
210    async fn handle_pending_blob(
211        &self,
212        chain_id: ChainId,
213        blob: BlobContent,
214    ) -> Result<ChainInfoResponse, NodeError> {
215        self.spawn_and_receive(move |validator, sender| {
216            validator.do_handle_pending_blob(chain_id, blob, sender)
217        })
218        .await
219    }
220
221    async fn download_certificate(
222        &self,
223        hash: CryptoHash,
224    ) -> Result<ConfirmedBlockCertificate, NodeError> {
225        self.spawn_and_receive(move |validator, sender| {
226            validator.do_download_certificate(hash, sender)
227        })
228        .await
229    }
230
231    async fn download_certificates(
232        &self,
233        hashes: Vec<CryptoHash>,
234    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
235        self.spawn_and_receive(move |validator, sender| {
236            validator.do_download_certificates(hashes, sender)
237        })
238        .await
239    }
240
241    async fn download_certificates_by_heights(
242        &self,
243        chain_id: ChainId,
244        heights: Vec<BlockHeight>,
245    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
246        self.spawn_and_receive(move |validator, sender| {
247            validator.do_download_certificates_by_heights(chain_id, heights, sender)
248        })
249        .await
250    }
251
252    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
253        self.spawn_and_receive(move |validator, sender| {
254            validator.do_blob_last_used_by(blob_id, sender)
255        })
256        .await
257    }
258
259    async fn blob_last_used_by_certificate(
260        &self,
261        blob_id: BlobId,
262    ) -> Result<ConfirmedBlockCertificate, NodeError> {
263        self.spawn_and_receive(move |validator, sender| {
264            validator.do_blob_last_used_by_certificate(blob_id, sender)
265        })
266        .await
267    }
268
269    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
270        self.spawn_and_receive(move |validator, sender| {
271            validator.do_missing_blob_ids(blob_ids, sender)
272        })
273        .await
274    }
275}
276
277impl<S> LocalValidatorClient<S>
278where
279    S: Storage + Clone + Send + Sync + 'static,
280{
281    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
282        let client = LocalValidator {
283            state,
284            notifier: Arc::new(ChannelNotifier::default()),
285        };
286        Self {
287            public_key,
288            client: Arc::new(Mutex::new(client)),
289            fault_type: FaultType::Honest,
290        }
291    }
292
293    pub fn name(&self) -> ValidatorPublicKey {
294        self.public_key
295    }
296
297    fn set_fault_type(&mut self, fault_type: FaultType) {
298        self.fault_type = fault_type;
299    }
300
301    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
302    pub async fn chain_info_with_manager_values(
303        &mut self,
304        chain_id: ChainId,
305    ) -> Result<Box<ChainInfo>, NodeError> {
306        let query = ChainInfoQuery::new(chain_id).with_manager_values();
307        let response = self.handle_chain_info_query(query).await?;
308        Ok(response.info)
309    }
310
311    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
312    /// Returns the value that the future puts into the sender.
313    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
314    where
315        T: Send + 'static,
316        R: Future<Output = Result<(), T>> + Send,
317        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
318    {
319        let validator = self.clone();
320        let (sender, receiver) = oneshot::channel();
321        tokio::spawn(async move {
322            if f(validator, sender).await.is_err() {
323                tracing::debug!("result could not be sent");
324            }
325        });
326        receiver.await.unwrap()
327    }
328
329    async fn do_handle_block_proposal(
330        self,
331        proposal: BlockProposal,
332        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
333    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
334        let result = match self.fault_type {
335            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
336                error: "offline".to_string(),
337            }),
338            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
339            FaultType::DontSendValidateVote
340            | FaultType::Honest
341            | FaultType::DontSendConfirmVote
342            | FaultType::DontProcessValidated => {
343                let result = self
344                    .client
345                    .lock()
346                    .await
347                    .state
348                    .handle_block_proposal(proposal)
349                    .await
350                    .map_err(Into::into);
351                if self.fault_type == FaultType::DontSendValidateVote {
352                    Err(NodeError::ClientIoError {
353                        error: "refusing to validate".to_string(),
354                    })
355                } else {
356                    result
357                }
358            }
359        };
360        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
361        sender.send(result.map(|(info, _actions)| info))
362    }
363
364    async fn do_handle_lite_certificate(
365        self,
366        certificate: LiteCertificate<'_>,
367        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
368    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
369        let client = self.client.clone();
370        let mut validator = client.lock().await;
371        let result = async move {
372            match validator.state.full_certificate(certificate).await? {
373                Either::Left(confirmed) => {
374                    self.do_handle_certificate_internal(confirmed, &mut validator)
375                        .await
376                }
377                Either::Right(validated) => {
378                    self.do_handle_certificate_internal(validated, &mut validator)
379                        .await
380                }
381            }
382        }
383        .await;
384        sender.send(result)
385    }
386
387    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
388        &self,
389        certificate: GenericCertificate<T>,
390        validator: &mut MutexGuard<'_, LocalValidator<S>>,
391    ) -> Result<ChainInfoResponse, NodeError> {
392        match self.fault_type {
393            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
394                Err(NodeError::ClientIoError {
395                    error: "refusing to process validated block".to_string(),
396                })
397            }
398            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
399            FaultType::Honest
400            | FaultType::DontSendConfirmVote
401            | FaultType::DontProcessValidated
402            | FaultType::DontSendValidateVote => {
403                let result = validator
404                    .state
405                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
406                    .await
407                    .map_err(Into::into);
408                if T::KIND == CertificateKind::Validated
409                    && self.fault_type == FaultType::DontSendConfirmVote
410                {
411                    Err(NodeError::ClientIoError {
412                        error: "refusing to confirm".to_string(),
413                    })
414                } else {
415                    result
416                }
417            }
418            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
419                error: "offline".to_string(),
420            }),
421        }
422    }
423
424    async fn do_handle_certificate<T: ProcessableCertificate>(
425        self,
426        certificate: GenericCertificate<T>,
427        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
428    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
429        let mut validator = self.client.lock().await;
430        let result = self
431            .do_handle_certificate_internal(certificate, &mut validator)
432            .await;
433        sender.send(result)
434    }
435
436    async fn do_handle_chain_info_query(
437        self,
438        query: ChainInfoQuery,
439        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
440    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
441        let validator = self.client.lock().await;
442        let result = match self.fault_type {
443            FaultType::Offline => Err(NodeError::ClientIoError {
444                error: "offline".to_string(),
445            }),
446            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
447            FaultType::Honest
448            | FaultType::DontSendConfirmVote
449            | FaultType::DontProcessValidated
450            | FaultType::DontSendValidateVote
451            | FaultType::OfflineWithInfo => validator
452                .state
453                .handle_chain_info_query(query)
454                .await
455                .map_err(Into::into),
456        };
457        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
458        sender.send(result.map(|(info, _actions)| info))
459    }
460
461    async fn do_subscribe(
462        self,
463        chains: Vec<ChainId>,
464        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
465    ) -> Result<(), Result<NotificationStream, NodeError>> {
466        let validator = self.client.lock().await;
467        let rx = validator.notifier.subscribe(chains);
468        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
469        sender.send(Ok(stream))
470    }
471
472    async fn do_upload_blob(
473        self,
474        content: BlobContent,
475        sender: oneshot::Sender<Result<BlobId, NodeError>>,
476    ) -> Result<(), Result<BlobId, NodeError>> {
477        let validator = self.client.lock().await;
478        let blob = Blob::new(content);
479        let id = blob.id();
480        let storage = validator.state.storage_client();
481        let result = match storage.maybe_write_blobs(&[blob]).await {
482            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
483            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
484            Err(error) => Err(error.into()),
485        };
486        sender.send(result)
487    }
488
489    async fn do_download_blob(
490        self,
491        blob_id: BlobId,
492        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
493    ) -> Result<(), Result<BlobContent, NodeError>> {
494        let validator = self.client.lock().await;
495        let blob = validator
496            .state
497            .storage_client()
498            .read_blob(blob_id)
499            .await
500            .map_err(Into::into);
501        let blob = match blob {
502            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
503            Err(error) => Err(error),
504        };
505        sender.send(blob.map(|blob| blob.into_content()))
506    }
507
508    async fn do_download_pending_blob(
509        self,
510        chain_id: ChainId,
511        blob_id: BlobId,
512        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
513    ) -> Result<(), Result<BlobContent, NodeError>> {
514        let validator = self.client.lock().await;
515        let result = validator
516            .state
517            .download_pending_blob(chain_id, blob_id)
518            .await
519            .map_err(Into::into);
520        sender.send(result.map(|blob| blob.into_content()))
521    }
522
523    async fn do_handle_pending_blob(
524        self,
525        chain_id: ChainId,
526        blob: BlobContent,
527        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
528    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
529        let validator = self.client.lock().await;
530        let result = validator
531            .state
532            .handle_pending_blob(chain_id, Blob::new(blob))
533            .await
534            .map_err(Into::into);
535        sender.send(result)
536    }
537
538    async fn do_download_certificate(
539        self,
540        hash: CryptoHash,
541        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
542    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
543        let validator = self.client.lock().await;
544        let certificate = validator
545            .state
546            .storage_client()
547            .read_certificate(hash)
548            .await
549            .map_err(Into::into);
550
551        let certificate = match certificate {
552            Err(error) => Err(error),
553            Ok(entry) => match entry {
554                Some(certificate) => Ok(certificate),
555                None => {
556                    panic!("Missing certificate: {hash}");
557                }
558            },
559        };
560
561        sender.send(certificate)
562    }
563
564    async fn do_download_certificates(
565        self,
566        hashes: Vec<CryptoHash>,
567        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
568    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
569        let validator = self.client.lock().await;
570        let certificates = validator
571            .state
572            .storage_client()
573            .read_certificates(hashes.clone())
574            .await
575            .map_err(Into::into);
576
577        let certificates = match certificates {
578            Err(error) => Err(error),
579            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
580                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
581                ResultReadCertificates::InvalidHashes(hashes) => {
582                    panic!("Missing certificates: {:?}", hashes)
583                }
584            },
585        };
586
587        sender.send(certificates)
588    }
589
590    async fn do_download_certificates_by_heights(
591        self,
592        chain_id: ChainId,
593        heights: Vec<BlockHeight>,
594        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
595    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
596        // First, use do_handle_chain_info_query to get the certificate hashes
597        let (query_sender, query_receiver) = oneshot::channel();
598        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
599
600        let self_clone = self.clone();
601        self.do_handle_chain_info_query(query, query_sender)
602            .await
603            .expect("Failed to handle chain info query");
604
605        // Get the response from the chain info query
606        let chain_info_response = query_receiver.await.map_err(|_| {
607            Err(NodeError::ClientIoError {
608                error: "Failed to receive chain info response".to_string(),
609            })
610        })?;
611
612        let hashes = match chain_info_response {
613            Ok(response) => response.info.requested_sent_certificate_hashes,
614            Err(e) => {
615                return sender.send(Err(e));
616            }
617        };
618
619        // Now use do_download_certificates to get the actual certificates
620        let (cert_sender, cert_receiver) = oneshot::channel();
621        self_clone
622            .do_download_certificates(hashes, cert_sender)
623            .await?;
624
625        // Forward the result to the original sender
626        let result = cert_receiver.await.map_err(|_| {
627            Err(NodeError::ClientIoError {
628                error: "Failed to receive certificates".to_string(),
629            })
630        })?;
631
632        sender.send(result)
633    }
634
635    async fn do_blob_last_used_by(
636        self,
637        blob_id: BlobId,
638        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
639    ) -> Result<(), Result<CryptoHash, NodeError>> {
640        let validator = self.client.lock().await;
641        let blob_state = validator
642            .state
643            .storage_client()
644            .read_blob_state(blob_id)
645            .await
646            .map_err(Into::into);
647        let certificate_hash = match blob_state {
648            Err(err) => Err(err),
649            Ok(blob_state) => match blob_state {
650                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
651                Some(blob_state) => blob_state
652                    .last_used_by
653                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
654            },
655        };
656
657        sender.send(certificate_hash)
658    }
659
660    async fn do_blob_last_used_by_certificate(
661        self,
662        blob_id: BlobId,
663        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
664    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
665        match self.blob_last_used_by(blob_id).await {
666            Ok(cert_hash) => {
667                let cert = self.download_certificate(cert_hash).await;
668                sender.send(cert)
669            }
670            Err(err) => sender.send(Err(err)),
671        }
672    }
673
674    async fn do_missing_blob_ids(
675        self,
676        blob_ids: Vec<BlobId>,
677        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
678    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
679        let validator = self.client.lock().await;
680        let missing_blob_ids = validator
681            .state
682            .storage_client()
683            .missing_blobs(&blob_ids)
684            .await
685            .map_err(Into::into);
686        sender.send(missing_blob_ids)
687    }
688}
689
690#[derive(Clone)]
691pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
692where
693    S: Storage;
694
695impl<S> NodeProvider<S>
696where
697    S: Storage + Clone,
698{
699    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
700        self.0.lock().unwrap().clone()
701    }
702}
703
704impl<S> ValidatorNodeProvider for NodeProvider<S>
705where
706    S: Storage + Clone + Send + Sync + 'static,
707{
708    type Node = LocalValidatorClient<S>;
709
710    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
711        unimplemented!()
712    }
713
714    fn make_nodes_from_list<A>(
715        &self,
716        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
717    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
718    where
719        A: AsRef<str>,
720    {
721        let list = self.0.lock().unwrap();
722        Ok(validators
723            .into_iter()
724            .map(|(public_key, address)| {
725                list.iter()
726                    .find(|client| client.public_key == public_key)
727                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
728                        address: address.as_ref().to_string(),
729                    })
730                    .map(|client| (public_key, client.clone()))
731            })
732            .collect::<Result<Vec<_>, _>>()?
733            .into_iter())
734    }
735}
736
737impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
738where
739    S: Storage,
740{
741    fn from_iter<T>(iter: T) -> Self
742    where
743        T: IntoIterator<Item = LocalValidatorClient<S>>,
744    {
745        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
746    }
747}
748
749// NOTE:
750// * To communicate with a quorum of validators, chain clients iterate over a copy of
751// `validator_clients` to spawn I/O tasks.
752// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
753// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
754// communicate with.
755pub struct TestBuilder<B: StorageBuilder> {
756    storage_builder: B,
757    pub initial_committee: Committee,
758    admin_description: Option<ChainDescription>,
759    network_description: Option<NetworkDescription>,
760    genesis_storage_builder: GenesisStorageBuilder,
761    node_provider: NodeProvider<B::Storage>,
762    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
763    chain_client_storages: Vec<B::Storage>,
764    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
765    pub signer: InMemorySigner,
766}
767
768#[async_trait]
769pub trait StorageBuilder {
770    type Storage: Storage + Clone + Send + Sync + 'static;
771
772    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
773
774    fn clock(&self) -> &TestClock;
775}
776
777#[derive(Default)]
778struct GenesisStorageBuilder {
779    accounts: Vec<GenesisAccount>,
780}
781
782struct GenesisAccount {
783    description: ChainDescription,
784    public_key: AccountPublicKey,
785}
786
787impl GenesisStorageBuilder {
788    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
789        self.accounts.push(GenesisAccount {
790            description,
791            public_key,
792        })
793    }
794
795    async fn build<S>(&self, storage: S) -> S
796    where
797        S: Storage + Clone + Send + Sync + 'static,
798    {
799        for account in &self.accounts {
800            storage
801                .create_chain(account.description.clone())
802                .await
803                .unwrap();
804        }
805        storage
806    }
807}
808
809pub type ChainClient<S> =
810    crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
811
812impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
813    /// Reads the hashed certificate values in descending order from the given hash.
814    pub async fn read_confirmed_blocks_downward(
815        &self,
816        from: CryptoHash,
817        limit: u32,
818    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
819        let mut hash = Some(from);
820        let mut values = Vec::new();
821        for _ in 0..limit {
822            let Some(next_hash) = hash else {
823                break;
824            };
825            let value = self.read_confirmed_block(next_hash).await?;
826            hash = value.block().header.previous_block_hash;
827            values.push(value);
828        }
829        Ok(values)
830    }
831}
832
833impl<B> TestBuilder<B>
834where
835    B: StorageBuilder,
836{
837    pub async fn new(
838        mut storage_builder: B,
839        count: usize,
840        with_faulty_validators: usize,
841        mut signer: InMemorySigner,
842    ) -> Result<Self, anyhow::Error> {
843        let mut validators = Vec::new();
844        for _ in 0..count {
845            let validator_keypair = ValidatorKeypair::generate();
846            let account_public_key = signer.generate_new();
847            validators.push((validator_keypair, account_public_key));
848        }
849        let for_committee = validators
850            .iter()
851            .map(|(validating, account)| (validating.public_key, *account))
852            .collect::<Vec<_>>();
853        let initial_committee = Committee::make_simple(for_committee);
854        let mut validator_clients = Vec::new();
855        let mut validator_storages = HashMap::new();
856        let mut faulty_validators = HashSet::new();
857        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
858            let validator_public_key = validator_keypair.public_key;
859            let storage = storage_builder.build().await?;
860            let state = WorkerState::new(
861                format!("Node {}", i),
862                Some(validator_keypair.secret_key),
863                storage.clone(),
864            )
865            .with_allow_inactive_chains(false)
866            .with_allow_messages_from_deprecated_epochs(false);
867            let mut validator = LocalValidatorClient::new(validator_public_key, state);
868            if i < with_faulty_validators {
869                faulty_validators.insert(validator_public_key);
870                validator.set_fault_type(FaultType::NoChains);
871            }
872            validator_clients.push(validator);
873            validator_storages.insert(validator_public_key, storage);
874        }
875        tracing::info!(
876            "Test will use the following faulty validators: {:?}",
877            faulty_validators
878        );
879        Ok(Self {
880            storage_builder,
881            initial_committee,
882            admin_description: None,
883            network_description: None,
884            genesis_storage_builder: GenesisStorageBuilder::default(),
885            node_provider: NodeProvider::from_iter(validator_clients),
886            validator_storages,
887            chain_client_storages: Vec::new(),
888            chain_owners: BTreeMap::new(),
889            signer,
890        })
891    }
892
893    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
894        let validators = self.initial_committee.validators().clone();
895        self.initial_committee = Committee::new(validators, policy);
896        self
897    }
898
899    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
900        let mut faulty_validators = vec![];
901        let mut validator_clients = self.node_provider.0.lock().unwrap();
902        for index in indexes.as_ref() {
903            let validator = &mut validator_clients[*index];
904            validator.set_fault_type(fault_type);
905            faulty_validators.push(validator.public_key);
906        }
907        tracing::info!(
908            "Making the following validators {:?}: {:?}",
909            fault_type,
910            faulty_validators
911        );
912    }
913
914    /// Creates the root chain with the given `index`, and returns a client for it.
915    ///
916    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
917    /// is automatically set to zero.
918    pub async fn add_root_chain(
919        &mut self,
920        index: u32,
921        balance: Amount,
922    ) -> anyhow::Result<ChainClient<B::Storage>> {
923        // Make sure the admin chain is initialized.
924        if self.admin_description.is_none() && index != 0 {
925            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
926        }
927        let origin = ChainOrigin::Root(index);
928        let public_key = self.signer.generate_new();
929        let open_chain_config = InitialChainConfig {
930            ownership: ChainOwnership::single(public_key.into()),
931            epoch: Epoch(0),
932            min_active_epoch: Epoch(0),
933            max_active_epoch: Epoch(0),
934            balance,
935            application_permissions: ApplicationPermissions::default(),
936        };
937        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
938        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
939        if index == 0 {
940            self.admin_description = Some(description.clone());
941            self.network_description = Some(NetworkDescription {
942                admin_chain_id: description.id(),
943                // dummy values to fill the description
944                genesis_config_hash: CryptoHash::test_hash("genesis config"),
945                genesis_timestamp: Timestamp::from(0),
946                genesis_committee_blob_hash: committee_blob.id().hash,
947                name: "test network".to_string(),
948            });
949        }
950        // Remember what's in the genesis store for future clients to join.
951        self.genesis_storage_builder
952            .add(description.clone(), public_key);
953
954        let network_description = self.network_description.as_ref().unwrap();
955
956        for validator in self.node_provider.all_nodes() {
957            let storage = self
958                .validator_storages
959                .get_mut(&validator.public_key)
960                .unwrap();
961            storage
962                .write_network_description(network_description)
963                .await
964                .expect("writing the NetworkDescription should succeed");
965            storage
966                .write_blob(&committee_blob)
967                .await
968                .expect("writing a blob should succeed");
969            storage.create_chain(description.clone()).await.unwrap();
970        }
971        for storage in self.chain_client_storages.iter_mut() {
972            storage.create_chain(description.clone()).await.unwrap();
973        }
974        let chain_id = description.id();
975        self.chain_owners.insert(chain_id, public_key.into());
976        self.make_client(chain_id, None, BlockHeight::ZERO).await
977    }
978
979    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
980        let mut result = Vec::new();
981        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
982            assert_eq!(
983                genesis_account.description.origin(),
984                ChainOrigin::Root(i as u32)
985            );
986            result.push((
987                genesis_account.public_key,
988                genesis_account.description.config().balance,
989            ));
990        }
991        result
992    }
993
994    pub fn admin_id(&self) -> ChainId {
995        self.admin_description
996            .as_ref()
997            .expect("admin chain not initialized")
998            .id()
999    }
1000
1001    pub fn admin_description(&self) -> Option<&ChainDescription> {
1002        self.admin_description.as_ref()
1003    }
1004
1005    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1006        self.node_provider.clone()
1007    }
1008
1009    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1010        self.node_provider.0.lock().unwrap()[index].clone()
1011    }
1012
1013    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1014        let storage = self.storage_builder.build().await?;
1015        let network_description = self.network_description.as_ref().unwrap();
1016        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1017        storage
1018            .write_network_description(network_description)
1019            .await
1020            .expect("writing the NetworkDescription should succeed");
1021        storage
1022            .write_blob(&committee_blob)
1023            .await
1024            .expect("writing a blob should succeed");
1025        Ok(self.genesis_storage_builder.build(storage).await)
1026    }
1027
1028    pub async fn make_client(
1029        &mut self,
1030        chain_id: ChainId,
1031        block_hash: Option<CryptoHash>,
1032        block_height: BlockHeight,
1033    ) -> anyhow::Result<ChainClient<B::Storage>> {
1034        // Note that new clients are only given the genesis store: they must figure out
1035        // the rest by asking validators.
1036        let storage = self.make_storage().await?;
1037        self.chain_client_storages.push(storage.clone());
1038        let client = Arc::new(Client::new(
1039            crate::environment::Impl {
1040                network: self.make_node_provider(),
1041                storage,
1042                signer: self.signer.clone(),
1043            },
1044            self.admin_id(),
1045            false,
1046            [chain_id],
1047            format!("Client node for {:.8}", chain_id),
1048            Duration::from_secs(30),
1049            ChainClientOptions::test_default(),
1050        ));
1051        Ok(client.create_chain_client(
1052            chain_id,
1053            block_hash,
1054            block_height,
1055            None,
1056            self.chain_owners.get(&chain_id).copied(),
1057            None,
1058        ))
1059    }
1060
1061    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1062    pub async fn check_that_validators_have_certificate(
1063        &self,
1064        chain_id: ChainId,
1065        block_height: BlockHeight,
1066        target_count: usize,
1067    ) -> Option<ConfirmedBlockCertificate> {
1068        let query = ChainInfoQuery::new(chain_id)
1069            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1070        let mut count = 0;
1071        let mut certificate = None;
1072        for validator in self.node_provider.all_nodes() {
1073            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1074                if response.check(validator.public_key).is_ok() {
1075                    let ChainInfo {
1076                        mut requested_sent_certificate_hashes,
1077                        ..
1078                    } = *response.info;
1079                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1080                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1081                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1082                            if cert.inner().block().header.chain_id == chain_id
1083                                && cert.inner().block().header.height == block_height
1084                            {
1085                                cert.check(&self.initial_committee).unwrap();
1086                                count += 1;
1087                                certificate = Some(cert);
1088                            }
1089                        }
1090                    }
1091                }
1092            }
1093        }
1094        assert!(count >= target_count);
1095        certificate
1096    }
1097
1098    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1099    /// in the expected round.
1100    pub async fn check_that_validators_are_in_round(
1101        &self,
1102        chain_id: ChainId,
1103        block_height: BlockHeight,
1104        round: Round,
1105        target_count: usize,
1106    ) {
1107        let query = ChainInfoQuery::new(chain_id);
1108        let mut count = 0;
1109        for validator in self.node_provider.all_nodes() {
1110            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1111                if response.info.manager.current_round == round
1112                    && response.info.next_block_height == block_height
1113                    && response.check(validator.public_key).is_ok()
1114                {
1115                    count += 1;
1116                }
1117            }
1118        }
1119        assert!(count >= target_count);
1120    }
1121
1122    /// Panics if any validator has a nonempty outbox for the given chain.
1123    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1124        for validator in self.node_provider.all_nodes() {
1125            let guard = validator.client.lock().await;
1126            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1127            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1128        }
1129    }
1130}
1131
1132#[cfg(feature = "rocksdb")]
1133/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1134static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1135
1136#[derive(Default)]
1137pub struct MemoryStorageBuilder {
1138    namespace: String,
1139    instance_counter: usize,
1140    wasm_runtime: Option<WasmRuntime>,
1141    clock: TestClock,
1142}
1143
1144#[async_trait]
1145impl StorageBuilder for MemoryStorageBuilder {
1146    type Storage = DbStorage<MemoryDatabase, TestClock>;
1147
1148    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1149        self.instance_counter += 1;
1150        let config = MemoryDatabase::new_test_config().await?;
1151        if self.namespace.is_empty() {
1152            self.namespace = generate_test_namespace();
1153        }
1154        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1155        Ok(
1156            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1157                .await?,
1158        )
1159    }
1160
1161    fn clock(&self) -> &TestClock {
1162        &self.clock
1163    }
1164}
1165
1166impl MemoryStorageBuilder {
1167    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1168    /// applications.
1169    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1170        MemoryStorageBuilder {
1171            wasm_runtime: wasm_runtime.into(),
1172            ..MemoryStorageBuilder::default()
1173        }
1174    }
1175}
1176
1177#[cfg(feature = "rocksdb")]
1178pub struct RocksDbStorageBuilder {
1179    namespace: String,
1180    instance_counter: usize,
1181    wasm_runtime: Option<WasmRuntime>,
1182    clock: TestClock,
1183    _permit: SemaphorePermit<'static>,
1184}
1185
1186#[cfg(feature = "rocksdb")]
1187impl RocksDbStorageBuilder {
1188    pub async fn new() -> Self {
1189        RocksDbStorageBuilder {
1190            namespace: String::new(),
1191            instance_counter: 0,
1192            wasm_runtime: None,
1193            clock: TestClock::default(),
1194            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1195        }
1196    }
1197
1198    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1199    /// applications.
1200    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1201    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1202        RocksDbStorageBuilder {
1203            wasm_runtime: wasm_runtime.into(),
1204            ..RocksDbStorageBuilder::new().await
1205        }
1206    }
1207}
1208
1209#[cfg(feature = "rocksdb")]
1210#[async_trait]
1211impl StorageBuilder for RocksDbStorageBuilder {
1212    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1213
1214    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1215        self.instance_counter += 1;
1216        let config = RocksDbDatabase::new_test_config().await?;
1217        if self.namespace.is_empty() {
1218            self.namespace = generate_test_namespace();
1219        }
1220        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1221        Ok(
1222            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1223                .await?,
1224        )
1225    }
1226
1227    fn clock(&self) -> &TestClock {
1228        &self.clock
1229    }
1230}
1231
1232#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1233#[derive(Default)]
1234pub struct ServiceStorageBuilder {
1235    namespace: String,
1236    instance_counter: usize,
1237    wasm_runtime: Option<WasmRuntime>,
1238    clock: TestClock,
1239}
1240
1241#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1242impl ServiceStorageBuilder {
1243    /// Creates a `ServiceStorage`.
1244    pub async fn new() -> Self {
1245        Self::with_wasm_runtime(None).await
1246    }
1247
1248    /// Creates a `ServiceStorage` with the given Wasm runtime.
1249    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1250        ServiceStorageBuilder {
1251            wasm_runtime: wasm_runtime.into(),
1252            ..ServiceStorageBuilder::default()
1253        }
1254    }
1255}
1256
1257#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1258#[async_trait]
1259impl StorageBuilder for ServiceStorageBuilder {
1260    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1261
1262    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1263        self.instance_counter += 1;
1264        let config = StorageServiceDatabase::new_test_config().await?;
1265        if self.namespace.is_empty() {
1266            self.namespace = generate_test_namespace();
1267        }
1268        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1269        Ok(
1270            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1271                .await?,
1272        )
1273    }
1274
1275    fn clock(&self) -> &TestClock {
1276        &self.clock
1277    }
1278}
1279
1280#[cfg(feature = "dynamodb")]
1281#[derive(Default)]
1282pub struct DynamoDbStorageBuilder {
1283    namespace: String,
1284    instance_counter: usize,
1285    wasm_runtime: Option<WasmRuntime>,
1286    clock: TestClock,
1287}
1288
1289#[cfg(feature = "dynamodb")]
1290impl DynamoDbStorageBuilder {
1291    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1292    /// applications.
1293    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1294        DynamoDbStorageBuilder {
1295            wasm_runtime: wasm_runtime.into(),
1296            ..DynamoDbStorageBuilder::default()
1297        }
1298    }
1299}
1300
1301#[cfg(feature = "dynamodb")]
1302#[async_trait]
1303impl StorageBuilder for DynamoDbStorageBuilder {
1304    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1305
1306    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1307        self.instance_counter += 1;
1308        let config = DynamoDbDatabase::new_test_config().await?;
1309        if self.namespace.is_empty() {
1310            self.namespace = generate_test_namespace();
1311        }
1312        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1313        Ok(
1314            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1315                .await?,
1316        )
1317    }
1318
1319    fn clock(&self) -> &TestClock {
1320        &self.clock
1321    }
1322}
1323
1324#[cfg(feature = "scylladb")]
1325#[derive(Default)]
1326pub struct ScyllaDbStorageBuilder {
1327    namespace: String,
1328    instance_counter: usize,
1329    wasm_runtime: Option<WasmRuntime>,
1330    clock: TestClock,
1331}
1332
1333#[cfg(feature = "scylladb")]
1334impl ScyllaDbStorageBuilder {
1335    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1336    /// applications.
1337    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1338        ScyllaDbStorageBuilder {
1339            wasm_runtime: wasm_runtime.into(),
1340            ..ScyllaDbStorageBuilder::default()
1341        }
1342    }
1343}
1344
1345#[cfg(feature = "scylladb")]
1346#[async_trait]
1347impl StorageBuilder for ScyllaDbStorageBuilder {
1348    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1349
1350    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1351        self.instance_counter += 1;
1352        let config = ScyllaDbDatabase::new_test_config().await?;
1353        if self.namespace.is_empty() {
1354            self.namespace = generate_test_namespace();
1355        }
1356        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1357        Ok(
1358            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1359                .await?,
1360        )
1361    }
1362
1363    fn clock(&self) -> &TestClock {
1364        &self.clock
1365    }
1366}
1367
1368pub trait ClientOutcomeResultExt<T, E> {
1369    fn unwrap_ok_committed(self) -> T;
1370}
1371
1372impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1373    fn unwrap_ok_committed(self) -> T {
1374        self.unwrap().unwrap()
1375    }
1376}