1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, EmptyMutation, Error, MergedObject,
8 OutputType, Request, Response, ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{
19 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20 },
21 ownership::{ChainOwnership, TimeoutConfig},
22 vm::VmRuntime,
23 BcsHexParseError,
24};
25use linera_chain::{
26 types::{ConfirmedBlock, GenericCertificate},
27 ChainStateView,
28};
29use linera_client::chain_listener::{
30 ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33 client::chain_client::{self, ChainClient},
34 data_types::ClientOutcome,
35 wallet::Wallet as _,
36 worker::Notification,
37};
38use linera_execution::{
39 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40 SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, error, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56 pub list: Vec<ChainId>,
57 pub default: Option<ChainId>,
58}
59
60pub struct QueryRoot<C> {
62 context: Arc<Mutex<C>>,
63 port: NonZeroU16,
64 default_chain: Option<ChainId>,
65}
66
67pub struct SubscriptionRoot<C> {
69 context: Arc<Mutex<C>>,
70}
71
72pub struct MutationRoot<C> {
74 context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79 #[error(transparent)]
80 ChainClient(#[from] chain_client::Error),
81 #[error(transparent)]
82 BcsHex(#[from] BcsHexParseError),
83 #[error(transparent)]
84 Json(#[from] serde_json::Error),
85 #[error("malformed chain ID: {0}")]
86 InvalidChainId(CryptoError),
87 #[error(transparent)]
88 Client(#[from] linera_client::Error),
89 #[error("scheduling operations from queries is disabled in read-only mode")]
90 ReadOnlyModeOperationsNotAllowed,
91}
92
93impl IntoResponse for NodeServiceError {
94 fn into_response(self) -> response::Response {
95 let status = match self {
96 NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
97 StatusCode::BAD_REQUEST
98 }
99 NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
100 _ => StatusCode::INTERNAL_SERVER_ERROR,
101 };
102 let body = json!({"error": self.to_string()}).to_string();
103 (status, body).into_response()
104 }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110 C: ClientContext + 'static,
111{
112 async fn notifications(
114 &self,
115 chain_id: ChainId,
116 ) -> Result<impl Stream<Item = Notification>, Error> {
117 let client = self
118 .context
119 .lock()
120 .await
121 .make_chain_client(chain_id)
122 .await?;
123 Ok(client.subscribe()?)
124 }
125}
126
127impl<C> MutationRoot<C>
128where
129 C: ClientContext,
130{
131 async fn execute_system_operation(
132 &self,
133 system_operation: SystemOperation,
134 chain_id: ChainId,
135 ) -> Result<CryptoHash, Error> {
136 let certificate = self
137 .apply_client_command(&chain_id, move |client| {
138 let operation = Operation::system(system_operation.clone());
139 async move {
140 let result = client
141 .execute_operation(operation)
142 .await
143 .map_err(Error::from);
144 (result, client)
145 }
146 })
147 .await?;
148 Ok(certificate.hash())
149 }
150
151 async fn apply_client_command<F, Fut, T>(
155 &self,
156 chain_id: &ChainId,
157 mut f: F,
158 ) -> Result<T, Error>
159 where
160 F: FnMut(ChainClient<C::Environment>) -> Fut,
161 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
162 {
163 loop {
164 let client = self
165 .context
166 .lock()
167 .await
168 .make_chain_client(*chain_id)
169 .await?;
170 let mut stream = client.subscribe()?;
171 let (result, client) = f(client).await;
172 self.context.lock().await.update_wallet(&client).await?;
173 let timeout = match result? {
174 ClientOutcome::Committed(t) => return Ok(t),
175 ClientOutcome::WaitForTimeout(timeout) => timeout,
176 };
177 drop(client);
178 util::wait_for_next_round(&mut stream, timeout).await;
179 }
180 }
181}
182
183#[async_graphql::Object(cache_control(no_cache))]
184impl<C> MutationRoot<C>
185where
186 C: ClientContext + 'static,
187{
188 async fn process_inbox(
190 &self,
191 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
192 ) -> Result<Vec<CryptoHash>, Error> {
193 let mut hashes = Vec::new();
194 loop {
195 let client = self
196 .context
197 .lock()
198 .await
199 .make_chain_client(chain_id)
200 .await?;
201 let result = client.process_inbox().await;
202 self.context.lock().await.update_wallet(&client).await?;
203 let (certificates, maybe_timeout) = result?;
204 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
205 match maybe_timeout {
206 None => return Ok(hashes),
207 Some(timestamp) => {
208 let mut stream = client.subscribe()?;
209 drop(client);
210 util::wait_for_next_round(&mut stream, timestamp).await;
211 }
212 }
213 }
214 }
215
216 async fn sync(
221 &self,
222 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
223 ) -> Result<u64, Error> {
224 let client = self
225 .context
226 .lock()
227 .await
228 .make_chain_client(chain_id)
229 .await?;
230 let info = client.synchronize_from_validators().await?;
231 self.context.lock().await.update_wallet(&client).await?;
232 Ok(info.next_block_height.0)
233 }
234
235 async fn retry_pending_block(
237 &self,
238 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
239 ) -> Result<Option<CryptoHash>, Error> {
240 let client = self
241 .context
242 .lock()
243 .await
244 .make_chain_client(chain_id)
245 .await?;
246 let outcome = client.process_pending_block().await?;
247 self.context.lock().await.update_wallet(&client).await?;
248 match outcome {
249 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
250 ClientOutcome::Committed(None) => Ok(None),
251 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
252 "Please try again at {}",
253 timeout.timestamp
254 ))),
255 }
256 }
257
258 async fn transfer(
261 &self,
262 #[graphql(desc = "The chain which native tokens are being transferred from.")]
263 chain_id: ChainId,
264 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
265 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
266 #[graphql(desc = "The amount being transferred.")] amount: Amount,
267 ) -> Result<CryptoHash, Error> {
268 self.apply_client_command(&chain_id, move |client| async move {
269 let result = client
270 .transfer(owner, amount, recipient)
271 .await
272 .map_err(Error::from)
273 .map(|outcome| outcome.map(|certificate| certificate.hash()));
274 (result, client)
275 })
276 .await
277 }
278
279 async fn claim(
283 &self,
284 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
285 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
286 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
287 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
288 #[graphql(desc = "The amount being transferred.")] amount: Amount,
289 ) -> Result<CryptoHash, Error> {
290 self.apply_client_command(&chain_id, move |client| async move {
291 let result = client
292 .claim(owner, target_id, recipient, amount)
293 .await
294 .map_err(Error::from)
295 .map(|outcome| outcome.map(|certificate| certificate.hash()));
296 (result, client)
297 })
298 .await
299 }
300
301 async fn read_data_blob(
304 &self,
305 chain_id: ChainId,
306 hash: CryptoHash,
307 ) -> Result<CryptoHash, Error> {
308 self.apply_client_command(&chain_id, move |client| async move {
309 let result = client
310 .read_data_blob(hash)
311 .await
312 .map_err(Error::from)
313 .map(|outcome| outcome.map(|certificate| certificate.hash()));
314 (result, client)
315 })
316 .await
317 }
318
319 async fn open_chain(
321 &self,
322 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
323 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
324 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
325 balance: Option<Amount>,
326 ) -> Result<ChainId, Error> {
327 let ownership = ChainOwnership::single(owner);
328 let balance = balance.unwrap_or(Amount::ZERO);
329 let description = self
330 .apply_client_command(&chain_id, move |client| {
331 let ownership = ownership.clone();
332 async move {
333 let result = client
334 .open_chain(ownership, ApplicationPermissions::default(), balance)
335 .await
336 .map_err(Error::from)
337 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
338 (result, client)
339 }
340 })
341 .await?;
342 Ok(description.id())
343 }
344
345 #[expect(clippy::too_many_arguments)]
347 async fn open_multi_owner_chain(
348 &self,
349 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
350 #[graphql(desc = "Permissions for applications on the new chain")]
351 application_permissions: Option<ApplicationPermissions>,
352 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
353 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
354 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
355 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
356 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
357 fast_round_ms: Option<u64>,
358 #[graphql(
359 desc = "The duration of the first single-leader and all multi-leader rounds",
360 default = 10_000
361 )]
362 base_timeout_ms: u64,
363 #[graphql(
364 desc = "The number of milliseconds by which the timeout increases after each \
365 single-leader round",
366 default = 1_000
367 )]
368 timeout_increment_ms: u64,
369 #[graphql(
370 desc = "The age of an incoming tracked or protected message after which the \
371 validators start transitioning the chain to fallback mode, in milliseconds.",
372 default = 86_400_000
373 )]
374 fallback_duration_ms: u64,
375 ) -> Result<ChainId, Error> {
376 let owners = if let Some(weights) = weights {
377 if weights.len() != owners.len() {
378 return Err(Error::new(format!(
379 "There are {} owners but {} weights.",
380 owners.len(),
381 weights.len()
382 )));
383 }
384 owners.into_iter().zip(weights).collect::<Vec<_>>()
385 } else {
386 owners
387 .into_iter()
388 .zip(iter::repeat(100))
389 .collect::<Vec<_>>()
390 };
391 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
392 let timeout_config = TimeoutConfig {
393 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
394 base_timeout: TimeDelta::from_millis(base_timeout_ms),
395 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
396 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
397 };
398 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
399 let balance = balance.unwrap_or(Amount::ZERO);
400 let description = self
401 .apply_client_command(&chain_id, move |client| {
402 let ownership = ownership.clone();
403 let application_permissions = application_permissions.clone().unwrap_or_default();
404 async move {
405 let result = client
406 .open_chain(ownership, application_permissions, balance)
407 .await
408 .map_err(Error::from)
409 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
410 (result, client)
411 }
412 })
413 .await?;
414 Ok(description.id())
415 }
416
417 async fn close_chain(
419 &self,
420 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
421 ) -> Result<Option<CryptoHash>, Error> {
422 let maybe_cert = self
423 .apply_client_command(&chain_id, |client| async move {
424 let result = client.close_chain().await.map_err(Error::from);
425 (result, client)
426 })
427 .await?;
428 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
429 }
430
431 async fn change_owner(
433 &self,
434 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
435 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
436 ) -> Result<CryptoHash, Error> {
437 let operation = SystemOperation::ChangeOwnership {
438 super_owners: vec![new_owner],
439 owners: Vec::new(),
440 first_leader: None,
441 multi_leader_rounds: 2,
442 open_multi_leader_rounds: false,
443 timeout_config: TimeoutConfig::default(),
444 };
445 self.execute_system_operation(operation, chain_id).await
446 }
447
448 #[expect(clippy::too_many_arguments)]
450 async fn change_multiple_owners(
451 &self,
452 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
453 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
454 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
455 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
456 #[graphql(
457 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
458 )]
459 open_multi_leader_rounds: bool,
460 #[graphql(desc = "The leader of the first single-leader round. \
461 If not set, this is random like other rounds.")]
462 first_leader: Option<AccountOwner>,
463 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
464 fast_round_ms: Option<u64>,
465 #[graphql(
466 desc = "The duration of the first single-leader and all multi-leader rounds",
467 default = 10_000
468 )]
469 base_timeout_ms: u64,
470 #[graphql(
471 desc = "The number of milliseconds by which the timeout increases after each \
472 single-leader round",
473 default = 1_000
474 )]
475 timeout_increment_ms: u64,
476 #[graphql(
477 desc = "The age of an incoming tracked or protected message after which the \
478 validators start transitioning the chain to fallback mode, in milliseconds.",
479 default = 86_400_000
480 )]
481 fallback_duration_ms: u64,
482 ) -> Result<CryptoHash, Error> {
483 let operation = SystemOperation::ChangeOwnership {
484 super_owners: Vec::new(),
485 owners: new_owners.into_iter().zip(new_weights).collect(),
486 first_leader,
487 multi_leader_rounds,
488 open_multi_leader_rounds,
489 timeout_config: TimeoutConfig {
490 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
491 base_timeout: TimeDelta::from_millis(base_timeout_ms),
492 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
493 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
494 },
495 };
496 self.execute_system_operation(operation, chain_id).await
497 }
498
499 #[expect(clippy::too_many_arguments)]
501 async fn change_application_permissions(
502 &self,
503 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
504 #[graphql(
505 desc = "These applications are allowed to manage the chain: close it, change \
506 application permissions, and change ownership."
507 )]
508 manage_chain: Vec<ApplicationId>,
509 #[graphql(
510 desc = "If this is `None`, all system operations and application operations are allowed.
511If it is `Some`, only operations from the specified applications are allowed,
512and no system operations."
513 )]
514 execute_operations: Option<Vec<ApplicationId>>,
515 #[graphql(
516 desc = "At least one operation or incoming message from each of these applications must occur in every block."
517 )]
518 mandatory_applications: Vec<ApplicationId>,
519 #[graphql(
520 desc = "These applications are allowed to perform calls to services as oracles."
521 )]
522 call_service_as_oracle: Option<Vec<ApplicationId>>,
523 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
524 make_http_requests: Option<Vec<ApplicationId>>,
525 ) -> Result<CryptoHash, Error> {
526 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
527 execute_operations,
528 mandatory_applications,
529 manage_chain,
530 call_service_as_oracle,
531 make_http_requests,
532 });
533 self.execute_system_operation(operation, chain_id).await
534 }
535
536 async fn create_committee(
540 &self,
541 chain_id: ChainId,
542 committee: Committee,
543 ) -> Result<CryptoHash, Error> {
544 Ok(self
545 .apply_client_command(&chain_id, move |client| {
546 let committee = committee.clone();
547 async move {
548 let result = client
549 .stage_new_committee(committee)
550 .await
551 .map_err(Error::from);
552 (result, client)
553 }
554 })
555 .await?
556 .hash())
557 }
558
559 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
563 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
564 self.execute_system_operation(operation, chain_id).await
565 }
566
567 async fn publish_module(
569 &self,
570 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
571 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
572 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
573 service: Bytecode,
574 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
575 vm_runtime: VmRuntime,
576 ) -> Result<ModuleId, Error> {
577 self.apply_client_command(&chain_id, move |client| {
578 let contract = contract.clone();
579 let service = service.clone();
580 async move {
581 let result = client
582 .publish_module(contract, service, vm_runtime)
583 .await
584 .map_err(Error::from)
585 .map(|outcome| outcome.map(|(module_id, _)| module_id));
586 (result, client)
587 }
588 })
589 .await
590 }
591
592 async fn publish_data_blob(
594 &self,
595 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
596 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
597 ) -> Result<CryptoHash, Error> {
598 self.apply_client_command(&chain_id, |client| {
599 let bytes = bytes.clone();
600 async move {
601 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
602 (result, client)
603 }
604 })
605 .await
606 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
607 }
608
609 async fn create_application(
611 &self,
612 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
613 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
614 #[graphql(desc = "The JSON serialization of the parameters of the application")]
615 parameters: String,
616 #[graphql(
617 desc = "The JSON serialization of the instantiation argument of the application"
618 )]
619 instantiation_argument: String,
620 #[graphql(desc = "The dependencies of the application being created")]
621 required_application_ids: Vec<ApplicationId>,
622 ) -> Result<ApplicationId, Error> {
623 self.apply_client_command(&chain_id, move |client| {
624 let parameters = parameters.as_bytes().to_vec();
625 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
626 let required_application_ids = required_application_ids.clone();
627 async move {
628 let result = client
629 .create_application_untyped(
630 module_id,
631 parameters,
632 instantiation_argument,
633 required_application_ids,
634 )
635 .await
636 .map_err(Error::from)
637 .map(|outcome| outcome.map(|(application_id, _)| application_id));
638 (result, client)
639 }
640 })
641 .await
642 }
643}
644
645#[async_graphql::Object(cache_control(no_cache))]
646impl<C> QueryRoot<C>
647where
648 C: ClientContext + 'static,
649{
650 async fn chain(
651 &self,
652 chain_id: ChainId,
653 ) -> Result<
654 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
655 Error,
656 > {
657 let client = self
658 .context
659 .lock()
660 .await
661 .make_chain_client(chain_id)
662 .await?;
663 let view = client.chain_state_view().await?;
664 Ok(ChainStateExtendedView::new(view))
665 }
666
667 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
668 let client = self
669 .context
670 .lock()
671 .await
672 .make_chain_client(chain_id)
673 .await?;
674 let applications = client
675 .chain_state_view()
676 .await?
677 .execution_state
678 .list_applications()
679 .await?;
680
681 let overviews = applications
682 .into_iter()
683 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
684 .collect();
685
686 Ok(overviews)
687 }
688
689 async fn chains(&self) -> Result<Chains, Error> {
690 Ok(Chains {
691 list: self
692 .context
693 .lock()
694 .await
695 .wallet()
696 .chain_ids()
697 .try_collect()
698 .await?,
699 default: self.default_chain,
700 })
701 }
702
703 async fn block(
704 &self,
705 hash: Option<CryptoHash>,
706 chain_id: ChainId,
707 ) -> Result<Option<ConfirmedBlock>, Error> {
708 let client = self
709 .context
710 .lock()
711 .await
712 .make_chain_client(chain_id)
713 .await?;
714 let hash = match hash {
715 Some(hash) => Some(hash),
716 None => client.chain_info().await?.block_hash,
717 };
718 if let Some(hash) = hash {
719 let block = client.read_confirmed_block(hash).await?;
720 Ok(Some(block))
721 } else {
722 Ok(None)
723 }
724 }
725
726 async fn events_from_index(
727 &self,
728 chain_id: ChainId,
729 stream_id: StreamId,
730 start_index: u32,
731 ) -> Result<Vec<IndexAndEvent>, Error> {
732 Ok(self
733 .context
734 .lock()
735 .await
736 .make_chain_client(chain_id)
737 .await?
738 .events_from_index(stream_id, start_index)
739 .await?)
740 }
741
742 async fn blocks(
743 &self,
744 from: Option<CryptoHash>,
745 chain_id: ChainId,
746 limit: Option<u32>,
747 ) -> Result<Vec<ConfirmedBlock>, Error> {
748 let client = self
749 .context
750 .lock()
751 .await
752 .make_chain_client(chain_id)
753 .await?;
754 let limit = limit.unwrap_or(10);
755 let from = match from {
756 Some(from) => Some(from),
757 None => client.chain_info().await?.block_hash,
758 };
759 let Some(from) = from else {
760 return Ok(vec![]);
761 };
762 let mut hash = Some(from);
763 let mut values = Vec::new();
764 for _ in 0..limit {
765 let Some(next_hash) = hash else {
766 break;
767 };
768 let value = client.read_confirmed_block(next_hash).await?;
769 hash = value.block().header.previous_block_hash;
770 values.push(value);
771 }
772 Ok(values)
773 }
774
775 async fn version(&self) -> linera_version::VersionInfo {
777 linera_version::VersionInfo::default()
778 }
779}
780
781struct ChainStateViewExtension(ChainId);
785
786#[async_graphql::Object(cache_control(no_cache))]
787impl ChainStateViewExtension {
788 async fn chain_id(&self) -> ChainId {
789 self.0
790 }
791}
792
793#[derive(MergedObject)]
794struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
795where
796 C: linera_views::context::Context + Clone + Send + Sync + 'static,
797 C::Extra: linera_execution::ExecutionRuntimeContext;
798
799pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
802where
803 C: linera_views::context::Context + Clone + Send + Sync + 'static;
804
805impl<C> ContainerType for ReadOnlyChainStateView<C>
806where
807 C: linera_views::context::Context + Clone + Send + Sync + 'static,
808{
809 async fn resolve_field(
810 &self,
811 context: &async_graphql::Context<'_>,
812 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
813 self.0.resolve_field(context).await
814 }
815}
816
817impl<C> OutputType for ReadOnlyChainStateView<C>
818where
819 C: linera_views::context::Context + Clone + Send + Sync + 'static,
820{
821 fn type_name() -> Cow<'static, str> {
822 ChainStateView::<C>::type_name()
823 }
824
825 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
826 ChainStateView::<C>::create_type_info(registry)
827 }
828
829 async fn resolve(
830 &self,
831 context: &async_graphql::ContextSelectionSet<'_>,
832 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
833 ) -> async_graphql::ServerResult<async_graphql::Value> {
834 self.0.resolve(context, field).await
835 }
836}
837
838impl<C> ChainStateExtendedView<C>
839where
840 C: linera_views::context::Context + Clone + Send + Sync + 'static,
841 C::Extra: linera_execution::ExecutionRuntimeContext,
842{
843 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
844 Self(
845 ChainStateViewExtension(view.chain_id()),
846 ReadOnlyChainStateView(view),
847 )
848 }
849}
850
851#[derive(SimpleObject)]
852pub struct ApplicationOverview {
853 id: ApplicationId,
854 description: ApplicationDescription,
855 link: String,
856}
857
858impl ApplicationOverview {
859 fn new(
860 id: ApplicationId,
861 description: ApplicationDescription,
862 port: NonZeroU16,
863 chain_id: ChainId,
864 ) -> Self {
865 Self {
866 id,
867 description,
868 link: format!(
869 "http://localhost:{}/chains/{}/applications/{}",
870 port.get(),
871 chain_id,
872 id
873 ),
874 }
875 }
876}
877
878pub enum NodeServiceSchema<C>
880where
881 C: ClientContext + 'static,
882{
883 Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
885 ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
887}
888
889impl<C> NodeServiceSchema<C>
890where
891 C: ClientContext,
892{
893 pub async fn execute(&self, request: impl Into<Request>) -> Response {
895 match self {
896 Self::Full(schema) => schema.execute(request).await,
897 Self::ReadOnly(schema) => schema.execute(request).await,
898 }
899 }
900
901 pub fn sdl(&self) -> String {
903 match self {
904 Self::Full(schema) => schema.sdl(),
905 Self::ReadOnly(schema) => schema.sdl(),
906 }
907 }
908}
909
910impl<C> Clone for NodeServiceSchema<C>
911where
912 C: ClientContext,
913{
914 fn clone(&self) -> Self {
915 match self {
916 Self::Full(schema) => Self::Full(schema.clone()),
917 Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
918 }
919 }
920}
921
922pub struct NodeService<C>
925where
926 C: ClientContext + 'static,
927{
928 config: ChainListenerConfig,
929 port: NonZeroU16,
930 #[cfg(with_metrics)]
931 metrics_port: NonZeroU16,
932 default_chain: Option<ChainId>,
933 context: Arc<Mutex<C>>,
934 read_only: bool,
936}
937
938impl<C> Clone for NodeService<C>
939where
940 C: ClientContext + 'static,
941{
942 fn clone(&self) -> Self {
943 Self {
944 config: self.config.clone(),
945 port: self.port,
946 #[cfg(with_metrics)]
947 metrics_port: self.metrics_port,
948 default_chain: self.default_chain,
949 context: Arc::clone(&self.context),
950 read_only: self.read_only,
951 }
952 }
953}
954
955impl<C> NodeService<C>
956where
957 C: ClientContext,
958{
959 pub fn new(
961 config: ChainListenerConfig,
962 port: NonZeroU16,
963 #[cfg(with_metrics)] metrics_port: NonZeroU16,
964 default_chain: Option<ChainId>,
965 context: Arc<Mutex<C>>,
966 read_only: bool,
967 ) -> Self {
968 Self {
969 config,
970 port,
971 #[cfg(with_metrics)]
972 metrics_port,
973 default_chain,
974 context,
975 read_only,
976 }
977 }
978
979 #[cfg(with_metrics)]
980 pub fn metrics_address(&self) -> SocketAddr {
981 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
982 }
983
984 pub fn schema(&self) -> NodeServiceSchema<C> {
985 let query = QueryRoot {
986 context: Arc::clone(&self.context),
987 port: self.port,
988 default_chain: self.default_chain,
989 };
990 let subscription = SubscriptionRoot {
991 context: Arc::clone(&self.context),
992 };
993
994 if self.read_only {
995 NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
996 } else {
997 NodeServiceSchema::Full(
998 Schema::build(
999 query,
1000 MutationRoot {
1001 context: Arc::clone(&self.context),
1002 },
1003 subscription,
1004 )
1005 .finish(),
1006 )
1007 }
1008 }
1009
1010 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1012 pub async fn run(
1013 self,
1014 cancellation_token: CancellationToken,
1015 command_receiver: UnboundedReceiver<ListenerCommand>,
1016 ) -> Result<(), anyhow::Error> {
1017 let port = self.port.get();
1018 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1019 let application_handler =
1020 axum::routing::get(util::graphiql).post(Self::application_handler);
1021
1022 #[cfg(with_metrics)]
1023 monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1024
1025 let base_router = Router::new()
1026 .route("/", index_handler)
1027 .route(
1028 "/chains/{chain_id}/applications/{application_id}",
1029 application_handler,
1030 )
1031 .route("/ready", axum::routing::get(|| async { "ready!" }));
1032
1033 let app = match self.schema() {
1035 NodeServiceSchema::Full(schema) => {
1036 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1037 }
1038 NodeServiceSchema::ReadOnly(schema) => {
1039 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1040 }
1041 }
1042 .layer(Extension(self.clone()))
1043 .layer(CorsLayer::permissive());
1045
1046 info!("GraphiQL IDE: http://localhost:{}", port);
1047
1048 let storage = self.context.lock().await.storage().clone();
1049
1050 let chain_listener = ChainListener::new(
1051 self.config,
1052 self.context,
1053 storage,
1054 cancellation_token.clone(),
1055 command_receiver,
1056 true,
1057 )
1058 .run()
1059 .await?;
1060 let mut chain_listener = Box::pin(chain_listener).fuse();
1061 let tcp_listener =
1062 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1063 let server = axum::serve(tcp_listener, app)
1064 .with_graceful_shutdown(cancellation_token.cancelled_owned())
1065 .into_future();
1066 futures::select! {
1067 result = chain_listener => result?,
1068 result = Box::pin(server).fuse() => result?,
1069 };
1070
1071 Ok(())
1072 }
1073
1074 async fn handle_service_request(
1076 &self,
1077 application_id: ApplicationId,
1078 request: Vec<u8>,
1079 chain_id: ChainId,
1080 block_hash: Option<CryptoHash>,
1081 ) -> Result<Vec<u8>, NodeServiceError> {
1082 let QueryOutcome {
1083 response,
1084 operations,
1085 } = self
1086 .query_user_application(application_id, request, chain_id, block_hash)
1087 .await?;
1088 if operations.is_empty() {
1089 return Ok(response);
1090 }
1091
1092 if self.read_only {
1093 return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1094 }
1095
1096 trace!("Query requested a new block with operations: {operations:?}");
1097 let client = self
1098 .context
1099 .lock()
1100 .await
1101 .make_chain_client(chain_id)
1102 .await?;
1103 let hash = loop {
1104 let timeout = match client
1105 .execute_operations(operations.clone(), vec![])
1106 .await?
1107 {
1108 ClientOutcome::Committed(certificate) => break certificate.hash(),
1109 ClientOutcome::WaitForTimeout(timeout) => timeout,
1110 };
1111 let mut stream = client.subscribe().map_err(|_| {
1112 chain_client::Error::InternalError("Could not subscribe to the local node.")
1113 })?;
1114 util::wait_for_next_round(&mut stream, timeout).await;
1115 };
1116 let response = async_graphql::Response::new(hash.to_value());
1117 Ok(serde_json::to_vec(&response)?)
1118 }
1119
1120 async fn query_user_application(
1122 &self,
1123 application_id: ApplicationId,
1124 bytes: Vec<u8>,
1125 chain_id: ChainId,
1126 block_hash: Option<CryptoHash>,
1127 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1128 let query = Query::User {
1129 application_id,
1130 bytes,
1131 };
1132 let client = self
1133 .context
1134 .lock()
1135 .await
1136 .make_chain_client(chain_id)
1137 .await?;
1138 let QueryOutcome {
1139 response,
1140 operations,
1141 } = client.query_application(query, block_hash).await?;
1142 match response {
1143 QueryResponse::System(_) => {
1144 unreachable!("cannot get a system response for a user query")
1145 }
1146 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1147 response: user_response_bytes,
1148 operations,
1149 }),
1150 }
1151 }
1152
1153 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1155 service
1156 .0
1157 .schema()
1158 .execute(request.into_inner())
1159 .await
1160 .into()
1161 }
1162
1163 async fn application_handler(
1167 Path((chain_id, application_id)): Path<(String, String)>,
1168 service: Extension<Self>,
1169 request: String,
1170 ) -> Result<Vec<u8>, NodeServiceError> {
1171 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1172 let application_id: ApplicationId = application_id.parse()?;
1173
1174 debug!(
1175 %chain_id,
1176 %application_id,
1177 "processing request for application:\n{:?}",
1178 &request
1179 );
1180 let response = service
1181 .0
1182 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1183 .await?;
1184
1185 Ok(response)
1186 }
1187}