1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8 ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId},
19 ownership::{ChainOwnership, TimeoutConfig},
20 vm::VmRuntime,
21 BcsHexParseError,
22};
23use linera_chain::{
24 types::{ConfirmedBlock, GenericCertificate},
25 ChainStateView,
26};
27use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
28use linera_core::{
29 client::{ChainClient, ChainClientError},
30 data_types::ClientOutcome,
31 worker::Notification,
32};
33use linera_execution::{
34 committee::Committee,
35 system::{AdminOperation, Recipient},
36 Operation, Query, QueryOutcome, QueryResponse, SystemOperation,
37};
38use linera_sdk::linera_base_types::BlobContent;
39use serde::{Deserialize, Serialize};
40use serde_json::json;
41use thiserror::Error as ThisError;
42use tokio::sync::OwnedRwLockReadGuard;
43use tokio_util::sync::CancellationToken;
44use tower_http::cors::CorsLayer;
45use tracing::{debug, error, info, instrument, trace};
46
47use crate::util;
48
49#[derive(SimpleObject, Serialize, Deserialize, Clone)]
50pub struct Chains {
51 pub list: Vec<ChainId>,
52 pub default: Option<ChainId>,
53}
54
55pub struct QueryRoot<C> {
57 context: Arc<Mutex<C>>,
58 port: NonZeroU16,
59 default_chain: Option<ChainId>,
60}
61
62pub struct SubscriptionRoot<C> {
64 context: Arc<Mutex<C>>,
65}
66
67pub struct MutationRoot<C> {
69 context: Arc<Mutex<C>>,
70}
71
72#[derive(Debug, ThisError)]
73enum NodeServiceError {
74 #[error(transparent)]
75 ChainClientError(#[from] ChainClientError),
76 #[error(transparent)]
77 BcsHexError(#[from] BcsHexParseError),
78 #[error(transparent)]
79 JsonError(#[from] serde_json::Error),
80 #[error("malformed chain ID: {0}")]
81 InvalidChainId(CryptoError),
82}
83
84impl IntoResponse for NodeServiceError {
85 fn into_response(self) -> response::Response {
86 let tuple = match self {
87 NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
88 NodeServiceError::ChainClientError(e) => {
89 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
90 }
91 NodeServiceError::JsonError(e) => {
92 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
93 }
94 NodeServiceError::InvalidChainId(_) => (
95 StatusCode::BAD_REQUEST,
96 vec!["invalid chain ID".to_string()],
97 ),
98 };
99 let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
100 tuple.into_response()
101 }
102}
103
104#[Subscription]
105impl<C> SubscriptionRoot<C>
106where
107 C: ClientContext + 'static,
108{
109 async fn notifications(
111 &self,
112 chain_id: ChainId,
113 ) -> Result<impl Stream<Item = Notification>, Error> {
114 let client = self.context.lock().await.make_chain_client(chain_id);
115 Ok(client.subscribe().await?)
116 }
117}
118
119impl<C> MutationRoot<C>
120where
121 C: ClientContext,
122{
123 async fn execute_system_operation(
124 &self,
125 system_operation: SystemOperation,
126 chain_id: ChainId,
127 ) -> Result<CryptoHash, Error> {
128 let certificate = self
129 .apply_client_command(&chain_id, move |client| {
130 let operation = Operation::system(system_operation.clone());
131 async move {
132 let result = client
133 .execute_operation(operation)
134 .await
135 .map_err(Error::from);
136 (result, client)
137 }
138 })
139 .await?;
140 Ok(certificate.hash())
141 }
142
143 async fn apply_client_command<F, Fut, T>(
147 &self,
148 chain_id: &ChainId,
149 mut f: F,
150 ) -> Result<T, Error>
151 where
152 F: FnMut(ChainClient<C::Environment>) -> Fut,
153 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
154 {
155 loop {
156 let client = self.context.lock().await.make_chain_client(*chain_id);
157 let mut stream = client.subscribe().await?;
158 let (result, client) = f(client).await;
159 self.context.lock().await.update_wallet(&client).await?;
160 let timeout = match result? {
161 ClientOutcome::Committed(t) => return Ok(t),
162 ClientOutcome::WaitForTimeout(timeout) => timeout,
163 };
164 drop(client);
165 util::wait_for_next_round(&mut stream, timeout).await;
166 }
167 }
168}
169
170#[async_graphql::Object(cache_control(no_cache))]
171impl<C> MutationRoot<C>
172where
173 C: ClientContext + 'static,
174{
175 async fn process_inbox(&self, chain_id: ChainId) -> Result<Vec<CryptoHash>, Error> {
177 let mut hashes = Vec::new();
178 loop {
179 let client = self.context.lock().await.make_chain_client(chain_id);
180 client.synchronize_from_validators().await?;
181 let result = client.process_inbox_without_prepare().await;
182 self.context.lock().await.update_wallet(&client).await?;
183 let (certificates, maybe_timeout) = result?;
184 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
185 match maybe_timeout {
186 None => return Ok(hashes),
187 Some(timestamp) => {
188 let mut stream = client.subscribe().await?;
189 drop(client);
190 util::wait_for_next_round(&mut stream, timestamp).await;
191 }
192 }
193 }
194 }
195
196 async fn retry_pending_block(&self, chain_id: ChainId) -> Result<Option<CryptoHash>, Error> {
198 let client = self.context.lock().await.make_chain_client(chain_id);
199 let outcome = client.process_pending_block().await?;
200 self.context.lock().await.update_wallet(&client).await?;
201 match outcome {
202 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
203 ClientOutcome::Committed(None) => Ok(None),
204 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
205 "Please try again at {}",
206 timeout.timestamp
207 ))),
208 }
209 }
210
211 async fn transfer(
214 &self,
215 chain_id: ChainId,
216 owner: AccountOwner,
217 recipient: Recipient,
218 amount: Amount,
219 ) -> Result<CryptoHash, Error> {
220 self.apply_client_command(&chain_id, move |client| async move {
221 let result = client
222 .transfer(owner, amount, recipient)
223 .await
224 .map_err(Error::from)
225 .map(|outcome| outcome.map(|certificate| certificate.hash()));
226 (result, client)
227 })
228 .await
229 }
230
231 async fn claim(
235 &self,
236 chain_id: ChainId,
237 owner: AccountOwner,
238 target_id: ChainId,
239 recipient: Recipient,
240 amount: Amount,
241 ) -> Result<CryptoHash, Error> {
242 self.apply_client_command(&chain_id, move |client| async move {
243 let result = client
244 .claim(owner, target_id, recipient, amount)
245 .await
246 .map_err(Error::from)
247 .map(|outcome| outcome.map(|certificate| certificate.hash()));
248 (result, client)
249 })
250 .await
251 }
252
253 async fn read_data_blob(
256 &self,
257 chain_id: ChainId,
258 hash: CryptoHash,
259 ) -> Result<CryptoHash, Error> {
260 self.apply_client_command(&chain_id, move |client| async move {
261 let result = client
262 .read_data_blob(hash)
263 .await
264 .map_err(Error::from)
265 .map(|outcome| outcome.map(|certificate| certificate.hash()));
266 (result, client)
267 })
268 .await
269 }
270
271 async fn open_chain(
274 &self,
275 chain_id: ChainId,
276 owner: AccountOwner,
277 balance: Option<Amount>,
278 ) -> Result<ChainId, Error> {
279 let ownership = ChainOwnership::single(owner);
280 let balance = balance.unwrap_or(Amount::ZERO);
281 let description = self
282 .apply_client_command(&chain_id, move |client| {
283 let ownership = ownership.clone();
284 async move {
285 let result = client
286 .open_chain(ownership, ApplicationPermissions::default(), balance)
287 .await
288 .map_err(Error::from)
289 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
290 (result, client)
291 }
292 })
293 .await?;
294 Ok(description.id())
295 }
296
297 #[expect(clippy::too_many_arguments)]
300 async fn open_multi_owner_chain(
301 &self,
302 chain_id: ChainId,
303 application_permissions: Option<ApplicationPermissions>,
304 owners: Vec<AccountOwner>,
305 weights: Option<Vec<u64>>,
306 multi_leader_rounds: Option<u32>,
307 balance: Option<Amount>,
308 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
309 fast_round_ms: Option<u64>,
310 #[graphql(
311 desc = "The duration of the first single-leader and all multi-leader rounds",
312 default = 10_000
313 )]
314 base_timeout_ms: u64,
315 #[graphql(
316 desc = "The number of milliseconds by which the timeout increases after each \
317 single-leader round",
318 default = 1_000
319 )]
320 timeout_increment_ms: u64,
321 #[graphql(
322 desc = "The age of an incoming tracked or protected message after which the \
323 validators start transitioning the chain to fallback mode, in milliseconds.",
324 default = 86_400_000
325 )]
326 fallback_duration_ms: u64,
327 ) -> Result<ChainId, Error> {
328 let owners = if let Some(weights) = weights {
329 if weights.len() != owners.len() {
330 return Err(Error::new(format!(
331 "There are {} owners but {} weights.",
332 owners.len(),
333 weights.len()
334 )));
335 }
336 owners.into_iter().zip(weights).collect::<Vec<_>>()
337 } else {
338 owners
339 .into_iter()
340 .zip(iter::repeat(100))
341 .collect::<Vec<_>>()
342 };
343 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
344 let timeout_config = TimeoutConfig {
345 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
346 base_timeout: TimeDelta::from_millis(base_timeout_ms),
347 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
348 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
349 };
350 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
351 let balance = balance.unwrap_or(Amount::ZERO);
352 let description = self
353 .apply_client_command(&chain_id, move |client| {
354 let ownership = ownership.clone();
355 let application_permissions = application_permissions.clone().unwrap_or_default();
356 async move {
357 let result = client
358 .open_chain(ownership, application_permissions, balance)
359 .await
360 .map_err(Error::from)
361 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
362 (result, client)
363 }
364 })
365 .await?;
366 Ok(description.id())
367 }
368
369 async fn close_chain(&self, chain_id: ChainId) -> Result<Option<CryptoHash>, Error> {
371 let maybe_cert = self
372 .apply_client_command(&chain_id, |client| async move {
373 let result = client.close_chain().await.map_err(Error::from);
374 (result, client)
375 })
376 .await?;
377 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
378 }
379
380 async fn change_owner(
382 &self,
383 chain_id: ChainId,
384 new_owner: AccountOwner,
385 ) -> Result<CryptoHash, Error> {
386 let operation = SystemOperation::ChangeOwnership {
387 super_owners: vec![new_owner],
388 owners: Vec::new(),
389 multi_leader_rounds: 2,
390 open_multi_leader_rounds: false,
391 timeout_config: TimeoutConfig::default(),
392 };
393 self.execute_system_operation(operation, chain_id).await
394 }
395
396 #[expect(clippy::too_many_arguments)]
398 async fn change_multiple_owners(
399 &self,
400 chain_id: ChainId,
401 new_owners: Vec<AccountOwner>,
402 new_weights: Vec<u64>,
403 multi_leader_rounds: u32,
404 open_multi_leader_rounds: bool,
405 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
406 fast_round_ms: Option<u64>,
407 #[graphql(
408 desc = "The duration of the first single-leader and all multi-leader rounds",
409 default = 10_000
410 )]
411 base_timeout_ms: u64,
412 #[graphql(
413 desc = "The number of milliseconds by which the timeout increases after each \
414 single-leader round",
415 default = 1_000
416 )]
417 timeout_increment_ms: u64,
418 #[graphql(
419 desc = "The age of an incoming tracked or protected message after which the \
420 validators start transitioning the chain to fallback mode, in milliseconds.",
421 default = 86_400_000
422 )]
423 fallback_duration_ms: u64,
424 ) -> Result<CryptoHash, Error> {
425 let operation = SystemOperation::ChangeOwnership {
426 super_owners: Vec::new(),
427 owners: new_owners.into_iter().zip(new_weights).collect(),
428 multi_leader_rounds,
429 open_multi_leader_rounds,
430 timeout_config: TimeoutConfig {
431 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
432 base_timeout: TimeDelta::from_millis(base_timeout_ms),
433 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
434 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
435 },
436 };
437 self.execute_system_operation(operation, chain_id).await
438 }
439
440 #[expect(clippy::too_many_arguments)]
442 async fn change_application_permissions(
443 &self,
444 chain_id: ChainId,
445 close_chain: Vec<ApplicationId>,
446 execute_operations: Option<Vec<ApplicationId>>,
447 mandatory_applications: Vec<ApplicationId>,
448 change_application_permissions: Vec<ApplicationId>,
449 call_service_as_oracle: Option<Vec<ApplicationId>>,
450 make_http_requests: Option<Vec<ApplicationId>>,
451 ) -> Result<CryptoHash, Error> {
452 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
453 execute_operations,
454 mandatory_applications,
455 close_chain,
456 change_application_permissions,
457 call_service_as_oracle,
458 make_http_requests,
459 });
460 self.execute_system_operation(operation, chain_id).await
461 }
462
463 async fn create_committee(
467 &self,
468 chain_id: ChainId,
469 committee: Committee,
470 ) -> Result<CryptoHash, Error> {
471 Ok(self
472 .apply_client_command(&chain_id, move |client| {
473 let committee = committee.clone();
474 async move {
475 let result = client
476 .stage_new_committee(committee)
477 .await
478 .map_err(Error::from);
479 (result, client)
480 }
481 })
482 .await?
483 .hash())
484 }
485
486 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
490 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
491 self.execute_system_operation(operation, chain_id).await
492 }
493
494 async fn publish_module(
496 &self,
497 chain_id: ChainId,
498 contract: Bytecode,
499 service: Bytecode,
500 vm_runtime: VmRuntime,
501 ) -> Result<ModuleId, Error> {
502 self.apply_client_command(&chain_id, move |client| {
503 let contract = contract.clone();
504 let service = service.clone();
505 async move {
506 let result = client
507 .publish_module(contract, service, vm_runtime)
508 .await
509 .map_err(Error::from)
510 .map(|outcome| outcome.map(|(module_id, _)| module_id));
511 (result, client)
512 }
513 })
514 .await
515 }
516
517 async fn publish_data_blob(
519 &self,
520 chain_id: ChainId,
521 bytes: Vec<u8>,
522 ) -> Result<CryptoHash, Error> {
523 self.apply_client_command(&chain_id, |client| {
524 let bytes = bytes.clone();
525 async move {
526 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
527 (result, client)
528 }
529 })
530 .await
531 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
532 }
533
534 async fn create_application(
536 &self,
537 chain_id: ChainId,
538 module_id: ModuleId,
539 parameters: String,
540 instantiation_argument: String,
541 required_application_ids: Vec<ApplicationId>,
542 ) -> Result<ApplicationId, Error> {
543 self.apply_client_command(&chain_id, move |client| {
544 let parameters = parameters.as_bytes().to_vec();
545 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
546 let required_application_ids = required_application_ids.clone();
547 async move {
548 let result = client
549 .create_application_untyped(
550 module_id,
551 parameters,
552 instantiation_argument,
553 required_application_ids,
554 )
555 .await
556 .map_err(Error::from)
557 .map(|outcome| outcome.map(|(application_id, _)| application_id));
558 (result, client)
559 }
560 })
561 .await
562 }
563}
564
565#[async_graphql::Object(cache_control(no_cache))]
566impl<C> QueryRoot<C>
567where
568 C: ClientContext + 'static,
569{
570 async fn chain(
571 &self,
572 chain_id: ChainId,
573 ) -> Result<
574 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
575 Error,
576 > {
577 let client = self.context.lock().await.make_chain_client(chain_id);
578 let view = client.chain_state_view().await?;
579 Ok(ChainStateExtendedView::new(view))
580 }
581
582 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
583 let client = self.context.lock().await.make_chain_client(chain_id);
584 let applications = client
585 .chain_state_view()
586 .await?
587 .execution_state
588 .list_applications()
589 .await?;
590
591 let overviews = applications
592 .into_iter()
593 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
594 .collect();
595
596 Ok(overviews)
597 }
598
599 async fn chains(&self) -> Result<Chains, Error> {
600 Ok(Chains {
601 list: self.context.lock().await.wallet().chain_ids(),
602 default: self.default_chain,
603 })
604 }
605
606 async fn block(
607 &self,
608 hash: Option<CryptoHash>,
609 chain_id: ChainId,
610 ) -> Result<Option<ConfirmedBlock>, Error> {
611 let client = self.context.lock().await.make_chain_client(chain_id);
612 let hash = match hash {
613 Some(hash) => Some(hash),
614 None => {
615 let view = client.chain_state_view().await?;
616 view.tip_state.get().block_hash
617 }
618 };
619 if let Some(hash) = hash {
620 let block = client.read_confirmed_block(hash).await?;
621 Ok(Some(block))
622 } else {
623 Ok(None)
624 }
625 }
626
627 async fn events_from_index(
628 &self,
629 chain_id: ChainId,
630 stream_id: StreamId,
631 start_index: u32,
632 ) -> Result<Vec<IndexAndEvent>, Error> {
633 Ok(self
634 .context
635 .lock()
636 .await
637 .make_chain_client(chain_id)
638 .events_from_index(stream_id, start_index)
639 .await?)
640 }
641
642 async fn blocks(
643 &self,
644 from: Option<CryptoHash>,
645 chain_id: ChainId,
646 limit: Option<u32>,
647 ) -> Result<Vec<ConfirmedBlock>, Error> {
648 let client = self.context.lock().await.make_chain_client(chain_id);
649 let limit = limit.unwrap_or(10);
650 let from = match from {
651 Some(from) => Some(from),
652 None => {
653 let view = client.chain_state_view().await?;
654 view.tip_state.get().block_hash
655 }
656 };
657 let Some(from) = from else {
658 return Ok(vec![]);
659 };
660 let mut hash = Some(from);
661 let mut values = Vec::new();
662 for _ in 0..limit {
663 let Some(next_hash) = hash else {
664 break;
665 };
666 let value = client.read_confirmed_block(next_hash).await?;
667 hash = value.block().header.previous_block_hash;
668 values.push(value);
669 }
670 Ok(values)
671 }
672
673 async fn version(&self) -> linera_version::VersionInfo {
675 linera_version::VersionInfo::default()
676 }
677}
678
679struct ChainStateViewExtension(ChainId);
683
684#[async_graphql::Object(cache_control(no_cache))]
685impl ChainStateViewExtension {
686 async fn chain_id(&self) -> ChainId {
687 self.0
688 }
689}
690
691#[derive(MergedObject)]
692struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
693where
694 C: linera_views::context::Context + Clone + Send + Sync + 'static,
695 C::Extra: linera_execution::ExecutionRuntimeContext;
696
697pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
700where
701 C: linera_views::context::Context + Clone + Send + Sync + 'static;
702
703impl<C> ContainerType for ReadOnlyChainStateView<C>
704where
705 C: linera_views::context::Context + Clone + Send + Sync + 'static,
706{
707 async fn resolve_field(
708 &self,
709 context: &async_graphql::Context<'_>,
710 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
711 self.0.resolve_field(context).await
712 }
713}
714
715impl<C> OutputType for ReadOnlyChainStateView<C>
716where
717 C: linera_views::context::Context + Clone + Send + Sync + 'static,
718{
719 fn type_name() -> Cow<'static, str> {
720 ChainStateView::<C>::type_name()
721 }
722
723 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
724 ChainStateView::<C>::create_type_info(registry)
725 }
726
727 async fn resolve(
728 &self,
729 context: &async_graphql::ContextSelectionSet<'_>,
730 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
731 ) -> async_graphql::ServerResult<async_graphql::Value> {
732 self.0.resolve(context, field).await
733 }
734}
735
736impl<C> ChainStateExtendedView<C>
737where
738 C: linera_views::context::Context + Clone + Send + Sync + 'static,
739 C::Extra: linera_execution::ExecutionRuntimeContext,
740{
741 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
742 Self(
743 ChainStateViewExtension(view.chain_id()),
744 ReadOnlyChainStateView(view),
745 )
746 }
747}
748
749#[derive(SimpleObject)]
750pub struct ApplicationOverview {
751 id: ApplicationId,
752 description: ApplicationDescription,
753 link: String,
754}
755
756impl ApplicationOverview {
757 fn new(
758 id: ApplicationId,
759 description: ApplicationDescription,
760 port: NonZeroU16,
761 chain_id: ChainId,
762 ) -> Self {
763 Self {
764 id,
765 description,
766 link: format!(
767 "http://localhost:{}/chains/{}/applications/{}",
768 port.get(),
769 chain_id,
770 id
771 ),
772 }
773 }
774}
775
776pub struct NodeService<C>
779where
780 C: ClientContext + 'static,
781{
782 config: ChainListenerConfig,
783 port: NonZeroU16,
784 default_chain: Option<ChainId>,
785 context: Arc<Mutex<C>>,
786}
787
788impl<C> Clone for NodeService<C>
789where
790 C: ClientContext + 'static,
791{
792 fn clone(&self) -> Self {
793 Self {
794 config: self.config.clone(),
795 port: self.port,
796 default_chain: self.default_chain,
797 context: Arc::clone(&self.context),
798 }
799 }
800}
801
802impl<C> NodeService<C>
803where
804 C: ClientContext,
805{
806 pub async fn new(
808 config: ChainListenerConfig,
809 port: NonZeroU16,
810 default_chain: Option<ChainId>,
811 context: C,
812 ) -> Self {
813 Self {
814 config,
815 port,
816 default_chain,
817 context: Arc::new(Mutex::new(context)),
818 }
819 }
820
821 pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
822 Schema::build(
823 QueryRoot {
824 context: Arc::clone(&self.context),
825 port: self.port,
826 default_chain: self.default_chain,
827 },
828 MutationRoot {
829 context: Arc::clone(&self.context),
830 },
831 SubscriptionRoot {
832 context: Arc::clone(&self.context),
833 },
834 )
835 .finish()
836 }
837
838 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
840 pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
841 let port = self.port.get();
842 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
843 let application_handler =
844 axum::routing::get(util::graphiql).post(Self::application_handler);
845
846 let app = Router::new()
847 .route("/", index_handler)
848 .route(
849 "/chains/{chain_id}/applications/{application_id}",
850 application_handler,
851 )
852 .route("/ready", axum::routing::get(|| async { "ready!" }))
853 .route_service("/ws", GraphQLSubscription::new(self.schema()))
854 .layer(Extension(self.clone()))
855 .layer(CorsLayer::permissive());
857
858 info!("GraphiQL IDE: http://localhost:{}", port);
859
860 let storage = self.context.lock().await.storage().clone();
861
862 let chain_listener =
863 ChainListener::new(self.config, self.context, storage, cancellation_token).run();
864 let mut chain_listener = Box::pin(chain_listener).fuse();
865 let tcp_listener =
866 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
867 let server = axum::serve(tcp_listener, app).into_future();
868 futures::select! {
869 result = chain_listener => result?,
870 result = Box::pin(server).fuse() => result?,
871 };
872
873 Ok(())
874 }
875
876 async fn handle_service_request(
878 &self,
879 application_id: ApplicationId,
880 request: Vec<u8>,
881 chain_id: ChainId,
882 ) -> Result<Vec<u8>, NodeServiceError> {
883 let QueryOutcome {
884 response,
885 operations,
886 } = self
887 .query_user_application(application_id, request, chain_id)
888 .await?;
889 if operations.is_empty() {
890 return Ok(response);
891 }
892
893 trace!("Query requested a new block with operations: {operations:?}");
894 let client = self.context.lock().await.make_chain_client(chain_id);
895 let hash = loop {
896 let timeout = match client
897 .execute_operations(operations.clone(), vec![])
898 .await?
899 {
900 ClientOutcome::Committed(certificate) => break certificate.hash(),
901 ClientOutcome::WaitForTimeout(timeout) => timeout,
902 };
903 let mut stream = client.subscribe().await.map_err(|_| {
904 ChainClientError::InternalError("Could not subscribe to the local node.")
905 })?;
906 util::wait_for_next_round(&mut stream, timeout).await;
907 };
908 let response = async_graphql::Response::new(hash.to_value());
909 Ok(serde_json::to_vec(&response)?)
910 }
911
912 async fn query_user_application(
914 &self,
915 application_id: ApplicationId,
916 bytes: Vec<u8>,
917 chain_id: ChainId,
918 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
919 let query = Query::User {
920 application_id,
921 bytes,
922 };
923 let client = self.context.lock().await.make_chain_client(chain_id);
924 let QueryOutcome {
925 response,
926 operations,
927 } = client.query_application(query).await?;
928 match response {
929 QueryResponse::System(_) => {
930 unreachable!("cannot get a system response for a user query")
931 }
932 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
933 response: user_response_bytes,
934 operations,
935 }),
936 }
937 }
938
939 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
941 service
942 .0
943 .schema()
944 .execute(request.into_inner())
945 .await
946 .into()
947 }
948
949 async fn application_handler(
953 Path((chain_id, application_id)): Path<(String, String)>,
954 service: Extension<Self>,
955 request: String,
956 ) -> Result<Vec<u8>, NodeServiceError> {
957 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
958 let application_id: ApplicationId = application_id.parse()?;
959
960 debug!(
961 "Processing request for application {application_id} on chain {chain_id}:\n{:?}",
962 &request
963 );
964 let response = service
965 .0
966 .handle_service_request(application_id, request.into_bytes(), chain_id)
967 .await?;
968
969 Ok(response)
970 }
971}