1use std::{
9 collections::HashMap,
10 io,
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14
15use cargo_toml::Manifest;
16use futures::future;
17use linera_base::{
18 crypto::{AccountPublicKey, AccountSecretKey},
19 data_types::{
20 Amount, ApplicationDescription, Blob, BlockHeight, Bytecode, ChainDescription,
21 CompressedBytecode, Epoch,
22 },
23 identifiers::{AccountOwner, ApplicationId, ChainId, ModuleId},
24 vm::VmRuntime,
25};
26use linera_chain::{types::ConfirmedBlockCertificate, ChainExecutionContext};
27use linera_core::{data_types::ChainInfoQuery, worker::WorkerError};
28use linera_execution::{
29 system::{SystemOperation, SystemQuery, SystemResponse},
30 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, ResourceTracker,
31};
32use linera_storage::Storage as _;
33use serde::Serialize;
34use tokio::{fs, sync::Mutex};
35
36use super::{BlockBuilder, TestValidator};
37use crate::{ContractAbi, ServiceAbi};
38
39pub struct ActiveChain {
41 key_pair: AccountSecretKey,
42 description: ChainDescription,
43 tip: Arc<Mutex<Option<ConfirmedBlockCertificate>>>,
44 validator: TestValidator,
45}
46
47impl Clone for ActiveChain {
48 fn clone(&self) -> Self {
49 ActiveChain {
50 key_pair: self.key_pair.copy(),
51 description: self.description.clone(),
52 tip: self.tip.clone(),
53 validator: self.validator.clone(),
54 }
55 }
56}
57
58impl ActiveChain {
59 pub fn new(
65 key_pair: AccountSecretKey,
66 description: ChainDescription,
67 validator: TestValidator,
68 ) -> Self {
69 ActiveChain {
70 key_pair,
71 description,
72 tip: Arc::default(),
73 validator,
74 }
75 }
76
77 pub fn id(&self) -> ChainId {
79 self.description.id()
80 }
81
82 pub fn public_key(&self) -> AccountPublicKey {
84 self.key_pair.public()
85 }
86
87 pub fn key_pair(&self) -> &AccountSecretKey {
89 &self.key_pair
90 }
91
92 pub fn set_key_pair(&mut self, key_pair: AccountSecretKey) {
94 self.key_pair = key_pair
95 }
96
97 pub async fn epoch(&self) -> Epoch {
99 *self
100 .validator
101 .worker()
102 .chain_state_view(self.id())
103 .await
104 .expect("Failed to load chain")
105 .execution_state
106 .system
107 .epoch
108 .get()
109 }
110
111 pub async fn chain_balance(&self) -> Amount {
113 let query = Query::System(SystemQuery);
114
115 let QueryOutcome { response, .. } = self
116 .validator
117 .worker()
118 .query_application(self.id(), query, None)
119 .await
120 .expect("Failed to query chain's balance");
121
122 let QueryResponse::System(SystemResponse { balance, .. }) = response else {
123 panic!("Unexpected response from system application");
124 };
125
126 balance
127 }
128
129 pub async fn owner_balance(&self, owner: &AccountOwner) -> Option<Amount> {
131 let chain_state = self
132 .validator
133 .worker()
134 .chain_state_view(self.id())
135 .await
136 .expect("Failed to read chain state");
137
138 chain_state
139 .execution_state
140 .system
141 .balances
142 .get(owner)
143 .await
144 .expect("Failed to read owner balance")
145 }
146
147 pub async fn owner_balances(
149 &self,
150 owners: impl IntoIterator<Item = AccountOwner>,
151 ) -> HashMap<AccountOwner, Option<Amount>> {
152 let chain_state = self
153 .validator
154 .worker()
155 .chain_state_view(self.id())
156 .await
157 .expect("Failed to read chain state");
158
159 let mut balances = HashMap::new();
160
161 for owner in owners {
162 let balance = chain_state
163 .execution_state
164 .system
165 .balances
166 .get(&owner)
167 .await
168 .expect("Failed to read an owner's balance");
169
170 balances.insert(owner, balance);
171 }
172
173 balances
174 }
175
176 pub async fn accounts(&self) -> Vec<AccountOwner> {
178 let chain_state = self
179 .validator
180 .worker()
181 .chain_state_view(self.id())
182 .await
183 .expect("Failed to read chain state");
184
185 chain_state
186 .execution_state
187 .system
188 .balances
189 .indices()
190 .await
191 .expect("Failed to list accounts on the chain")
192 }
193
194 pub async fn all_owner_balances(&self) -> HashMap<AccountOwner, Amount> {
196 self.owner_balances(self.accounts().await)
197 .await
198 .into_iter()
199 .map(|(owner, balance)| {
200 (
201 owner,
202 balance.expect("`accounts` should only return accounts with non-zero balance"),
203 )
204 })
205 .collect()
206 }
207
208 pub async fn add_block(
215 &self,
216 block_builder: impl FnOnce(&mut BlockBuilder),
217 ) -> (ConfirmedBlockCertificate, ResourceTracker) {
218 self.try_add_block(block_builder)
219 .await
220 .expect("Failed to execute block.")
221 }
222
223 pub async fn add_block_with_blobs(
230 &self,
231 block_builder: impl FnOnce(&mut BlockBuilder),
232 blobs: Vec<Blob>,
233 ) -> (ConfirmedBlockCertificate, ResourceTracker) {
234 self.try_add_block_with_blobs(block_builder, blobs)
235 .await
236 .expect("Failed to execute block.")
237 }
238
239 pub async fn try_add_block(
246 &self,
247 block_builder: impl FnOnce(&mut BlockBuilder),
248 ) -> Result<(ConfirmedBlockCertificate, ResourceTracker), WorkerError> {
249 self.try_add_block_with_blobs(block_builder, vec![]).await
250 }
251
252 async fn try_add_block_with_blobs(
262 &self,
263 block_builder: impl FnOnce(&mut BlockBuilder),
264 blobs: Vec<Blob>,
265 ) -> Result<(ConfirmedBlockCertificate, ResourceTracker), WorkerError> {
266 let mut tip = self.tip.lock().await;
267 let mut block = BlockBuilder::new(
268 self.description.id(),
269 self.key_pair.public().into(),
270 self.epoch().await,
271 tip.as_ref(),
272 self.validator.clone(),
273 );
274
275 block_builder(&mut block);
276
277 let (certificate, resource_tracker) = Box::pin(block.try_sign(&blobs)).await?;
279
280 let result = self
281 .validator
282 .worker()
283 .fully_handle_certificate_with_notifications(certificate.clone(), &())
284 .await;
285 if let Err(WorkerError::BlobsNotFound(_)) = &result {
286 self.validator.storage().maybe_write_blobs(&blobs).await?;
287 self.validator
288 .worker()
289 .fully_handle_certificate_with_notifications(certificate.clone(), &())
290 .await
291 .expect("Rejected certificate");
292 } else {
293 result.expect("Rejected certificate");
294 }
295
296 *tip = Some(certificate.clone());
297
298 Ok((certificate, resource_tracker))
299 }
300
301 pub async fn handle_received_messages(&self) -> Option<ConfirmedBlockCertificate> {
308 let chain_id = self.id();
309 let (information, _) = self
310 .validator
311 .worker()
312 .handle_chain_info_query(ChainInfoQuery::new(chain_id).with_pending_message_bundles())
313 .await
314 .expect("Failed to query chain's pending messages");
315 let messages = information.info.requested_pending_message_bundles;
316 if messages.is_empty() {
319 return None;
320 }
321 let (certificate, _) = self
322 .add_block(|block| {
323 block.with_incoming_bundles(messages);
324 })
325 .await;
326 Some(certificate)
327 }
328
329 pub async fn handle_new_events(&self) {
333 let chain_id = self.id();
334 let worker = self.validator.worker();
335 let subscription_map = worker
336 .chain_state_view(chain_id)
337 .await
338 .expect("Failed to query chain state view")
339 .execution_state
340 .system
341 .event_subscriptions
342 .index_values()
343 .await
344 .expect("Failed to query chain's event subscriptions");
345 let futures = subscription_map
347 .into_iter()
348 .map(|((chain_id, stream_id), subscriptions)| {
349 let worker = worker.clone();
350 async move {
351 worker
352 .chain_state_view(chain_id)
353 .await
354 .expect("Failed to query chain state view")
355 .execution_state
356 .system
357 .stream_event_counts
358 .get(&stream_id)
359 .await
360 .expect("Failed to query chain's event counts")
361 .filter(|next_index| *next_index > subscriptions.next_index)
362 .map(|next_index| (chain_id, stream_id, next_index))
363 }
364 });
365 let updates = future::join_all(futures)
366 .await
367 .into_iter()
368 .flatten()
369 .collect::<Vec<_>>();
370 assert!(!updates.is_empty(), "No new events to process");
371
372 self.add_block(|block| {
373 block.with_system_operation(SystemOperation::UpdateStreams(updates));
374 })
375 .await;
376 }
377
378 pub async fn publish_current_module<Abi, Parameters, InstantiationArgument>(
384 &self,
385 ) -> ModuleId<Abi, Parameters, InstantiationArgument> {
386 self.publish_bytecode_files_in(".").await
387 }
388
389 pub async fn publish_bytecode_files_in<Abi, Parameters, InstantiationArgument>(
395 &self,
396 repository_path: impl AsRef<Path>,
397 ) -> ModuleId<Abi, Parameters, InstantiationArgument> {
398 let repository_path = fs::canonicalize(repository_path)
399 .await
400 .expect("Failed to obtain absolute application repository path");
401 Self::build_bytecode_files_in(&repository_path);
402 let (contract, service) = Self::find_compressed_bytecode_files_in(&repository_path).await;
403 let contract_blob = Blob::new_contract_bytecode(contract);
404 let service_blob = Blob::new_service_bytecode(service);
405 let contract_blob_hash = contract_blob.id().hash;
406 let service_blob_hash = service_blob.id().hash;
407 let vm_runtime = VmRuntime::Wasm;
408
409 let module_id = ModuleId::new(contract_blob_hash, service_blob_hash, vm_runtime);
410
411 let (certificate, _) = self
412 .add_block_with_blobs(
413 |block| {
414 block.with_system_operation(SystemOperation::PublishModule { module_id });
415 },
416 vec![contract_blob, service_blob],
417 )
418 .await;
419
420 let block = certificate.inner().block();
421 assert_eq!(block.messages().len(), 1);
422 assert_eq!(block.messages()[0].len(), 0);
423
424 module_id.with_abi()
425 }
426
427 pub fn build_bytecode_files_in(repository: &Path) {
429 let output = std::process::Command::new("cargo")
430 .args(["build", "--release", "--target", "wasm32-unknown-unknown"])
431 .current_dir(repository)
432 .output()
433 .expect("Failed to build Wasm binaries");
434
435 assert!(
436 output.status.success(),
437 "Failed to build bytecode binaries.\nstdout: {}\nstderr: {}",
438 String::from_utf8_lossy(&output.stdout),
439 String::from_utf8_lossy(&output.stderr)
440 );
441 }
442
443 pub async fn find_bytecode_files_in(repository: &Path) -> (Bytecode, Bytecode) {
449 let manifest_path = repository.join("Cargo.toml");
450 let cargo_manifest =
451 Manifest::from_path(manifest_path).expect("Failed to load Cargo.toml manifest");
452
453 let binaries = cargo_manifest
454 .bin
455 .into_iter()
456 .filter_map(|binary| binary.name)
457 .filter(|name| name.ends_with("service") || name.ends_with("contract"))
458 .collect::<Vec<_>>();
459
460 assert_eq!(
461 binaries.len(),
462 2,
463 "Could not figure out contract and service bytecode binaries.\
464 Please specify them manually using `publish_module`."
465 );
466
467 let (contract_binary, service_binary) = if binaries[0].ends_with("contract") {
468 (&binaries[0], &binaries[1])
469 } else {
470 (&binaries[1], &binaries[0])
471 };
472
473 let base_path = Self::find_output_directory_of(repository)
474 .await
475 .expect("Failed to look for output binaries");
476 let contract_path = base_path.join(format!("{}.wasm", contract_binary));
477 let service_path = base_path.join(format!("{}.wasm", service_binary));
478
479 let contract = Bytecode::load_from_file(contract_path)
480 .await
481 .expect("Failed to load contract bytecode from file");
482 let service = Bytecode::load_from_file(service_path)
483 .await
484 .expect("Failed to load service bytecode from file");
485 (contract, service)
486 }
487
488 pub async fn find_compressed_bytecode_files_in(
491 repository: &Path,
492 ) -> (CompressedBytecode, CompressedBytecode) {
493 let (contract, service) = Self::find_bytecode_files_in(repository).await;
494 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
495 .await
496 .expect("Failed to compress bytecode files")
497 }
498
499 async fn find_output_directory_of(repository: &Path) -> Result<PathBuf, io::Error> {
506 let output_sub_directory = Path::new("target/wasm32-unknown-unknown/release");
507 let mut current_directory = repository;
508 let mut output_path = current_directory.join(output_sub_directory);
509
510 while !fs::try_exists(&output_path).await? {
511 current_directory = current_directory.parent().unwrap_or_else(|| {
512 panic!(
513 "Failed to find Wasm binary output directory in {}",
514 repository.display()
515 )
516 });
517
518 output_path = current_directory.join(output_sub_directory);
519 }
520
521 Ok(output_path)
522 }
523
524 pub async fn get_tip_height(&self) -> BlockHeight {
526 self.tip
527 .lock()
528 .await
529 .as_ref()
530 .expect("Block was not successfully added")
531 .inner()
532 .block()
533 .header
534 .height
535 }
536
537 pub async fn create_application<Abi, Parameters, InstantiationArgument>(
548 &mut self,
549 module_id: ModuleId<Abi, Parameters, InstantiationArgument>,
550 parameters: Parameters,
551 instantiation_argument: InstantiationArgument,
552 required_application_ids: Vec<ApplicationId>,
553 ) -> ApplicationId<Abi>
554 where
555 Abi: ContractAbi,
556 Parameters: Serialize,
557 InstantiationArgument: Serialize,
558 {
559 let parameters = serde_json::to_vec(¶meters).unwrap();
560 let instantiation_argument = serde_json::to_vec(&instantiation_argument).unwrap();
561
562 let (creation_certificate, _) = self
563 .add_block(|block| {
564 block.with_system_operation(SystemOperation::CreateApplication {
565 module_id: module_id.forget_abi(),
566 parameters: parameters.clone(),
567 instantiation_argument,
568 required_application_ids: required_application_ids.clone(),
569 });
570 })
571 .await;
572
573 let block = creation_certificate.inner().block();
574 assert_eq!(block.messages().len(), 1);
575
576 let description = ApplicationDescription {
577 module_id: module_id.forget_abi(),
578 creator_chain_id: block.header.chain_id,
579 block_height: block.header.height,
580 application_index: 0,
581 parameters,
582 required_application_ids,
583 };
584
585 ApplicationId::<()>::from(&description).with_abi()
586 }
587
588 pub async fn is_closed(&self) -> bool {
590 let chain = self
591 .validator
592 .worker()
593 .chain_state_view(self.id())
594 .await
595 .expect("Failed to load chain");
596 *chain.execution_state.system.closed.get()
597 }
598
599 pub async fn query<Abi>(
603 &self,
604 application_id: ApplicationId<Abi>,
605 query: Abi::Query,
606 ) -> QueryOutcome<Abi::QueryResponse>
607 where
608 Abi: ServiceAbi,
609 {
610 self.try_query(application_id, query)
611 .await
612 .expect("Failed to execute application service query")
613 }
614
615 pub async fn try_query<Abi>(
619 &self,
620 application_id: ApplicationId<Abi>,
621 query: Abi::Query,
622 ) -> Result<QueryOutcome<Abi::QueryResponse>, TryQueryError>
623 where
624 Abi: ServiceAbi,
625 {
626 let query_bytes = serde_json::to_vec(&query)?;
627
628 let QueryOutcome {
629 response,
630 operations,
631 } = self
632 .validator
633 .worker()
634 .query_application(
635 self.id(),
636 Query::User {
637 application_id: application_id.forget_abi(),
638 bytes: query_bytes,
639 },
640 None,
641 )
642 .await?;
643
644 let deserialized_response = match response {
645 QueryResponse::User(bytes) => {
646 serde_json::from_slice(&bytes).expect("Failed to deserialize query response")
647 }
648 QueryResponse::System(_) => {
649 unreachable!("User query returned a system response")
650 }
651 };
652
653 Ok(QueryOutcome {
654 response: deserialized_response,
655 operations,
656 })
657 }
658
659 pub async fn graphql_query<Abi>(
663 &self,
664 application_id: ApplicationId<Abi>,
665 query: impl Into<async_graphql::Request>,
666 ) -> QueryOutcome<serde_json::Value>
667 where
668 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
669 {
670 let query = query.into();
671 let query_str = query.query.clone();
672
673 self.try_graphql_query(application_id, query)
674 .await
675 .unwrap_or_else(|error| panic!("Service query {query_str:?} failed: {error}"))
676 }
677
678 pub async fn try_graphql_query<Abi>(
682 &self,
683 application_id: ApplicationId<Abi>,
684 query: impl Into<async_graphql::Request>,
685 ) -> Result<QueryOutcome<serde_json::Value>, TryGraphQLQueryError>
686 where
687 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
688 {
689 let query = query.into();
690 let QueryOutcome {
691 response,
692 operations,
693 } = self.try_query(application_id, query).await?;
694
695 if !response.errors.is_empty() {
696 return Err(TryGraphQLQueryError::Service(response.errors));
697 }
698 let json_response = response.data.into_json()?;
699
700 Ok(QueryOutcome {
701 response: json_response,
702 operations,
703 })
704 }
705
706 pub async fn graphql_mutation<Abi>(
711 &self,
712 application_id: ApplicationId<Abi>,
713 query: impl Into<async_graphql::Request>,
714 ) -> ConfirmedBlockCertificate
715 where
716 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
717 {
718 self.try_graphql_mutation(application_id, query)
719 .await
720 .expect("Failed to execute service GraphQL mutation")
721 }
722
723 pub async fn try_graphql_mutation<Abi>(
728 &self,
729 application_id: ApplicationId<Abi>,
730 query: impl Into<async_graphql::Request>,
731 ) -> Result<ConfirmedBlockCertificate, TryGraphQLMutationError>
732 where
733 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
734 {
735 let QueryOutcome { operations, .. } = self.try_graphql_query(application_id, query).await?;
736
737 let (certificate, _) = self
738 .try_add_block(|block| {
739 for operation in operations {
740 match operation {
741 Operation::User {
742 application_id,
743 bytes,
744 } => {
745 block.with_raw_operation(application_id, bytes);
746 }
747 Operation::System(system_operation) => {
748 block.with_system_operation(*system_operation);
749 }
750 }
751 }
752 })
753 .await?;
754
755 Ok(certificate)
756 }
757}
758
759#[derive(Debug, thiserror::Error)]
761pub enum TryQueryError {
762 #[error("Failed to serialize query request")]
764 Serialization(#[from] serde_json::Error),
765
766 #[error("Failed to execute service query")]
768 Execution(#[from] WorkerError),
769}
770
771#[derive(Debug, thiserror::Error)]
773pub enum TryGraphQLQueryError {
774 #[error("Failed to serialize GraphQL query request")]
776 RequestSerialization(#[source] serde_json::Error),
777
778 #[error("Failed to execute service query")]
780 Execution(#[from] WorkerError),
781
782 #[error("Unexpected non-JSON service query response")]
784 ResponseDeserialization(#[from] serde_json::Error),
785
786 #[error("Service returned errors: {_0:#?}")]
788 Service(Vec<async_graphql::ServerError>),
789}
790
791impl From<TryQueryError> for TryGraphQLQueryError {
792 fn from(query_error: TryQueryError) -> Self {
793 match query_error {
794 TryQueryError::Serialization(error) => {
795 TryGraphQLQueryError::RequestSerialization(error)
796 }
797 TryQueryError::Execution(error) => TryGraphQLQueryError::Execution(error),
798 }
799 }
800}
801
802impl TryGraphQLQueryError {
803 pub fn expect_execution_error(self) -> ExecutionError {
809 let TryGraphQLQueryError::Execution(worker_error) = self else {
810 panic!("Expected an `ExecutionError`. Got: {self:#?}");
811 };
812
813 worker_error.expect_execution_error(ChainExecutionContext::Query)
814 }
815}
816
817#[derive(Debug, thiserror::Error)]
819pub enum TryGraphQLMutationError {
820 #[error(transparent)]
822 Query(#[from] TryGraphQLQueryError),
823
824 #[error("Failed to propose block with operations scheduled by the GraphQL mutation")]
826 Proposal(#[from] WorkerError),
827}
828
829impl TryGraphQLMutationError {
830 pub fn expect_proposal_execution_error(self, transaction_index: u32) -> ExecutionError {
836 let TryGraphQLMutationError::Proposal(proposal_error) = self else {
837 panic!("Expected an `ExecutionError` during the block proposal. Got: {self:#?}");
838 };
839
840 proposal_error.expect_execution_error(ChainExecutionContext::Operation(transaction_index))
841 }
842}