1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7 time::Duration,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbDatabase,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 chain_worker::ChainWorkerConfig,
52 client::{chain_client, Client},
53 data_types::*,
54 environment::{TestSigner, TestWallet},
55 node::{
56 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
57 ValidatorNodeProvider,
58 },
59 notifier::ChannelNotifier,
60 worker::{Notification, ProcessableCertificate, WorkerState},
61};
62
63#[derive(Debug, PartialEq, Clone, Copy)]
64pub enum FaultType {
65 Honest,
66 Offline,
67 OfflineWithInfo,
68 NoChains,
69 DontSendConfirmVote,
70 DontProcessValidated,
71 DontSendValidateVote,
72}
73
74struct LocalValidator<S>
81where
82 S: Storage,
83{
84 state: WorkerState<S>,
85 notifier: Arc<ChannelNotifier<Notification>>,
86}
87
88#[derive(Clone)]
89pub struct LocalValidatorClient<S>
90where
91 S: Storage,
92{
93 public_key: ValidatorPublicKey,
94 client: Arc<Mutex<LocalValidator<S>>>,
95 fault_type: FaultType,
96}
97
98impl<S> ValidatorNode for LocalValidatorClient<S>
99where
100 S: Storage + Clone + Send + Sync + 'static,
101{
102 type NotificationStream = NotificationStream;
103
104 fn address(&self) -> String {
105 format!("local:{}", self.public_key)
106 }
107
108 async fn handle_block_proposal(
109 &self,
110 proposal: BlockProposal,
111 ) -> Result<ChainInfoResponse, NodeError> {
112 self.spawn_and_receive(move |validator, sender| {
113 validator.do_handle_block_proposal(proposal, sender)
114 })
115 .await
116 }
117
118 async fn handle_lite_certificate(
119 &self,
120 certificate: LiteCertificate<'_>,
121 _delivery: CrossChainMessageDelivery,
122 ) -> Result<ChainInfoResponse, NodeError> {
123 let certificate = certificate.cloned();
124 self.spawn_and_receive(move |validator, sender| {
125 validator.do_handle_lite_certificate(certificate, sender)
126 })
127 .await
128 }
129
130 async fn handle_timeout_certificate(
131 &self,
132 certificate: GenericCertificate<Timeout>,
133 ) -> Result<ChainInfoResponse, NodeError> {
134 self.spawn_and_receive(move |validator, sender| {
135 validator.do_handle_certificate(certificate, sender)
136 })
137 .await
138 }
139
140 async fn handle_validated_certificate(
141 &self,
142 certificate: GenericCertificate<ValidatedBlock>,
143 ) -> Result<ChainInfoResponse, NodeError> {
144 self.spawn_and_receive(move |validator, sender| {
145 validator.do_handle_certificate(certificate, sender)
146 })
147 .await
148 }
149
150 async fn handle_confirmed_certificate(
151 &self,
152 certificate: GenericCertificate<ConfirmedBlock>,
153 _delivery: CrossChainMessageDelivery,
154 ) -> Result<ChainInfoResponse, NodeError> {
155 self.spawn_and_receive(move |validator, sender| {
156 validator.do_handle_certificate(certificate, sender)
157 })
158 .await
159 }
160
161 async fn handle_chain_info_query(
162 &self,
163 query: ChainInfoQuery,
164 ) -> Result<ChainInfoResponse, NodeError> {
165 self.spawn_and_receive(move |validator, sender| {
166 validator.do_handle_chain_info_query(query, sender)
167 })
168 .await
169 }
170
171 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
172 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
173 .await
174 }
175
176 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
177 Ok(Default::default())
178 }
179
180 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
181 Ok(self
182 .client
183 .lock()
184 .await
185 .state
186 .storage_client()
187 .read_network_description()
188 .await
189 .transpose()
190 .ok_or_else(|| NodeError::ViewError {
191 error: "missing NetworkDescription".to_owned(),
192 })??)
193 }
194
195 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
196 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
197 .await
198 }
199
200 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
201 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
202 .await
203 }
204
205 async fn download_pending_blob(
206 &self,
207 chain_id: ChainId,
208 blob_id: BlobId,
209 ) -> Result<BlobContent, NodeError> {
210 self.spawn_and_receive(move |validator, sender| {
211 validator.do_download_pending_blob(chain_id, blob_id, sender)
212 })
213 .await
214 }
215
216 async fn handle_pending_blob(
217 &self,
218 chain_id: ChainId,
219 blob: BlobContent,
220 ) -> Result<ChainInfoResponse, NodeError> {
221 self.spawn_and_receive(move |validator, sender| {
222 validator.do_handle_pending_blob(chain_id, blob, sender)
223 })
224 .await
225 }
226
227 async fn download_certificate(
228 &self,
229 hash: CryptoHash,
230 ) -> Result<ConfirmedBlockCertificate, NodeError> {
231 self.spawn_and_receive(move |validator, sender| {
232 validator.do_download_certificate(hash, sender)
233 })
234 .await
235 }
236
237 async fn download_certificates(
238 &self,
239 hashes: Vec<CryptoHash>,
240 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
241 self.spawn_and_receive(move |validator, sender| {
242 validator.do_download_certificates(hashes, sender)
243 })
244 .await
245 }
246
247 async fn download_certificates_by_heights(
248 &self,
249 chain_id: ChainId,
250 heights: Vec<BlockHeight>,
251 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
252 self.spawn_and_receive(move |validator, sender| {
253 validator.do_download_certificates_by_heights(chain_id, heights, sender)
254 })
255 .await
256 }
257
258 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
259 self.spawn_and_receive(move |validator, sender| {
260 validator.do_blob_last_used_by(blob_id, sender)
261 })
262 .await
263 }
264
265 async fn blob_last_used_by_certificate(
266 &self,
267 blob_id: BlobId,
268 ) -> Result<ConfirmedBlockCertificate, NodeError> {
269 self.spawn_and_receive(move |validator, sender| {
270 validator.do_blob_last_used_by_certificate(blob_id, sender)
271 })
272 .await
273 }
274
275 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
276 self.spawn_and_receive(move |validator, sender| {
277 validator.do_missing_blob_ids(blob_ids, sender)
278 })
279 .await
280 }
281
282 async fn get_shard_info(
283 &self,
284 _chain_id: ChainId,
285 ) -> Result<crate::data_types::ShardInfo, NodeError> {
286 Ok(crate::data_types::ShardInfo {
288 shard_id: 0,
289 total_shards: 1,
290 })
291 }
292}
293
294impl<S> LocalValidatorClient<S>
295where
296 S: Storage + Clone + Send + Sync + 'static,
297{
298 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
299 let client = LocalValidator {
300 state,
301 notifier: Arc::new(ChannelNotifier::default()),
302 };
303 Self {
304 public_key,
305 client: Arc::new(Mutex::new(client)),
306 fault_type: FaultType::Honest,
307 }
308 }
309
310 pub fn name(&self) -> ValidatorPublicKey {
311 self.public_key
312 }
313
314 fn set_fault_type(&mut self, fault_type: FaultType) {
315 self.fault_type = fault_type;
316 }
317
318 pub async fn chain_info_with_manager_values(
320 &mut self,
321 chain_id: ChainId,
322 ) -> Result<Box<ChainInfo>, NodeError> {
323 let query = ChainInfoQuery::new(chain_id).with_manager_values();
324 let response = self.handle_chain_info_query(query).await?;
325 Ok(response.info)
326 }
327
328 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
331 where
332 T: Send + 'static,
333 R: Future<Output = Result<(), T>> + Send,
334 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
335 {
336 let validator = self.clone();
337 let (sender, receiver) = oneshot::channel();
338 tokio::spawn(async move {
339 if f(validator, sender).await.is_err() {
340 tracing::debug!("result could not be sent");
341 }
342 });
343 receiver.await.unwrap()
344 }
345
346 async fn do_handle_block_proposal(
347 self,
348 proposal: BlockProposal,
349 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
350 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
351 let result = match self.fault_type {
352 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
353 error: "offline".to_string(),
354 }),
355 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
356 FaultType::DontSendValidateVote
357 | FaultType::Honest
358 | FaultType::DontSendConfirmVote
359 | FaultType::DontProcessValidated => {
360 let result = self
361 .client
362 .lock()
363 .await
364 .state
365 .handle_block_proposal(proposal)
366 .await
367 .map_err(Into::into);
368 if self.fault_type == FaultType::DontSendValidateVote {
369 Err(NodeError::ClientIoError {
370 error: "refusing to validate".to_string(),
371 })
372 } else {
373 result
374 }
375 }
376 };
377 sender.send(result.map(|(info, _actions)| info))
379 }
380
381 async fn do_handle_lite_certificate(
382 self,
383 certificate: LiteCertificate<'_>,
384 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
385 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
386 let client = self.client.clone();
387 let validator = client.lock().await;
388 let result = async move {
389 match validator.state.full_certificate(certificate).await? {
390 Either::Left(confirmed) => {
391 self.do_handle_certificate_internal(confirmed, &validator)
392 .await
393 }
394 Either::Right(validated) => {
395 self.do_handle_certificate_internal(validated, &validator)
396 .await
397 }
398 }
399 }
400 .await;
401 sender.send(result)
402 }
403
404 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
405 &self,
406 certificate: GenericCertificate<T>,
407 validator: &MutexGuard<'_, LocalValidator<S>>,
408 ) -> Result<ChainInfoResponse, NodeError> {
409 match self.fault_type {
410 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
411 Err(NodeError::ClientIoError {
412 error: "refusing to process validated block".to_string(),
413 })
414 }
415 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
416 FaultType::Honest
417 | FaultType::DontSendConfirmVote
418 | FaultType::DontProcessValidated
419 | FaultType::DontSendValidateVote => {
420 let result = validator
421 .state
422 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
423 .await
424 .map_err(Into::into);
425 if T::KIND == CertificateKind::Validated
426 && self.fault_type == FaultType::DontSendConfirmVote
427 {
428 Err(NodeError::ClientIoError {
429 error: "refusing to confirm".to_string(),
430 })
431 } else {
432 result
433 }
434 }
435 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
436 error: "offline".to_string(),
437 }),
438 }
439 }
440
441 async fn do_handle_certificate<T: ProcessableCertificate>(
442 self,
443 certificate: GenericCertificate<T>,
444 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
445 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
446 let validator = self.client.lock().await;
447 let result = self
448 .do_handle_certificate_internal(certificate, &validator)
449 .await;
450 sender.send(result)
451 }
452
453 async fn do_handle_chain_info_query(
454 self,
455 query: ChainInfoQuery,
456 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
457 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
458 let validator = self.client.lock().await;
459 let result = match self.fault_type {
460 FaultType::Offline => Err(NodeError::ClientIoError {
461 error: "offline".to_string(),
462 }),
463 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
464 FaultType::Honest
465 | FaultType::DontSendConfirmVote
466 | FaultType::DontProcessValidated
467 | FaultType::DontSendValidateVote
468 | FaultType::OfflineWithInfo => validator
469 .state
470 .handle_chain_info_query(query)
471 .await
472 .map_err(Into::into),
473 };
474 sender.send(result.map(|(info, _actions)| info))
476 }
477
478 async fn do_subscribe(
479 self,
480 chains: Vec<ChainId>,
481 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
482 ) -> Result<(), Result<NotificationStream, NodeError>> {
483 let validator = self.client.lock().await;
484 let rx = validator.notifier.subscribe(chains);
485 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
486 sender.send(Ok(stream))
487 }
488
489 async fn do_upload_blob(
490 self,
491 content: BlobContent,
492 sender: oneshot::Sender<Result<BlobId, NodeError>>,
493 ) -> Result<(), Result<BlobId, NodeError>> {
494 let validator = self.client.lock().await;
495 let blob = Blob::new(content);
496 let id = blob.id();
497 let storage = validator.state.storage_client();
498 let result = match storage.maybe_write_blobs(&[blob]).await {
499 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
500 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
501 Err(error) => Err(error.into()),
502 };
503 sender.send(result)
504 }
505
506 async fn do_download_blob(
507 self,
508 blob_id: BlobId,
509 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
510 ) -> Result<(), Result<BlobContent, NodeError>> {
511 let validator = self.client.lock().await;
512 let blob = validator
513 .state
514 .storage_client()
515 .read_blob(blob_id)
516 .await
517 .map_err(Into::into);
518 let blob = match blob {
519 Ok(blob) => blob.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
520 Err(error) => Err(error),
521 };
522 sender.send(blob.map(|blob| blob.into_content()))
523 }
524
525 async fn do_download_pending_blob(
526 self,
527 chain_id: ChainId,
528 blob_id: BlobId,
529 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
530 ) -> Result<(), Result<BlobContent, NodeError>> {
531 let validator = self.client.lock().await;
532 let result = validator
533 .state
534 .download_pending_blob(chain_id, blob_id)
535 .await
536 .map_err(Into::into);
537 sender.send(result.map(|blob| blob.into_content()))
538 }
539
540 async fn do_handle_pending_blob(
541 self,
542 chain_id: ChainId,
543 blob: BlobContent,
544 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
545 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
546 let validator = self.client.lock().await;
547 let result = validator
548 .state
549 .handle_pending_blob(chain_id, Blob::new(blob))
550 .await
551 .map_err(Into::into);
552 sender.send(result)
553 }
554
555 async fn do_download_certificate(
556 self,
557 hash: CryptoHash,
558 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
559 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
560 let validator = self.client.lock().await;
561 let certificate = validator
562 .state
563 .storage_client()
564 .read_certificate(hash)
565 .await
566 .map_err(Into::into);
567
568 let certificate = match certificate {
569 Err(error) => Err(error),
570 Ok(entry) => match entry {
571 Some(certificate) => Ok(certificate),
572 None => {
573 panic!("Missing certificate: {hash}");
574 }
575 },
576 };
577
578 sender.send(certificate)
579 }
580
581 async fn do_download_certificates(
582 self,
583 hashes: Vec<CryptoHash>,
584 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
585 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
586 let validator = self.client.lock().await;
587 let certificates = validator
588 .state
589 .storage_client()
590 .read_certificates(&hashes)
591 .await
592 .map_err(Into::into);
593
594 let certificates = match certificates {
595 Err(error) => Err(error),
596 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
597 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
598 ResultReadCertificates::InvalidHashes(hashes) => {
599 panic!("Missing certificates: {:?}", hashes)
600 }
601 },
602 };
603
604 sender.send(certificates)
605 }
606
607 async fn do_download_certificates_by_heights(
608 self,
609 chain_id: ChainId,
610 heights: Vec<BlockHeight>,
611 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
612 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
613 let (query_sender, query_receiver) = oneshot::channel();
615 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
616
617 let self_clone = self.clone();
618 self.do_handle_chain_info_query(query, query_sender)
619 .await
620 .expect("Failed to handle chain info query");
621
622 let chain_info_response = query_receiver.await.map_err(|_| {
624 Err(NodeError::ClientIoError {
625 error: "Failed to receive chain info response".to_string(),
626 })
627 })?;
628
629 let hashes = match chain_info_response {
630 Ok(response) => response.info.requested_sent_certificate_hashes,
631 Err(e) => {
632 return sender.send(Err(e));
633 }
634 };
635
636 let (cert_sender, cert_receiver) = oneshot::channel();
638 self_clone
639 .do_download_certificates(hashes, cert_sender)
640 .await?;
641
642 let result = cert_receiver.await.map_err(|_| {
644 Err(NodeError::ClientIoError {
645 error: "Failed to receive certificates".to_string(),
646 })
647 })?;
648
649 sender.send(result)
650 }
651
652 async fn do_blob_last_used_by(
653 self,
654 blob_id: BlobId,
655 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
656 ) -> Result<(), Result<CryptoHash, NodeError>> {
657 let validator = self.client.lock().await;
658 let blob_state = validator
659 .state
660 .storage_client()
661 .read_blob_state(blob_id)
662 .await
663 .map_err(Into::into);
664 let certificate_hash = match blob_state {
665 Err(err) => Err(err),
666 Ok(blob_state) => match blob_state {
667 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
668 Some(blob_state) => blob_state
669 .last_used_by
670 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
671 },
672 };
673
674 sender.send(certificate_hash)
675 }
676
677 async fn do_blob_last_used_by_certificate(
678 self,
679 blob_id: BlobId,
680 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
681 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
682 match self.blob_last_used_by(blob_id).await {
683 Ok(cert_hash) => {
684 let cert = self.download_certificate(cert_hash).await;
685 sender.send(cert)
686 }
687 Err(err) => sender.send(Err(err)),
688 }
689 }
690
691 async fn do_missing_blob_ids(
692 self,
693 blob_ids: Vec<BlobId>,
694 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
695 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
696 let validator = self.client.lock().await;
697 let missing_blob_ids = validator
698 .state
699 .storage_client()
700 .missing_blobs(&blob_ids)
701 .await
702 .map_err(Into::into);
703 sender.send(missing_blob_ids)
704 }
705}
706
707#[derive(Clone)]
708pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
709where
710 S: Storage;
711
712impl<S> NodeProvider<S>
713where
714 S: Storage + Clone,
715{
716 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
717 self.0.lock().unwrap().clone()
718 }
719}
720
721impl<S> ValidatorNodeProvider for NodeProvider<S>
722where
723 S: Storage + Clone + Send + Sync + 'static,
724{
725 type Node = LocalValidatorClient<S>;
726
727 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
728 unimplemented!()
729 }
730
731 fn make_nodes_from_list<A>(
732 &self,
733 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
734 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
735 where
736 A: AsRef<str>,
737 {
738 let list = self.0.lock().unwrap();
739 Ok(validators
740 .into_iter()
741 .map(|(public_key, address)| {
742 list.iter()
743 .find(|client| client.public_key == public_key)
744 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
745 address: address.as_ref().to_string(),
746 })
747 .map(|client| (public_key, client.clone()))
748 })
749 .collect::<Result<Vec<_>, _>>()?
750 .into_iter())
751 }
752}
753
754impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
755where
756 S: Storage,
757{
758 fn from_iter<T>(iter: T) -> Self
759 where
760 T: IntoIterator<Item = LocalValidatorClient<S>>,
761 {
762 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
763 }
764}
765
766pub struct TestBuilder<B: StorageBuilder> {
773 storage_builder: B,
774 pub initial_committee: Committee,
775 admin_description: Option<ChainDescription>,
776 network_description: Option<NetworkDescription>,
777 genesis_storage_builder: GenesisStorageBuilder,
778 node_provider: NodeProvider<B::Storage>,
779 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
780 chain_client_storages: Vec<B::Storage>,
781 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
782 pub signer: TestSigner,
783}
784
785#[async_trait]
786pub trait StorageBuilder {
787 type Storage: Storage + Clone + Send + Sync + 'static;
788
789 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
790
791 fn clock(&self) -> &TestClock;
792}
793
794#[derive(Default)]
795struct GenesisStorageBuilder {
796 accounts: Vec<GenesisAccount>,
797}
798
799struct GenesisAccount {
800 description: ChainDescription,
801 public_key: AccountPublicKey,
802}
803
804impl GenesisStorageBuilder {
805 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
806 self.accounts.push(GenesisAccount {
807 description,
808 public_key,
809 })
810 }
811
812 async fn build<S>(&self, storage: S) -> S
813 where
814 S: Storage + Clone + Send + Sync + 'static,
815 {
816 for account in &self.accounts {
817 storage
818 .create_chain(account.description.clone())
819 .await
820 .unwrap();
821 }
822 storage
823 }
824}
825
826pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
827
828impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
829 pub async fn read_confirmed_blocks_downward(
831 &self,
832 from: CryptoHash,
833 limit: u32,
834 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
835 let mut hash = Some(from);
836 let mut values = Vec::new();
837 for _ in 0..limit {
838 let Some(next_hash) = hash else {
839 break;
840 };
841 let value = self.read_confirmed_block(next_hash).await?;
842 hash = value.block().header.previous_block_hash;
843 values.push(value);
844 }
845 Ok(values)
846 }
847}
848
849impl<B> TestBuilder<B>
850where
851 B: StorageBuilder,
852{
853 pub async fn new(
854 mut storage_builder: B,
855 count: usize,
856 with_faulty_validators: usize,
857 mut signer: TestSigner,
858 ) -> Result<Self, anyhow::Error> {
859 let mut validators = Vec::new();
860 for _ in 0..count {
861 let validator_keypair = ValidatorKeypair::generate();
862 let account_public_key = signer.generate_new();
863 validators.push((validator_keypair, account_public_key));
864 }
865 let for_committee = validators
866 .iter()
867 .map(|(validating, account)| (validating.public_key, *account))
868 .collect::<Vec<_>>();
869 let initial_committee = Committee::make_simple(for_committee);
870 let mut validator_clients = Vec::new();
871 let mut validator_storages = HashMap::new();
872 let mut faulty_validators = HashSet::new();
873 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
874 let validator_public_key = validator_keypair.public_key;
875 let storage = storage_builder.build().await?;
876 let config = ChainWorkerConfig {
877 nickname: format!("Node {}", i),
878 ..ChainWorkerConfig::default()
879 }
880 .with_key_pair(Some(validator_keypair.secret_key));
881 let state = WorkerState::new(storage.clone(), config, None);
882 let mut validator = LocalValidatorClient::new(validator_public_key, state);
883 if i < with_faulty_validators {
884 faulty_validators.insert(validator_public_key);
885 validator.set_fault_type(FaultType::NoChains);
886 }
887 validator_clients.push(validator);
888 validator_storages.insert(validator_public_key, storage);
889 }
890 tracing::info!(
891 "Test will use the following faulty validators: {:?}",
892 faulty_validators
893 );
894 Ok(Self {
895 storage_builder,
896 initial_committee,
897 admin_description: None,
898 network_description: None,
899 genesis_storage_builder: GenesisStorageBuilder::default(),
900 node_provider: NodeProvider::from_iter(validator_clients),
901 validator_storages,
902 chain_client_storages: Vec::new(),
903 chain_owners: BTreeMap::new(),
904 signer,
905 })
906 }
907
908 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
909 let validators = self.initial_committee.validators().clone();
910 self.initial_committee =
911 Committee::new(validators, policy).expect("committee votes should not overflow");
912 self
913 }
914
915 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
916 let mut faulty_validators = vec![];
917 let mut validator_clients = self.node_provider.0.lock().unwrap();
918 for index in indexes.as_ref() {
919 let validator = &mut validator_clients[*index];
920 validator.set_fault_type(fault_type);
921 faulty_validators.push(validator.public_key);
922 }
923 tracing::info!(
924 "Making the following validators {:?}: {:?}",
925 fault_type,
926 faulty_validators
927 );
928 }
929
930 pub async fn add_root_chain(
935 &mut self,
936 index: u32,
937 balance: Amount,
938 ) -> anyhow::Result<ChainClient<B::Storage>> {
939 if self.admin_description.is_none() && index != 0 {
941 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
942 }
943 let origin = ChainOrigin::Root(index);
944 let public_key = self.signer.generate_new();
945 let open_chain_config = InitialChainConfig {
946 ownership: ChainOwnership::single(public_key.into()),
947 epoch: Epoch(0),
948 min_active_epoch: Epoch(0),
949 max_active_epoch: Epoch(0),
950 balance,
951 application_permissions: ApplicationPermissions::default(),
952 };
953 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
954 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
955 if index == 0 {
956 self.admin_description = Some(description.clone());
957 self.network_description = Some(NetworkDescription {
958 admin_chain_id: description.id(),
959 genesis_config_hash: CryptoHash::test_hash("genesis config"),
961 genesis_timestamp: Timestamp::from(0),
962 genesis_committee_blob_hash: committee_blob.id().hash,
963 name: "test network".to_string(),
964 });
965 }
966 self.genesis_storage_builder
968 .add(description.clone(), public_key);
969
970 let network_description = self.network_description.as_ref().unwrap();
971
972 for validator in self.node_provider.all_nodes() {
973 let storage = self
974 .validator_storages
975 .get_mut(&validator.public_key)
976 .unwrap();
977 storage
978 .write_network_description(network_description)
979 .await
980 .expect("writing the NetworkDescription should succeed");
981 storage
982 .write_blob(&committee_blob)
983 .await
984 .expect("writing a blob should succeed");
985 storage.create_chain(description.clone()).await.unwrap();
986 }
987 for storage in &mut self.chain_client_storages {
988 storage.create_chain(description.clone()).await.unwrap();
989 }
990 let chain_id = description.id();
991 self.chain_owners.insert(chain_id, public_key.into());
992 self.make_client(chain_id, None, BlockHeight::ZERO).await
993 }
994
995 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
996 let mut result = Vec::new();
997 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
998 assert_eq!(
999 genesis_account.description.origin(),
1000 ChainOrigin::Root(i as u32)
1001 );
1002 result.push((
1003 genesis_account.public_key,
1004 genesis_account.description.config().balance,
1005 ));
1006 }
1007 result
1008 }
1009
1010 pub fn admin_chain_id(&self) -> ChainId {
1011 self.admin_description
1012 .as_ref()
1013 .expect("admin chain not initialized")
1014 .id()
1015 }
1016
1017 pub fn admin_description(&self) -> Option<&ChainDescription> {
1018 self.admin_description.as_ref()
1019 }
1020
1021 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1022 self.node_provider.clone()
1023 }
1024
1025 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1026 self.node_provider.0.lock().unwrap()[index].clone()
1027 }
1028
1029 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1030 let storage = self.storage_builder.build().await?;
1031 let network_description = self.network_description.as_ref().unwrap();
1032 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1033 storage
1034 .write_network_description(network_description)
1035 .await
1036 .expect("writing the NetworkDescription should succeed");
1037 storage
1038 .write_blob(&committee_blob)
1039 .await
1040 .expect("writing a blob should succeed");
1041 Ok(self.genesis_storage_builder.build(storage).await)
1042 }
1043
1044 pub async fn make_client_with_options(
1045 &mut self,
1046 chain_id: ChainId,
1047 block_hash: Option<CryptoHash>,
1048 block_height: BlockHeight,
1049 options: chain_client::Options,
1050 follow_only: bool,
1051 ) -> anyhow::Result<ChainClient<B::Storage>> {
1052 let storage = self.make_storage().await?;
1055 self.chain_client_storages.push(storage.clone());
1056 let mode = if follow_only {
1057 crate::client::ListeningMode::FollowChain
1058 } else {
1059 crate::client::ListeningMode::FullChain
1060 };
1061 let client = Arc::new(Client::new(
1062 crate::environment::Impl {
1063 network: self.make_node_provider(),
1064 storage,
1065 signer: self.signer.clone(),
1066 wallet: TestWallet::default(),
1067 },
1068 self.admin_chain_id(),
1069 false,
1070 [(chain_id, mode)],
1071 format!("Client node for {:.8}", chain_id),
1072 Some(Duration::from_secs(30)),
1073 Some(Duration::from_secs(1)),
1074 options,
1075 5_000,
1076 10_000,
1077 &crate::client::RequestsSchedulerConfig::default(),
1078 ));
1079 Ok(client.create_chain_client(
1080 chain_id,
1081 block_hash,
1082 block_height,
1083 &None,
1084 self.chain_owners.get(&chain_id).copied(),
1085 None,
1086 follow_only,
1087 ))
1088 }
1089
1090 pub async fn make_client(
1091 &mut self,
1092 chain_id: ChainId,
1093 block_hash: Option<CryptoHash>,
1094 block_height: BlockHeight,
1095 ) -> anyhow::Result<ChainClient<B::Storage>> {
1096 self.make_client_with_options(
1097 chain_id,
1098 block_hash,
1099 block_height,
1100 chain_client::Options::test_default(),
1101 false,
1102 )
1103 .await
1104 }
1105
1106 pub async fn check_that_validators_have_certificate(
1108 &self,
1109 chain_id: ChainId,
1110 block_height: BlockHeight,
1111 target_count: usize,
1112 ) -> Option<ConfirmedBlockCertificate> {
1113 let query = ChainInfoQuery::new(chain_id)
1114 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1115 let mut count = 0;
1116 let mut certificate = None;
1117 for validator in self.node_provider.all_nodes() {
1118 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1119 if response.check(validator.public_key).is_ok() {
1120 let ChainInfo {
1121 mut requested_sent_certificate_hashes,
1122 ..
1123 } = *response.info;
1124 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1125 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1126 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1127 if cert.inner().block().header.chain_id == chain_id
1128 && cert.inner().block().header.height == block_height
1129 {
1130 cert.check(&self.initial_committee).unwrap();
1131 count += 1;
1132 certificate = Some(cert);
1133 }
1134 }
1135 }
1136 }
1137 }
1138 }
1139 assert!(count >= target_count);
1140 certificate
1141 }
1142
1143 pub async fn check_that_validators_are_in_round(
1146 &self,
1147 chain_id: ChainId,
1148 block_height: BlockHeight,
1149 round: Round,
1150 target_count: usize,
1151 ) {
1152 let query = ChainInfoQuery::new(chain_id);
1153 let mut count = 0;
1154 for validator in self.node_provider.all_nodes() {
1155 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1156 if response.info.manager.current_round == round
1157 && response.info.next_block_height == block_height
1158 && response.check(validator.public_key).is_ok()
1159 {
1160 count += 1;
1161 }
1162 }
1163 }
1164 assert!(count >= target_count);
1165 }
1166
1167 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1169 for validator in self.node_provider.all_nodes() {
1170 let guard = validator.client.lock().await;
1171 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1172 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1173 }
1174 }
1175}
1176
1177#[cfg(feature = "rocksdb")]
1178static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1180
1181#[derive(Default)]
1182pub struct MemoryStorageBuilder {
1183 namespace: String,
1184 instance_counter: usize,
1185 wasm_runtime: Option<WasmRuntime>,
1186 clock: TestClock,
1187}
1188
1189#[async_trait]
1190impl StorageBuilder for MemoryStorageBuilder {
1191 type Storage = DbStorage<MemoryDatabase, TestClock>;
1192
1193 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1194 self.instance_counter += 1;
1195 let config = MemoryDatabase::new_test_config().await?;
1196 if self.namespace.is_empty() {
1197 self.namespace = generate_test_namespace();
1198 }
1199 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1200 Ok(
1201 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1202 .await?,
1203 )
1204 }
1205
1206 fn clock(&self) -> &TestClock {
1207 &self.clock
1208 }
1209}
1210
1211impl MemoryStorageBuilder {
1212 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1215 MemoryStorageBuilder {
1216 wasm_runtime: wasm_runtime.into(),
1217 ..MemoryStorageBuilder::default()
1218 }
1219 }
1220}
1221
1222#[cfg(feature = "rocksdb")]
1223pub struct RocksDbStorageBuilder {
1224 namespace: String,
1225 instance_counter: usize,
1226 wasm_runtime: Option<WasmRuntime>,
1227 clock: TestClock,
1228 _permit: SemaphorePermit<'static>,
1229}
1230
1231#[cfg(feature = "rocksdb")]
1232impl RocksDbStorageBuilder {
1233 pub async fn new() -> Self {
1234 RocksDbStorageBuilder {
1235 namespace: String::new(),
1236 instance_counter: 0,
1237 wasm_runtime: None,
1238 clock: TestClock::default(),
1239 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1240 }
1241 }
1242
1243 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1246 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1247 RocksDbStorageBuilder {
1248 wasm_runtime: wasm_runtime.into(),
1249 ..RocksDbStorageBuilder::new().await
1250 }
1251 }
1252}
1253
1254#[cfg(feature = "rocksdb")]
1255#[async_trait]
1256impl StorageBuilder for RocksDbStorageBuilder {
1257 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1258
1259 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1260 self.instance_counter += 1;
1261 let config = RocksDbDatabase::new_test_config().await?;
1262 if self.namespace.is_empty() {
1263 self.namespace = generate_test_namespace();
1264 }
1265 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1266 Ok(
1267 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1268 .await?,
1269 )
1270 }
1271
1272 fn clock(&self) -> &TestClock {
1273 &self.clock
1274 }
1275}
1276
1277#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1278#[derive(Default)]
1279pub struct ServiceStorageBuilder {
1280 namespace: String,
1281 instance_counter: usize,
1282 wasm_runtime: Option<WasmRuntime>,
1283 clock: TestClock,
1284}
1285
1286#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1287impl ServiceStorageBuilder {
1288 pub fn new() -> Self {
1290 Self::with_wasm_runtime(None)
1291 }
1292
1293 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1295 ServiceStorageBuilder {
1296 wasm_runtime: wasm_runtime.into(),
1297 ..ServiceStorageBuilder::default()
1298 }
1299 }
1300}
1301
1302#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1303#[async_trait]
1304impl StorageBuilder for ServiceStorageBuilder {
1305 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1306
1307 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1308 self.instance_counter += 1;
1309 let config = StorageServiceDatabase::new_test_config().await?;
1310 if self.namespace.is_empty() {
1311 self.namespace = generate_test_namespace();
1312 }
1313 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1314 Ok(
1315 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1316 .await?,
1317 )
1318 }
1319
1320 fn clock(&self) -> &TestClock {
1321 &self.clock
1322 }
1323}
1324
1325#[cfg(feature = "dynamodb")]
1326#[derive(Default)]
1327pub struct DynamoDbStorageBuilder {
1328 namespace: String,
1329 instance_counter: usize,
1330 wasm_runtime: Option<WasmRuntime>,
1331 clock: TestClock,
1332}
1333
1334#[cfg(feature = "dynamodb")]
1335impl DynamoDbStorageBuilder {
1336 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1339 DynamoDbStorageBuilder {
1340 wasm_runtime: wasm_runtime.into(),
1341 ..DynamoDbStorageBuilder::default()
1342 }
1343 }
1344}
1345
1346#[cfg(feature = "dynamodb")]
1347#[async_trait]
1348impl StorageBuilder for DynamoDbStorageBuilder {
1349 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1350
1351 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1352 self.instance_counter += 1;
1353 let config = DynamoDbDatabase::new_test_config().await?;
1354 if self.namespace.is_empty() {
1355 self.namespace = generate_test_namespace();
1356 }
1357 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1358 Ok(
1359 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1360 .await?,
1361 )
1362 }
1363
1364 fn clock(&self) -> &TestClock {
1365 &self.clock
1366 }
1367}
1368
1369#[cfg(feature = "scylladb")]
1370#[derive(Default)]
1371pub struct ScyllaDbStorageBuilder {
1372 namespace: String,
1373 instance_counter: usize,
1374 wasm_runtime: Option<WasmRuntime>,
1375 clock: TestClock,
1376}
1377
1378#[cfg(feature = "scylladb")]
1379impl ScyllaDbStorageBuilder {
1380 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1383 ScyllaDbStorageBuilder {
1384 wasm_runtime: wasm_runtime.into(),
1385 ..ScyllaDbStorageBuilder::default()
1386 }
1387 }
1388}
1389
1390#[cfg(feature = "scylladb")]
1391#[async_trait]
1392impl StorageBuilder for ScyllaDbStorageBuilder {
1393 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1394
1395 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1396 self.instance_counter += 1;
1397 let config = ScyllaDbDatabase::new_test_config().await?;
1398 if self.namespace.is_empty() {
1399 self.namespace = generate_test_namespace();
1400 }
1401 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1402 Ok(
1403 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1404 .await?,
1405 )
1406 }
1407
1408 fn clock(&self) -> &TestClock {
1409 &self.clock
1410 }
1411}
1412
1413pub trait ClientOutcomeResultExt<T, E> {
1414 fn unwrap_ok_committed(self) -> T;
1417
1418 fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>>;
1421}
1422
1423impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1424 fn unwrap_ok_committed(self) -> T {
1425 match self.unwrap() {
1426 ClientOutcome::Committed(t) => t,
1427 ClientOutcome::WaitForTimeout(timeout) => {
1428 panic!("unexpected timeout: {timeout}")
1429 }
1430 ClientOutcome::Conflict(certificate) => {
1431 panic!("unexpected conflict: {}", certificate.hash())
1432 }
1433 }
1434 }
1435
1436 fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>> {
1437 match self.unwrap() {
1438 ClientOutcome::Committed(t) => Ok(t),
1439 ClientOutcome::Conflict(certificate) => Err(certificate),
1440 ClientOutcome::WaitForTimeout(timeout) => {
1441 panic!("unexpected timeout: {timeout}")
1442 }
1443 }
1444 }
1445}