1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8 ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{
19 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20 },
21 ownership::{ChainOwnership, TimeoutConfig},
22 vm::VmRuntime,
23 BcsHexParseError,
24};
25use linera_chain::{
26 types::{ConfirmedBlock, GenericCertificate},
27 ChainStateView,
28};
29use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
30use linera_core::{
31 client::chain_client::{self, ChainClient},
32 data_types::ClientOutcome,
33 worker::Notification,
34};
35use linera_execution::{
36 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
37 SystemOperation,
38};
39#[cfg(with_metrics)]
40use linera_metrics::monitoring_server;
41use linera_sdk::linera_base_types::BlobContent;
42use serde::{Deserialize, Serialize};
43use serde_json::json;
44use thiserror::Error as ThisError;
45use tokio::sync::OwnedRwLockReadGuard;
46use tokio_util::sync::CancellationToken;
47use tower_http::cors::CorsLayer;
48use tracing::{debug, error, info, instrument, trace};
49
50use crate::util;
51
52#[derive(SimpleObject, Serialize, Deserialize, Clone)]
53pub struct Chains {
54 pub list: Vec<ChainId>,
55 pub default: Option<ChainId>,
56}
57
58pub struct QueryRoot<C> {
60 context: Arc<Mutex<C>>,
61 port: NonZeroU16,
62 default_chain: Option<ChainId>,
63}
64
65pub struct SubscriptionRoot<C> {
67 context: Arc<Mutex<C>>,
68}
69
70pub struct MutationRoot<C> {
72 context: Arc<Mutex<C>>,
73}
74
75#[derive(Debug, ThisError)]
76enum NodeServiceError {
77 #[error(transparent)]
78 ChainClientError(#[from] chain_client::Error),
79 #[error(transparent)]
80 BcsHexError(#[from] BcsHexParseError),
81 #[error(transparent)]
82 JsonError(#[from] serde_json::Error),
83 #[error("malformed chain ID: {0}")]
84 InvalidChainId(CryptoError),
85}
86
87impl IntoResponse for NodeServiceError {
88 fn into_response(self) -> response::Response {
89 let tuple = match self {
90 NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
91 NodeServiceError::ChainClientError(e) => {
92 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
93 }
94 NodeServiceError::JsonError(e) => {
95 (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
96 }
97 NodeServiceError::InvalidChainId(_) => (
98 StatusCode::BAD_REQUEST,
99 vec!["invalid chain ID".to_string()],
100 ),
101 };
102 let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
103 tuple.into_response()
104 }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110 C: ClientContext + 'static,
111{
112 async fn notifications(
114 &self,
115 chain_id: ChainId,
116 ) -> Result<impl Stream<Item = Notification>, Error> {
117 let client = self.context.lock().await.make_chain_client(chain_id);
118 Ok(client.subscribe()?)
119 }
120}
121
122impl<C> MutationRoot<C>
123where
124 C: ClientContext,
125{
126 async fn execute_system_operation(
127 &self,
128 system_operation: SystemOperation,
129 chain_id: ChainId,
130 ) -> Result<CryptoHash, Error> {
131 let certificate = self
132 .apply_client_command(&chain_id, move |client| {
133 let operation = Operation::system(system_operation.clone());
134 async move {
135 let result = client
136 .execute_operation(operation)
137 .await
138 .map_err(Error::from);
139 (result, client)
140 }
141 })
142 .await?;
143 Ok(certificate.hash())
144 }
145
146 async fn apply_client_command<F, Fut, T>(
150 &self,
151 chain_id: &ChainId,
152 mut f: F,
153 ) -> Result<T, Error>
154 where
155 F: FnMut(ChainClient<C::Environment>) -> Fut,
156 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
157 {
158 loop {
159 let client = self.context.lock().await.make_chain_client(*chain_id);
160 let mut stream = client.subscribe()?;
161 let (result, client) = f(client).await;
162 self.context.lock().await.update_wallet(&client).await?;
163 let timeout = match result? {
164 ClientOutcome::Committed(t) => return Ok(t),
165 ClientOutcome::WaitForTimeout(timeout) => timeout,
166 };
167 drop(client);
168 util::wait_for_next_round(&mut stream, timeout).await;
169 }
170 }
171}
172
173#[async_graphql::Object(cache_control(no_cache))]
174impl<C> MutationRoot<C>
175where
176 C: ClientContext + 'static,
177{
178 async fn process_inbox(
180 &self,
181 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
182 ) -> Result<Vec<CryptoHash>, Error> {
183 let mut hashes = Vec::new();
184 loop {
185 let client = self.context.lock().await.make_chain_client(chain_id);
186 let result = client.process_inbox().await;
187 self.context.lock().await.update_wallet(&client).await?;
188 let (certificates, maybe_timeout) = result?;
189 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
190 match maybe_timeout {
191 None => return Ok(hashes),
192 Some(timestamp) => {
193 let mut stream = client.subscribe()?;
194 drop(client);
195 util::wait_for_next_round(&mut stream, timestamp).await;
196 }
197 }
198 }
199 }
200
201 async fn sync(
206 &self,
207 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
208 ) -> Result<u64, Error> {
209 let client = self.context.lock().await.make_chain_client(chain_id);
210 let info = client.synchronize_from_validators().await?;
211 self.context.lock().await.update_wallet(&client).await?;
212 Ok(info.next_block_height.0)
213 }
214
215 async fn retry_pending_block(
217 &self,
218 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
219 ) -> Result<Option<CryptoHash>, Error> {
220 let client = self.context.lock().await.make_chain_client(chain_id);
221 let outcome = client.process_pending_block().await?;
222 self.context.lock().await.update_wallet(&client).await?;
223 match outcome {
224 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
225 ClientOutcome::Committed(None) => Ok(None),
226 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
227 "Please try again at {}",
228 timeout.timestamp
229 ))),
230 }
231 }
232
233 async fn transfer(
236 &self,
237 #[graphql(desc = "The chain which native tokens are being transferred from.")]
238 chain_id: ChainId,
239 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
240 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
241 #[graphql(desc = "The amount being transferred.")] amount: Amount,
242 ) -> Result<CryptoHash, Error> {
243 self.apply_client_command(&chain_id, move |client| async move {
244 let result = client
245 .transfer(owner, amount, recipient)
246 .await
247 .map_err(Error::from)
248 .map(|outcome| outcome.map(|certificate| certificate.hash()));
249 (result, client)
250 })
251 .await
252 }
253
254 async fn claim(
258 &self,
259 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
260 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
261 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
262 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
263 #[graphql(desc = "The amount being transferred.")] amount: Amount,
264 ) -> Result<CryptoHash, Error> {
265 self.apply_client_command(&chain_id, move |client| async move {
266 let result = client
267 .claim(owner, target_id, recipient, amount)
268 .await
269 .map_err(Error::from)
270 .map(|outcome| outcome.map(|certificate| certificate.hash()));
271 (result, client)
272 })
273 .await
274 }
275
276 async fn read_data_blob(
279 &self,
280 chain_id: ChainId,
281 hash: CryptoHash,
282 ) -> Result<CryptoHash, Error> {
283 self.apply_client_command(&chain_id, move |client| async move {
284 let result = client
285 .read_data_blob(hash)
286 .await
287 .map_err(Error::from)
288 .map(|outcome| outcome.map(|certificate| certificate.hash()));
289 (result, client)
290 })
291 .await
292 }
293
294 async fn open_chain(
296 &self,
297 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
298 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
299 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
300 balance: Option<Amount>,
301 ) -> Result<ChainId, Error> {
302 let ownership = ChainOwnership::single(owner);
303 let balance = balance.unwrap_or(Amount::ZERO);
304 let description = self
305 .apply_client_command(&chain_id, move |client| {
306 let ownership = ownership.clone();
307 async move {
308 let result = client
309 .open_chain(ownership, ApplicationPermissions::default(), balance)
310 .await
311 .map_err(Error::from)
312 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
313 (result, client)
314 }
315 })
316 .await?;
317 Ok(description.id())
318 }
319
320 #[expect(clippy::too_many_arguments)]
322 async fn open_multi_owner_chain(
323 &self,
324 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
325 #[graphql(desc = "Permissions for applications on the new chain")]
326 application_permissions: Option<ApplicationPermissions>,
327 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
328 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
329 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
330 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
331 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
332 fast_round_ms: Option<u64>,
333 #[graphql(
334 desc = "The duration of the first single-leader and all multi-leader rounds",
335 default = 10_000
336 )]
337 base_timeout_ms: u64,
338 #[graphql(
339 desc = "The number of milliseconds by which the timeout increases after each \
340 single-leader round",
341 default = 1_000
342 )]
343 timeout_increment_ms: u64,
344 #[graphql(
345 desc = "The age of an incoming tracked or protected message after which the \
346 validators start transitioning the chain to fallback mode, in milliseconds.",
347 default = 86_400_000
348 )]
349 fallback_duration_ms: u64,
350 ) -> Result<ChainId, Error> {
351 let owners = if let Some(weights) = weights {
352 if weights.len() != owners.len() {
353 return Err(Error::new(format!(
354 "There are {} owners but {} weights.",
355 owners.len(),
356 weights.len()
357 )));
358 }
359 owners.into_iter().zip(weights).collect::<Vec<_>>()
360 } else {
361 owners
362 .into_iter()
363 .zip(iter::repeat(100))
364 .collect::<Vec<_>>()
365 };
366 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
367 let timeout_config = TimeoutConfig {
368 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
369 base_timeout: TimeDelta::from_millis(base_timeout_ms),
370 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
371 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
372 };
373 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
374 let balance = balance.unwrap_or(Amount::ZERO);
375 let description = self
376 .apply_client_command(&chain_id, move |client| {
377 let ownership = ownership.clone();
378 let application_permissions = application_permissions.clone().unwrap_or_default();
379 async move {
380 let result = client
381 .open_chain(ownership, application_permissions, balance)
382 .await
383 .map_err(Error::from)
384 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
385 (result, client)
386 }
387 })
388 .await?;
389 Ok(description.id())
390 }
391
392 async fn close_chain(
394 &self,
395 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
396 ) -> Result<Option<CryptoHash>, Error> {
397 let maybe_cert = self
398 .apply_client_command(&chain_id, |client| async move {
399 let result = client.close_chain().await.map_err(Error::from);
400 (result, client)
401 })
402 .await?;
403 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
404 }
405
406 async fn change_owner(
408 &self,
409 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
410 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
411 ) -> Result<CryptoHash, Error> {
412 let operation = SystemOperation::ChangeOwnership {
413 super_owners: vec![new_owner],
414 owners: Vec::new(),
415 multi_leader_rounds: 2,
416 open_multi_leader_rounds: false,
417 timeout_config: TimeoutConfig::default(),
418 };
419 self.execute_system_operation(operation, chain_id).await
420 }
421
422 #[expect(clippy::too_many_arguments)]
424 async fn change_multiple_owners(
425 &self,
426 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
427 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
428 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
429 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
430 #[graphql(
431 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
432 )]
433 open_multi_leader_rounds: bool,
434 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
435 fast_round_ms: Option<u64>,
436 #[graphql(
437 desc = "The duration of the first single-leader and all multi-leader rounds",
438 default = 10_000
439 )]
440 base_timeout_ms: u64,
441 #[graphql(
442 desc = "The number of milliseconds by which the timeout increases after each \
443 single-leader round",
444 default = 1_000
445 )]
446 timeout_increment_ms: u64,
447 #[graphql(
448 desc = "The age of an incoming tracked or protected message after which the \
449 validators start transitioning the chain to fallback mode, in milliseconds.",
450 default = 86_400_000
451 )]
452 fallback_duration_ms: u64,
453 ) -> Result<CryptoHash, Error> {
454 let operation = SystemOperation::ChangeOwnership {
455 super_owners: Vec::new(),
456 owners: new_owners.into_iter().zip(new_weights).collect(),
457 multi_leader_rounds,
458 open_multi_leader_rounds,
459 timeout_config: TimeoutConfig {
460 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
461 base_timeout: TimeDelta::from_millis(base_timeout_ms),
462 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
463 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
464 },
465 };
466 self.execute_system_operation(operation, chain_id).await
467 }
468
469 #[expect(clippy::too_many_arguments)]
471 async fn change_application_permissions(
472 &self,
473 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
474 #[graphql(desc = "These applications are allowed to close the current chain.")]
475 close_chain: Vec<ApplicationId>,
476 #[graphql(
477 desc = "If this is `None`, all system operations and application operations are allowed.
478If it is `Some`, only operations from the specified applications are allowed,
479and no system operations."
480 )]
481 execute_operations: Option<Vec<ApplicationId>>,
482 #[graphql(
483 desc = "At least one operation or incoming message from each of these applications must occur in every block."
484 )]
485 mandatory_applications: Vec<ApplicationId>,
486 #[graphql(desc = "These applications are allowed to change the application permissions.")]
487 change_application_permissions: Vec<ApplicationId>,
488 #[graphql(
489 desc = "These applications are allowed to perform calls to services as oracles."
490 )]
491 call_service_as_oracle: Option<Vec<ApplicationId>>,
492 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
493 make_http_requests: Option<Vec<ApplicationId>>,
494 ) -> Result<CryptoHash, Error> {
495 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
496 execute_operations,
497 mandatory_applications,
498 close_chain,
499 change_application_permissions,
500 call_service_as_oracle,
501 make_http_requests,
502 });
503 self.execute_system_operation(operation, chain_id).await
504 }
505
506 async fn create_committee(
510 &self,
511 chain_id: ChainId,
512 committee: Committee,
513 ) -> Result<CryptoHash, Error> {
514 Ok(self
515 .apply_client_command(&chain_id, move |client| {
516 let committee = committee.clone();
517 async move {
518 let result = client
519 .stage_new_committee(committee)
520 .await
521 .map_err(Error::from);
522 (result, client)
523 }
524 })
525 .await?
526 .hash())
527 }
528
529 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
533 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
534 self.execute_system_operation(operation, chain_id).await
535 }
536
537 async fn publish_module(
539 &self,
540 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
541 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
542 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
543 service: Bytecode,
544 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
545 vm_runtime: VmRuntime,
546 ) -> Result<ModuleId, Error> {
547 self.apply_client_command(&chain_id, move |client| {
548 let contract = contract.clone();
549 let service = service.clone();
550 async move {
551 let result = client
552 .publish_module(contract, service, vm_runtime)
553 .await
554 .map_err(Error::from)
555 .map(|outcome| outcome.map(|(module_id, _)| module_id));
556 (result, client)
557 }
558 })
559 .await
560 }
561
562 async fn publish_data_blob(
564 &self,
565 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
566 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
567 ) -> Result<CryptoHash, Error> {
568 self.apply_client_command(&chain_id, |client| {
569 let bytes = bytes.clone();
570 async move {
571 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
572 (result, client)
573 }
574 })
575 .await
576 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
577 }
578
579 async fn create_application(
581 &self,
582 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
583 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
584 #[graphql(desc = "The JSON serialization of the parameters of the application")]
585 parameters: String,
586 #[graphql(
587 desc = "The JSON serialization of the instantiation argument of the application"
588 )]
589 instantiation_argument: String,
590 #[graphql(desc = "The dependencies of the application being created")]
591 required_application_ids: Vec<ApplicationId>,
592 ) -> Result<ApplicationId, Error> {
593 self.apply_client_command(&chain_id, move |client| {
594 let parameters = parameters.as_bytes().to_vec();
595 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
596 let required_application_ids = required_application_ids.clone();
597 async move {
598 let result = client
599 .create_application_untyped(
600 module_id,
601 parameters,
602 instantiation_argument,
603 required_application_ids,
604 )
605 .await
606 .map_err(Error::from)
607 .map(|outcome| outcome.map(|(application_id, _)| application_id));
608 (result, client)
609 }
610 })
611 .await
612 }
613}
614
615#[async_graphql::Object(cache_control(no_cache))]
616impl<C> QueryRoot<C>
617where
618 C: ClientContext + 'static,
619{
620 async fn chain(
621 &self,
622 chain_id: ChainId,
623 ) -> Result<
624 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
625 Error,
626 > {
627 let client = self.context.lock().await.make_chain_client(chain_id);
628 let view = client.chain_state_view().await?;
629 Ok(ChainStateExtendedView::new(view))
630 }
631
632 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
633 let client = self.context.lock().await.make_chain_client(chain_id);
634 let applications = client
635 .chain_state_view()
636 .await?
637 .execution_state
638 .list_applications()
639 .await?;
640
641 let overviews = applications
642 .into_iter()
643 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
644 .collect();
645
646 Ok(overviews)
647 }
648
649 async fn chains(&self) -> Result<Chains, Error> {
650 Ok(Chains {
651 list: self.context.lock().await.wallet().chain_ids(),
652 default: self.default_chain,
653 })
654 }
655
656 async fn block(
657 &self,
658 hash: Option<CryptoHash>,
659 chain_id: ChainId,
660 ) -> Result<Option<ConfirmedBlock>, Error> {
661 let client = self.context.lock().await.make_chain_client(chain_id);
662 let hash = match hash {
663 Some(hash) => Some(hash),
664 None => client.chain_info().await?.block_hash,
665 };
666 if let Some(hash) = hash {
667 let block = client.read_confirmed_block(hash).await?;
668 Ok(Some(block))
669 } else {
670 Ok(None)
671 }
672 }
673
674 async fn events_from_index(
675 &self,
676 chain_id: ChainId,
677 stream_id: StreamId,
678 start_index: u32,
679 ) -> Result<Vec<IndexAndEvent>, Error> {
680 Ok(self
681 .context
682 .lock()
683 .await
684 .make_chain_client(chain_id)
685 .events_from_index(stream_id, start_index)
686 .await?)
687 }
688
689 async fn blocks(
690 &self,
691 from: Option<CryptoHash>,
692 chain_id: ChainId,
693 limit: Option<u32>,
694 ) -> Result<Vec<ConfirmedBlock>, Error> {
695 let client = self.context.lock().await.make_chain_client(chain_id);
696 let limit = limit.unwrap_or(10);
697 let from = match from {
698 Some(from) => Some(from),
699 None => client.chain_info().await?.block_hash,
700 };
701 let Some(from) = from else {
702 return Ok(vec![]);
703 };
704 let mut hash = Some(from);
705 let mut values = Vec::new();
706 for _ in 0..limit {
707 let Some(next_hash) = hash else {
708 break;
709 };
710 let value = client.read_confirmed_block(next_hash).await?;
711 hash = value.block().header.previous_block_hash;
712 values.push(value);
713 }
714 Ok(values)
715 }
716
717 async fn version(&self) -> linera_version::VersionInfo {
719 linera_version::VersionInfo::default()
720 }
721}
722
723struct ChainStateViewExtension(ChainId);
727
728#[async_graphql::Object(cache_control(no_cache))]
729impl ChainStateViewExtension {
730 async fn chain_id(&self) -> ChainId {
731 self.0
732 }
733}
734
735#[derive(MergedObject)]
736struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
737where
738 C: linera_views::context::Context + Clone + Send + Sync + 'static,
739 C::Extra: linera_execution::ExecutionRuntimeContext;
740
741pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
744where
745 C: linera_views::context::Context + Clone + Send + Sync + 'static;
746
747impl<C> ContainerType for ReadOnlyChainStateView<C>
748where
749 C: linera_views::context::Context + Clone + Send + Sync + 'static,
750{
751 async fn resolve_field(
752 &self,
753 context: &async_graphql::Context<'_>,
754 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
755 self.0.resolve_field(context).await
756 }
757}
758
759impl<C> OutputType for ReadOnlyChainStateView<C>
760where
761 C: linera_views::context::Context + Clone + Send + Sync + 'static,
762{
763 fn type_name() -> Cow<'static, str> {
764 ChainStateView::<C>::type_name()
765 }
766
767 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
768 ChainStateView::<C>::create_type_info(registry)
769 }
770
771 async fn resolve(
772 &self,
773 context: &async_graphql::ContextSelectionSet<'_>,
774 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
775 ) -> async_graphql::ServerResult<async_graphql::Value> {
776 self.0.resolve(context, field).await
777 }
778}
779
780impl<C> ChainStateExtendedView<C>
781where
782 C: linera_views::context::Context + Clone + Send + Sync + 'static,
783 C::Extra: linera_execution::ExecutionRuntimeContext,
784{
785 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
786 Self(
787 ChainStateViewExtension(view.chain_id()),
788 ReadOnlyChainStateView(view),
789 )
790 }
791}
792
793#[derive(SimpleObject)]
794pub struct ApplicationOverview {
795 id: ApplicationId,
796 description: ApplicationDescription,
797 link: String,
798}
799
800impl ApplicationOverview {
801 fn new(
802 id: ApplicationId,
803 description: ApplicationDescription,
804 port: NonZeroU16,
805 chain_id: ChainId,
806 ) -> Self {
807 Self {
808 id,
809 description,
810 link: format!(
811 "http://localhost:{}/chains/{}/applications/{}",
812 port.get(),
813 chain_id,
814 id
815 ),
816 }
817 }
818}
819
820pub struct NodeService<C>
823where
824 C: ClientContext + 'static,
825{
826 config: ChainListenerConfig,
827 port: NonZeroU16,
828 #[cfg(with_metrics)]
829 metrics_port: NonZeroU16,
830 default_chain: Option<ChainId>,
831 context: Arc<Mutex<C>>,
832}
833
834impl<C> Clone for NodeService<C>
835where
836 C: ClientContext + 'static,
837{
838 fn clone(&self) -> Self {
839 Self {
840 config: self.config.clone(),
841 port: self.port,
842 #[cfg(with_metrics)]
843 metrics_port: self.metrics_port,
844 default_chain: self.default_chain,
845 context: Arc::clone(&self.context),
846 }
847 }
848}
849
850impl<C> NodeService<C>
851where
852 C: ClientContext,
853{
854 pub fn new(
856 config: ChainListenerConfig,
857 port: NonZeroU16,
858 #[cfg(with_metrics)] metrics_port: NonZeroU16,
859 default_chain: Option<ChainId>,
860 context: C,
861 ) -> Self {
862 Self {
863 config,
864 port,
865 #[cfg(with_metrics)]
866 metrics_port,
867 default_chain,
868 context: Arc::new(Mutex::new(context)),
869 }
870 }
871
872 #[cfg(with_metrics)]
873 pub fn metrics_address(&self) -> SocketAddr {
874 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
875 }
876
877 pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
878 Schema::build(
879 QueryRoot {
880 context: Arc::clone(&self.context),
881 port: self.port,
882 default_chain: self.default_chain,
883 },
884 MutationRoot {
885 context: Arc::clone(&self.context),
886 },
887 SubscriptionRoot {
888 context: Arc::clone(&self.context),
889 },
890 )
891 .finish()
892 }
893
894 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
896 pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
897 let port = self.port.get();
898 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
899 let application_handler =
900 axum::routing::get(util::graphiql).post(Self::application_handler);
901
902 #[cfg(with_metrics)]
903 monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
904
905 let app = Router::new()
906 .route("/", index_handler)
907 .route(
908 "/chains/{chain_id}/applications/{application_id}",
909 application_handler,
910 )
911 .route("/ready", axum::routing::get(|| async { "ready!" }))
912 .route_service("/ws", GraphQLSubscription::new(self.schema()))
913 .layer(Extension(self.clone()))
914 .layer(CorsLayer::permissive());
916
917 info!("GraphiQL IDE: http://localhost:{}", port);
918
919 let storage = self.context.lock().await.storage().clone();
920
921 let chain_listener = ChainListener::new(
922 self.config,
923 self.context,
924 storage,
925 cancellation_token.clone(),
926 )
927 .run(true)
928 .await?;
929 let mut chain_listener = Box::pin(chain_listener).fuse();
930 let tcp_listener =
931 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
932 let server = axum::serve(tcp_listener, app)
933 .with_graceful_shutdown(cancellation_token.cancelled_owned())
934 .into_future();
935 futures::select! {
936 result = chain_listener => result?,
937 result = Box::pin(server).fuse() => result?,
938 };
939
940 Ok(())
941 }
942
943 async fn handle_service_request(
945 &self,
946 application_id: ApplicationId,
947 request: Vec<u8>,
948 chain_id: ChainId,
949 block_hash: Option<CryptoHash>,
950 ) -> Result<Vec<u8>, NodeServiceError> {
951 let QueryOutcome {
952 response,
953 operations,
954 } = self
955 .query_user_application(application_id, request, chain_id, block_hash)
956 .await?;
957 if operations.is_empty() {
958 return Ok(response);
959 }
960
961 trace!("Query requested a new block with operations: {operations:?}");
962 let client = self.context.lock().await.make_chain_client(chain_id);
963 let hash = loop {
964 let timeout = match client
965 .execute_operations(operations.clone(), vec![])
966 .await?
967 {
968 ClientOutcome::Committed(certificate) => break certificate.hash(),
969 ClientOutcome::WaitForTimeout(timeout) => timeout,
970 };
971 let mut stream = client.subscribe().map_err(|_| {
972 chain_client::Error::InternalError("Could not subscribe to the local node.")
973 })?;
974 util::wait_for_next_round(&mut stream, timeout).await;
975 };
976 let response = async_graphql::Response::new(hash.to_value());
977 Ok(serde_json::to_vec(&response)?)
978 }
979
980 async fn query_user_application(
982 &self,
983 application_id: ApplicationId,
984 bytes: Vec<u8>,
985 chain_id: ChainId,
986 block_hash: Option<CryptoHash>,
987 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
988 let query = Query::User {
989 application_id,
990 bytes,
991 };
992 let client = self.context.lock().await.make_chain_client(chain_id);
993 let QueryOutcome {
994 response,
995 operations,
996 } = client.query_application(query, block_hash).await?;
997 match response {
998 QueryResponse::System(_) => {
999 unreachable!("cannot get a system response for a user query")
1000 }
1001 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1002 response: user_response_bytes,
1003 operations,
1004 }),
1005 }
1006 }
1007
1008 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1010 service
1011 .0
1012 .schema()
1013 .execute(request.into_inner())
1014 .await
1015 .into()
1016 }
1017
1018 async fn application_handler(
1022 Path((chain_id, application_id)): Path<(String, String)>,
1023 service: Extension<Self>,
1024 request: String,
1025 ) -> Result<Vec<u8>, NodeServiceError> {
1026 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1027 let application_id: ApplicationId = application_id.parse()?;
1028
1029 debug!(
1030 %chain_id,
1031 %application_id,
1032 "processing request for application:\n{:?}",
1033 &request
1034 );
1035 let response = service
1036 .0
1037 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1038 .await?;
1039
1040 Ok(response)
1041 }
1042}