1use std::{
9 collections::HashMap,
10 io,
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14
15use cargo_toml::Manifest;
16use futures::future;
17use linera_base::{
18 crypto::{AccountPublicKey, AccountSecretKey},
19 data_types::{
20 Amount, ApplicationDescription, Blob, BlockHeight, Bytecode, ChainDescription,
21 CompressedBytecode, Epoch,
22 },
23 identifiers::{AccountOwner, ApplicationId, ChainId, ModuleId},
24 vm::VmRuntime,
25};
26use linera_chain::{types::ConfirmedBlockCertificate, ChainExecutionContext};
27use linera_core::{data_types::ChainInfoQuery, worker::WorkerError};
28use linera_execution::{
29 system::{SystemOperation, SystemQuery, SystemResponse},
30 ExecutionError, Operation, Query, QueryOutcome, QueryResponse,
31};
32use linera_storage::Storage as _;
33use serde::Serialize;
34use tokio::{fs, sync::Mutex};
35
36use super::{BlockBuilder, TestValidator};
37use crate::{ContractAbi, ServiceAbi};
38
39pub struct ActiveChain {
41 key_pair: AccountSecretKey,
42 description: ChainDescription,
43 tip: Arc<Mutex<Option<ConfirmedBlockCertificate>>>,
44 validator: TestValidator,
45}
46
47impl Clone for ActiveChain {
48 fn clone(&self) -> Self {
49 ActiveChain {
50 key_pair: self.key_pair.copy(),
51 description: self.description.clone(),
52 tip: self.tip.clone(),
53 validator: self.validator.clone(),
54 }
55 }
56}
57
58impl ActiveChain {
59 pub fn new(
65 key_pair: AccountSecretKey,
66 description: ChainDescription,
67 validator: TestValidator,
68 ) -> Self {
69 ActiveChain {
70 key_pair,
71 description,
72 tip: Arc::default(),
73 validator,
74 }
75 }
76
77 pub fn id(&self) -> ChainId {
79 self.description.id()
80 }
81
82 pub fn public_key(&self) -> AccountPublicKey {
84 self.key_pair.public()
85 }
86
87 pub fn key_pair(&self) -> &AccountSecretKey {
89 &self.key_pair
90 }
91
92 pub fn set_key_pair(&mut self, key_pair: AccountSecretKey) {
94 self.key_pair = key_pair
95 }
96
97 pub async fn epoch(&self) -> Epoch {
99 *self
100 .validator
101 .worker()
102 .chain_state_view(self.id())
103 .await
104 .expect("Failed to load chain")
105 .execution_state
106 .system
107 .epoch
108 .get()
109 }
110
111 pub async fn chain_balance(&self) -> Amount {
113 let query = Query::System(SystemQuery);
114
115 let QueryOutcome { response, .. } = self
116 .validator
117 .worker()
118 .query_application(self.id(), query)
119 .await
120 .expect("Failed to query chain's balance");
121
122 let QueryResponse::System(SystemResponse { balance, .. }) = response else {
123 panic!("Unexpected response from system application");
124 };
125
126 balance
127 }
128
129 pub async fn owner_balance(&self, owner: &AccountOwner) -> Option<Amount> {
131 let chain_state = self
132 .validator
133 .worker()
134 .chain_state_view(self.id())
135 .await
136 .expect("Failed to read chain state");
137
138 chain_state
139 .execution_state
140 .system
141 .balances
142 .get(owner)
143 .await
144 .expect("Failed to read owner balance")
145 }
146
147 pub async fn owner_balances(
149 &self,
150 owners: impl IntoIterator<Item = AccountOwner>,
151 ) -> HashMap<AccountOwner, Option<Amount>> {
152 let chain_state = self
153 .validator
154 .worker()
155 .chain_state_view(self.id())
156 .await
157 .expect("Failed to read chain state");
158
159 let mut balances = HashMap::new();
160
161 for owner in owners {
162 let balance = chain_state
163 .execution_state
164 .system
165 .balances
166 .get(&owner)
167 .await
168 .expect("Failed to read an owner's balance");
169
170 balances.insert(owner, balance);
171 }
172
173 balances
174 }
175
176 pub async fn accounts(&self) -> Vec<AccountOwner> {
178 let chain_state = self
179 .validator
180 .worker()
181 .chain_state_view(self.id())
182 .await
183 .expect("Failed to read chain state");
184
185 chain_state
186 .execution_state
187 .system
188 .balances
189 .indices()
190 .await
191 .expect("Failed to list accounts on the chain")
192 }
193
194 pub async fn all_owner_balances(&self) -> HashMap<AccountOwner, Amount> {
196 self.owner_balances(self.accounts().await)
197 .await
198 .into_iter()
199 .map(|(owner, balance)| {
200 (
201 owner,
202 balance.expect("`accounts` should only return accounts with non-zero balance"),
203 )
204 })
205 .collect()
206 }
207
208 pub async fn add_block(
213 &self,
214 block_builder: impl FnOnce(&mut BlockBuilder),
215 ) -> ConfirmedBlockCertificate {
216 self.try_add_block(block_builder)
217 .await
218 .expect("Failed to execute block.")
219 }
220
221 pub async fn add_block_with_blobs(
226 &self,
227 block_builder: impl FnOnce(&mut BlockBuilder),
228 blobs: Vec<Blob>,
229 ) -> ConfirmedBlockCertificate {
230 self.try_add_block_with_blobs(block_builder, blobs)
231 .await
232 .expect("Failed to execute block.")
233 }
234
235 pub async fn try_add_block(
240 &self,
241 block_builder: impl FnOnce(&mut BlockBuilder),
242 ) -> Result<ConfirmedBlockCertificate, WorkerError> {
243 self.try_add_block_with_blobs(block_builder, vec![]).await
244 }
245
246 async fn try_add_block_with_blobs(
254 &self,
255 block_builder: impl FnOnce(&mut BlockBuilder),
256 blobs: Vec<Blob>,
257 ) -> Result<ConfirmedBlockCertificate, WorkerError> {
258 let mut tip = self.tip.lock().await;
259 let mut block = BlockBuilder::new(
260 self.description.id(),
261 self.key_pair.public().into(),
262 self.epoch().await,
263 tip.as_ref(),
264 self.validator.clone(),
265 );
266
267 block_builder(&mut block);
268
269 let certificate = Box::pin(block.try_sign(&blobs)).await?;
271
272 let result = self
273 .validator
274 .worker()
275 .fully_handle_certificate_with_notifications(certificate.clone(), &())
276 .await;
277 if let Err(WorkerError::BlobsNotFound(_)) = &result {
278 self.validator.storage().maybe_write_blobs(&blobs).await?;
279 self.validator
280 .worker()
281 .fully_handle_certificate_with_notifications(certificate.clone(), &())
282 .await
283 .expect("Rejected certificate");
284 } else {
285 result.expect("Rejected certificate");
286 }
287
288 *tip = Some(certificate.clone());
289
290 Ok(certificate)
291 }
292
293 pub async fn handle_received_messages(&self) {
298 let chain_id = self.id();
299 let (information, _) = self
300 .validator
301 .worker()
302 .handle_chain_info_query(ChainInfoQuery::new(chain_id).with_pending_message_bundles())
303 .await
304 .expect("Failed to query chain's pending messages");
305 let messages = information.info.requested_pending_message_bundles;
306 if messages.is_empty() {
309 return;
310 }
311 self.add_block(|block| {
312 block.with_incoming_bundles(messages);
313 })
314 .await;
315 }
316
317 pub async fn handle_new_events(&self) {
321 let chain_id = self.id();
322 let worker = self.validator.worker();
323 let subscription_map = worker
324 .chain_state_view(chain_id)
325 .await
326 .expect("Failed to query chain state view")
327 .execution_state
328 .system
329 .event_subscriptions
330 .index_values()
331 .await
332 .expect("Failed to query chain's event subscriptions");
333 let futures = subscription_map
335 .into_iter()
336 .map(|((chain_id, stream_id), subscriptions)| {
337 let worker = worker.clone();
338 async move {
339 worker
340 .chain_state_view(chain_id)
341 .await
342 .expect("Failed to query chain state view")
343 .execution_state
344 .system
345 .stream_event_counts
346 .get(&stream_id)
347 .await
348 .expect("Failed to query chain's event counts")
349 .filter(|next_index| *next_index > subscriptions.next_index)
350 .map(|next_index| (chain_id, stream_id, next_index))
351 }
352 });
353 let updates = future::join_all(futures)
354 .await
355 .into_iter()
356 .flatten()
357 .collect::<Vec<_>>();
358 assert!(!updates.is_empty(), "No new events to process");
359
360 self.add_block(|block| {
361 block.with_system_operation(SystemOperation::UpdateStreams(updates));
362 })
363 .await;
364 }
365
366 pub async fn publish_current_module<Abi, Parameters, InstantiationArgument>(
372 &self,
373 ) -> ModuleId<Abi, Parameters, InstantiationArgument> {
374 self.publish_bytecode_files_in(".").await
375 }
376
377 pub async fn publish_bytecode_files_in<Abi, Parameters, InstantiationArgument>(
383 &self,
384 repository_path: impl AsRef<Path>,
385 ) -> ModuleId<Abi, Parameters, InstantiationArgument> {
386 let repository_path = fs::canonicalize(repository_path)
387 .await
388 .expect("Failed to obtain absolute application repository path");
389 Self::build_bytecode_files_in(&repository_path);
390 let (contract, service) = Self::find_compressed_bytecode_files_in(&repository_path).await;
391 let contract_blob = Blob::new_contract_bytecode(contract);
392 let service_blob = Blob::new_service_bytecode(service);
393 let contract_blob_hash = contract_blob.id().hash;
394 let service_blob_hash = service_blob.id().hash;
395 let vm_runtime = VmRuntime::Wasm;
396
397 let module_id = ModuleId::new(contract_blob_hash, service_blob_hash, vm_runtime);
398
399 let certificate = self
400 .add_block_with_blobs(
401 |block| {
402 block.with_system_operation(SystemOperation::PublishModule { module_id });
403 },
404 vec![contract_blob, service_blob],
405 )
406 .await;
407
408 let block = certificate.inner().block();
409 assert_eq!(block.messages().len(), 1);
410 assert_eq!(block.messages()[0].len(), 0);
411
412 module_id.with_abi()
413 }
414
415 pub fn build_bytecode_files_in(repository: &Path) {
417 let output = std::process::Command::new("cargo")
418 .args(["build", "--release", "--target", "wasm32-unknown-unknown"])
419 .current_dir(repository)
420 .output()
421 .expect("Failed to build Wasm binaries");
422
423 assert!(
424 output.status.success(),
425 "Failed to build bytecode binaries.\nstdout: {}\nstderr: {}",
426 String::from_utf8_lossy(&output.stdout),
427 String::from_utf8_lossy(&output.stderr)
428 );
429 }
430
431 pub async fn find_bytecode_files_in(repository: &Path) -> (Bytecode, Bytecode) {
437 let manifest_path = repository.join("Cargo.toml");
438 let cargo_manifest =
439 Manifest::from_path(manifest_path).expect("Failed to load Cargo.toml manifest");
440
441 let binaries = cargo_manifest
442 .bin
443 .into_iter()
444 .filter_map(|binary| binary.name)
445 .filter(|name| name.ends_with("service") || name.ends_with("contract"))
446 .collect::<Vec<_>>();
447
448 assert_eq!(
449 binaries.len(),
450 2,
451 "Could not figure out contract and service bytecode binaries.\
452 Please specify them manually using `publish_module`."
453 );
454
455 let (contract_binary, service_binary) = if binaries[0].ends_with("contract") {
456 (&binaries[0], &binaries[1])
457 } else {
458 (&binaries[1], &binaries[0])
459 };
460
461 let base_path = Self::find_output_directory_of(repository)
462 .await
463 .expect("Failed to look for output binaries");
464 let contract_path = base_path.join(format!("{}.wasm", contract_binary));
465 let service_path = base_path.join(format!("{}.wasm", service_binary));
466
467 let contract = Bytecode::load_from_file(contract_path)
468 .expect("Failed to load contract bytecode from file");
469 let service = Bytecode::load_from_file(service_path)
470 .expect("Failed to load service bytecode from file");
471 (contract, service)
472 }
473
474 pub async fn find_compressed_bytecode_files_in(
477 repository: &Path,
478 ) -> (CompressedBytecode, CompressedBytecode) {
479 let (contract, service) = Self::find_bytecode_files_in(repository).await;
480 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
481 .await
482 .expect("Failed to compress bytecode files")
483 }
484
485 async fn find_output_directory_of(repository: &Path) -> Result<PathBuf, io::Error> {
492 let output_sub_directory = Path::new("target/wasm32-unknown-unknown/release");
493 let mut current_directory = repository;
494 let mut output_path = current_directory.join(output_sub_directory);
495
496 while !fs::try_exists(&output_path).await? {
497 current_directory = current_directory.parent().unwrap_or_else(|| {
498 panic!(
499 "Failed to find Wasm binary output directory in {}",
500 repository.display()
501 )
502 });
503
504 output_path = current_directory.join(output_sub_directory);
505 }
506
507 Ok(output_path)
508 }
509
510 pub async fn get_tip_height(&self) -> BlockHeight {
512 self.tip
513 .lock()
514 .await
515 .as_ref()
516 .expect("Block was not successfully added")
517 .inner()
518 .block()
519 .header
520 .height
521 }
522
523 pub async fn create_application<Abi, Parameters, InstantiationArgument>(
534 &mut self,
535 module_id: ModuleId<Abi, Parameters, InstantiationArgument>,
536 parameters: Parameters,
537 instantiation_argument: InstantiationArgument,
538 required_application_ids: Vec<ApplicationId>,
539 ) -> ApplicationId<Abi>
540 where
541 Abi: ContractAbi,
542 Parameters: Serialize,
543 InstantiationArgument: Serialize,
544 {
545 let parameters = serde_json::to_vec(¶meters).unwrap();
546 let instantiation_argument = serde_json::to_vec(&instantiation_argument).unwrap();
547
548 let creation_certificate = self
549 .add_block(|block| {
550 block.with_system_operation(SystemOperation::CreateApplication {
551 module_id: module_id.forget_abi(),
552 parameters: parameters.clone(),
553 instantiation_argument,
554 required_application_ids: required_application_ids.clone(),
555 });
556 })
557 .await;
558
559 let block = creation_certificate.inner().block();
560 assert_eq!(block.messages().len(), 1);
561
562 let description = ApplicationDescription {
563 module_id: module_id.forget_abi(),
564 creator_chain_id: block.header.chain_id,
565 block_height: block.header.height,
566 application_index: 0,
567 parameters,
568 required_application_ids,
569 };
570
571 ApplicationId::<()>::from(&description).with_abi()
572 }
573
574 pub async fn is_closed(&self) -> bool {
576 let chain = self
577 .validator
578 .worker()
579 .chain_state_view(self.id())
580 .await
581 .expect("Failed to load chain");
582 *chain.execution_state.system.closed.get()
583 }
584
585 pub async fn query<Abi>(
589 &self,
590 application_id: ApplicationId<Abi>,
591 query: Abi::Query,
592 ) -> QueryOutcome<Abi::QueryResponse>
593 where
594 Abi: ServiceAbi,
595 {
596 self.try_query(application_id, query)
597 .await
598 .expect("Failed to execute application service query")
599 }
600
601 pub async fn try_query<Abi>(
605 &self,
606 application_id: ApplicationId<Abi>,
607 query: Abi::Query,
608 ) -> Result<QueryOutcome<Abi::QueryResponse>, TryQueryError>
609 where
610 Abi: ServiceAbi,
611 {
612 let query_bytes = serde_json::to_vec(&query)?;
613
614 let QueryOutcome {
615 response,
616 operations,
617 } = self
618 .validator
619 .worker()
620 .query_application(
621 self.id(),
622 Query::User {
623 application_id: application_id.forget_abi(),
624 bytes: query_bytes,
625 },
626 )
627 .await?;
628
629 let deserialized_response = match response {
630 QueryResponse::User(bytes) => {
631 serde_json::from_slice(&bytes).expect("Failed to deserialize query response")
632 }
633 QueryResponse::System(_) => {
634 unreachable!("User query returned a system response")
635 }
636 };
637
638 Ok(QueryOutcome {
639 response: deserialized_response,
640 operations,
641 })
642 }
643
644 pub async fn graphql_query<Abi>(
648 &self,
649 application_id: ApplicationId<Abi>,
650 query: impl Into<async_graphql::Request>,
651 ) -> QueryOutcome<serde_json::Value>
652 where
653 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
654 {
655 let query = query.into();
656 let query_str = query.query.clone();
657
658 self.try_graphql_query(application_id, query)
659 .await
660 .unwrap_or_else(|error| panic!("Service query {query_str:?} failed: {error}"))
661 }
662
663 pub async fn try_graphql_query<Abi>(
667 &self,
668 application_id: ApplicationId<Abi>,
669 query: impl Into<async_graphql::Request>,
670 ) -> Result<QueryOutcome<serde_json::Value>, TryGraphQLQueryError>
671 where
672 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
673 {
674 let query = query.into();
675 let QueryOutcome {
676 response,
677 operations,
678 } = self.try_query(application_id, query).await?;
679
680 if !response.errors.is_empty() {
681 return Err(TryGraphQLQueryError::Service(response.errors));
682 }
683 let json_response = response.data.into_json()?;
684
685 Ok(QueryOutcome {
686 response: json_response,
687 operations,
688 })
689 }
690
691 pub async fn graphql_mutation<Abi>(
696 &self,
697 application_id: ApplicationId<Abi>,
698 query: impl Into<async_graphql::Request>,
699 ) -> ConfirmedBlockCertificate
700 where
701 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
702 {
703 self.try_graphql_mutation(application_id, query)
704 .await
705 .expect("Failed to execute service GraphQL mutation")
706 }
707
708 pub async fn try_graphql_mutation<Abi>(
713 &self,
714 application_id: ApplicationId<Abi>,
715 query: impl Into<async_graphql::Request>,
716 ) -> Result<ConfirmedBlockCertificate, TryGraphQLMutationError>
717 where
718 Abi: ServiceAbi<Query = async_graphql::Request, QueryResponse = async_graphql::Response>,
719 {
720 let QueryOutcome { operations, .. } = self.try_graphql_query(application_id, query).await?;
721
722 let certificate = self
723 .try_add_block(|block| {
724 for operation in operations {
725 match operation {
726 Operation::User {
727 application_id,
728 bytes,
729 } => {
730 block.with_raw_operation(application_id, bytes);
731 }
732 Operation::System(system_operation) => {
733 block.with_system_operation(*system_operation);
734 }
735 }
736 }
737 })
738 .await?;
739
740 Ok(certificate)
741 }
742}
743
744#[derive(Debug, thiserror::Error)]
746pub enum TryQueryError {
747 #[error("Failed to serialize query request")]
749 Serialization(#[from] serde_json::Error),
750
751 #[error("Failed to execute service query")]
753 Execution(#[from] WorkerError),
754}
755
756#[derive(Debug, thiserror::Error)]
758pub enum TryGraphQLQueryError {
759 #[error("Failed to serialize GraphQL query request")]
761 RequestSerialization(#[source] serde_json::Error),
762
763 #[error("Failed to execute service query")]
765 Execution(#[from] WorkerError),
766
767 #[error("Unexpected non-JSON service query response")]
769 ResponseDeserialization(#[from] serde_json::Error),
770
771 #[error("Service returned errors: {_0:#?}")]
773 Service(Vec<async_graphql::ServerError>),
774}
775
776impl From<TryQueryError> for TryGraphQLQueryError {
777 fn from(query_error: TryQueryError) -> Self {
778 match query_error {
779 TryQueryError::Serialization(error) => {
780 TryGraphQLQueryError::RequestSerialization(error)
781 }
782 TryQueryError::Execution(error) => TryGraphQLQueryError::Execution(error),
783 }
784 }
785}
786
787impl TryGraphQLQueryError {
788 pub fn expect_execution_error(self) -> ExecutionError {
794 let TryGraphQLQueryError::Execution(worker_error) = self else {
795 panic!("Expected an `ExecutionError`. Got: {self:#?}");
796 };
797
798 worker_error.expect_execution_error(ChainExecutionContext::Query)
799 }
800}
801
802#[derive(Debug, thiserror::Error)]
804pub enum TryGraphQLMutationError {
805 #[error(transparent)]
807 Query(#[from] TryGraphQLQueryError),
808
809 #[error("Failed to propose block with operations scheduled by the GraphQL mutation")]
811 Proposal(#[from] WorkerError),
812}
813
814impl TryGraphQLMutationError {
815 pub fn expect_proposal_execution_error(self, transaction_index: u32) -> ExecutionError {
821 let TryGraphQLMutationError::Proposal(proposal_error) = self else {
822 panic!("Expected an `ExecutionError` during the block proposal. Got: {self:#?}");
823 };
824
825 proposal_error.expect_execution_error(ChainExecutionContext::Operation(transaction_index))
826 }
827}