1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7 time::Duration,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbDatabase,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{ChainClientOptions, Client},
52 data_types::*,
53 node::{
54 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55 ValidatorNodeProvider,
56 },
57 notifier::ChannelNotifier,
58 worker::{Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63 Honest,
64 Offline,
65 OfflineWithInfo,
66 NoChains,
67 DontSendConfirmVote,
68 DontProcessValidated,
69 DontSendValidateVote,
70}
71
72struct LocalValidator<S>
79where
80 S: Storage,
81{
82 state: WorkerState<S>,
83 notifier: Arc<ChannelNotifier<Notification>>,
84}
85
86#[derive(Clone)]
87pub struct LocalValidatorClient<S>
88where
89 S: Storage,
90{
91 public_key: ValidatorPublicKey,
92 client: Arc<Mutex<LocalValidator<S>>>,
93 fault_type: FaultType,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98 S: Storage + Clone + Send + Sync + 'static,
99{
100 type NotificationStream = NotificationStream;
101
102 async fn handle_block_proposal(
103 &self,
104 proposal: BlockProposal,
105 ) -> Result<ChainInfoResponse, NodeError> {
106 self.spawn_and_receive(move |validator, sender| {
107 validator.do_handle_block_proposal(proposal, sender)
108 })
109 .await
110 }
111
112 async fn handle_lite_certificate(
113 &self,
114 certificate: LiteCertificate<'_>,
115 _delivery: CrossChainMessageDelivery,
116 ) -> Result<ChainInfoResponse, NodeError> {
117 let certificate = certificate.cloned();
118 self.spawn_and_receive(move |validator, sender| {
119 validator.do_handle_lite_certificate(certificate, sender)
120 })
121 .await
122 }
123
124 async fn handle_timeout_certificate(
125 &self,
126 certificate: GenericCertificate<Timeout>,
127 ) -> Result<ChainInfoResponse, NodeError> {
128 self.spawn_and_receive(move |validator, sender| {
129 validator.do_handle_certificate(certificate, sender)
130 })
131 .await
132 }
133
134 async fn handle_validated_certificate(
135 &self,
136 certificate: GenericCertificate<ValidatedBlock>,
137 ) -> Result<ChainInfoResponse, NodeError> {
138 self.spawn_and_receive(move |validator, sender| {
139 validator.do_handle_certificate(certificate, sender)
140 })
141 .await
142 }
143
144 async fn handle_confirmed_certificate(
145 &self,
146 certificate: GenericCertificate<ConfirmedBlock>,
147 _delivery: CrossChainMessageDelivery,
148 ) -> Result<ChainInfoResponse, NodeError> {
149 self.spawn_and_receive(move |validator, sender| {
150 validator.do_handle_certificate(certificate, sender)
151 })
152 .await
153 }
154
155 async fn handle_chain_info_query(
156 &self,
157 query: ChainInfoQuery,
158 ) -> Result<ChainInfoResponse, NodeError> {
159 self.spawn_and_receive(move |validator, sender| {
160 validator.do_handle_chain_info_query(query, sender)
161 })
162 .await
163 }
164
165 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
166 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
167 .await
168 }
169
170 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
171 Ok(Default::default())
172 }
173
174 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
175 Ok(self
176 .client
177 .lock()
178 .await
179 .state
180 .storage_client()
181 .read_network_description()
182 .await
183 .transpose()
184 .ok_or(NodeError::ViewError {
185 error: "missing NetworkDescription".to_owned(),
186 })??)
187 }
188
189 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
190 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
191 .await
192 }
193
194 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
195 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
196 .await
197 }
198
199 async fn download_pending_blob(
200 &self,
201 chain_id: ChainId,
202 blob_id: BlobId,
203 ) -> Result<BlobContent, NodeError> {
204 self.spawn_and_receive(move |validator, sender| {
205 validator.do_download_pending_blob(chain_id, blob_id, sender)
206 })
207 .await
208 }
209
210 async fn handle_pending_blob(
211 &self,
212 chain_id: ChainId,
213 blob: BlobContent,
214 ) -> Result<ChainInfoResponse, NodeError> {
215 self.spawn_and_receive(move |validator, sender| {
216 validator.do_handle_pending_blob(chain_id, blob, sender)
217 })
218 .await
219 }
220
221 async fn download_certificate(
222 &self,
223 hash: CryptoHash,
224 ) -> Result<ConfirmedBlockCertificate, NodeError> {
225 self.spawn_and_receive(move |validator, sender| {
226 validator.do_download_certificate(hash, sender)
227 })
228 .await
229 }
230
231 async fn download_certificates(
232 &self,
233 hashes: Vec<CryptoHash>,
234 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
235 self.spawn_and_receive(move |validator, sender| {
236 validator.do_download_certificates(hashes, sender)
237 })
238 .await
239 }
240
241 async fn download_certificates_by_heights(
242 &self,
243 chain_id: ChainId,
244 heights: Vec<BlockHeight>,
245 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
246 self.spawn_and_receive(move |validator, sender| {
247 validator.do_download_certificates_by_heights(chain_id, heights, sender)
248 })
249 .await
250 }
251
252 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
253 self.spawn_and_receive(move |validator, sender| {
254 validator.do_blob_last_used_by(blob_id, sender)
255 })
256 .await
257 }
258
259 async fn blob_last_used_by_certificate(
260 &self,
261 blob_id: BlobId,
262 ) -> Result<ConfirmedBlockCertificate, NodeError> {
263 self.spawn_and_receive(move |validator, sender| {
264 validator.do_blob_last_used_by_certificate(blob_id, sender)
265 })
266 .await
267 }
268
269 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
270 self.spawn_and_receive(move |validator, sender| {
271 validator.do_missing_blob_ids(blob_ids, sender)
272 })
273 .await
274 }
275}
276
277impl<S> LocalValidatorClient<S>
278where
279 S: Storage + Clone + Send + Sync + 'static,
280{
281 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
282 let client = LocalValidator {
283 state,
284 notifier: Arc::new(ChannelNotifier::default()),
285 };
286 Self {
287 public_key,
288 client: Arc::new(Mutex::new(client)),
289 fault_type: FaultType::Honest,
290 }
291 }
292
293 pub fn name(&self) -> ValidatorPublicKey {
294 self.public_key
295 }
296
297 fn set_fault_type(&mut self, fault_type: FaultType) {
298 self.fault_type = fault_type;
299 }
300
301 pub async fn chain_info_with_manager_values(
303 &mut self,
304 chain_id: ChainId,
305 ) -> Result<Box<ChainInfo>, NodeError> {
306 let query = ChainInfoQuery::new(chain_id).with_manager_values();
307 let response = self.handle_chain_info_query(query).await?;
308 Ok(response.info)
309 }
310
311 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
314 where
315 T: Send + 'static,
316 R: Future<Output = Result<(), T>> + Send,
317 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
318 {
319 let validator = self.clone();
320 let (sender, receiver) = oneshot::channel();
321 tokio::spawn(async move {
322 if f(validator, sender).await.is_err() {
323 tracing::debug!("result could not be sent");
324 }
325 });
326 receiver.await.unwrap()
327 }
328
329 async fn do_handle_block_proposal(
330 self,
331 proposal: BlockProposal,
332 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
333 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
334 let result = match self.fault_type {
335 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
336 error: "offline".to_string(),
337 }),
338 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
339 FaultType::DontSendValidateVote
340 | FaultType::Honest
341 | FaultType::DontSendConfirmVote
342 | FaultType::DontProcessValidated => {
343 let result = self
344 .client
345 .lock()
346 .await
347 .state
348 .handle_block_proposal(proposal)
349 .await
350 .map_err(Into::into);
351 if self.fault_type == FaultType::DontSendValidateVote {
352 Err(NodeError::ClientIoError {
353 error: "refusing to validate".to_string(),
354 })
355 } else {
356 result
357 }
358 }
359 };
360 sender.send(result.map(|(info, _actions)| info))
362 }
363
364 async fn do_handle_lite_certificate(
365 self,
366 certificate: LiteCertificate<'_>,
367 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
368 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
369 let client = self.client.clone();
370 let mut validator = client.lock().await;
371 let result = async move {
372 match validator.state.full_certificate(certificate).await? {
373 Either::Left(confirmed) => {
374 self.do_handle_certificate_internal(confirmed, &mut validator)
375 .await
376 }
377 Either::Right(validated) => {
378 self.do_handle_certificate_internal(validated, &mut validator)
379 .await
380 }
381 }
382 }
383 .await;
384 sender.send(result)
385 }
386
387 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
388 &self,
389 certificate: GenericCertificate<T>,
390 validator: &mut MutexGuard<'_, LocalValidator<S>>,
391 ) -> Result<ChainInfoResponse, NodeError> {
392 match self.fault_type {
393 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
394 Err(NodeError::ClientIoError {
395 error: "refusing to process validated block".to_string(),
396 })
397 }
398 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
399 FaultType::Honest
400 | FaultType::DontSendConfirmVote
401 | FaultType::DontProcessValidated
402 | FaultType::DontSendValidateVote => {
403 let result = validator
404 .state
405 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
406 .await
407 .map_err(Into::into);
408 if T::KIND == CertificateKind::Validated
409 && self.fault_type == FaultType::DontSendConfirmVote
410 {
411 Err(NodeError::ClientIoError {
412 error: "refusing to confirm".to_string(),
413 })
414 } else {
415 result
416 }
417 }
418 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
419 error: "offline".to_string(),
420 }),
421 }
422 }
423
424 async fn do_handle_certificate<T: ProcessableCertificate>(
425 self,
426 certificate: GenericCertificate<T>,
427 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
428 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
429 let mut validator = self.client.lock().await;
430 let result = self
431 .do_handle_certificate_internal(certificate, &mut validator)
432 .await;
433 sender.send(result)
434 }
435
436 async fn do_handle_chain_info_query(
437 self,
438 query: ChainInfoQuery,
439 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
440 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
441 let validator = self.client.lock().await;
442 let result = match self.fault_type {
443 FaultType::Offline => Err(NodeError::ClientIoError {
444 error: "offline".to_string(),
445 }),
446 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
447 FaultType::Honest
448 | FaultType::DontSendConfirmVote
449 | FaultType::DontProcessValidated
450 | FaultType::DontSendValidateVote
451 | FaultType::OfflineWithInfo => validator
452 .state
453 .handle_chain_info_query(query)
454 .await
455 .map_err(Into::into),
456 };
457 sender.send(result.map(|(info, _actions)| info))
459 }
460
461 async fn do_subscribe(
462 self,
463 chains: Vec<ChainId>,
464 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
465 ) -> Result<(), Result<NotificationStream, NodeError>> {
466 let validator = self.client.lock().await;
467 let rx = validator.notifier.subscribe(chains);
468 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
469 sender.send(Ok(stream))
470 }
471
472 async fn do_upload_blob(
473 self,
474 content: BlobContent,
475 sender: oneshot::Sender<Result<BlobId, NodeError>>,
476 ) -> Result<(), Result<BlobId, NodeError>> {
477 let validator = self.client.lock().await;
478 let blob = Blob::new(content);
479 let id = blob.id();
480 let storage = validator.state.storage_client();
481 let result = match storage.maybe_write_blobs(&[blob]).await {
482 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
483 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
484 Err(error) => Err(error.into()),
485 };
486 sender.send(result)
487 }
488
489 async fn do_download_blob(
490 self,
491 blob_id: BlobId,
492 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
493 ) -> Result<(), Result<BlobContent, NodeError>> {
494 let validator = self.client.lock().await;
495 let blob = validator
496 .state
497 .storage_client()
498 .read_blob(blob_id)
499 .await
500 .map_err(Into::into);
501 let blob = match blob {
502 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
503 Err(error) => Err(error),
504 };
505 sender.send(blob.map(|blob| blob.into_content()))
506 }
507
508 async fn do_download_pending_blob(
509 self,
510 chain_id: ChainId,
511 blob_id: BlobId,
512 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
513 ) -> Result<(), Result<BlobContent, NodeError>> {
514 let validator = self.client.lock().await;
515 let result = validator
516 .state
517 .download_pending_blob(chain_id, blob_id)
518 .await
519 .map_err(Into::into);
520 sender.send(result.map(|blob| blob.into_content()))
521 }
522
523 async fn do_handle_pending_blob(
524 self,
525 chain_id: ChainId,
526 blob: BlobContent,
527 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
528 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
529 let validator = self.client.lock().await;
530 let result = validator
531 .state
532 .handle_pending_blob(chain_id, Blob::new(blob))
533 .await
534 .map_err(Into::into);
535 sender.send(result)
536 }
537
538 async fn do_download_certificate(
539 self,
540 hash: CryptoHash,
541 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
542 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
543 let validator = self.client.lock().await;
544 let certificate = validator
545 .state
546 .storage_client()
547 .read_certificate(hash)
548 .await
549 .map_err(Into::into);
550
551 let certificate = match certificate {
552 Err(error) => Err(error),
553 Ok(entry) => match entry {
554 Some(certificate) => Ok(certificate),
555 None => {
556 panic!("Missing certificate: {hash}");
557 }
558 },
559 };
560
561 sender.send(certificate)
562 }
563
564 async fn do_download_certificates(
565 self,
566 hashes: Vec<CryptoHash>,
567 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
568 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
569 let validator = self.client.lock().await;
570 let certificates = validator
571 .state
572 .storage_client()
573 .read_certificates(hashes.clone())
574 .await
575 .map_err(Into::into);
576
577 let certificates = match certificates {
578 Err(error) => Err(error),
579 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
580 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
581 ResultReadCertificates::InvalidHashes(hashes) => {
582 panic!("Missing certificates: {:?}", hashes)
583 }
584 },
585 };
586
587 sender.send(certificates)
588 }
589
590 async fn do_download_certificates_by_heights(
591 self,
592 chain_id: ChainId,
593 heights: Vec<BlockHeight>,
594 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
595 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
596 let (query_sender, query_receiver) = oneshot::channel();
598 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
599
600 let self_clone = self.clone();
601 self.do_handle_chain_info_query(query, query_sender)
602 .await
603 .expect("Failed to handle chain info query");
604
605 let chain_info_response = query_receiver.await.map_err(|_| {
607 Err(NodeError::ClientIoError {
608 error: "Failed to receive chain info response".to_string(),
609 })
610 })?;
611
612 let hashes = match chain_info_response {
613 Ok(response) => response.info.requested_sent_certificate_hashes,
614 Err(e) => {
615 return sender.send(Err(e));
616 }
617 };
618
619 let (cert_sender, cert_receiver) = oneshot::channel();
621 self_clone
622 .do_download_certificates(hashes, cert_sender)
623 .await?;
624
625 let result = cert_receiver.await.map_err(|_| {
627 Err(NodeError::ClientIoError {
628 error: "Failed to receive certificates".to_string(),
629 })
630 })?;
631
632 sender.send(result)
633 }
634
635 async fn do_blob_last_used_by(
636 self,
637 blob_id: BlobId,
638 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
639 ) -> Result<(), Result<CryptoHash, NodeError>> {
640 let validator = self.client.lock().await;
641 let blob_state = validator
642 .state
643 .storage_client()
644 .read_blob_state(blob_id)
645 .await
646 .map_err(Into::into);
647 let certificate_hash = match blob_state {
648 Err(err) => Err(err),
649 Ok(blob_state) => match blob_state {
650 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
651 Some(blob_state) => blob_state
652 .last_used_by
653 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
654 },
655 };
656
657 sender.send(certificate_hash)
658 }
659
660 async fn do_blob_last_used_by_certificate(
661 self,
662 blob_id: BlobId,
663 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
664 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
665 match self.blob_last_used_by(blob_id).await {
666 Ok(cert_hash) => {
667 let cert = self.download_certificate(cert_hash).await;
668 sender.send(cert)
669 }
670 Err(err) => sender.send(Err(err)),
671 }
672 }
673
674 async fn do_missing_blob_ids(
675 self,
676 blob_ids: Vec<BlobId>,
677 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
678 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
679 let validator = self.client.lock().await;
680 let missing_blob_ids = validator
681 .state
682 .storage_client()
683 .missing_blobs(&blob_ids)
684 .await
685 .map_err(Into::into);
686 sender.send(missing_blob_ids)
687 }
688}
689
690#[derive(Clone)]
691pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
692where
693 S: Storage;
694
695impl<S> NodeProvider<S>
696where
697 S: Storage + Clone,
698{
699 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
700 self.0.lock().unwrap().clone()
701 }
702}
703
704impl<S> ValidatorNodeProvider for NodeProvider<S>
705where
706 S: Storage + Clone + Send + Sync + 'static,
707{
708 type Node = LocalValidatorClient<S>;
709
710 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
711 unimplemented!()
712 }
713
714 fn make_nodes_from_list<A>(
715 &self,
716 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
717 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
718 where
719 A: AsRef<str>,
720 {
721 let list = self.0.lock().unwrap();
722 Ok(validators
723 .into_iter()
724 .map(|(public_key, address)| {
725 list.iter()
726 .find(|client| client.public_key == public_key)
727 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
728 address: address.as_ref().to_string(),
729 })
730 .map(|client| (public_key, client.clone()))
731 })
732 .collect::<Result<Vec<_>, _>>()?
733 .into_iter())
734 }
735}
736
737impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
738where
739 S: Storage,
740{
741 fn from_iter<T>(iter: T) -> Self
742 where
743 T: IntoIterator<Item = LocalValidatorClient<S>>,
744 {
745 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
746 }
747}
748
749pub struct TestBuilder<B: StorageBuilder> {
756 storage_builder: B,
757 pub initial_committee: Committee,
758 admin_description: Option<ChainDescription>,
759 network_description: Option<NetworkDescription>,
760 genesis_storage_builder: GenesisStorageBuilder,
761 node_provider: NodeProvider<B::Storage>,
762 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
763 chain_client_storages: Vec<B::Storage>,
764 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
765 pub signer: InMemorySigner,
766}
767
768#[async_trait]
769pub trait StorageBuilder {
770 type Storage: Storage + Clone + Send + Sync + 'static;
771
772 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
773
774 fn clock(&self) -> &TestClock;
775}
776
777#[derive(Default)]
778struct GenesisStorageBuilder {
779 accounts: Vec<GenesisAccount>,
780}
781
782struct GenesisAccount {
783 description: ChainDescription,
784 public_key: AccountPublicKey,
785}
786
787impl GenesisStorageBuilder {
788 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
789 self.accounts.push(GenesisAccount {
790 description,
791 public_key,
792 })
793 }
794
795 async fn build<S>(&self, storage: S) -> S
796 where
797 S: Storage + Clone + Send + Sync + 'static,
798 {
799 for account in &self.accounts {
800 storage
801 .create_chain(account.description.clone())
802 .await
803 .unwrap();
804 }
805 storage
806 }
807}
808
809pub type ChainClient<S> =
810 crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
811
812impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
813 pub async fn read_confirmed_blocks_downward(
815 &self,
816 from: CryptoHash,
817 limit: u32,
818 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
819 let mut hash = Some(from);
820 let mut values = Vec::new();
821 for _ in 0..limit {
822 let Some(next_hash) = hash else {
823 break;
824 };
825 let value = self.read_confirmed_block(next_hash).await?;
826 hash = value.block().header.previous_block_hash;
827 values.push(value);
828 }
829 Ok(values)
830 }
831}
832
833impl<B> TestBuilder<B>
834where
835 B: StorageBuilder,
836{
837 pub async fn new(
838 mut storage_builder: B,
839 count: usize,
840 with_faulty_validators: usize,
841 mut signer: InMemorySigner,
842 ) -> Result<Self, anyhow::Error> {
843 let mut validators = Vec::new();
844 for _ in 0..count {
845 let validator_keypair = ValidatorKeypair::generate();
846 let account_public_key = signer.generate_new();
847 validators.push((validator_keypair, account_public_key));
848 }
849 let for_committee = validators
850 .iter()
851 .map(|(validating, account)| (validating.public_key, *account))
852 .collect::<Vec<_>>();
853 let initial_committee = Committee::make_simple(for_committee);
854 let mut validator_clients = Vec::new();
855 let mut validator_storages = HashMap::new();
856 let mut faulty_validators = HashSet::new();
857 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
858 let validator_public_key = validator_keypair.public_key;
859 let storage = storage_builder.build().await?;
860 let state = WorkerState::new(
861 format!("Node {}", i),
862 Some(validator_keypair.secret_key),
863 storage.clone(),
864 )
865 .with_allow_inactive_chains(false)
866 .with_allow_messages_from_deprecated_epochs(false);
867 let mut validator = LocalValidatorClient::new(validator_public_key, state);
868 if i < with_faulty_validators {
869 faulty_validators.insert(validator_public_key);
870 validator.set_fault_type(FaultType::NoChains);
871 }
872 validator_clients.push(validator);
873 validator_storages.insert(validator_public_key, storage);
874 }
875 tracing::info!(
876 "Test will use the following faulty validators: {:?}",
877 faulty_validators
878 );
879 Ok(Self {
880 storage_builder,
881 initial_committee,
882 admin_description: None,
883 network_description: None,
884 genesis_storage_builder: GenesisStorageBuilder::default(),
885 node_provider: NodeProvider::from_iter(validator_clients),
886 validator_storages,
887 chain_client_storages: Vec::new(),
888 chain_owners: BTreeMap::new(),
889 signer,
890 })
891 }
892
893 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
894 let validators = self.initial_committee.validators().clone();
895 self.initial_committee = Committee::new(validators, policy);
896 self
897 }
898
899 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
900 let mut faulty_validators = vec![];
901 let mut validator_clients = self.node_provider.0.lock().unwrap();
902 for index in indexes.as_ref() {
903 let validator = &mut validator_clients[*index];
904 validator.set_fault_type(fault_type);
905 faulty_validators.push(validator.public_key);
906 }
907 tracing::info!(
908 "Making the following validators {:?}: {:?}",
909 fault_type,
910 faulty_validators
911 );
912 }
913
914 pub async fn add_root_chain(
919 &mut self,
920 index: u32,
921 balance: Amount,
922 ) -> anyhow::Result<ChainClient<B::Storage>> {
923 if self.admin_description.is_none() && index != 0 {
925 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
926 }
927 let origin = ChainOrigin::Root(index);
928 let public_key = self.signer.generate_new();
929 let open_chain_config = InitialChainConfig {
930 ownership: ChainOwnership::single(public_key.into()),
931 epoch: Epoch(0),
932 min_active_epoch: Epoch(0),
933 max_active_epoch: Epoch(0),
934 balance,
935 application_permissions: ApplicationPermissions::default(),
936 };
937 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
938 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
939 if index == 0 {
940 self.admin_description = Some(description.clone());
941 self.network_description = Some(NetworkDescription {
942 admin_chain_id: description.id(),
943 genesis_config_hash: CryptoHash::test_hash("genesis config"),
945 genesis_timestamp: Timestamp::from(0),
946 genesis_committee_blob_hash: committee_blob.id().hash,
947 name: "test network".to_string(),
948 });
949 }
950 self.genesis_storage_builder
952 .add(description.clone(), public_key);
953
954 let network_description = self.network_description.as_ref().unwrap();
955
956 for validator in self.node_provider.all_nodes() {
957 let storage = self
958 .validator_storages
959 .get_mut(&validator.public_key)
960 .unwrap();
961 storage
962 .write_network_description(network_description)
963 .await
964 .expect("writing the NetworkDescription should succeed");
965 storage
966 .write_blob(&committee_blob)
967 .await
968 .expect("writing a blob should succeed");
969 storage.create_chain(description.clone()).await.unwrap();
970 }
971 for storage in self.chain_client_storages.iter_mut() {
972 storage.create_chain(description.clone()).await.unwrap();
973 }
974 let chain_id = description.id();
975 self.chain_owners.insert(chain_id, public_key.into());
976 self.make_client(chain_id, None, BlockHeight::ZERO).await
977 }
978
979 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
980 let mut result = Vec::new();
981 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
982 assert_eq!(
983 genesis_account.description.origin(),
984 ChainOrigin::Root(i as u32)
985 );
986 result.push((
987 genesis_account.public_key,
988 genesis_account.description.config().balance,
989 ));
990 }
991 result
992 }
993
994 pub fn admin_id(&self) -> ChainId {
995 self.admin_description
996 .as_ref()
997 .expect("admin chain not initialized")
998 .id()
999 }
1000
1001 pub fn admin_description(&self) -> Option<&ChainDescription> {
1002 self.admin_description.as_ref()
1003 }
1004
1005 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1006 self.node_provider.clone()
1007 }
1008
1009 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1010 self.node_provider.0.lock().unwrap()[index].clone()
1011 }
1012
1013 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1014 let storage = self.storage_builder.build().await?;
1015 let network_description = self.network_description.as_ref().unwrap();
1016 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1017 storage
1018 .write_network_description(network_description)
1019 .await
1020 .expect("writing the NetworkDescription should succeed");
1021 storage
1022 .write_blob(&committee_blob)
1023 .await
1024 .expect("writing a blob should succeed");
1025 Ok(self.genesis_storage_builder.build(storage).await)
1026 }
1027
1028 pub async fn make_client(
1029 &mut self,
1030 chain_id: ChainId,
1031 block_hash: Option<CryptoHash>,
1032 block_height: BlockHeight,
1033 ) -> anyhow::Result<ChainClient<B::Storage>> {
1034 let storage = self.make_storage().await?;
1037 self.chain_client_storages.push(storage.clone());
1038 let client = Arc::new(Client::new(
1039 crate::environment::Impl {
1040 network: self.make_node_provider(),
1041 storage,
1042 signer: self.signer.clone(),
1043 },
1044 self.admin_id(),
1045 false,
1046 [chain_id],
1047 format!("Client node for {:.8}", chain_id),
1048 Duration::from_secs(30),
1049 ChainClientOptions::test_default(),
1050 ));
1051 Ok(client.create_chain_client(
1052 chain_id,
1053 block_hash,
1054 block_height,
1055 None,
1056 self.chain_owners.get(&chain_id).copied(),
1057 None,
1058 ))
1059 }
1060
1061 pub async fn check_that_validators_have_certificate(
1063 &self,
1064 chain_id: ChainId,
1065 block_height: BlockHeight,
1066 target_count: usize,
1067 ) -> Option<ConfirmedBlockCertificate> {
1068 let query = ChainInfoQuery::new(chain_id)
1069 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1070 let mut count = 0;
1071 let mut certificate = None;
1072 for validator in self.node_provider.all_nodes() {
1073 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1074 if response.check(validator.public_key).is_ok() {
1075 let ChainInfo {
1076 mut requested_sent_certificate_hashes,
1077 ..
1078 } = *response.info;
1079 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1080 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1081 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1082 if cert.inner().block().header.chain_id == chain_id
1083 && cert.inner().block().header.height == block_height
1084 {
1085 cert.check(&self.initial_committee).unwrap();
1086 count += 1;
1087 certificate = Some(cert);
1088 }
1089 }
1090 }
1091 }
1092 }
1093 }
1094 assert!(count >= target_count);
1095 certificate
1096 }
1097
1098 pub async fn check_that_validators_are_in_round(
1101 &self,
1102 chain_id: ChainId,
1103 block_height: BlockHeight,
1104 round: Round,
1105 target_count: usize,
1106 ) {
1107 let query = ChainInfoQuery::new(chain_id);
1108 let mut count = 0;
1109 for validator in self.node_provider.all_nodes() {
1110 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1111 if response.info.manager.current_round == round
1112 && response.info.next_block_height == block_height
1113 && response.check(validator.public_key).is_ok()
1114 {
1115 count += 1;
1116 }
1117 }
1118 }
1119 assert!(count >= target_count);
1120 }
1121
1122 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1124 for validator in self.node_provider.all_nodes() {
1125 let guard = validator.client.lock().await;
1126 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1127 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1128 }
1129 }
1130}
1131
1132#[cfg(feature = "rocksdb")]
1133static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1135
1136#[derive(Default)]
1137pub struct MemoryStorageBuilder {
1138 namespace: String,
1139 instance_counter: usize,
1140 wasm_runtime: Option<WasmRuntime>,
1141 clock: TestClock,
1142}
1143
1144#[async_trait]
1145impl StorageBuilder for MemoryStorageBuilder {
1146 type Storage = DbStorage<MemoryDatabase, TestClock>;
1147
1148 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1149 self.instance_counter += 1;
1150 let config = MemoryDatabase::new_test_config().await?;
1151 if self.namespace.is_empty() {
1152 self.namespace = generate_test_namespace();
1153 }
1154 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1155 Ok(
1156 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1157 .await?,
1158 )
1159 }
1160
1161 fn clock(&self) -> &TestClock {
1162 &self.clock
1163 }
1164}
1165
1166impl MemoryStorageBuilder {
1167 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1170 MemoryStorageBuilder {
1171 wasm_runtime: wasm_runtime.into(),
1172 ..MemoryStorageBuilder::default()
1173 }
1174 }
1175}
1176
1177#[cfg(feature = "rocksdb")]
1178pub struct RocksDbStorageBuilder {
1179 namespace: String,
1180 instance_counter: usize,
1181 wasm_runtime: Option<WasmRuntime>,
1182 clock: TestClock,
1183 _permit: SemaphorePermit<'static>,
1184}
1185
1186#[cfg(feature = "rocksdb")]
1187impl RocksDbStorageBuilder {
1188 pub async fn new() -> Self {
1189 RocksDbStorageBuilder {
1190 namespace: String::new(),
1191 instance_counter: 0,
1192 wasm_runtime: None,
1193 clock: TestClock::default(),
1194 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1195 }
1196 }
1197
1198 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1201 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1202 RocksDbStorageBuilder {
1203 wasm_runtime: wasm_runtime.into(),
1204 ..RocksDbStorageBuilder::new().await
1205 }
1206 }
1207}
1208
1209#[cfg(feature = "rocksdb")]
1210#[async_trait]
1211impl StorageBuilder for RocksDbStorageBuilder {
1212 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1213
1214 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1215 self.instance_counter += 1;
1216 let config = RocksDbDatabase::new_test_config().await?;
1217 if self.namespace.is_empty() {
1218 self.namespace = generate_test_namespace();
1219 }
1220 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1221 Ok(
1222 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1223 .await?,
1224 )
1225 }
1226
1227 fn clock(&self) -> &TestClock {
1228 &self.clock
1229 }
1230}
1231
1232#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1233#[derive(Default)]
1234pub struct ServiceStorageBuilder {
1235 namespace: String,
1236 instance_counter: usize,
1237 wasm_runtime: Option<WasmRuntime>,
1238 clock: TestClock,
1239}
1240
1241#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1242impl ServiceStorageBuilder {
1243 pub async fn new() -> Self {
1245 Self::with_wasm_runtime(None).await
1246 }
1247
1248 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1250 ServiceStorageBuilder {
1251 wasm_runtime: wasm_runtime.into(),
1252 ..ServiceStorageBuilder::default()
1253 }
1254 }
1255}
1256
1257#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1258#[async_trait]
1259impl StorageBuilder for ServiceStorageBuilder {
1260 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1261
1262 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1263 self.instance_counter += 1;
1264 let config = StorageServiceDatabase::new_test_config().await?;
1265 if self.namespace.is_empty() {
1266 self.namespace = generate_test_namespace();
1267 }
1268 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1269 Ok(
1270 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1271 .await?,
1272 )
1273 }
1274
1275 fn clock(&self) -> &TestClock {
1276 &self.clock
1277 }
1278}
1279
1280#[cfg(feature = "dynamodb")]
1281#[derive(Default)]
1282pub struct DynamoDbStorageBuilder {
1283 namespace: String,
1284 instance_counter: usize,
1285 wasm_runtime: Option<WasmRuntime>,
1286 clock: TestClock,
1287}
1288
1289#[cfg(feature = "dynamodb")]
1290impl DynamoDbStorageBuilder {
1291 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1294 DynamoDbStorageBuilder {
1295 wasm_runtime: wasm_runtime.into(),
1296 ..DynamoDbStorageBuilder::default()
1297 }
1298 }
1299}
1300
1301#[cfg(feature = "dynamodb")]
1302#[async_trait]
1303impl StorageBuilder for DynamoDbStorageBuilder {
1304 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1305
1306 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1307 self.instance_counter += 1;
1308 let config = DynamoDbDatabase::new_test_config().await?;
1309 if self.namespace.is_empty() {
1310 self.namespace = generate_test_namespace();
1311 }
1312 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1313 Ok(
1314 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1315 .await?,
1316 )
1317 }
1318
1319 fn clock(&self) -> &TestClock {
1320 &self.clock
1321 }
1322}
1323
1324#[cfg(feature = "scylladb")]
1325#[derive(Default)]
1326pub struct ScyllaDbStorageBuilder {
1327 namespace: String,
1328 instance_counter: usize,
1329 wasm_runtime: Option<WasmRuntime>,
1330 clock: TestClock,
1331}
1332
1333#[cfg(feature = "scylladb")]
1334impl ScyllaDbStorageBuilder {
1335 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1338 ScyllaDbStorageBuilder {
1339 wasm_runtime: wasm_runtime.into(),
1340 ..ScyllaDbStorageBuilder::default()
1341 }
1342 }
1343}
1344
1345#[cfg(feature = "scylladb")]
1346#[async_trait]
1347impl StorageBuilder for ScyllaDbStorageBuilder {
1348 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1349
1350 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1351 self.instance_counter += 1;
1352 let config = ScyllaDbDatabase::new_test_config().await?;
1353 if self.namespace.is_empty() {
1354 self.namespace = generate_test_namespace();
1355 }
1356 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1357 Ok(
1358 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1359 .await?,
1360 )
1361 }
1362
1363 fn clock(&self) -> &TestClock {
1364 &self.clock
1365 }
1366}
1367
1368pub trait ClientOutcomeResultExt<T, E> {
1369 fn unwrap_ok_committed(self) -> T;
1370}
1371
1372impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1373 fn unwrap_ok_committed(self) -> T {
1374 self.unwrap().unwrap()
1375 }
1376}