1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8 ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{
19 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20 },
21 ownership::{ChainOwnership, TimeoutConfig},
22 vm::VmRuntime,
23 BcsHexParseError,
24};
25use linera_chain::{
26 types::{ConfirmedBlock, GenericCertificate},
27 ChainStateView,
28};
29use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
30use linera_core::{
31 client::{ChainClient, ChainClientError},
32 data_types::ClientOutcome,
33 worker::Notification,
34};
35use linera_execution::{
36 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
37 SystemOperation,
38};
39use linera_sdk::linera_base_types::BlobContent;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42use thiserror::Error as ThisError;
43use tokio::sync::OwnedRwLockReadGuard;
44use tokio_util::sync::CancellationToken;
45use tower_http::cors::CorsLayer;
46use tracing::{debug, error, info, instrument, trace};
47
48use crate::util;
49
50#[derive(SimpleObject, Serialize, Deserialize, Clone)]
51pub struct Chains {
52 pub list: Vec<ChainId>,
53 pub default: Option<ChainId>,
54}
55
56pub struct QueryRoot<C> {
58 context: Arc<Mutex<C>>,
59 port: NonZeroU16,
60 default_chain: Option<ChainId>,
61}
62
63pub struct SubscriptionRoot<C> {
65 context: Arc<Mutex<C>>,
66}
67
68pub struct MutationRoot<C> {
70 context: Arc<Mutex<C>>,
71}
72
73#[derive(Debug, ThisError)]
74enum NodeServiceError {
75 #[error(transparent)]
76 ChainClientError(#[from] ChainClientError),
77 #[error(transparent)]
78 BcsHexError(#[from] BcsHexParseError),
79 #[error(transparent)]
80 JsonError(#[from] serde_json::Error),
81 #[error("malformed chain ID: {0}")]
82 InvalidChainId(CryptoError),
83}
84
85impl IntoResponse for NodeServiceError {
86 fn into_response(self) -> response::Response {
87 let tuple = match self {
88 NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
89 NodeServiceError::ChainClientError(e) => {
90 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
91 }
92 NodeServiceError::JsonError(e) => {
93 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
94 }
95 NodeServiceError::InvalidChainId(_) => (
96 StatusCode::BAD_REQUEST,
97 vec!["invalid chain ID".to_string()],
98 ),
99 };
100 let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
101 tuple.into_response()
102 }
103}
104
105#[Subscription]
106impl<C> SubscriptionRoot<C>
107where
108 C: ClientContext + 'static,
109{
110 async fn notifications(
112 &self,
113 chain_id: ChainId,
114 ) -> Result<impl Stream<Item = Notification>, Error> {
115 let client = self.context.lock().await.make_chain_client(chain_id);
116 Ok(client.subscribe()?)
117 }
118}
119
120impl<C> MutationRoot<C>
121where
122 C: ClientContext,
123{
124 async fn execute_system_operation(
125 &self,
126 system_operation: SystemOperation,
127 chain_id: ChainId,
128 ) -> Result<CryptoHash, Error> {
129 let certificate = self
130 .apply_client_command(&chain_id, move |client| {
131 let operation = Operation::system(system_operation.clone());
132 async move {
133 let result = client
134 .execute_operation(operation)
135 .await
136 .map_err(Error::from);
137 (result, client)
138 }
139 })
140 .await?;
141 Ok(certificate.hash())
142 }
143
144 async fn apply_client_command<F, Fut, T>(
148 &self,
149 chain_id: &ChainId,
150 mut f: F,
151 ) -> Result<T, Error>
152 where
153 F: FnMut(ChainClient<C::Environment>) -> Fut,
154 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
155 {
156 loop {
157 let client = self.context.lock().await.make_chain_client(*chain_id);
158 let mut stream = client.subscribe()?;
159 let (result, client) = f(client).await;
160 self.context.lock().await.update_wallet(&client).await?;
161 let timeout = match result? {
162 ClientOutcome::Committed(t) => return Ok(t),
163 ClientOutcome::WaitForTimeout(timeout) => timeout,
164 };
165 drop(client);
166 util::wait_for_next_round(&mut stream, timeout).await;
167 }
168 }
169}
170
171#[async_graphql::Object(cache_control(no_cache))]
172impl<C> MutationRoot<C>
173where
174 C: ClientContext + 'static,
175{
176 async fn process_inbox(
178 &self,
179 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
180 ) -> Result<Vec<CryptoHash>, Error> {
181 let mut hashes = Vec::new();
182 loop {
183 let client = self.context.lock().await.make_chain_client(chain_id);
184 client.synchronize_from_validators().await?;
185 let result = client.process_inbox_without_prepare().await;
186 self.context.lock().await.update_wallet(&client).await?;
187 let (certificates, maybe_timeout) = result?;
188 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
189 match maybe_timeout {
190 None => return Ok(hashes),
191 Some(timestamp) => {
192 let mut stream = client.subscribe()?;
193 drop(client);
194 util::wait_for_next_round(&mut stream, timestamp).await;
195 }
196 }
197 }
198 }
199
200 async fn retry_pending_block(
202 &self,
203 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
204 ) -> Result<Option<CryptoHash>, Error> {
205 let client = self.context.lock().await.make_chain_client(chain_id);
206 let outcome = client.process_pending_block().await?;
207 self.context.lock().await.update_wallet(&client).await?;
208 match outcome {
209 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
210 ClientOutcome::Committed(None) => Ok(None),
211 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
212 "Please try again at {}",
213 timeout.timestamp
214 ))),
215 }
216 }
217
218 async fn transfer(
221 &self,
222 #[graphql(desc = "The chain which native tokens are being transferred from.")]
223 chain_id: ChainId,
224 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
225 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
226 #[graphql(desc = "The amount being transferred.")] amount: Amount,
227 ) -> Result<CryptoHash, Error> {
228 self.apply_client_command(&chain_id, move |client| async move {
229 let result = client
230 .transfer(owner, amount, recipient)
231 .await
232 .map_err(Error::from)
233 .map(|outcome| outcome.map(|certificate| certificate.hash()));
234 (result, client)
235 })
236 .await
237 }
238
239 async fn claim(
243 &self,
244 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
245 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
246 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
247 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
248 #[graphql(desc = "The amount being transferred.")] amount: Amount,
249 ) -> Result<CryptoHash, Error> {
250 self.apply_client_command(&chain_id, move |client| async move {
251 let result = client
252 .claim(owner, target_id, recipient, amount)
253 .await
254 .map_err(Error::from)
255 .map(|outcome| outcome.map(|certificate| certificate.hash()));
256 (result, client)
257 })
258 .await
259 }
260
261 async fn read_data_blob(
264 &self,
265 chain_id: ChainId,
266 hash: CryptoHash,
267 ) -> Result<CryptoHash, Error> {
268 self.apply_client_command(&chain_id, move |client| async move {
269 let result = client
270 .read_data_blob(hash)
271 .await
272 .map_err(Error::from)
273 .map(|outcome| outcome.map(|certificate| certificate.hash()));
274 (result, client)
275 })
276 .await
277 }
278
279 async fn open_chain(
281 &self,
282 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
283 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
284 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
285 balance: Option<Amount>,
286 ) -> Result<ChainId, Error> {
287 let ownership = ChainOwnership::single(owner);
288 let balance = balance.unwrap_or(Amount::ZERO);
289 let description = self
290 .apply_client_command(&chain_id, move |client| {
291 let ownership = ownership.clone();
292 async move {
293 let result = client
294 .open_chain(ownership, ApplicationPermissions::default(), balance)
295 .await
296 .map_err(Error::from)
297 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
298 (result, client)
299 }
300 })
301 .await?;
302 Ok(description.id())
303 }
304
305 #[expect(clippy::too_many_arguments)]
307 async fn open_multi_owner_chain(
308 &self,
309 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
310 #[graphql(desc = "Permissions for applications on the new chain")]
311 application_permissions: Option<ApplicationPermissions>,
312 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
313 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
314 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
315 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
316 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
317 fast_round_ms: Option<u64>,
318 #[graphql(
319 desc = "The duration of the first single-leader and all multi-leader rounds",
320 default = 10_000
321 )]
322 base_timeout_ms: u64,
323 #[graphql(
324 desc = "The number of milliseconds by which the timeout increases after each \
325 single-leader round",
326 default = 1_000
327 )]
328 timeout_increment_ms: u64,
329 #[graphql(
330 desc = "The age of an incoming tracked or protected message after which the \
331 validators start transitioning the chain to fallback mode, in milliseconds.",
332 default = 86_400_000
333 )]
334 fallback_duration_ms: u64,
335 ) -> Result<ChainId, Error> {
336 let owners = if let Some(weights) = weights {
337 if weights.len() != owners.len() {
338 return Err(Error::new(format!(
339 "There are {} owners but {} weights.",
340 owners.len(),
341 weights.len()
342 )));
343 }
344 owners.into_iter().zip(weights).collect::<Vec<_>>()
345 } else {
346 owners
347 .into_iter()
348 .zip(iter::repeat(100))
349 .collect::<Vec<_>>()
350 };
351 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
352 let timeout_config = TimeoutConfig {
353 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
354 base_timeout: TimeDelta::from_millis(base_timeout_ms),
355 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
356 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
357 };
358 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
359 let balance = balance.unwrap_or(Amount::ZERO);
360 let description = self
361 .apply_client_command(&chain_id, move |client| {
362 let ownership = ownership.clone();
363 let application_permissions = application_permissions.clone().unwrap_or_default();
364 async move {
365 let result = client
366 .open_chain(ownership, application_permissions, balance)
367 .await
368 .map_err(Error::from)
369 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
370 (result, client)
371 }
372 })
373 .await?;
374 Ok(description.id())
375 }
376
377 async fn close_chain(
379 &self,
380 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
381 ) -> Result<Option<CryptoHash>, Error> {
382 let maybe_cert = self
383 .apply_client_command(&chain_id, |client| async move {
384 let result = client.close_chain().await.map_err(Error::from);
385 (result, client)
386 })
387 .await?;
388 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
389 }
390
391 async fn change_owner(
393 &self,
394 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
395 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
396 ) -> Result<CryptoHash, Error> {
397 let operation = SystemOperation::ChangeOwnership {
398 super_owners: vec![new_owner],
399 owners: Vec::new(),
400 multi_leader_rounds: 2,
401 open_multi_leader_rounds: false,
402 timeout_config: TimeoutConfig::default(),
403 };
404 self.execute_system_operation(operation, chain_id).await
405 }
406
407 #[expect(clippy::too_many_arguments)]
409 async fn change_multiple_owners(
410 &self,
411 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
412 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
413 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
414 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
415 #[graphql(
416 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
417 )]
418 open_multi_leader_rounds: bool,
419 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
420 fast_round_ms: Option<u64>,
421 #[graphql(
422 desc = "The duration of the first single-leader and all multi-leader rounds",
423 default = 10_000
424 )]
425 base_timeout_ms: u64,
426 #[graphql(
427 desc = "The number of milliseconds by which the timeout increases after each \
428 single-leader round",
429 default = 1_000
430 )]
431 timeout_increment_ms: u64,
432 #[graphql(
433 desc = "The age of an incoming tracked or protected message after which the \
434 validators start transitioning the chain to fallback mode, in milliseconds.",
435 default = 86_400_000
436 )]
437 fallback_duration_ms: u64,
438 ) -> Result<CryptoHash, Error> {
439 let operation = SystemOperation::ChangeOwnership {
440 super_owners: Vec::new(),
441 owners: new_owners.into_iter().zip(new_weights).collect(),
442 multi_leader_rounds,
443 open_multi_leader_rounds,
444 timeout_config: TimeoutConfig {
445 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
446 base_timeout: TimeDelta::from_millis(base_timeout_ms),
447 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
448 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
449 },
450 };
451 self.execute_system_operation(operation, chain_id).await
452 }
453
454 #[expect(clippy::too_many_arguments)]
456 async fn change_application_permissions(
457 &self,
458 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
459 #[graphql(desc = "These applications are allowed to close the current chain.")]
460 close_chain: Vec<ApplicationId>,
461 #[graphql(
462 desc = "If this is `None`, all system operations and application operations are allowed.
463If it is `Some`, only operations from the specified applications are allowed,
464and no system operations."
465 )]
466 execute_operations: Option<Vec<ApplicationId>>,
467 #[graphql(
468 desc = "At least one operation or incoming message from each of these applications must occur in every block."
469 )]
470 mandatory_applications: Vec<ApplicationId>,
471 #[graphql(desc = "These applications are allowed to change the application permissions.")]
472 change_application_permissions: Vec<ApplicationId>,
473 #[graphql(
474 desc = "These applications are allowed to perform calls to services as oracles."
475 )]
476 call_service_as_oracle: Option<Vec<ApplicationId>>,
477 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
478 make_http_requests: Option<Vec<ApplicationId>>,
479 ) -> Result<CryptoHash, Error> {
480 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
481 execute_operations,
482 mandatory_applications,
483 close_chain,
484 change_application_permissions,
485 call_service_as_oracle,
486 make_http_requests,
487 });
488 self.execute_system_operation(operation, chain_id).await
489 }
490
491 async fn create_committee(
495 &self,
496 chain_id: ChainId,
497 committee: Committee,
498 ) -> Result<CryptoHash, Error> {
499 Ok(self
500 .apply_client_command(&chain_id, move |client| {
501 let committee = committee.clone();
502 async move {
503 let result = client
504 .stage_new_committee(committee)
505 .await
506 .map_err(Error::from);
507 (result, client)
508 }
509 })
510 .await?
511 .hash())
512 }
513
514 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
518 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
519 self.execute_system_operation(operation, chain_id).await
520 }
521
522 async fn publish_module(
524 &self,
525 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
526 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
527 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
528 service: Bytecode,
529 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
530 vm_runtime: VmRuntime,
531 ) -> Result<ModuleId, Error> {
532 self.apply_client_command(&chain_id, move |client| {
533 let contract = contract.clone();
534 let service = service.clone();
535 async move {
536 let result = client
537 .publish_module(contract, service, vm_runtime)
538 .await
539 .map_err(Error::from)
540 .map(|outcome| outcome.map(|(module_id, _)| module_id));
541 (result, client)
542 }
543 })
544 .await
545 }
546
547 async fn publish_data_blob(
549 &self,
550 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
551 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
552 ) -> Result<CryptoHash, Error> {
553 self.apply_client_command(&chain_id, |client| {
554 let bytes = bytes.clone();
555 async move {
556 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
557 (result, client)
558 }
559 })
560 .await
561 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
562 }
563
564 async fn create_application(
566 &self,
567 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
568 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
569 #[graphql(desc = "The JSON serialization of the parameters of the application")]
570 parameters: String,
571 #[graphql(
572 desc = "The JSON serialization of the instantiation argument of the application"
573 )]
574 instantiation_argument: String,
575 #[graphql(desc = "The dependencies of the application being created")]
576 required_application_ids: Vec<ApplicationId>,
577 ) -> Result<ApplicationId, Error> {
578 self.apply_client_command(&chain_id, move |client| {
579 let parameters = parameters.as_bytes().to_vec();
580 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
581 let required_application_ids = required_application_ids.clone();
582 async move {
583 let result = client
584 .create_application_untyped(
585 module_id,
586 parameters,
587 instantiation_argument,
588 required_application_ids,
589 )
590 .await
591 .map_err(Error::from)
592 .map(|outcome| outcome.map(|(application_id, _)| application_id));
593 (result, client)
594 }
595 })
596 .await
597 }
598}
599
600#[async_graphql::Object(cache_control(no_cache))]
601impl<C> QueryRoot<C>
602where
603 C: ClientContext + 'static,
604{
605 async fn chain(
606 &self,
607 chain_id: ChainId,
608 ) -> Result<
609 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
610 Error,
611 > {
612 let client = self.context.lock().await.make_chain_client(chain_id);
613 let view = client.chain_state_view().await?;
614 Ok(ChainStateExtendedView::new(view))
615 }
616
617 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
618 let client = self.context.lock().await.make_chain_client(chain_id);
619 let applications = client
620 .chain_state_view()
621 .await?
622 .execution_state
623 .list_applications()
624 .await?;
625
626 let overviews = applications
627 .into_iter()
628 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
629 .collect();
630
631 Ok(overviews)
632 }
633
634 async fn chains(&self) -> Result<Chains, Error> {
635 Ok(Chains {
636 list: self.context.lock().await.wallet().chain_ids(),
637 default: self.default_chain,
638 })
639 }
640
641 async fn block(
642 &self,
643 hash: Option<CryptoHash>,
644 chain_id: ChainId,
645 ) -> Result<Option<ConfirmedBlock>, Error> {
646 let client = self.context.lock().await.make_chain_client(chain_id);
647 let hash = match hash {
648 Some(hash) => Some(hash),
649 None => {
650 let view = client.chain_state_view().await?;
651 view.tip_state.get().block_hash
652 }
653 };
654 if let Some(hash) = hash {
655 let block = client.read_confirmed_block(hash).await?;
656 Ok(Some(block))
657 } else {
658 Ok(None)
659 }
660 }
661
662 async fn events_from_index(
663 &self,
664 chain_id: ChainId,
665 stream_id: StreamId,
666 start_index: u32,
667 ) -> Result<Vec<IndexAndEvent>, Error> {
668 Ok(self
669 .context
670 .lock()
671 .await
672 .make_chain_client(chain_id)
673 .events_from_index(stream_id, start_index)
674 .await?)
675 }
676
677 async fn blocks(
678 &self,
679 from: Option<CryptoHash>,
680 chain_id: ChainId,
681 limit: Option<u32>,
682 ) -> Result<Vec<ConfirmedBlock>, Error> {
683 let client = self.context.lock().await.make_chain_client(chain_id);
684 let limit = limit.unwrap_or(10);
685 let from = match from {
686 Some(from) => Some(from),
687 None => {
688 let view = client.chain_state_view().await?;
689 view.tip_state.get().block_hash
690 }
691 };
692 let Some(from) = from else {
693 return Ok(vec![]);
694 };
695 let mut hash = Some(from);
696 let mut values = Vec::new();
697 for _ in 0..limit {
698 let Some(next_hash) = hash else {
699 break;
700 };
701 let value = client.read_confirmed_block(next_hash).await?;
702 hash = value.block().header.previous_block_hash;
703 values.push(value);
704 }
705 Ok(values)
706 }
707
708 async fn version(&self) -> linera_version::VersionInfo {
710 linera_version::VersionInfo::default()
711 }
712}
713
714struct ChainStateViewExtension(ChainId);
718
719#[async_graphql::Object(cache_control(no_cache))]
720impl ChainStateViewExtension {
721 async fn chain_id(&self) -> ChainId {
722 self.0
723 }
724}
725
726#[derive(MergedObject)]
727struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
728where
729 C: linera_views::context::Context + Clone + Send + Sync + 'static,
730 C::Extra: linera_execution::ExecutionRuntimeContext;
731
732pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
735where
736 C: linera_views::context::Context + Clone + Send + Sync + 'static;
737
738impl<C> ContainerType for ReadOnlyChainStateView<C>
739where
740 C: linera_views::context::Context + Clone + Send + Sync + 'static,
741{
742 async fn resolve_field(
743 &self,
744 context: &async_graphql::Context<'_>,
745 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
746 self.0.resolve_field(context).await
747 }
748}
749
750impl<C> OutputType for ReadOnlyChainStateView<C>
751where
752 C: linera_views::context::Context + Clone + Send + Sync + 'static,
753{
754 fn type_name() -> Cow<'static, str> {
755 ChainStateView::<C>::type_name()
756 }
757
758 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
759 ChainStateView::<C>::create_type_info(registry)
760 }
761
762 async fn resolve(
763 &self,
764 context: &async_graphql::ContextSelectionSet<'_>,
765 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
766 ) -> async_graphql::ServerResult<async_graphql::Value> {
767 self.0.resolve(context, field).await
768 }
769}
770
771impl<C> ChainStateExtendedView<C>
772where
773 C: linera_views::context::Context + Clone + Send + Sync + 'static,
774 C::Extra: linera_execution::ExecutionRuntimeContext,
775{
776 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
777 Self(
778 ChainStateViewExtension(view.chain_id()),
779 ReadOnlyChainStateView(view),
780 )
781 }
782}
783
784#[derive(SimpleObject)]
785pub struct ApplicationOverview {
786 id: ApplicationId,
787 description: ApplicationDescription,
788 link: String,
789}
790
791impl ApplicationOverview {
792 fn new(
793 id: ApplicationId,
794 description: ApplicationDescription,
795 port: NonZeroU16,
796 chain_id: ChainId,
797 ) -> Self {
798 Self {
799 id,
800 description,
801 link: format!(
802 "http://localhost:{}/chains/{}/applications/{}",
803 port.get(),
804 chain_id,
805 id
806 ),
807 }
808 }
809}
810
811pub struct NodeService<C>
814where
815 C: ClientContext + 'static,
816{
817 config: ChainListenerConfig,
818 port: NonZeroU16,
819 default_chain: Option<ChainId>,
820 context: Arc<Mutex<C>>,
821}
822
823impl<C> Clone for NodeService<C>
824where
825 C: ClientContext + 'static,
826{
827 fn clone(&self) -> Self {
828 Self {
829 config: self.config.clone(),
830 port: self.port,
831 default_chain: self.default_chain,
832 context: Arc::clone(&self.context),
833 }
834 }
835}
836
837impl<C> NodeService<C>
838where
839 C: ClientContext,
840{
841 pub async fn new(
843 config: ChainListenerConfig,
844 port: NonZeroU16,
845 default_chain: Option<ChainId>,
846 context: C,
847 ) -> Self {
848 Self {
849 config,
850 port,
851 default_chain,
852 context: Arc::new(Mutex::new(context)),
853 }
854 }
855
856 pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
857 Schema::build(
858 QueryRoot {
859 context: Arc::clone(&self.context),
860 port: self.port,
861 default_chain: self.default_chain,
862 },
863 MutationRoot {
864 context: Arc::clone(&self.context),
865 },
866 SubscriptionRoot {
867 context: Arc::clone(&self.context),
868 },
869 )
870 .finish()
871 }
872
873 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
875 pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
876 let port = self.port.get();
877 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
878 let application_handler =
879 axum::routing::get(util::graphiql).post(Self::application_handler);
880
881 let app = Router::new()
882 .route("/", index_handler)
883 .route(
884 "/chains/{chain_id}/applications/{application_id}",
885 application_handler,
886 )
887 .route("/ready", axum::routing::get(|| async { "ready!" }))
888 .route_service("/ws", GraphQLSubscription::new(self.schema()))
889 .layer(Extension(self.clone()))
890 .layer(CorsLayer::permissive());
892
893 info!("GraphiQL IDE: http://localhost:{}", port);
894
895 let storage = self.context.lock().await.storage().clone();
896
897 let chain_listener =
898 ChainListener::new(self.config, self.context, storage, cancellation_token)
899 .run()
900 .await?;
901 let mut chain_listener = Box::pin(chain_listener).fuse();
902 let tcp_listener =
903 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
904 let server = axum::serve(tcp_listener, app).into_future();
905 futures::select! {
906 result = chain_listener => result?,
907 result = Box::pin(server).fuse() => result?,
908 };
909
910 Ok(())
911 }
912
913 async fn handle_service_request(
915 &self,
916 application_id: ApplicationId,
917 request: Vec<u8>,
918 chain_id: ChainId,
919 ) -> Result<Vec<u8>, NodeServiceError> {
920 let QueryOutcome {
921 response,
922 operations,
923 } = self
924 .query_user_application(application_id, request, chain_id)
925 .await?;
926 if operations.is_empty() {
927 return Ok(response);
928 }
929
930 trace!("Query requested a new block with operations: {operations:?}");
931 let client = self.context.lock().await.make_chain_client(chain_id);
932 let hash = loop {
933 let timeout = match client
934 .execute_operations(operations.clone(), vec![])
935 .await?
936 {
937 ClientOutcome::Committed(certificate) => break certificate.hash(),
938 ClientOutcome::WaitForTimeout(timeout) => timeout,
939 };
940 let mut stream = client.subscribe().map_err(|_| {
941 ChainClientError::InternalError("Could not subscribe to the local node.")
942 })?;
943 util::wait_for_next_round(&mut stream, timeout).await;
944 };
945 let response = async_graphql::Response::new(hash.to_value());
946 Ok(serde_json::to_vec(&response)?)
947 }
948
949 async fn query_user_application(
951 &self,
952 application_id: ApplicationId,
953 bytes: Vec<u8>,
954 chain_id: ChainId,
955 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
956 let query = Query::User {
957 application_id,
958 bytes,
959 };
960 let client = self.context.lock().await.make_chain_client(chain_id);
961 let QueryOutcome {
962 response,
963 operations,
964 } = client.query_application(query).await?;
965 match response {
966 QueryResponse::System(_) => {
967 unreachable!("cannot get a system response for a user query")
968 }
969 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
970 response: user_response_bytes,
971 operations,
972 }),
973 }
974 }
975
976 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
978 service
979 .0
980 .schema()
981 .execute(request.into_inner())
982 .await
983 .into()
984 }
985
986 async fn application_handler(
990 Path((chain_id, application_id)): Path<(String, String)>,
991 service: Extension<Self>,
992 request: String,
993 ) -> Result<Vec<u8>, NodeServiceError> {
994 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
995 let application_id: ApplicationId = application_id.parse()?;
996
997 debug!(
998 "Processing request for application {application_id} on chain {chain_id}:\n{:?}",
999 &request
1000 );
1001 let response = service
1002 .0
1003 .handle_service_request(application_id, request.into_bytes(), chain_id)
1004 .await?;
1005
1006 Ok(response)
1007 }
1008}