1use std::{
5 borrow::Cow,
6 collections::BTreeSet,
7 future::IntoFuture,
8 iter,
9 net::SocketAddr,
10 num::NonZeroU16,
11 sync::{Arc, Mutex as StdMutex},
12};
13
14use async_graphql::{
15 futures_util::Stream,
16 registry::{MetaType, MetaTypeId, Registry},
17 resolver_utils::ContainerType,
18 EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
19 Schema, SimpleObject, Subscription,
20};
21use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
22use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
23use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
24use linera_base::{
25 crypto::{CryptoError, CryptoHash},
26 data_types::{
27 Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
28 TimeDelta,
29 },
30 identifiers::{
31 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
32 },
33 ownership::{ChainOwnership, TimeoutConfig},
34 vm::VmRuntime,
35 BcsHexParseError,
36};
37use linera_chain::{
38 types::{ConfirmedBlock, GenericCertificate},
39 ChainStateView,
40};
41use linera_client::chain_listener::{
42 ChainListener, ChainListenerConfig, ClientContext, ClientContextExt as _, ListenerCommand,
43};
44use linera_core::{
45 client::chain_client::{self, ChainClient},
46 data_types::ClientOutcome,
47 wallet::Wallet as _,
48 worker::{ChainStateViewReadGuard, Notification, Reason},
49};
50use linera_execution::{
51 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
52 SystemOperation,
53};
54#[cfg(with_metrics)]
55use linera_metrics::monitoring_server;
56use linera_sdk::linera_base_types::BlobContent;
57use linera_storage::Storage;
58use lru::LruCache;
59use serde::{Deserialize, Serialize};
60use serde_json::json;
61use tokio::sync::mpsc::UnboundedReceiver;
62use tokio_util::sync::CancellationToken;
63use tower_http::cors::CorsLayer;
64use tracing::{debug, info, instrument, trace};
65
66use crate::util;
67
68#[derive(Clone)]
74struct RawJson(String);
75
76impl OutputType for RawJson {
77 fn type_name() -> Cow<'static, str> {
78 Cow::Borrowed("JSON")
79 }
80
81 fn create_type_info(registry: &mut Registry) -> String {
82 registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
83 name: "JSON".to_string(),
84 description: Some("A scalar that can represent any JSON value.".to_string()),
85 is_valid: None,
86 visible: None,
87 inaccessible: false,
88 tags: Default::default(),
89 specified_by_url: None,
90 directive_invocations: Default::default(),
91 requires_scopes: Default::default(),
92 })
93 }
94
95 async fn resolve(
96 &self,
97 _ctx: &async_graphql::ContextSelectionSet<'_>,
98 _field: &Positioned<async_graphql::parser::types::Field>,
99 ) -> async_graphql::ServerResult<async_graphql::Value> {
100 Ok(async_graphql::Value::Object(
105 std::iter::once((
106 async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
107 async_graphql::Value::String(self.0.clone()),
108 ))
109 .collect(),
110 ))
111 }
112}
113
114#[derive(SimpleObject, Serialize, Deserialize, Clone)]
116pub struct Chains {
117 pub list: Vec<ChainId>,
119 pub default: Option<ChainId>,
121}
122
123pub struct QueryRoot<C> {
125 context: Arc<Mutex<C>>,
126 port: NonZeroU16,
127 default_chain: Option<ChainId>,
128}
129
130pub struct SubscriptionRoot<C> {
132 context: Arc<Mutex<C>>,
133 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
134 cancellation_token: CancellationToken,
135}
136
137pub struct MutationRoot<C> {
139 context: Arc<Mutex<C>>,
140}
141
142#[derive(Debug, thiserror::Error)]
143enum NodeServiceError {
144 #[error(transparent)]
145 ChainClient(#[from] chain_client::Error),
146 #[error(transparent)]
147 BcsHex(#[from] BcsHexParseError),
148 #[error(transparent)]
149 Json(#[from] serde_json::Error),
150 #[error("malformed chain ID: {0}")]
151 InvalidChainId(CryptoError),
152 #[error(transparent)]
153 Client(#[from] linera_client::Error),
154 #[error("scheduling operations from queries is disabled in read-only mode")]
155 ReadOnlyModeOperationsNotAllowed,
156}
157
158impl IntoResponse for NodeServiceError {
159 fn into_response(self) -> response::Response {
160 let status = match self {
161 NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
162 StatusCode::BAD_REQUEST
163 }
164 NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
165 _ => StatusCode::INTERNAL_SERVER_ERROR,
166 };
167 let body = json!({"error": self.to_string()}).to_string();
168 (status, body).into_response()
169 }
170}
171
172#[Subscription]
173impl<C> SubscriptionRoot<C>
174where
175 C: ClientContext + 'static,
176{
177 async fn notifications(
179 &self,
180 chain_id: ChainId,
181 ) -> Result<impl Stream<Item = Notification>, Error> {
182 let client = self
183 .context
184 .lock()
185 .await
186 .make_chain_client(chain_id)
187 .await?;
188 Ok(client.subscribe()?)
189 }
190
191 async fn query_result(
194 &self,
195 #[graphql(desc = "Name of the registered subscription query.")] name: String,
196 #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
197 #[graphql(desc = "The application to query.")] application_id: ApplicationId,
198 ) -> Result<impl Stream<Item = RawJson>, Error> {
199 let manager = self
200 .query_subscriptions
201 .as_ref()
202 .ok_or_else(|| Error::new("no subscription queries registered"))?;
203
204 let key = crate::query_subscription::SubscriptionKey {
205 name,
206 chain_id,
207 application_id,
208 };
209
210 let receiver = manager
211 .subscribe(
212 &key,
213 Arc::clone(&self.context),
214 self.cancellation_token.clone(),
215 )
216 .map_err(|e| Error::new(e.to_string()))?;
217
218 let current = receiver.borrow().clone();
223 let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
224 .filter_map(|value| async move { value });
225 Ok(futures::stream::iter(current).chain(changes).map(RawJson))
226 }
227}
228
229impl<C> MutationRoot<C>
230where
231 C: ClientContext,
232{
233 async fn execute_system_operation(
234 &self,
235 system_operation: SystemOperation,
236 chain_id: ChainId,
237 ) -> Result<CryptoHash, Error> {
238 let certificate = self
239 .apply_client_command(&chain_id, move |client| {
240 let operation = Operation::system(system_operation.clone());
241 async move {
242 let result = client
243 .execute_operation(operation)
244 .await
245 .map_err(Error::from);
246 (result, client)
247 }
248 })
249 .await?;
250 Ok(certificate.hash())
251 }
252
253 async fn maybe_auto_assign_preferred_owner(
256 &self,
257 chain_id: ChainId,
258 new_ownership: &ChainOwnership,
259 ) -> Result<(), Error> {
260 let context = self.context.lock().await;
261 let mut chain_client = context.make_chain_client(chain_id).await?;
262 context
263 .maybe_auto_assign_preferred_owner(&mut chain_client, new_ownership)
264 .await?;
265 Ok(())
266 }
267
268 async fn apply_client_command<F, Fut, T>(
272 &self,
273 chain_id: &ChainId,
274 mut f: F,
275 ) -> Result<T, Error>
276 where
277 F: FnMut(ChainClient<C::Environment>) -> Fut,
278 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
279 {
280 loop {
281 let client = self
282 .context
283 .lock()
284 .await
285 .make_chain_client(*chain_id)
286 .await?;
287 let mut stream = client.subscribe()?;
288 let (result, client) = f(client).await;
289 self.context.lock().await.update_wallet(&client).await?;
290 let timeout = match result? {
291 ClientOutcome::Committed(t) => return Ok(t),
292 ClientOutcome::Conflict(certificate) => {
293 return Err(chain_client::Error::Conflict(certificate.hash()).into());
294 }
295 ClientOutcome::WaitForTimeout(timeout) => timeout,
296 };
297 drop(client);
298 util::wait_for_next_round(&mut stream, timeout).await;
299 }
300 }
301}
302
303#[async_graphql::Object(cache_control(no_cache))]
304impl<C> MutationRoot<C>
305where
306 C: ClientContext + 'static,
307{
308 async fn process_inbox(
310 &self,
311 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
312 ) -> Result<Vec<CryptoHash>, Error> {
313 let mut hashes = Vec::new();
314 loop {
315 let client = self
316 .context
317 .lock()
318 .await
319 .make_chain_client(chain_id)
320 .await?;
321 let result = client.process_inbox().await;
322 self.context.lock().await.update_wallet(&client).await?;
323 let (certificates, maybe_timeout) = result?;
324 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
325 match maybe_timeout {
326 None => return Ok(hashes),
327 Some(timestamp) => {
328 let mut stream = client.subscribe()?;
329 drop(client);
330 util::wait_for_next_round(&mut stream, timestamp).await;
331 }
332 }
333 }
334 }
335
336 async fn sync(
341 &self,
342 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
343 ) -> Result<u64, Error> {
344 let client = self
345 .context
346 .lock()
347 .await
348 .make_chain_client(chain_id)
349 .await?;
350 let info = client.synchronize_from_validators().await?;
351 self.context.lock().await.update_wallet(&client).await?;
352 Ok(info.next_block_height.0)
353 }
354
355 async fn retry_pending_block(
357 &self,
358 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
359 ) -> Result<Option<CryptoHash>, Error> {
360 let client = self
361 .context
362 .lock()
363 .await
364 .make_chain_client(chain_id)
365 .await?;
366 let outcome = client.process_pending_block().await?;
367 self.context.lock().await.update_wallet(&client).await?;
368 match outcome {
369 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
370 ClientOutcome::Committed(None) => Ok(None),
371 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
372 "Please try again at {}",
373 timeout.timestamp
374 ))),
375 ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
376 "A different block was committed: {}",
377 certificate.hash()
378 ))),
379 }
380 }
381
382 async fn transfer(
385 &self,
386 #[graphql(desc = "The chain which native tokens are being transferred from.")]
387 chain_id: ChainId,
388 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
389 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
390 #[graphql(desc = "The amount being transferred.")] amount: Amount,
391 ) -> Result<CryptoHash, Error> {
392 self.apply_client_command(&chain_id, move |client| async move {
393 let result = client
394 .transfer(owner, amount, recipient)
395 .await
396 .map_err(Error::from)
397 .map(|outcome| outcome.map(|certificate| certificate.hash()));
398 (result, client)
399 })
400 .await
401 }
402
403 async fn claim(
407 &self,
408 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
409 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
410 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
411 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
412 #[graphql(desc = "The amount being transferred.")] amount: Amount,
413 ) -> Result<CryptoHash, Error> {
414 self.apply_client_command(&chain_id, move |client| async move {
415 let result = client
416 .claim(owner, target_id, recipient, amount)
417 .await
418 .map_err(Error::from)
419 .map(|outcome| outcome.map(|certificate| certificate.hash()));
420 (result, client)
421 })
422 .await
423 }
424
425 async fn read_data_blob(
428 &self,
429 chain_id: ChainId,
430 hash: CryptoHash,
431 ) -> Result<CryptoHash, Error> {
432 self.apply_client_command(&chain_id, move |client| async move {
433 let result = client
434 .read_data_blob(hash)
435 .await
436 .map_err(Error::from)
437 .map(|outcome| outcome.map(|certificate| certificate.hash()));
438 (result, client)
439 })
440 .await
441 }
442
443 async fn open_chain(
445 &self,
446 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
447 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
448 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
449 balance: Option<Amount>,
450 ) -> Result<ChainId, Error> {
451 let ownership = ChainOwnership::single(owner);
452 let balance = balance.unwrap_or(Amount::ZERO);
453 let description = self
454 .apply_client_command(&chain_id, move |client| {
455 let ownership = ownership.clone();
456 async move {
457 let result = client
458 .open_chain(ownership, ApplicationPermissions::default(), balance)
459 .await
460 .map_err(Error::from)
461 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
462 (result, client)
463 }
464 })
465 .await?;
466 Ok(description.id())
467 }
468
469 #[expect(clippy::too_many_arguments)]
471 async fn open_multi_owner_chain(
472 &self,
473 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
474 #[graphql(desc = "Permissions for applications on the new chain")]
475 application_permissions: Option<ApplicationPermissions>,
476 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
477 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
478 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
479 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
480 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
481 fast_round_ms: Option<u64>,
482 #[graphql(
483 desc = "The duration of the first single-leader and all multi-leader rounds",
484 default = 10_000
485 )]
486 base_timeout_ms: u64,
487 #[graphql(
488 desc = "The number of milliseconds by which the timeout increases after each \
489 single-leader round",
490 default = 1_000
491 )]
492 timeout_increment_ms: u64,
493 #[graphql(
494 desc = "The age of an incoming tracked or protected message after which the \
495 validators start transitioning the chain to fallback mode, in milliseconds.",
496 default = 86_400_000
497 )]
498 fallback_duration_ms: u64,
499 ) -> Result<ChainId, Error> {
500 let owners = if let Some(weights) = weights {
501 if weights.len() != owners.len() {
502 return Err(Error::new(format!(
503 "There are {} owners but {} weights.",
504 owners.len(),
505 weights.len()
506 )));
507 }
508 owners.into_iter().zip(weights).collect::<Vec<_>>()
509 } else {
510 owners
511 .into_iter()
512 .zip(iter::repeat(100))
513 .collect::<Vec<_>>()
514 };
515 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
516 let timeout_config = TimeoutConfig {
517 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
518 base_timeout: TimeDelta::from_millis(base_timeout_ms),
519 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
520 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
521 };
522 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
523 let balance = balance.unwrap_or(Amount::ZERO);
524 let description = self
525 .apply_client_command(&chain_id, move |client| {
526 let ownership = ownership.clone();
527 let application_permissions = application_permissions.clone().unwrap_or_default();
528 async move {
529 let result = client
530 .open_chain(ownership, application_permissions, balance)
531 .await
532 .map_err(Error::from)
533 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
534 (result, client)
535 }
536 })
537 .await?;
538 Ok(description.id())
539 }
540
541 async fn close_chain(
543 &self,
544 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
545 ) -> Result<Option<CryptoHash>, Error> {
546 let maybe_cert = self
547 .apply_client_command(&chain_id, |client| async move {
548 let result = client.close_chain().await.map_err(Error::from);
549 (result, client)
550 })
551 .await?;
552 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
553 }
554
555 async fn change_owner(
557 &self,
558 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
559 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
560 ) -> Result<CryptoHash, Error> {
561 let new_ownership = ChainOwnership::single_super(new_owner);
562 let operation = SystemOperation::ChangeOwnership {
563 super_owners: vec![new_owner],
564 owners: Vec::new(),
565 first_leader: None,
566 multi_leader_rounds: 5,
567 open_multi_leader_rounds: false,
568 timeout_config: TimeoutConfig::default(),
569 };
570 let hash = self.execute_system_operation(operation, chain_id).await?;
571 self.maybe_auto_assign_preferred_owner(chain_id, &new_ownership)
572 .await?;
573 Ok(hash)
574 }
575
576 #[expect(clippy::too_many_arguments)]
578 async fn change_multiple_owners(
579 &self,
580 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
581 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
582 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
583 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
584 #[graphql(
585 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
586 )]
587 open_multi_leader_rounds: bool,
588 #[graphql(desc = "The leader of the first single-leader round. \
589 If not set, this is random like other rounds.")]
590 first_leader: Option<AccountOwner>,
591 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
592 fast_round_ms: Option<u64>,
593 #[graphql(
594 desc = "The duration of the first single-leader and all multi-leader rounds",
595 default = 10_000
596 )]
597 base_timeout_ms: u64,
598 #[graphql(
599 desc = "The number of milliseconds by which the timeout increases after each \
600 single-leader round",
601 default = 1_000
602 )]
603 timeout_increment_ms: u64,
604 #[graphql(
605 desc = "The age of an incoming tracked or protected message after which the \
606 validators start transitioning the chain to fallback mode, in milliseconds.",
607 default = 86_400_000
608 )]
609 fallback_duration_ms: u64,
610 ) -> Result<CryptoHash, Error> {
611 let timeout_config = TimeoutConfig {
612 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
613 base_timeout: TimeDelta::from_millis(base_timeout_ms),
614 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
615 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
616 };
617 let owners = new_owners.into_iter().zip(new_weights).collect::<Vec<_>>();
618 let new_ownership = ChainOwnership {
619 super_owners: BTreeSet::new(),
620 owners: owners.iter().cloned().collect(),
621 first_leader,
622 multi_leader_rounds,
623 open_multi_leader_rounds,
624 timeout_config: timeout_config.clone(),
625 };
626 let operation = SystemOperation::ChangeOwnership {
627 super_owners: Vec::new(),
628 owners,
629 first_leader,
630 multi_leader_rounds,
631 open_multi_leader_rounds,
632 timeout_config,
633 };
634 let hash = self.execute_system_operation(operation, chain_id).await?;
635 self.maybe_auto_assign_preferred_owner(chain_id, &new_ownership)
636 .await?;
637 Ok(hash)
638 }
639
640 #[expect(clippy::too_many_arguments)]
642 async fn change_application_permissions(
643 &self,
644 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
645 #[graphql(
646 desc = "These applications are allowed to manage the chain: close it, change \
647 application permissions, and change ownership."
648 )]
649 manage_chain: Vec<ApplicationId>,
650 #[graphql(
651 desc = "If this is `None`, all system operations and application operations are allowed.
652If it is `Some`, only operations from the specified applications are allowed,
653and no system operations."
654 )]
655 execute_operations: Option<Vec<ApplicationId>>,
656 #[graphql(
657 desc = "At least one operation or incoming message from each of these applications must occur in every block."
658 )]
659 mandatory_applications: Vec<ApplicationId>,
660 #[graphql(
661 desc = "These applications are allowed to perform calls to services as oracles."
662 )]
663 call_service_as_oracle: Option<Vec<ApplicationId>>,
664 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
665 make_http_requests: Option<Vec<ApplicationId>>,
666 ) -> Result<CryptoHash, Error> {
667 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
668 execute_operations,
669 mandatory_applications,
670 manage_chain,
671 call_service_as_oracle,
672 make_http_requests,
673 });
674 self.execute_system_operation(operation, chain_id).await
675 }
676
677 async fn create_committee(
681 &self,
682 chain_id: ChainId,
683 committee: Committee,
684 ) -> Result<CryptoHash, Error> {
685 Ok(self
686 .apply_client_command(&chain_id, move |client| {
687 let committee = committee.clone();
688 async move {
689 let result = client
690 .stage_new_committee(committee)
691 .await
692 .map_err(Error::from);
693 (result, client)
694 }
695 })
696 .await?
697 .hash())
698 }
699
700 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
704 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
705 self.execute_system_operation(operation, chain_id).await
706 }
707
708 async fn publish_module(
712 &self,
713 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
714 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
715 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
716 service: Bytecode,
717 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
718 vm_runtime: VmRuntime,
719 #[graphql(desc = "Optional JSON-encoded `Formats` description bytes")] formats: Option<
720 Vec<u8>,
721 >,
722 ) -> Result<ModuleId, Error> {
723 self.apply_client_command(&chain_id, move |client| {
724 let contract = contract.clone();
725 let service = service.clone();
726 let formats = formats.clone();
727 async move {
728 let result = client
729 .publish_module(contract, service, vm_runtime, formats)
730 .await
731 .map_err(Error::from)
732 .map(|outcome| outcome.map(|(module_id, _)| module_id));
733 (result, client)
734 }
735 })
736 .await
737 }
738
739 async fn publish_data_blob(
741 &self,
742 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
743 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
744 ) -> Result<CryptoHash, Error> {
745 self.apply_client_command(&chain_id, |client| {
746 let bytes = bytes.clone();
747 async move {
748 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
749 (result, client)
750 }
751 })
752 .await
753 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
754 }
755
756 async fn create_application(
758 &self,
759 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
760 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
761 #[graphql(desc = "The JSON serialization of the parameters of the application")]
762 parameters: String,
763 #[graphql(
764 desc = "The JSON serialization of the instantiation argument of the application"
765 )]
766 instantiation_argument: String,
767 #[graphql(desc = "The dependencies of the application being created")]
768 required_application_ids: Vec<ApplicationId>,
769 ) -> Result<ApplicationId, Error> {
770 self.apply_client_command(&chain_id, move |client| {
771 let parameters = parameters.as_bytes().to_vec();
772 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
773 let required_application_ids = required_application_ids.clone();
774 async move {
775 let result = client
776 .create_application_untyped(
777 module_id,
778 parameters,
779 instantiation_argument,
780 required_application_ids,
781 )
782 .await
783 .map_err(Error::from)
784 .map(|outcome| outcome.map(|(application_id, _)| application_id));
785 (result, client)
786 }
787 })
788 .await
789 }
790}
791
792#[async_graphql::Object(cache_control(no_cache))]
793impl<C> QueryRoot<C>
794where
795 C: ClientContext + 'static,
796{
797 async fn chain(
798 &self,
799 chain_id: ChainId,
800 ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
801 {
802 let client = self
803 .context
804 .lock()
805 .await
806 .make_chain_client(chain_id)
807 .await?;
808 let view = client.chain_state_view().await?;
809 Ok(ChainStateExtendedView::new(view))
810 }
811
812 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
813 let client = self
814 .context
815 .lock()
816 .await
817 .make_chain_client(chain_id)
818 .await?;
819 let applications = client
820 .chain_state_view()
821 .await?
822 .execution_state
823 .list_applications()
824 .await?;
825
826 let overviews = applications
827 .into_iter()
828 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
829 .collect();
830
831 Ok(overviews)
832 }
833
834 async fn chains(&self) -> Result<Chains, Error> {
835 Ok(Chains {
836 list: self
837 .context
838 .lock()
839 .await
840 .wallet()
841 .chain_ids()
842 .try_collect()
843 .await?,
844 default: self.default_chain,
845 })
846 }
847
848 async fn block(
849 &self,
850 hash: Option<CryptoHash>,
851 chain_id: ChainId,
852 ) -> Result<Option<Arc<ConfirmedBlock>>, Error> {
853 let client = self
854 .context
855 .lock()
856 .await
857 .make_chain_client(chain_id)
858 .await?;
859 let hash = match hash {
860 Some(hash) => Some(hash),
861 None => client.chain_info().await?.block_hash,
862 };
863 if let Some(hash) = hash {
864 Ok(Some(client.read_confirmed_block(hash).await?))
865 } else {
866 Ok(None)
867 }
868 }
869
870 async fn events_from_index(
871 &self,
872 chain_id: ChainId,
873 stream_id: StreamId,
874 start_index: u32,
875 ) -> Result<Vec<IndexAndEvent>, Error> {
876 Ok(self
877 .context
878 .lock()
879 .await
880 .make_chain_client(chain_id)
881 .await?
882 .events_from_index(stream_id, start_index)
883 .await?)
884 }
885
886 async fn blocks(
887 &self,
888 from: Option<CryptoHash>,
889 chain_id: ChainId,
890 limit: Option<u32>,
891 ) -> Result<Vec<Arc<ConfirmedBlock>>, Error> {
892 let client = self
893 .context
894 .lock()
895 .await
896 .make_chain_client(chain_id)
897 .await?;
898 let limit = limit.unwrap_or(10);
899 let from = match from {
900 Some(from) => Some(from),
901 None => client.chain_info().await?.block_hash,
902 };
903 let Some(from) = from else {
904 return Ok(vec![]);
905 };
906 let mut hash = Some(from);
907 let mut values = Vec::new();
908 for _ in 0..limit {
909 let Some(next_hash) = hash else {
910 break;
911 };
912 let value = client.read_confirmed_block(next_hash).await?;
913 hash = value.block().header.previous_block_hash;
914 values.push(value);
915 }
916 Ok(values)
917 }
918
919 async fn version(&self) -> linera_version::VersionInfo {
921 linera_version::VersionInfo::default()
922 }
923
924 async fn application_formats(
928 &self,
929 chain_id: ChainId,
930 formats_blob_hash: CryptoHash,
931 ) -> Result<Option<Vec<u8>>, Error> {
932 let client = self
933 .context
934 .lock()
935 .await
936 .make_chain_client(chain_id)
937 .await?;
938 let blob_id = linera_base::identifiers::BlobId::new(
939 formats_blob_hash,
940 linera_base::identifiers::BlobType::ApplicationFormats,
941 );
942 let blob = client.storage_client().read_blob(blob_id).await?;
943 Ok(blob.map(|b| b.bytes().to_vec()))
944 }
945}
946
947struct ChainStateViewExtension(ChainId);
951
952#[async_graphql::Object(cache_control(no_cache))]
953impl ChainStateViewExtension {
954 async fn chain_id(&self) -> ChainId {
955 self.0
956 }
957}
958
959#[derive(MergedObject)]
960struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
961
962pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
965
966impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
967where
968 ChainStateView<S::Context>: ContainerType,
969{
970 async fn resolve_field(
971 &self,
972 context: &async_graphql::Context<'_>,
973 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
974 self.0.resolve_field(context).await
975 }
976}
977
978impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
979where
980 ChainStateView<S::Context>: OutputType,
981{
982 fn type_name() -> Cow<'static, str> {
983 ChainStateView::<S::Context>::type_name()
984 }
985
986 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
987 ChainStateView::<S::Context>::create_type_info(registry)
988 }
989
990 async fn resolve(
991 &self,
992 context: &async_graphql::ContextSelectionSet<'_>,
993 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
994 ) -> async_graphql::ServerResult<async_graphql::Value> {
995 self.0.resolve(context, field).await
996 }
997}
998
999impl<S: Storage> ChainStateExtendedView<S> {
1000 fn new(view: ChainStateViewReadGuard<S>) -> Self {
1001 Self(
1002 ChainStateViewExtension(view.chain_id()),
1003 ReadOnlyChainStateView(view),
1004 )
1005 }
1006}
1007
1008#[derive(SimpleObject)]
1010pub struct ApplicationOverview {
1011 id: ApplicationId,
1012 description: ApplicationDescription,
1013 link: String,
1014}
1015
1016impl ApplicationOverview {
1017 fn new(
1018 id: ApplicationId,
1019 description: ApplicationDescription,
1020 port: NonZeroU16,
1021 chain_id: ChainId,
1022 ) -> Self {
1023 Self {
1024 id,
1025 description,
1026 link: format!(
1027 "http://localhost:{}/chains/{}/applications/{}",
1028 port.get(),
1029 chain_id,
1030 id
1031 ),
1032 }
1033 }
1034}
1035
1036pub enum NodeServiceSchema<C>
1038where
1039 C: ClientContext + 'static,
1040{
1041 Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
1043 ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
1045}
1046
1047impl<C> NodeServiceSchema<C>
1048where
1049 C: ClientContext,
1050{
1051 pub async fn execute(&self, request: impl Into<Request>) -> Response {
1053 match self {
1054 Self::Full(schema) => schema.execute(request).await,
1055 Self::ReadOnly(schema) => schema.execute(request).await,
1056 }
1057 }
1058
1059 pub fn sdl(&self) -> String {
1061 match self {
1062 Self::Full(schema) => schema.sdl(),
1063 Self::ReadOnly(schema) => schema.sdl(),
1064 }
1065 }
1066}
1067
1068impl<C> Clone for NodeServiceSchema<C>
1069where
1070 C: ClientContext,
1071{
1072 fn clone(&self) -> Self {
1073 match self {
1074 Self::Full(schema) => Self::Full(schema.clone()),
1075 Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1076 }
1077 }
1078}
1079
1080#[cfg(with_metrics)]
1081mod query_cache_metrics {
1082 use std::sync::LazyLock;
1083
1084 use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1085 use prometheus::{IntCounterVec, IntGauge};
1086
1087 pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1088 register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1089 });
1090
1091 pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1092 register_int_counter_vec(
1093 "query_response_cache_miss",
1094 "Query response cache misses",
1095 &[],
1096 )
1097 });
1098
1099 pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1100 register_int_counter_vec(
1101 "query_response_cache_invalidation",
1102 "Query response cache invalidations (per chain)",
1103 &[],
1104 )
1105 });
1106
1107 pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1108 register_int_gauge(
1109 "query_response_cache_entries",
1110 "Current number of cached query responses across all chains",
1111 )
1112 });
1113}
1114
1115struct PerChainCache {
1118 lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1119 next_block_height: BlockHeight,
1120}
1121
1122struct QueryResponseCache {
1131 chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1132 subscribed: papaya::HashSet<ChainId>,
1134 notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1136 capacity_per_chain: std::num::NonZeroUsize,
1137}
1138
1139impl QueryResponseCache {
1140 fn new(capacity_per_chain: usize) -> Self {
1141 Self {
1142 chains: papaya::HashMap::new(),
1143 subscribed: papaya::HashSet::new(),
1144 notification_sender: StdMutex::new(None),
1145 capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1146 .expect("capacity must be > 0"),
1147 }
1148 }
1149
1150 fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1152 *self
1153 .notification_sender
1154 .lock()
1155 .expect("sender mutex poisoned") = Some(sender);
1156 }
1157
1158 fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1160 self.notification_sender
1161 .lock()
1162 .expect("sender mutex poisoned")
1163 .clone()
1164 }
1165
1166 fn mark_subscribed(&self, chain_id: ChainId) {
1168 self.subscribed.pin().insert(chain_id);
1169 }
1170
1171 fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1173 !self.subscribed.pin().contains(chain_id)
1174 }
1175
1176 fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1178 let pinned = self.subscribed.pin();
1179 for &chain_id in chain_ids {
1180 pinned.insert(chain_id);
1181 }
1182 }
1183
1184 fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1187 let pinned = self.chains.pin();
1188 let result = pinned.get(&chain_id).and_then(|mutex| {
1189 mutex
1190 .lock()
1191 .expect("LRU mutex poisoned")
1192 .lru
1193 .get(&(*app_id, request.to_vec()))
1194 .cloned()
1195 });
1196 #[cfg(with_metrics)]
1197 {
1198 let metric = if result.is_some() {
1199 &query_cache_metrics::QUERY_CACHE_HIT
1200 } else {
1201 &query_cache_metrics::QUERY_CACHE_MISS
1202 };
1203 metric.with_label_values(&[]).inc();
1204 }
1205 result
1206 }
1207
1208 fn insert(
1212 &self,
1213 chain_id: ChainId,
1214 app_id: ApplicationId,
1215 request: Vec<u8>,
1216 response: Vec<u8>,
1217 next_block_height: BlockHeight,
1218 ) {
1219 let pinned = self.chains.pin();
1220 let capacity = self.capacity_per_chain;
1221 let mutex = pinned.get_or_insert_with(chain_id, || {
1222 StdMutex::new(PerChainCache {
1223 lru: LruCache::new(capacity),
1224 next_block_height,
1225 })
1226 });
1227 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1228 if next_block_height < cache.next_block_height {
1229 return; }
1231 if next_block_height > cache.next_block_height {
1235 debug!(
1236 "Unexpected query cache invalidation for chain {chain_id}:\
1237 {next_block_height} > {}",
1238 cache.next_block_height
1239 );
1240 #[cfg(with_metrics)]
1241 {
1242 #[expect(
1243 clippy::cast_possible_wrap,
1244 reason = "LRU cache size fits in i64 for any realistic cache"
1245 )]
1246 let cache_len = cache.lru.len() as i64;
1247 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1248 query_cache_metrics::QUERY_CACHE_INVALIDATION
1249 .with_label_values(&[])
1250 .inc();
1251 }
1252 cache.lru.clear();
1253 cache.next_block_height = next_block_height;
1254 }
1255 #[cfg(with_metrics)]
1256 let prev_len = cache.lru.len();
1257 cache.lru.put((app_id, request), response);
1258 #[cfg(with_metrics)]
1259 if cache.lru.len() != prev_len {
1260 query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1261 }
1262 }
1263
1264 fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1267 let pinned = self.chains.pin();
1268 let capacity = self.capacity_per_chain;
1269 let mutex = pinned.get_or_insert_with(*chain_id, || {
1270 StdMutex::new(PerChainCache {
1271 lru: LruCache::new(capacity),
1272 next_block_height,
1273 })
1274 });
1275 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1276 if next_block_height > cache.next_block_height {
1277 #[cfg(with_metrics)]
1278 {
1279 #[expect(
1280 clippy::cast_possible_wrap,
1281 reason = "LRU cache size fits in i64 for any realistic cache"
1282 )]
1283 let cache_len = cache.lru.len() as i64;
1284 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1285 query_cache_metrics::QUERY_CACHE_INVALIDATION
1286 .with_label_values(&[])
1287 .inc();
1288 }
1289 cache.lru.clear();
1290 cache.next_block_height = next_block_height;
1291 } else {
1292 debug!(
1293 "Query cache for chain {chain_id} was already invalidated:\
1294 {next_block_height} <= {}",
1295 cache.next_block_height
1296 );
1297 }
1298 }
1299}
1300
1301pub struct NodeService<C>
1304where
1305 C: ClientContext + 'static,
1306{
1307 config: ChainListenerConfig,
1308 port: NonZeroU16,
1309 #[cfg(with_metrics)]
1310 metrics_port: NonZeroU16,
1311 default_chain: Option<ChainId>,
1312 context: Arc<Mutex<C>>,
1313 read_only: bool,
1315 query_cache: Option<Arc<QueryResponseCache>>,
1317 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1318 cancellation_token: CancellationToken,
1319 enable_memory_profiling: bool,
1320 pause: bool,
1322}
1323
1324impl<C> Clone for NodeService<C>
1325where
1326 C: ClientContext + 'static,
1327{
1328 fn clone(&self) -> Self {
1329 Self {
1330 config: self.config.clone(),
1331 port: self.port,
1332 #[cfg(with_metrics)]
1333 metrics_port: self.metrics_port,
1334 default_chain: self.default_chain,
1335 context: Arc::clone(&self.context),
1336 read_only: self.read_only,
1337 query_cache: self.query_cache.clone(),
1338 query_subscriptions: self.query_subscriptions.clone(),
1339 cancellation_token: self.cancellation_token.clone(),
1340 enable_memory_profiling: self.enable_memory_profiling,
1341 pause: self.pause,
1342 }
1343 }
1344}
1345
1346impl<C> NodeService<C>
1347where
1348 C: ClientContext,
1349{
1350 #[expect(clippy::too_many_arguments)]
1356 pub fn new(
1357 config: ChainListenerConfig,
1358 port: NonZeroU16,
1359 #[cfg(with_metrics)] metrics_port: NonZeroU16,
1360 default_chain: Option<ChainId>,
1361 context: Arc<Mutex<C>>,
1362 read_only: bool,
1363 query_cache_size: Option<usize>,
1364 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1365 cancellation_token: CancellationToken,
1366 enable_memory_profiling: bool,
1367 pause: bool,
1368 ) -> Self {
1369 let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1370 Self {
1371 config,
1372 port,
1373 #[cfg(with_metrics)]
1374 metrics_port,
1375 default_chain,
1376 context,
1377 read_only,
1378 query_cache,
1379 query_subscriptions,
1380 cancellation_token,
1381 enable_memory_profiling,
1382 pause,
1383 }
1384 }
1385
1386 #[cfg(with_metrics)]
1388 pub fn metrics_address(&self) -> SocketAddr {
1389 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1390 }
1391
1392 pub fn schema(&self) -> NodeServiceSchema<C> {
1394 let query = QueryRoot {
1395 context: Arc::clone(&self.context),
1396 port: self.port,
1397 default_chain: self.default_chain,
1398 };
1399 let subscription = SubscriptionRoot {
1400 context: Arc::clone(&self.context),
1401 query_subscriptions: self.query_subscriptions.clone(),
1402 cancellation_token: self.cancellation_token.clone(),
1403 };
1404
1405 if self.read_only {
1406 NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1407 } else {
1408 NodeServiceSchema::Full(
1409 Schema::build(
1410 query,
1411 MutationRoot {
1412 context: Arc::clone(&self.context),
1413 },
1414 subscription,
1415 )
1416 .finish(),
1417 )
1418 }
1419 }
1420
1421 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1423 pub async fn run(
1424 self,
1425 cancellation_token: CancellationToken,
1426 command_receiver: UnboundedReceiver<ListenerCommand>,
1427 ) -> Result<(), anyhow::Error> {
1428 let port = self.port.get();
1429 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1430 let application_handler =
1431 axum::routing::get(util::graphiql).post(Self::application_handler);
1432
1433 #[cfg(with_metrics)]
1434 monitoring_server::start_metrics_with_profiling(
1435 self.metrics_address(),
1436 cancellation_token.clone(),
1437 self.enable_memory_profiling,
1438 )
1439 .await;
1440
1441 let base_router = Router::new()
1442 .route("/", index_handler)
1443 .route(
1444 "/chains/{chain_id}/applications/{application_id}",
1445 application_handler,
1446 )
1447 .route("/ready", axum::routing::get(|| async { "ready!" }));
1448
1449 let app = match self.schema() {
1451 NodeServiceSchema::Full(schema) => {
1452 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1453 }
1454 NodeServiceSchema::ReadOnly(schema) => {
1455 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1456 }
1457 }
1458 .layer(Extension(self.clone()))
1459 .layer(CorsLayer::permissive());
1461
1462 info!("GraphiQL IDE: http://localhost:{}", port);
1463
1464 if let Some(cache) = &self.query_cache {
1466 let guard = self.context.lock().await;
1467 let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1468 let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1469 guard.client().subscribe_extra(chain_ids.clone(), &tx);
1470 cache.mark_all_subscribed(&chain_ids);
1471 cache.set_notification_sender(tx);
1472 drop(guard);
1473 let cache = Arc::clone(cache);
1474 tokio::spawn(async move {
1475 while let Some(notification) = receiver.recv().await {
1476 if let Reason::NewBlock { height, .. } = notification.reason {
1477 let next_block_height = height
1478 .try_add_one()
1479 .expect("block height should not overflow");
1480 cache.invalidate_chain(¬ification.chain_id, next_block_height);
1481 }
1482 }
1483 });
1484 }
1485
1486 let tcp_listener =
1487 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1488 let server = axum::serve(tcp_listener, app)
1489 .with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
1490 .into_future();
1491
1492 if self.pause {
1493 info!("Running in paused mode: chain synchronization is disabled");
1494 server.await?;
1495 } else {
1496 let storage = self.context.lock().await.storage().clone();
1497 let chain_listener = ChainListener::new(
1498 self.config,
1499 self.context,
1500 storage,
1501 cancellation_token.clone(),
1502 command_receiver,
1503 true,
1504 )
1505 .run()
1506 .await?;
1507 let mut chain_listener = Box::pin(chain_listener).fuse();
1508 futures::select! {
1509 result = chain_listener => result?,
1510 result = Box::pin(server).fuse() => result?,
1511 };
1512 }
1513
1514 Ok(())
1515 }
1516
1517 async fn handle_service_request(
1519 &self,
1520 application_id: ApplicationId,
1521 request: Vec<u8>,
1522 chain_id: ChainId,
1523 block_hash: Option<CryptoHash>,
1524 ) -> Result<Vec<u8>, NodeServiceError> {
1525 let cache = block_hash
1527 .is_none()
1528 .then_some(self.query_cache.as_ref())
1529 .flatten();
1530
1531 if let Some(cache) = cache {
1533 if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1534 return Ok(cached);
1535 }
1536 }
1537
1538 let (
1539 QueryOutcome {
1540 response,
1541 operations,
1542 },
1543 block_height,
1544 ) = self
1545 .query_user_application(application_id, request.clone(), chain_id, block_hash)
1546 .await?;
1547 if operations.is_empty() {
1548 if let Some(cache) = cache {
1549 if cache.needs_subscription(&chain_id) {
1551 if let Some(sender) = cache.notification_sender() {
1552 self.context
1553 .lock()
1554 .await
1555 .client()
1556 .subscribe_extra(vec![chain_id], &sender);
1557 cache.mark_subscribed(chain_id);
1558 }
1559 }
1560 cache.insert(
1561 chain_id,
1562 application_id,
1563 request,
1564 response.clone(),
1565 block_height,
1566 );
1567 }
1568 return Ok(response);
1569 }
1570
1571 if self.read_only {
1572 return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1573 }
1574
1575 trace!("Query requested a new block with operations: {operations:?}");
1576 let client = self
1577 .context
1578 .lock()
1579 .await
1580 .make_chain_client(chain_id)
1581 .await?;
1582 let hash = loop {
1583 let timeout = match client
1584 .execute_operations(operations.clone(), vec![])
1585 .await?
1586 {
1587 ClientOutcome::Committed(certificate) => break certificate.hash(),
1588 ClientOutcome::Conflict(certificate) => {
1589 return Err(chain_client::Error::Conflict(certificate.hash()).into());
1590 }
1591 ClientOutcome::WaitForTimeout(timeout) => timeout,
1592 };
1593 let mut stream = client.subscribe().map_err(|_| {
1594 chain_client::Error::InternalError("Could not subscribe to the local node.")
1595 })?;
1596 util::wait_for_next_round(&mut stream, timeout).await;
1597 };
1598 let response = async_graphql::Response::new(hash.to_value());
1599 Ok(serde_json::to_vec(&response)?)
1600 }
1601
1602 async fn query_user_application(
1605 &self,
1606 application_id: ApplicationId,
1607 bytes: Vec<u8>,
1608 chain_id: ChainId,
1609 block_hash: Option<CryptoHash>,
1610 ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1611 let query = Query::User {
1612 application_id,
1613 bytes,
1614 };
1615 let client = self
1616 .context
1617 .lock()
1618 .await
1619 .make_chain_client(chain_id)
1620 .await?;
1621 let (
1622 QueryOutcome {
1623 response,
1624 operations,
1625 },
1626 next_block_height,
1627 ) = client.query_application(query, block_hash).await?;
1628 match response {
1629 QueryResponse::System(_) => {
1630 unreachable!("cannot get a system response for a user query")
1631 }
1632 QueryResponse::User(user_response_bytes) => Ok((
1633 QueryOutcome {
1634 response: user_response_bytes,
1635 operations,
1636 },
1637 next_block_height,
1638 )),
1639 }
1640 }
1641
1642 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1644 service
1645 .0
1646 .schema()
1647 .execute(request.into_inner())
1648 .await
1649 .into()
1650 }
1651
1652 async fn application_handler(
1656 Path((chain_id, application_id)): Path<(String, String)>,
1657 service: Extension<Self>,
1658 request: String,
1659 ) -> Result<Vec<u8>, NodeServiceError> {
1660 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1661 let application_id: ApplicationId = application_id.parse()?;
1662
1663 debug!(
1664 %chain_id,
1665 %application_id,
1666 "processing request for application:\n{:?}",
1667 &request
1668 );
1669 let response = service
1670 .0
1671 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1672 .await?;
1673
1674 Ok(response)
1675 }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680 use linera_base::{
1681 crypto::CryptoHash,
1682 data_types::BlockHeight,
1683 identifiers::{ApplicationId, ChainId},
1684 };
1685
1686 use super::QueryResponseCache;
1687
1688 fn test_chain(n: u64) -> ChainId {
1689 ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1690 }
1691
1692 fn test_app(n: u64) -> ApplicationId {
1693 ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1694 }
1695
1696 #[test]
1697 fn cache_hit_and_miss() {
1698 let cache = QueryResponseCache::new(100);
1699 let chain = test_chain(0);
1700 let app = test_app(0);
1701 let request = b"query { balance }".to_vec();
1702 let response = b"{ \"balance\": 42 }".to_vec();
1703
1704 assert!(cache.get(chain, &app, &request).is_none());
1706
1707 cache.insert(
1709 chain,
1710 app,
1711 request.clone(),
1712 response.clone(),
1713 BlockHeight(1),
1714 );
1715
1716 assert_eq!(cache.get(chain, &app, &request), Some(response));
1718 }
1719
1720 #[test]
1721 fn per_chain_isolation() {
1722 let cache = QueryResponseCache::new(100);
1723 let chain_a = test_chain(0);
1724 let chain_b = test_chain(1);
1725 let app = test_app(0);
1726 let request = b"q".to_vec();
1727 let response = b"r".to_vec();
1728
1729 cache.insert(
1730 chain_a,
1731 app,
1732 request.clone(),
1733 response.clone(),
1734 BlockHeight(1),
1735 );
1736
1737 cache.invalidate_chain(&chain_b, BlockHeight(1));
1739 assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1740 }
1741
1742 #[test]
1743 fn invalidation_clears_all_entries() {
1744 let cache = QueryResponseCache::new(100);
1745 let chain = test_chain(0);
1746 let app = test_app(0);
1747
1748 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1749 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1750
1751 cache.invalidate_chain(&chain, BlockHeight(2));
1752 assert!(cache.get(chain, &app, b"q1").is_none());
1753 assert!(cache.get(chain, &app, b"q2").is_none());
1754 }
1755
1756 #[test]
1757 fn lru_eviction() {
1758 let cache = QueryResponseCache::new(2);
1759 let chain = test_chain(0);
1760 let app = test_app(0);
1761
1762 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1763 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1764 cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1766
1767 assert!(cache.get(chain, &app, b"q1").is_none());
1768 assert!(cache.get(chain, &app, b"q2").is_some());
1769 assert!(cache.get(chain, &app, b"q3").is_some());
1770 }
1771
1772 #[test]
1773 fn stale_insert_rejected_after_invalidation() {
1774 let cache = QueryResponseCache::new(100);
1775 let chain = test_chain(0);
1776 let app = test_app(0);
1777
1778 cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1780 let stale_height = BlockHeight(3);
1781
1782 cache.invalidate_chain(&chain, BlockHeight(4));
1784
1785 cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1787
1788 assert!(cache.get(chain, &app, b"q").is_none());
1790 }
1791}