linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::cast_possible_truncation)]
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    sync::Arc,
9    time::Duration,
10    vec,
11};
12
13use async_trait::async_trait;
14use futures::{
15    future::Either,
16    lock::{Mutex, MutexGuard},
17    Future,
18};
19use linera_base::{
20    crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
21    data_types::*,
22    identifiers::{AccountOwner, BlobId, ChainId, EventId},
23    ownership::ChainOwnership,
24};
25use linera_chain::{
26    data_types::BlockProposal,
27    types::{
28        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29        LiteCertificate, Timeout, ValidatedBlock,
30    },
31};
32use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
33use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
34#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
35use linera_storage_service::client::StorageServiceDatabase;
36use linera_version::VersionInfo;
37#[cfg(feature = "dynamodb")]
38use linera_views::dynamo_db::DynamoDbDatabase;
39#[cfg(feature = "scylladb")]
40use linera_views::scylla_db::ScyllaDbDatabase;
41use linera_views::{
42    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
43};
44use tokio::sync::oneshot;
45use tokio_stream::wrappers::UnboundedReceiverStream;
46#[cfg(feature = "rocksdb")]
47use {
48    linera_views::rocks_db::RocksDbDatabase,
49    tokio::sync::{Semaphore, SemaphorePermit},
50};
51
52use crate::{
53    chain_worker::ChainWorkerConfig,
54    client::{chain_client, Client},
55    data_types::*,
56    environment::{TestSigner, TestWallet},
57    node::{
58        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
59        ValidatorNodeProvider,
60    },
61    notifier::ChannelNotifier,
62    worker::{
63        Notification, ProcessableCertificate, WorkerState, DEFAULT_BLOCK_CACHE_SIZE,
64        DEFAULT_EXECUTION_STATE_CACHE_SIZE,
65    },
66};
67
68#[derive(Debug, PartialEq, Clone, Copy)]
69pub enum FaultType {
70    Honest,
71    Offline,
72    OfflineWithInfo,
73    NoChains,
74    DontSendConfirmVote,
75    DontProcessValidated,
76    DontSendValidateVote,
77}
78
79/// A validator used for testing. "Faulty" validators ignore block proposals (but not
80/// certificates or info queries) and have the wrong initial balance for all chains.
81///
82/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
83/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
84/// tasks if the client stopped waiting for the response.
85struct LocalValidator<S>
86where
87    S: Storage,
88{
89    state: WorkerState<S>,
90    notifier: Arc<ChannelNotifier<Notification>>,
91}
92
93#[derive(Clone)]
94pub struct LocalValidatorClient<S>
95where
96    S: Storage,
97{
98    public_key: ValidatorPublicKey,
99    client: Arc<Mutex<LocalValidator<S>>>,
100    fault_type: FaultType,
101}
102
103impl<S> ValidatorNode for LocalValidatorClient<S>
104where
105    S: Storage + Clone + Send + Sync + 'static,
106{
107    type NotificationStream = NotificationStream;
108
109    fn address(&self) -> String {
110        format!("local:{}", self.public_key)
111    }
112
113    async fn handle_block_proposal(
114        &self,
115        proposal: BlockProposal,
116    ) -> Result<ChainInfoResponse, NodeError> {
117        self.spawn_and_receive(move |validator, sender| {
118            validator.do_handle_block_proposal(proposal, sender)
119        })
120        .await
121    }
122
123    async fn handle_lite_certificate(
124        &self,
125        certificate: LiteCertificate<'_>,
126        _delivery: CrossChainMessageDelivery,
127    ) -> Result<ChainInfoResponse, NodeError> {
128        let certificate = certificate.cloned();
129        self.spawn_and_receive(move |validator, sender| {
130            validator.do_handle_lite_certificate(certificate, sender)
131        })
132        .await
133    }
134
135    async fn handle_timeout_certificate(
136        &self,
137        certificate: GenericCertificate<Timeout>,
138    ) -> Result<ChainInfoResponse, NodeError> {
139        self.spawn_and_receive(move |validator, sender| {
140            validator.do_handle_certificate(certificate, sender)
141        })
142        .await
143    }
144
145    async fn handle_validated_certificate(
146        &self,
147        certificate: GenericCertificate<ValidatedBlock>,
148    ) -> Result<ChainInfoResponse, NodeError> {
149        self.spawn_and_receive(move |validator, sender| {
150            validator.do_handle_certificate(certificate, sender)
151        })
152        .await
153    }
154
155    async fn handle_confirmed_certificate(
156        &self,
157        certificate: Arc<GenericCertificate<ConfirmedBlock>>,
158        _delivery: CrossChainMessageDelivery,
159    ) -> Result<ChainInfoResponse, NodeError> {
160        self.spawn_and_receive(move |validator, sender| {
161            validator.do_handle_certificate(Arc::unwrap_or_clone(certificate), sender)
162        })
163        .await
164    }
165
166    async fn handle_chain_info_query(
167        &self,
168        query: ChainInfoQuery,
169    ) -> Result<ChainInfoResponse, NodeError> {
170        self.spawn_and_receive(move |validator, sender| {
171            validator.do_handle_chain_info_query(query, sender)
172        })
173        .await
174    }
175
176    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
177        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
178            .await
179    }
180
181    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
182        Ok(Default::default())
183    }
184
185    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
186        Ok(self
187            .client
188            .lock()
189            .await
190            .state
191            .storage_client()
192            .read_network_description()
193            .await
194            .transpose()
195            .ok_or_else(|| NodeError::ViewError {
196                error: "missing NetworkDescription".to_owned(),
197            })??)
198    }
199
200    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
201        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
202            .await
203    }
204
205    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
206        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
207            .await
208    }
209
210    async fn download_blobs(
211        &self,
212        blob_ids: Vec<BlobId>,
213    ) -> Result<crate::node::BlobStream, NodeError> {
214        let this = self.clone();
215        let stream = futures::stream::unfold(blob_ids.into_iter(), move |mut iter| {
216            let this = this.clone();
217            async move {
218                let blob_id = iter.next()?;
219                let result = this.download_blob(blob_id).await;
220                Some((result, iter))
221            }
222        });
223        Ok(Box::pin(stream))
224    }
225
226    async fn download_pending_blob(
227        &self,
228        chain_id: ChainId,
229        blob_id: BlobId,
230    ) -> Result<BlobContent, NodeError> {
231        self.spawn_and_receive(move |validator, sender| {
232            validator.do_download_pending_blob(chain_id, blob_id, sender)
233        })
234        .await
235    }
236
237    async fn handle_pending_blob(
238        &self,
239        chain_id: ChainId,
240        blob: BlobContent,
241    ) -> Result<ChainInfoResponse, NodeError> {
242        self.spawn_and_receive(move |validator, sender| {
243            validator.do_handle_pending_blob(chain_id, blob, sender)
244        })
245        .await
246    }
247
248    async fn download_certificate(
249        &self,
250        hash: CryptoHash,
251    ) -> Result<ConfirmedBlockCertificate, NodeError> {
252        self.spawn_and_receive(move |validator, sender| {
253            validator.do_download_certificate(hash, sender)
254        })
255        .await
256    }
257
258    async fn download_certificates(
259        &self,
260        hashes: Vec<CryptoHash>,
261    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
262        self.spawn_and_receive(move |validator, sender| {
263            validator.do_download_certificates(hashes, sender)
264        })
265        .await
266    }
267
268    async fn download_certificates_by_heights(
269        &self,
270        chain_id: ChainId,
271        heights: Vec<BlockHeight>,
272    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
273        self.spawn_and_receive(move |validator, sender| {
274            validator.do_download_certificates_by_heights(chain_id, heights, sender)
275        })
276        .await
277    }
278
279    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
280        self.spawn_and_receive(move |validator, sender| {
281            validator.do_blob_last_used_by(blob_id, sender)
282        })
283        .await
284    }
285
286    async fn blob_last_used_by_certificate(
287        &self,
288        blob_id: BlobId,
289    ) -> Result<ConfirmedBlockCertificate, NodeError> {
290        self.spawn_and_receive(move |validator, sender| {
291            validator.do_blob_last_used_by_certificate(blob_id, sender)
292        })
293        .await
294    }
295
296    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
297        self.spawn_and_receive(move |validator, sender| {
298            validator.do_missing_blob_ids(blob_ids, sender)
299        })
300        .await
301    }
302
303    async fn event_block_heights(
304        &self,
305        event_ids: Vec<EventId>,
306    ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
307        self.spawn_and_receive(move |validator, sender| {
308            validator.do_event_block_heights(event_ids, sender)
309        })
310        .await
311    }
312
313    async fn get_shard_info(
314        &self,
315        _chain_id: ChainId,
316    ) -> Result<crate::data_types::ShardInfo, NodeError> {
317        // For test purposes, return a dummy shard info
318        Ok(crate::data_types::ShardInfo {
319            shard_id: 0,
320            total_shards: 1,
321        })
322    }
323}
324
325impl<S> LocalValidatorClient<S>
326where
327    S: Storage + Clone + Send + Sync + 'static,
328{
329    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
330        let client = LocalValidator {
331            state,
332            notifier: Arc::new(ChannelNotifier::default()),
333        };
334        Self {
335            public_key,
336            client: Arc::new(Mutex::new(client)),
337            fault_type: FaultType::Honest,
338        }
339    }
340
341    pub fn name(&self) -> ValidatorPublicKey {
342        self.public_key
343    }
344
345    pub fn fault_type(&self) -> FaultType {
346        self.fault_type
347    }
348
349    fn set_fault_type(&mut self, fault_type: FaultType) {
350        self.fault_type = fault_type;
351    }
352
353    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
354    pub async fn chain_info_with_manager_values(
355        &mut self,
356        chain_id: ChainId,
357    ) -> Result<Box<ChainInfo>, NodeError> {
358        let query = ChainInfoQuery::new(chain_id).with_manager_values();
359        let response = self.handle_chain_info_query(query).await?;
360        Ok(response.info)
361    }
362
363    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
364    /// Returns the value that the future puts into the sender.
365    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
366    where
367        T: Send + 'static,
368        R: Future<Output = Result<(), T>> + Send,
369        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
370    {
371        let validator = self.clone();
372        let (sender, receiver) = oneshot::channel();
373        tokio::spawn(async move {
374            if f(validator, sender).await.is_err() {
375                tracing::debug!("result could not be sent");
376            }
377        });
378        receiver.await.unwrap()
379    }
380
381    async fn do_handle_block_proposal(
382        self,
383        proposal: BlockProposal,
384        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
385    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
386        let result = match self.fault_type {
387            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
388                error: "offline".to_string(),
389            }),
390            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
391            FaultType::DontSendValidateVote
392            | FaultType::Honest
393            | FaultType::DontSendConfirmVote
394            | FaultType::DontProcessValidated => {
395                let result = self
396                    .client
397                    .lock()
398                    .await
399                    .state
400                    .handle_block_proposal(proposal)
401                    .await
402                    .map_err(Into::into);
403                if self.fault_type == FaultType::DontSendValidateVote {
404                    Err(NodeError::ClientIoError {
405                        error: "refusing to validate".to_string(),
406                    })
407                } else {
408                    result
409                }
410            }
411        };
412        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
413        sender.send(result.map(|(info, _actions)| info))
414    }
415
416    async fn do_handle_lite_certificate(
417        self,
418        certificate: LiteCertificate<'_>,
419        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
420    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
421        let client = self.client.clone();
422        let validator = client.lock().await;
423        let result = async move {
424            match validator.state.full_certificate(certificate).await? {
425                Either::Left(confirmed) => {
426                    self.do_handle_certificate_internal(confirmed, &validator)
427                        .await
428                }
429                Either::Right(validated) => {
430                    self.do_handle_certificate_internal(validated, &validator)
431                        .await
432                }
433            }
434        }
435        .await;
436        sender.send(result)
437    }
438
439    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
440        &self,
441        certificate: GenericCertificate<T>,
442        validator: &MutexGuard<'_, LocalValidator<S>>,
443    ) -> Result<ChainInfoResponse, NodeError> {
444        match self.fault_type {
445            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
446                Err(NodeError::ClientIoError {
447                    error: "refusing to process validated block".to_string(),
448                })
449            }
450            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
451            FaultType::Honest
452            | FaultType::DontSendConfirmVote
453            | FaultType::DontProcessValidated
454            | FaultType::DontSendValidateVote => {
455                let result = validator
456                    .state
457                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
458                    .await
459                    .map_err(Into::into);
460                if T::KIND == CertificateKind::Validated
461                    && self.fault_type == FaultType::DontSendConfirmVote
462                {
463                    Err(NodeError::ClientIoError {
464                        error: "refusing to confirm".to_string(),
465                    })
466                } else {
467                    result
468                }
469            }
470            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
471                error: "offline".to_string(),
472            }),
473        }
474    }
475
476    async fn do_handle_certificate<T: ProcessableCertificate>(
477        self,
478        certificate: GenericCertificate<T>,
479        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
480    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
481        let validator = self.client.lock().await;
482        let result = self
483            .do_handle_certificate_internal(certificate, &validator)
484            .await;
485        sender.send(result)
486    }
487
488    async fn do_handle_chain_info_query(
489        self,
490        query: ChainInfoQuery,
491        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
492    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
493        let validator = self.client.lock().await;
494        let result = match self.fault_type {
495            FaultType::Offline => Err(NodeError::ClientIoError {
496                error: "offline".to_string(),
497            }),
498            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
499            FaultType::Honest
500            | FaultType::DontSendConfirmVote
501            | FaultType::DontProcessValidated
502            | FaultType::DontSendValidateVote
503            | FaultType::OfflineWithInfo => validator
504                .state
505                .handle_chain_info_query(query)
506                .await
507                .map_err(Into::into),
508        };
509        sender.send(result)
510    }
511
512    async fn do_subscribe(
513        self,
514        chains: Vec<ChainId>,
515        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
516    ) -> Result<(), Result<NotificationStream, NodeError>> {
517        let validator = self.client.lock().await;
518        let rx = validator.notifier.subscribe(chains);
519        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
520        sender.send(Ok(stream))
521    }
522
523    async fn do_upload_blob(
524        self,
525        content: BlobContent,
526        sender: oneshot::Sender<Result<BlobId, NodeError>>,
527    ) -> Result<(), Result<BlobId, NodeError>> {
528        let validator = self.client.lock().await;
529        let blob = Blob::new(content);
530        let id = blob.id();
531        let storage = validator.state.storage_client();
532        let result = match storage.maybe_write_blobs(&[blob]).await {
533            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
534            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
535            Err(error) => Err(error.into()),
536        };
537        sender.send(result)
538    }
539
540    async fn do_download_blob(
541        self,
542        blob_id: BlobId,
543        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
544    ) -> Result<(), Result<BlobContent, NodeError>> {
545        let validator = self.client.lock().await;
546        let blob = validator
547            .state
548            .storage_client()
549            .read_blob(blob_id)
550            .await
551            .map_err(Into::into);
552        let blob = match blob {
553            Ok(blob) => blob.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
554            Err(error) => Err(error),
555        };
556        sender.send(blob.map(|blob| Arc::unwrap_or_clone(blob).into_content()))
557    }
558
559    async fn do_download_pending_blob(
560        self,
561        chain_id: ChainId,
562        blob_id: BlobId,
563        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
564    ) -> Result<(), Result<BlobContent, NodeError>> {
565        let validator = self.client.lock().await;
566        let result = validator
567            .state
568            .download_pending_blob(chain_id, blob_id)
569            .await
570            .map_err(Into::into);
571        sender.send(result.map(|blob| blob.content().clone()))
572    }
573
574    async fn do_handle_pending_blob(
575        self,
576        chain_id: ChainId,
577        blob: BlobContent,
578        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
579    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
580        let validator = self.client.lock().await;
581        let result = validator
582            .state
583            .handle_pending_blob(chain_id, Blob::new(blob))
584            .await
585            .map_err(Into::into);
586        sender.send(result)
587    }
588
589    async fn do_download_certificate(
590        self,
591        hash: CryptoHash,
592        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
593    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
594        let validator = self.client.lock().await;
595        let certificate = validator
596            .state
597            .storage_client()
598            .read_certificate(hash)
599            .await
600            .map_err(Into::into);
601
602        let certificate = match certificate {
603            Err(error) => Err(error),
604            Ok(entry) => match entry {
605                Some(certificate) => Ok(Arc::unwrap_or_clone(certificate)),
606                None => {
607                    panic!("Missing certificate: {hash}");
608                }
609            },
610        };
611
612        sender.send(certificate)
613    }
614
615    async fn do_download_certificates(
616        self,
617        hashes: Vec<CryptoHash>,
618        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
619    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
620        let validator = self.client.lock().await;
621        let certificates = validator
622            .state
623            .storage_client()
624            .read_certificates(&hashes)
625            .await
626            .map_err(Into::into);
627
628        let certificates = match certificates {
629            Err(error) => Err(error),
630            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
631                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
632                ResultReadCertificates::InvalidHashes(hashes) => {
633                    panic!("Missing certificates: {hashes:?}")
634                }
635            },
636        };
637
638        sender.send(certificates)
639    }
640
641    async fn do_download_certificates_by_heights(
642        self,
643        chain_id: ChainId,
644        heights: Vec<BlockHeight>,
645        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
646    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
647        // First, use do_handle_chain_info_query to get the certificate hashes
648        let (query_sender, query_receiver) = oneshot::channel();
649        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
650
651        let self_clone = self.clone();
652        self.do_handle_chain_info_query(query, query_sender)
653            .await
654            .expect("Failed to handle chain info query");
655
656        // Get the response from the chain info query
657        let chain_info_response = query_receiver.await.map_err(|_| {
658            Err(NodeError::ClientIoError {
659                error: "Failed to receive chain info response".to_string(),
660            })
661        })?;
662
663        let hashes = match chain_info_response {
664            Ok(response) => response.info.requested_sent_certificate_hashes,
665            Err(e) => {
666                return sender.send(Err(e));
667            }
668        };
669
670        // Now use do_download_certificates to get the actual certificates
671        let (cert_sender, cert_receiver) = oneshot::channel();
672        self_clone
673            .do_download_certificates(hashes, cert_sender)
674            .await?;
675
676        // Forward the result to the original sender
677        let result = cert_receiver.await.map_err(|_| {
678            Err(NodeError::ClientIoError {
679                error: "Failed to receive certificates".to_string(),
680            })
681        })?;
682
683        sender.send(result)
684    }
685
686    async fn do_blob_last_used_by(
687        self,
688        blob_id: BlobId,
689        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
690    ) -> Result<(), Result<CryptoHash, NodeError>> {
691        let validator = self.client.lock().await;
692        let blob_state = validator
693            .state
694            .storage_client()
695            .read_blob_state(blob_id)
696            .await
697            .map_err(Into::into);
698        let certificate_hash = match blob_state {
699            Err(err) => Err(err),
700            Ok(blob_state) => match blob_state {
701                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
702                Some(blob_state) => blob_state
703                    .last_used_by
704                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
705            },
706        };
707
708        sender.send(certificate_hash)
709    }
710
711    async fn do_blob_last_used_by_certificate(
712        self,
713        blob_id: BlobId,
714        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
715    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
716        match self.blob_last_used_by(blob_id).await {
717            Ok(cert_hash) => {
718                let cert = self.download_certificate(cert_hash).await;
719                sender.send(cert)
720            }
721            Err(err) => sender.send(Err(err)),
722        }
723    }
724
725    async fn do_missing_blob_ids(
726        self,
727        blob_ids: Vec<BlobId>,
728        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
729    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
730        let validator = self.client.lock().await;
731        let missing_blob_ids = validator
732            .state
733            .storage_client()
734            .missing_blobs(&blob_ids)
735            .await
736            .map_err(Into::into);
737        sender.send(missing_blob_ids)
738    }
739
740    async fn do_event_block_heights(
741        self,
742        event_ids: Vec<EventId>,
743        sender: oneshot::Sender<Result<Vec<Option<BlockHeight>>, NodeError>>,
744    ) -> Result<(), Result<Vec<Option<BlockHeight>>, NodeError>> {
745        let validator = self.client.lock().await;
746        let heights = validator
747            .state
748            .storage_client()
749            .read_event_block_heights(&event_ids)
750            .await
751            .map_err(Into::into);
752        sender.send(heights)
753    }
754}
755
756#[derive(Clone)]
757pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
758where
759    S: Storage;
760
761impl<S> NodeProvider<S>
762where
763    S: Storage + Clone,
764{
765    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
766        self.0.lock().unwrap().clone()
767    }
768}
769
770impl<S> ValidatorNodeProvider for NodeProvider<S>
771where
772    S: Storage + Clone + Send + Sync + 'static,
773{
774    type Node = LocalValidatorClient<S>;
775
776    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
777        unimplemented!()
778    }
779
780    fn make_nodes_from_list<A>(
781        &self,
782        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
783    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
784    where
785        A: AsRef<str>,
786    {
787        let list = self.0.lock().unwrap();
788        Ok(validators
789            .into_iter()
790            .map(|(public_key, address)| {
791                list.iter()
792                    .find(|client| client.public_key == public_key)
793                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
794                        address: address.as_ref().to_string(),
795                    })
796                    .map(|client| (public_key, client.clone()))
797            })
798            .collect::<Result<Vec<_>, _>>()?
799            .into_iter())
800    }
801}
802
803impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
804where
805    S: Storage,
806{
807    fn from_iter<T>(iter: T) -> Self
808    where
809        T: IntoIterator<Item = LocalValidatorClient<S>>,
810    {
811        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
812    }
813}
814
815// NOTE:
816// * To communicate with a quorum of validators, chain clients iterate over a copy of
817// `validator_clients` to spawn I/O tasks.
818// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
819// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
820// communicate with.
821pub struct TestBuilder<B: StorageBuilder> {
822    storage_builder: B,
823    pub initial_committee: Committee,
824    admin_description: Option<ChainDescription>,
825    network_description: Option<NetworkDescription>,
826    genesis_storage_builder: GenesisStorageBuilder,
827    node_provider: NodeProvider<B::Storage>,
828    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
829    chain_client_storages: Vec<B::Storage>,
830    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
831    pub signer: TestSigner,
832}
833
834#[async_trait]
835pub trait StorageBuilder {
836    type Storage: Storage + Clone + Send + Sync + 'static;
837
838    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
839
840    fn clock(&self) -> &TestClock;
841}
842
843#[derive(Default)]
844struct GenesisStorageBuilder {
845    accounts: Vec<GenesisAccount>,
846}
847
848struct GenesisAccount {
849    description: ChainDescription,
850    public_key: AccountPublicKey,
851}
852
853impl GenesisStorageBuilder {
854    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
855        self.accounts.push(GenesisAccount {
856            description,
857            public_key,
858        })
859    }
860
861    async fn build<S>(&self, storage: S) -> S
862    where
863        S: Storage + Clone + Send + Sync + 'static,
864    {
865        for account in &self.accounts {
866            storage
867                .create_chain(account.description.clone())
868                .await
869                .unwrap();
870        }
871        storage
872    }
873}
874
875pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
876
877impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
878    /// Reads the hashed certificate values in descending order from the given hash.
879    pub async fn read_confirmed_blocks_downward(
880        &self,
881        from: CryptoHash,
882        limit: u32,
883    ) -> anyhow::Result<Vec<Arc<ConfirmedBlock>>> {
884        let mut hash = Some(from);
885        let mut values = Vec::new();
886        for _ in 0..limit {
887            let Some(next_hash) = hash else {
888                break;
889            };
890            let value = self.read_confirmed_block(next_hash).await?;
891            hash = value.block().header.previous_block_hash;
892            values.push(value);
893        }
894        Ok(values)
895    }
896}
897
898impl<B> TestBuilder<B>
899where
900    B: StorageBuilder,
901{
902    pub async fn new(
903        mut storage_builder: B,
904        count: usize,
905        with_faulty_validators: usize,
906        mut signer: TestSigner,
907    ) -> Result<Self, anyhow::Error> {
908        let mut validators = Vec::new();
909        for _ in 0..count {
910            let validator_keypair = ValidatorKeypair::generate();
911            let account_public_key = signer.generate_new();
912            validators.push((validator_keypair, account_public_key));
913        }
914        let for_committee = validators
915            .iter()
916            .map(|(validating, account)| (validating.public_key, *account))
917            .collect::<Vec<_>>();
918        let initial_committee = Committee::make_simple(for_committee);
919        let mut validator_clients = Vec::new();
920        let mut validator_storages = HashMap::new();
921        let mut faulty_validators = HashSet::new();
922        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
923            let validator_public_key = validator_keypair.public_key;
924            let storage = storage_builder.build().await?;
925            let config = ChainWorkerConfig {
926                nickname: format!("Node {i}"),
927                ..ChainWorkerConfig::default()
928            }
929            .with_key_pair(Some(validator_keypair.secret_key));
930            let state = WorkerState::new(storage.clone(), config, None);
931            let mut validator = LocalValidatorClient::new(validator_public_key, state);
932            if i < with_faulty_validators {
933                faulty_validators.insert(validator_public_key);
934                validator.set_fault_type(FaultType::NoChains);
935            }
936            validator_clients.push(validator);
937            validator_storages.insert(validator_public_key, storage);
938        }
939        tracing::info!(
940            "Test will use the following faulty validators: {:?}",
941            faulty_validators
942        );
943        Ok(Self {
944            storage_builder,
945            initial_committee,
946            admin_description: None,
947            network_description: None,
948            genesis_storage_builder: GenesisStorageBuilder::default(),
949            node_provider: NodeProvider::from_iter(validator_clients),
950            validator_storages,
951            chain_client_storages: Vec::new(),
952            chain_owners: BTreeMap::new(),
953            signer,
954        })
955    }
956
957    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
958        let validators = self.initial_committee.validators().clone();
959        self.initial_committee =
960            Committee::new(validators, policy).expect("committee votes should not overflow");
961        self
962    }
963
964    pub fn with_cross_chain_message_chunk_limit(self, limit: usize) -> Self {
965        let validator_clients = self.node_provider.0.lock().unwrap();
966        for validator in validator_clients.iter() {
967            let mut inner = validator.client.try_lock().expect("no contention at setup");
968            inner.state.set_cross_chain_message_chunk_limit(limit);
969        }
970        drop(validator_clients);
971        self
972    }
973
974    /// Returns the [`FaultType`] currently configured for the given validator, or `None`
975    /// if no validator with that key is in the test setup.
976    pub fn fault_type(&self, public_key: &ValidatorPublicKey) -> Option<FaultType> {
977        self.node_provider
978            .0
979            .lock()
980            .unwrap()
981            .iter()
982            .find(|client| client.public_key == *public_key)
983            .map(|client| client.fault_type())
984    }
985
986    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
987        let mut faulty_validators = vec![];
988        let mut validator_clients = self.node_provider.0.lock().unwrap();
989        for index in indexes.as_ref() {
990            let validator = &mut validator_clients[*index];
991            validator.set_fault_type(fault_type);
992            faulty_validators.push(validator.public_key);
993        }
994        tracing::info!(
995            "Making the following validators {:?}: {:?}",
996            fault_type,
997            faulty_validators
998        );
999    }
1000
1001    /// Creates the root chain with the given `index`, and returns a client for it.
1002    ///
1003    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
1004    /// is automatically set to zero.
1005    pub async fn add_root_chain(
1006        &mut self,
1007        index: u32,
1008        balance: Amount,
1009    ) -> anyhow::Result<ChainClient<B::Storage>> {
1010        // Make sure the admin chain is initialized.
1011        if self.admin_description.is_none() && index != 0 {
1012            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
1013        }
1014        let origin = ChainOrigin::Root(index);
1015        let public_key = self.signer.generate_new();
1016        let open_chain_config = InitialChainConfig {
1017            ownership: ChainOwnership::single(public_key.into()),
1018            epoch: Epoch(0),
1019            balance,
1020            application_permissions: ApplicationPermissions::default(),
1021        };
1022        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
1023        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1024        if index == 0 {
1025            self.admin_description = Some(description.clone());
1026            self.network_description = Some(NetworkDescription {
1027                admin_chain_id: description.id(),
1028                // dummy values to fill the description
1029                genesis_config_hash: CryptoHash::test_hash("genesis config"),
1030                genesis_timestamp: Timestamp::from(0),
1031                genesis_committee_blob_hash: committee_blob.id().hash,
1032                name: "test network".to_string(),
1033            });
1034        }
1035        // Remember what's in the genesis store for future clients to join.
1036        self.genesis_storage_builder
1037            .add(description.clone(), public_key);
1038
1039        let network_description = self.network_description.as_ref().unwrap();
1040
1041        for validator in self.node_provider.all_nodes() {
1042            let storage = self
1043                .validator_storages
1044                .get_mut(&validator.public_key)
1045                .unwrap();
1046            storage
1047                .write_network_description(network_description)
1048                .await
1049                .expect("writing the NetworkDescription should succeed");
1050            storage
1051                .write_blob(&committee_blob)
1052                .await
1053                .expect("writing a blob should succeed");
1054            storage.create_chain(description.clone()).await.unwrap();
1055        }
1056        for storage in &mut self.chain_client_storages {
1057            storage.create_chain(description.clone()).await.unwrap();
1058        }
1059        let chain_id = description.id();
1060        self.chain_owners.insert(chain_id, public_key.into());
1061        self.make_client(chain_id, None, BlockHeight::ZERO).await
1062    }
1063
1064    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
1065        let mut result = Vec::new();
1066        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
1067            assert_eq!(
1068                genesis_account.description.origin(),
1069                ChainOrigin::Root(i as u32)
1070            );
1071            result.push((
1072                genesis_account.public_key,
1073                genesis_account.description.config().balance,
1074            ));
1075        }
1076        result
1077    }
1078
1079    pub fn admin_chain_id(&self) -> ChainId {
1080        self.admin_description
1081            .as_ref()
1082            .expect("admin chain not initialized")
1083            .id()
1084    }
1085
1086    pub fn admin_description(&self) -> Option<&ChainDescription> {
1087        self.admin_description.as_ref()
1088    }
1089
1090    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1091        self.node_provider.clone()
1092    }
1093
1094    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1095        self.node_provider.0.lock().unwrap()[index].clone()
1096    }
1097
1098    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1099        let storage = self.storage_builder.build().await?;
1100        let network_description = self.network_description.as_ref().unwrap();
1101        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1102        storage
1103            .write_network_description(network_description)
1104            .await
1105            .expect("writing the NetworkDescription should succeed");
1106        storage
1107            .write_blob(&committee_blob)
1108            .await
1109            .expect("writing a blob should succeed");
1110        Ok(self.genesis_storage_builder.build(storage).await)
1111    }
1112
1113    pub async fn make_client_with_options(
1114        &mut self,
1115        chain_id: ChainId,
1116        block_hash: Option<CryptoHash>,
1117        block_height: BlockHeight,
1118        options: chain_client::Options,
1119        follow_only: bool,
1120    ) -> anyhow::Result<ChainClient<B::Storage>> {
1121        // Note that new clients are only given the genesis store: they must figure out
1122        // the rest by asking validators.
1123        let storage = self.make_storage().await?;
1124        self.chain_client_storages.push(storage.clone());
1125        let mode = if follow_only {
1126            crate::client::ListeningMode::FollowChain
1127        } else {
1128            crate::client::ListeningMode::FullChain
1129        };
1130        let client = Arc::new(Client::new(
1131            crate::environment::Impl {
1132                network: self.make_node_provider(),
1133                storage,
1134                signer: self.signer.clone(),
1135                wallet: TestWallet::default(),
1136            },
1137            self.admin_chain_id(),
1138            false,
1139            [(chain_id, mode)],
1140            format!("Client node for {chain_id:.8}"),
1141            Some(Duration::from_secs(30)),
1142            Some(Duration::from_secs(1)),
1143            1000,
1144            options,
1145            DEFAULT_BLOCK_CACHE_SIZE,
1146            DEFAULT_EXECUTION_STATE_CACHE_SIZE,
1147            &crate::client::RequestsSchedulerConfig::default(),
1148        ));
1149        Ok(client.create_chain_client(
1150            chain_id,
1151            block_hash,
1152            block_height,
1153            &None,
1154            self.chain_owners.get(&chain_id).copied(),
1155            None,
1156            follow_only,
1157        ))
1158    }
1159
1160    pub async fn make_client(
1161        &mut self,
1162        chain_id: ChainId,
1163        block_hash: Option<CryptoHash>,
1164        block_height: BlockHeight,
1165    ) -> anyhow::Result<ChainClient<B::Storage>> {
1166        self.make_client_with_options(
1167            chain_id,
1168            block_hash,
1169            block_height,
1170            chain_client::Options::test_default(),
1171            false,
1172        )
1173        .await
1174    }
1175
1176    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1177    pub async fn check_that_validators_have_certificate(
1178        &self,
1179        chain_id: ChainId,
1180        block_height: BlockHeight,
1181        target_count: usize,
1182    ) -> Option<ConfirmedBlockCertificate> {
1183        let query = ChainInfoQuery::new(chain_id)
1184            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1185        let mut count = 0;
1186        let mut certificate = None;
1187        for validator in self.node_provider.all_nodes() {
1188            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1189                if response.check(validator.public_key).is_ok() {
1190                    let ChainInfo {
1191                        mut requested_sent_certificate_hashes,
1192                        ..
1193                    } = *response.info;
1194                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1195                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1196                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1197                            if cert.inner().block().header.chain_id == chain_id
1198                                && cert.inner().block().header.height == block_height
1199                            {
1200                                cert.check(&self.initial_committee).unwrap();
1201                                count += 1;
1202                                certificate = Some(cert);
1203                            }
1204                        }
1205                    }
1206                }
1207            }
1208        }
1209        assert!(count >= target_count);
1210        certificate
1211    }
1212
1213    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1214    /// in the expected round.
1215    pub async fn check_that_validators_are_in_round(
1216        &self,
1217        chain_id: ChainId,
1218        block_height: BlockHeight,
1219        round: Round,
1220        target_count: usize,
1221    ) {
1222        let query = ChainInfoQuery::new(chain_id);
1223        let mut count = 0;
1224        for validator in self.node_provider.all_nodes() {
1225            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1226                if response.info.manager.current_round == round
1227                    && response.info.next_block_height == block_height
1228                    && response.check(validator.public_key).is_ok()
1229                {
1230                    count += 1;
1231                }
1232            }
1233        }
1234        assert!(count >= target_count);
1235    }
1236
1237    /// Panics if any validator has a nonempty outbox for the given chain.
1238    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1239        for validator in self.node_provider.all_nodes() {
1240            let guard = validator.client.lock().await;
1241            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1242            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1243        }
1244    }
1245}
1246
1247#[cfg(feature = "rocksdb")]
1248/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1249static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1250
1251#[derive(Default)]
1252pub struct MemoryStorageBuilder {
1253    namespace: String,
1254    instance_counter: usize,
1255    wasm_runtime: Option<WasmRuntime>,
1256    clock: TestClock,
1257}
1258
1259#[async_trait]
1260impl StorageBuilder for MemoryStorageBuilder {
1261    type Storage = DbStorage<MemoryDatabase, TestClock>;
1262
1263    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1264        self.instance_counter += 1;
1265        let config = MemoryDatabase::new_test_config().await?;
1266        if self.namespace.is_empty() {
1267            self.namespace = generate_test_namespace();
1268        }
1269        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1270        Ok(
1271            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1272                .await?,
1273        )
1274    }
1275
1276    fn clock(&self) -> &TestClock {
1277        &self.clock
1278    }
1279}
1280
1281impl MemoryStorageBuilder {
1282    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1283    /// applications.
1284    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1285        MemoryStorageBuilder {
1286            wasm_runtime: wasm_runtime.into(),
1287            ..MemoryStorageBuilder::default()
1288        }
1289    }
1290}
1291
1292#[cfg(feature = "rocksdb")]
1293pub struct RocksDbStorageBuilder {
1294    namespace: String,
1295    instance_counter: usize,
1296    wasm_runtime: Option<WasmRuntime>,
1297    clock: TestClock,
1298    _permit: SemaphorePermit<'static>,
1299}
1300
1301#[cfg(feature = "rocksdb")]
1302impl RocksDbStorageBuilder {
1303    pub async fn new() -> Self {
1304        RocksDbStorageBuilder {
1305            namespace: String::new(),
1306            instance_counter: 0,
1307            wasm_runtime: None,
1308            clock: TestClock::default(),
1309            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1310        }
1311    }
1312
1313    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1314    /// applications.
1315    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1316    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1317        RocksDbStorageBuilder {
1318            wasm_runtime: wasm_runtime.into(),
1319            ..RocksDbStorageBuilder::new().await
1320        }
1321    }
1322}
1323
1324#[cfg(feature = "rocksdb")]
1325#[async_trait]
1326impl StorageBuilder for RocksDbStorageBuilder {
1327    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1328
1329    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1330        self.instance_counter += 1;
1331        let config = RocksDbDatabase::new_test_config().await?;
1332        if self.namespace.is_empty() {
1333            self.namespace = generate_test_namespace();
1334        }
1335        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1336        Ok(
1337            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1338                .await?,
1339        )
1340    }
1341
1342    fn clock(&self) -> &TestClock {
1343        &self.clock
1344    }
1345}
1346
1347#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1348#[derive(Default)]
1349pub struct ServiceStorageBuilder {
1350    namespace: String,
1351    instance_counter: usize,
1352    wasm_runtime: Option<WasmRuntime>,
1353    clock: TestClock,
1354}
1355
1356#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1357impl ServiceStorageBuilder {
1358    /// Creates a `ServiceStorage`.
1359    pub fn new() -> Self {
1360        Self::with_wasm_runtime(None)
1361    }
1362
1363    /// Creates a `ServiceStorage` with the given Wasm runtime.
1364    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1365        ServiceStorageBuilder {
1366            wasm_runtime: wasm_runtime.into(),
1367            ..ServiceStorageBuilder::default()
1368        }
1369    }
1370}
1371
1372#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1373#[async_trait]
1374impl StorageBuilder for ServiceStorageBuilder {
1375    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1376
1377    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1378        self.instance_counter += 1;
1379        let config = StorageServiceDatabase::new_test_config().await?;
1380        if self.namespace.is_empty() {
1381            self.namespace = generate_test_namespace();
1382        }
1383        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1384        Ok(
1385            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1386                .await?,
1387        )
1388    }
1389
1390    fn clock(&self) -> &TestClock {
1391        &self.clock
1392    }
1393}
1394
1395#[cfg(feature = "dynamodb")]
1396#[derive(Default)]
1397pub struct DynamoDbStorageBuilder {
1398    namespace: String,
1399    instance_counter: usize,
1400    wasm_runtime: Option<WasmRuntime>,
1401    clock: TestClock,
1402}
1403
1404#[cfg(feature = "dynamodb")]
1405impl DynamoDbStorageBuilder {
1406    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1407    /// applications.
1408    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1409        DynamoDbStorageBuilder {
1410            wasm_runtime: wasm_runtime.into(),
1411            ..DynamoDbStorageBuilder::default()
1412        }
1413    }
1414}
1415
1416#[cfg(feature = "dynamodb")]
1417#[async_trait]
1418impl StorageBuilder for DynamoDbStorageBuilder {
1419    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1420
1421    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1422        self.instance_counter += 1;
1423        let config = DynamoDbDatabase::new_test_config().await?;
1424        if self.namespace.is_empty() {
1425            self.namespace = generate_test_namespace();
1426        }
1427        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1428        Ok(
1429            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1430                .await?,
1431        )
1432    }
1433
1434    fn clock(&self) -> &TestClock {
1435        &self.clock
1436    }
1437}
1438
1439#[cfg(feature = "scylladb")]
1440#[derive(Default)]
1441pub struct ScyllaDbStorageBuilder {
1442    namespace: String,
1443    instance_counter: usize,
1444    wasm_runtime: Option<WasmRuntime>,
1445    clock: TestClock,
1446}
1447
1448#[cfg(feature = "scylladb")]
1449impl ScyllaDbStorageBuilder {
1450    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1451    /// applications.
1452    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1453        ScyllaDbStorageBuilder {
1454            wasm_runtime: wasm_runtime.into(),
1455            ..ScyllaDbStorageBuilder::default()
1456        }
1457    }
1458}
1459
1460#[cfg(feature = "scylladb")]
1461#[async_trait]
1462impl StorageBuilder for ScyllaDbStorageBuilder {
1463    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1464
1465    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1466        self.instance_counter += 1;
1467        let config = ScyllaDbDatabase::new_test_config().await?;
1468        if self.namespace.is_empty() {
1469            self.namespace = generate_test_namespace();
1470        }
1471        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1472        Ok(
1473            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1474                .await?,
1475        )
1476    }
1477
1478    fn clock(&self) -> &TestClock {
1479        &self.clock
1480    }
1481}
1482
1483pub trait ClientOutcomeResultExt<T, E> {
1484    /// Unwraps the result and panics if it's not `Committed`.
1485    /// Use this when you expect the operation to succeed without conflicts.
1486    fn unwrap_ok_committed(self) -> T;
1487
1488    /// Unwraps the result, accepting both `Committed` and `Conflict` outcomes.
1489    /// Returns the committed value or the conflicting certificate (boxed).
1490    fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>>;
1491}
1492
1493impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1494    fn unwrap_ok_committed(self) -> T {
1495        match self.unwrap() {
1496            ClientOutcome::Committed(t) => t,
1497            ClientOutcome::WaitForTimeout(timeout) => {
1498                panic!("unexpected timeout: {timeout}")
1499            }
1500            ClientOutcome::Conflict(certificate) => {
1501                panic!("unexpected conflict: {}", certificate.hash())
1502            }
1503        }
1504    }
1505
1506    fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>> {
1507        match self.unwrap() {
1508            ClientOutcome::Committed(t) => Ok(t),
1509            ClientOutcome::Conflict(certificate) => Err(certificate),
1510            ClientOutcome::WaitForTimeout(timeout) => {
1511                panic!("unexpected timeout: {timeout}")
1512            }
1513        }
1514    }
1515}