1use std::{
5 borrow::Cow,
6 future::IntoFuture,
7 iter,
8 net::SocketAddr,
9 num::NonZeroU16,
10 sync::{Arc, Mutex as StdMutex},
11};
12
13use async_graphql::{
14 futures_util::Stream,
15 registry::{MetaType, MetaTypeId, Registry},
16 resolver_utils::ContainerType,
17 EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
18 Schema, SimpleObject, Subscription,
19};
20use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
21use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
22use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
23use linera_base::{
24 crypto::{CryptoError, CryptoHash},
25 data_types::{
26 Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
27 TimeDelta,
28 },
29 identifiers::{
30 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
31 },
32 ownership::{ChainOwnership, TimeoutConfig},
33 vm::VmRuntime,
34 BcsHexParseError,
35};
36use linera_chain::{
37 types::{ConfirmedBlock, GenericCertificate},
38 ChainStateView,
39};
40use linera_client::chain_listener::{
41 ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
42};
43use linera_core::{
44 client::chain_client::{self, ChainClient},
45 data_types::ClientOutcome,
46 wallet::Wallet as _,
47 worker::{ChainStateViewReadGuard, Notification, Reason},
48};
49use linera_execution::{
50 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
51 SystemOperation,
52};
53#[cfg(with_metrics)]
54use linera_metrics::monitoring_server;
55use linera_sdk::linera_base_types::BlobContent;
56use linera_storage::Storage;
57use lru::LruCache;
58use serde::{Deserialize, Serialize};
59use serde_json::json;
60use tokio::sync::mpsc::UnboundedReceiver;
61use tokio_util::sync::CancellationToken;
62use tower_http::cors::CorsLayer;
63use tracing::{debug, info, instrument, trace};
64
65use crate::util;
66
67#[derive(Clone)]
73struct RawJson(String);
74
75impl OutputType for RawJson {
76 fn type_name() -> Cow<'static, str> {
77 Cow::Borrowed("JSON")
78 }
79
80 fn create_type_info(registry: &mut Registry) -> String {
81 registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
82 name: "JSON".to_string(),
83 description: Some("A scalar that can represent any JSON value.".to_string()),
84 is_valid: None,
85 visible: None,
86 inaccessible: false,
87 tags: Default::default(),
88 specified_by_url: None,
89 directive_invocations: Default::default(),
90 requires_scopes: Default::default(),
91 })
92 }
93
94 async fn resolve(
95 &self,
96 _ctx: &async_graphql::ContextSelectionSet<'_>,
97 _field: &Positioned<async_graphql::parser::types::Field>,
98 ) -> async_graphql::ServerResult<async_graphql::Value> {
99 Ok(async_graphql::Value::Object(
104 std::iter::once((
105 async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
106 async_graphql::Value::String(self.0.clone()),
107 ))
108 .collect(),
109 ))
110 }
111}
112
113#[derive(SimpleObject, Serialize, Deserialize, Clone)]
114pub struct Chains {
115 pub list: Vec<ChainId>,
116 pub default: Option<ChainId>,
117}
118
119pub struct QueryRoot<C> {
121 context: Arc<Mutex<C>>,
122 port: NonZeroU16,
123 default_chain: Option<ChainId>,
124}
125
126pub struct SubscriptionRoot<C> {
128 context: Arc<Mutex<C>>,
129 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
130 cancellation_token: CancellationToken,
131}
132
133pub struct MutationRoot<C> {
135 context: Arc<Mutex<C>>,
136}
137
138#[derive(Debug, thiserror::Error)]
139enum NodeServiceError {
140 #[error(transparent)]
141 ChainClient(#[from] chain_client::Error),
142 #[error(transparent)]
143 BcsHex(#[from] BcsHexParseError),
144 #[error(transparent)]
145 Json(#[from] serde_json::Error),
146 #[error("malformed chain ID: {0}")]
147 InvalidChainId(CryptoError),
148 #[error(transparent)]
149 Client(#[from] linera_client::Error),
150 #[error("scheduling operations from queries is disabled in read-only mode")]
151 ReadOnlyModeOperationsNotAllowed,
152}
153
154impl IntoResponse for NodeServiceError {
155 fn into_response(self) -> response::Response {
156 let status = match self {
157 NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
158 StatusCode::BAD_REQUEST
159 }
160 NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
161 _ => StatusCode::INTERNAL_SERVER_ERROR,
162 };
163 let body = json!({"error": self.to_string()}).to_string();
164 (status, body).into_response()
165 }
166}
167
168#[Subscription]
169impl<C> SubscriptionRoot<C>
170where
171 C: ClientContext + 'static,
172{
173 async fn notifications(
175 &self,
176 chain_id: ChainId,
177 ) -> Result<impl Stream<Item = Notification>, Error> {
178 let client = self
179 .context
180 .lock()
181 .await
182 .make_chain_client(chain_id)
183 .await?;
184 Ok(client.subscribe()?)
185 }
186
187 async fn query_result(
190 &self,
191 #[graphql(desc = "Name of the registered subscription query.")] name: String,
192 #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
193 #[graphql(desc = "The application to query.")] application_id: ApplicationId,
194 ) -> Result<impl Stream<Item = RawJson>, Error> {
195 let manager = self
196 .query_subscriptions
197 .as_ref()
198 .ok_or_else(|| Error::new("no subscription queries registered"))?;
199
200 let key = crate::query_subscription::SubscriptionKey {
201 name,
202 chain_id,
203 application_id,
204 };
205
206 let receiver = manager
207 .subscribe(
208 &key,
209 Arc::clone(&self.context),
210 self.cancellation_token.clone(),
211 )
212 .map_err(|e| Error::new(e.to_string()))?;
213
214 let current = receiver.borrow().clone();
219 let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
220 .filter_map(|value| async move { value });
221 Ok(futures::stream::iter(current).chain(changes).map(RawJson))
222 }
223}
224
225impl<C> MutationRoot<C>
226where
227 C: ClientContext,
228{
229 async fn execute_system_operation(
230 &self,
231 system_operation: SystemOperation,
232 chain_id: ChainId,
233 ) -> Result<CryptoHash, Error> {
234 let certificate = self
235 .apply_client_command(&chain_id, move |client| {
236 let operation = Operation::system(system_operation.clone());
237 async move {
238 let result = client
239 .execute_operation(operation)
240 .await
241 .map_err(Error::from);
242 (result, client)
243 }
244 })
245 .await?;
246 Ok(certificate.hash())
247 }
248
249 async fn apply_client_command<F, Fut, T>(
253 &self,
254 chain_id: &ChainId,
255 mut f: F,
256 ) -> Result<T, Error>
257 where
258 F: FnMut(ChainClient<C::Environment>) -> Fut,
259 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
260 {
261 loop {
262 let client = self
263 .context
264 .lock()
265 .await
266 .make_chain_client(*chain_id)
267 .await?;
268 let mut stream = client.subscribe()?;
269 let (result, client) = f(client).await;
270 self.context.lock().await.update_wallet(&client).await?;
271 let timeout = match result? {
272 ClientOutcome::Committed(t) => return Ok(t),
273 ClientOutcome::Conflict(certificate) => {
274 return Err(chain_client::Error::Conflict(certificate.hash()).into());
275 }
276 ClientOutcome::WaitForTimeout(timeout) => timeout,
277 };
278 drop(client);
279 util::wait_for_next_round(&mut stream, timeout).await;
280 }
281 }
282}
283
284#[async_graphql::Object(cache_control(no_cache))]
285impl<C> MutationRoot<C>
286where
287 C: ClientContext + 'static,
288{
289 async fn process_inbox(
291 &self,
292 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
293 ) -> Result<Vec<CryptoHash>, Error> {
294 let mut hashes = Vec::new();
295 loop {
296 let client = self
297 .context
298 .lock()
299 .await
300 .make_chain_client(chain_id)
301 .await?;
302 let result = client.process_inbox().await;
303 self.context.lock().await.update_wallet(&client).await?;
304 let (certificates, maybe_timeout) = result?;
305 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
306 match maybe_timeout {
307 None => return Ok(hashes),
308 Some(timestamp) => {
309 let mut stream = client.subscribe()?;
310 drop(client);
311 util::wait_for_next_round(&mut stream, timestamp).await;
312 }
313 }
314 }
315 }
316
317 async fn sync(
322 &self,
323 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
324 ) -> Result<u64, Error> {
325 let client = self
326 .context
327 .lock()
328 .await
329 .make_chain_client(chain_id)
330 .await?;
331 let info = client.synchronize_from_validators().await?;
332 self.context.lock().await.update_wallet(&client).await?;
333 Ok(info.next_block_height.0)
334 }
335
336 async fn retry_pending_block(
338 &self,
339 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
340 ) -> Result<Option<CryptoHash>, Error> {
341 let client = self
342 .context
343 .lock()
344 .await
345 .make_chain_client(chain_id)
346 .await?;
347 let outcome = client.process_pending_block().await?;
348 self.context.lock().await.update_wallet(&client).await?;
349 match outcome {
350 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
351 ClientOutcome::Committed(None) => Ok(None),
352 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
353 "Please try again at {}",
354 timeout.timestamp
355 ))),
356 ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
357 "A different block was committed: {}",
358 certificate.hash()
359 ))),
360 }
361 }
362
363 async fn transfer(
366 &self,
367 #[graphql(desc = "The chain which native tokens are being transferred from.")]
368 chain_id: ChainId,
369 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
370 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
371 #[graphql(desc = "The amount being transferred.")] amount: Amount,
372 ) -> Result<CryptoHash, Error> {
373 self.apply_client_command(&chain_id, move |client| async move {
374 let result = client
375 .transfer(owner, amount, recipient)
376 .await
377 .map_err(Error::from)
378 .map(|outcome| outcome.map(|certificate| certificate.hash()));
379 (result, client)
380 })
381 .await
382 }
383
384 async fn claim(
388 &self,
389 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
390 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
391 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
392 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
393 #[graphql(desc = "The amount being transferred.")] amount: Amount,
394 ) -> Result<CryptoHash, Error> {
395 self.apply_client_command(&chain_id, move |client| async move {
396 let result = client
397 .claim(owner, target_id, recipient, amount)
398 .await
399 .map_err(Error::from)
400 .map(|outcome| outcome.map(|certificate| certificate.hash()));
401 (result, client)
402 })
403 .await
404 }
405
406 async fn read_data_blob(
409 &self,
410 chain_id: ChainId,
411 hash: CryptoHash,
412 ) -> Result<CryptoHash, Error> {
413 self.apply_client_command(&chain_id, move |client| async move {
414 let result = client
415 .read_data_blob(hash)
416 .await
417 .map_err(Error::from)
418 .map(|outcome| outcome.map(|certificate| certificate.hash()));
419 (result, client)
420 })
421 .await
422 }
423
424 async fn open_chain(
426 &self,
427 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
428 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
429 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
430 balance: Option<Amount>,
431 ) -> Result<ChainId, Error> {
432 let ownership = ChainOwnership::single(owner);
433 let balance = balance.unwrap_or(Amount::ZERO);
434 let description = self
435 .apply_client_command(&chain_id, move |client| {
436 let ownership = ownership.clone();
437 async move {
438 let result = client
439 .open_chain(ownership, ApplicationPermissions::default(), balance)
440 .await
441 .map_err(Error::from)
442 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
443 (result, client)
444 }
445 })
446 .await?;
447 Ok(description.id())
448 }
449
450 #[expect(clippy::too_many_arguments)]
452 async fn open_multi_owner_chain(
453 &self,
454 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
455 #[graphql(desc = "Permissions for applications on the new chain")]
456 application_permissions: Option<ApplicationPermissions>,
457 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
458 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
459 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
460 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
461 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
462 fast_round_ms: Option<u64>,
463 #[graphql(
464 desc = "The duration of the first single-leader and all multi-leader rounds",
465 default = 10_000
466 )]
467 base_timeout_ms: u64,
468 #[graphql(
469 desc = "The number of milliseconds by which the timeout increases after each \
470 single-leader round",
471 default = 1_000
472 )]
473 timeout_increment_ms: u64,
474 #[graphql(
475 desc = "The age of an incoming tracked or protected message after which the \
476 validators start transitioning the chain to fallback mode, in milliseconds.",
477 default = 86_400_000
478 )]
479 fallback_duration_ms: u64,
480 ) -> Result<ChainId, Error> {
481 let owners = if let Some(weights) = weights {
482 if weights.len() != owners.len() {
483 return Err(Error::new(format!(
484 "There are {} owners but {} weights.",
485 owners.len(),
486 weights.len()
487 )));
488 }
489 owners.into_iter().zip(weights).collect::<Vec<_>>()
490 } else {
491 owners
492 .into_iter()
493 .zip(iter::repeat(100))
494 .collect::<Vec<_>>()
495 };
496 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
497 let timeout_config = TimeoutConfig {
498 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
499 base_timeout: TimeDelta::from_millis(base_timeout_ms),
500 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
501 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
502 };
503 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
504 let balance = balance.unwrap_or(Amount::ZERO);
505 let description = self
506 .apply_client_command(&chain_id, move |client| {
507 let ownership = ownership.clone();
508 let application_permissions = application_permissions.clone().unwrap_or_default();
509 async move {
510 let result = client
511 .open_chain(ownership, application_permissions, balance)
512 .await
513 .map_err(Error::from)
514 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
515 (result, client)
516 }
517 })
518 .await?;
519 Ok(description.id())
520 }
521
522 async fn close_chain(
524 &self,
525 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
526 ) -> Result<Option<CryptoHash>, Error> {
527 let maybe_cert = self
528 .apply_client_command(&chain_id, |client| async move {
529 let result = client.close_chain().await.map_err(Error::from);
530 (result, client)
531 })
532 .await?;
533 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
534 }
535
536 async fn change_owner(
538 &self,
539 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
540 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
541 ) -> Result<CryptoHash, Error> {
542 let operation = SystemOperation::ChangeOwnership {
543 super_owners: vec![new_owner],
544 owners: Vec::new(),
545 first_leader: None,
546 multi_leader_rounds: 2,
547 open_multi_leader_rounds: false,
548 timeout_config: TimeoutConfig::default(),
549 };
550 self.execute_system_operation(operation, chain_id).await
551 }
552
553 #[expect(clippy::too_many_arguments)]
555 async fn change_multiple_owners(
556 &self,
557 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
558 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
559 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
560 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
561 #[graphql(
562 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
563 )]
564 open_multi_leader_rounds: bool,
565 #[graphql(desc = "The leader of the first single-leader round. \
566 If not set, this is random like other rounds.")]
567 first_leader: Option<AccountOwner>,
568 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
569 fast_round_ms: Option<u64>,
570 #[graphql(
571 desc = "The duration of the first single-leader and all multi-leader rounds",
572 default = 10_000
573 )]
574 base_timeout_ms: u64,
575 #[graphql(
576 desc = "The number of milliseconds by which the timeout increases after each \
577 single-leader round",
578 default = 1_000
579 )]
580 timeout_increment_ms: u64,
581 #[graphql(
582 desc = "The age of an incoming tracked or protected message after which the \
583 validators start transitioning the chain to fallback mode, in milliseconds.",
584 default = 86_400_000
585 )]
586 fallback_duration_ms: u64,
587 ) -> Result<CryptoHash, Error> {
588 let operation = SystemOperation::ChangeOwnership {
589 super_owners: Vec::new(),
590 owners: new_owners.into_iter().zip(new_weights).collect(),
591 first_leader,
592 multi_leader_rounds,
593 open_multi_leader_rounds,
594 timeout_config: TimeoutConfig {
595 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
596 base_timeout: TimeDelta::from_millis(base_timeout_ms),
597 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
598 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
599 },
600 };
601 self.execute_system_operation(operation, chain_id).await
602 }
603
604 #[expect(clippy::too_many_arguments)]
606 async fn change_application_permissions(
607 &self,
608 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
609 #[graphql(
610 desc = "These applications are allowed to manage the chain: close it, change \
611 application permissions, and change ownership."
612 )]
613 manage_chain: Vec<ApplicationId>,
614 #[graphql(
615 desc = "If this is `None`, all system operations and application operations are allowed.
616If it is `Some`, only operations from the specified applications are allowed,
617and no system operations."
618 )]
619 execute_operations: Option<Vec<ApplicationId>>,
620 #[graphql(
621 desc = "At least one operation or incoming message from each of these applications must occur in every block."
622 )]
623 mandatory_applications: Vec<ApplicationId>,
624 #[graphql(
625 desc = "These applications are allowed to perform calls to services as oracles."
626 )]
627 call_service_as_oracle: Option<Vec<ApplicationId>>,
628 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
629 make_http_requests: Option<Vec<ApplicationId>>,
630 ) -> Result<CryptoHash, Error> {
631 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
632 execute_operations,
633 mandatory_applications,
634 manage_chain,
635 call_service_as_oracle,
636 make_http_requests,
637 });
638 self.execute_system_operation(operation, chain_id).await
639 }
640
641 async fn create_committee(
645 &self,
646 chain_id: ChainId,
647 committee: Committee,
648 ) -> Result<CryptoHash, Error> {
649 Ok(self
650 .apply_client_command(&chain_id, move |client| {
651 let committee = committee.clone();
652 async move {
653 let result = client
654 .stage_new_committee(committee)
655 .await
656 .map_err(Error::from);
657 (result, client)
658 }
659 })
660 .await?
661 .hash())
662 }
663
664 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
668 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
669 self.execute_system_operation(operation, chain_id).await
670 }
671
672 async fn publish_module(
674 &self,
675 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
676 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
677 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
678 service: Bytecode,
679 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
680 vm_runtime: VmRuntime,
681 ) -> Result<ModuleId, Error> {
682 self.apply_client_command(&chain_id, move |client| {
683 let contract = contract.clone();
684 let service = service.clone();
685 async move {
686 let result = client
687 .publish_module(contract, service, vm_runtime)
688 .await
689 .map_err(Error::from)
690 .map(|outcome| outcome.map(|(module_id, _)| module_id));
691 (result, client)
692 }
693 })
694 .await
695 }
696
697 async fn publish_data_blob(
699 &self,
700 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
701 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
702 ) -> Result<CryptoHash, Error> {
703 self.apply_client_command(&chain_id, |client| {
704 let bytes = bytes.clone();
705 async move {
706 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
707 (result, client)
708 }
709 })
710 .await
711 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
712 }
713
714 async fn create_application(
716 &self,
717 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
718 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
719 #[graphql(desc = "The JSON serialization of the parameters of the application")]
720 parameters: String,
721 #[graphql(
722 desc = "The JSON serialization of the instantiation argument of the application"
723 )]
724 instantiation_argument: String,
725 #[graphql(desc = "The dependencies of the application being created")]
726 required_application_ids: Vec<ApplicationId>,
727 ) -> Result<ApplicationId, Error> {
728 self.apply_client_command(&chain_id, move |client| {
729 let parameters = parameters.as_bytes().to_vec();
730 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
731 let required_application_ids = required_application_ids.clone();
732 async move {
733 let result = client
734 .create_application_untyped(
735 module_id,
736 parameters,
737 instantiation_argument,
738 required_application_ids,
739 )
740 .await
741 .map_err(Error::from)
742 .map(|outcome| outcome.map(|(application_id, _)| application_id));
743 (result, client)
744 }
745 })
746 .await
747 }
748}
749
750#[async_graphql::Object(cache_control(no_cache))]
751impl<C> QueryRoot<C>
752where
753 C: ClientContext + 'static,
754{
755 async fn chain(
756 &self,
757 chain_id: ChainId,
758 ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
759 {
760 let client = self
761 .context
762 .lock()
763 .await
764 .make_chain_client(chain_id)
765 .await?;
766 let view = client.chain_state_view().await?;
767 Ok(ChainStateExtendedView::new(view))
768 }
769
770 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
771 let client = self
772 .context
773 .lock()
774 .await
775 .make_chain_client(chain_id)
776 .await?;
777 let applications = client
778 .chain_state_view()
779 .await?
780 .execution_state
781 .list_applications()
782 .await?;
783
784 let overviews = applications
785 .into_iter()
786 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
787 .collect();
788
789 Ok(overviews)
790 }
791
792 async fn chains(&self) -> Result<Chains, Error> {
793 Ok(Chains {
794 list: self
795 .context
796 .lock()
797 .await
798 .wallet()
799 .chain_ids()
800 .try_collect()
801 .await?,
802 default: self.default_chain,
803 })
804 }
805
806 async fn block(
807 &self,
808 hash: Option<CryptoHash>,
809 chain_id: ChainId,
810 ) -> Result<Option<ConfirmedBlock>, Error> {
811 let client = self
812 .context
813 .lock()
814 .await
815 .make_chain_client(chain_id)
816 .await?;
817 let hash = match hash {
818 Some(hash) => Some(hash),
819 None => client.chain_info().await?.block_hash,
820 };
821 if let Some(hash) = hash {
822 let block = client.read_confirmed_block(hash).await?;
823 Ok(Some(block))
824 } else {
825 Ok(None)
826 }
827 }
828
829 async fn events_from_index(
830 &self,
831 chain_id: ChainId,
832 stream_id: StreamId,
833 start_index: u32,
834 ) -> Result<Vec<IndexAndEvent>, Error> {
835 Ok(self
836 .context
837 .lock()
838 .await
839 .make_chain_client(chain_id)
840 .await?
841 .events_from_index(stream_id, start_index)
842 .await?)
843 }
844
845 async fn blocks(
846 &self,
847 from: Option<CryptoHash>,
848 chain_id: ChainId,
849 limit: Option<u32>,
850 ) -> Result<Vec<ConfirmedBlock>, Error> {
851 let client = self
852 .context
853 .lock()
854 .await
855 .make_chain_client(chain_id)
856 .await?;
857 let limit = limit.unwrap_or(10);
858 let from = match from {
859 Some(from) => Some(from),
860 None => client.chain_info().await?.block_hash,
861 };
862 let Some(from) = from else {
863 return Ok(vec![]);
864 };
865 let mut hash = Some(from);
866 let mut values = Vec::new();
867 for _ in 0..limit {
868 let Some(next_hash) = hash else {
869 break;
870 };
871 let value = client.read_confirmed_block(next_hash).await?;
872 hash = value.block().header.previous_block_hash;
873 values.push(value);
874 }
875 Ok(values)
876 }
877
878 async fn version(&self) -> linera_version::VersionInfo {
880 linera_version::VersionInfo::default()
881 }
882}
883
884struct ChainStateViewExtension(ChainId);
888
889#[async_graphql::Object(cache_control(no_cache))]
890impl ChainStateViewExtension {
891 async fn chain_id(&self) -> ChainId {
892 self.0
893 }
894}
895
896#[derive(MergedObject)]
897struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
898
899pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
902
903impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
904where
905 ChainStateView<S::Context>: ContainerType,
906{
907 async fn resolve_field(
908 &self,
909 context: &async_graphql::Context<'_>,
910 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
911 self.0.resolve_field(context).await
912 }
913}
914
915impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
916where
917 ChainStateView<S::Context>: OutputType,
918{
919 fn type_name() -> Cow<'static, str> {
920 ChainStateView::<S::Context>::type_name()
921 }
922
923 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
924 ChainStateView::<S::Context>::create_type_info(registry)
925 }
926
927 async fn resolve(
928 &self,
929 context: &async_graphql::ContextSelectionSet<'_>,
930 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
931 ) -> async_graphql::ServerResult<async_graphql::Value> {
932 self.0.resolve(context, field).await
933 }
934}
935
936impl<S: Storage> ChainStateExtendedView<S> {
937 fn new(view: ChainStateViewReadGuard<S>) -> Self {
938 Self(
939 ChainStateViewExtension(view.chain_id()),
940 ReadOnlyChainStateView(view),
941 )
942 }
943}
944
945#[derive(SimpleObject)]
946pub struct ApplicationOverview {
947 id: ApplicationId,
948 description: ApplicationDescription,
949 link: String,
950}
951
952impl ApplicationOverview {
953 fn new(
954 id: ApplicationId,
955 description: ApplicationDescription,
956 port: NonZeroU16,
957 chain_id: ChainId,
958 ) -> Self {
959 Self {
960 id,
961 description,
962 link: format!(
963 "http://localhost:{}/chains/{}/applications/{}",
964 port.get(),
965 chain_id,
966 id
967 ),
968 }
969 }
970}
971
972pub enum NodeServiceSchema<C>
974where
975 C: ClientContext + 'static,
976{
977 Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
979 ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
981}
982
983impl<C> NodeServiceSchema<C>
984where
985 C: ClientContext,
986{
987 pub async fn execute(&self, request: impl Into<Request>) -> Response {
989 match self {
990 Self::Full(schema) => schema.execute(request).await,
991 Self::ReadOnly(schema) => schema.execute(request).await,
992 }
993 }
994
995 pub fn sdl(&self) -> String {
997 match self {
998 Self::Full(schema) => schema.sdl(),
999 Self::ReadOnly(schema) => schema.sdl(),
1000 }
1001 }
1002}
1003
1004impl<C> Clone for NodeServiceSchema<C>
1005where
1006 C: ClientContext,
1007{
1008 fn clone(&self) -> Self {
1009 match self {
1010 Self::Full(schema) => Self::Full(schema.clone()),
1011 Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1012 }
1013 }
1014}
1015
1016#[cfg(with_metrics)]
1017mod query_cache_metrics {
1018 use std::sync::LazyLock;
1019
1020 use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1021 use prometheus::{IntCounterVec, IntGauge};
1022
1023 pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1024 register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1025 });
1026
1027 pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1028 register_int_counter_vec(
1029 "query_response_cache_miss",
1030 "Query response cache misses",
1031 &[],
1032 )
1033 });
1034
1035 pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1036 register_int_counter_vec(
1037 "query_response_cache_invalidation",
1038 "Query response cache invalidations (per chain)",
1039 &[],
1040 )
1041 });
1042
1043 pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1044 register_int_gauge(
1045 "query_response_cache_entries",
1046 "Current number of cached query responses across all chains",
1047 )
1048 });
1049}
1050
1051struct PerChainCache {
1054 lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1055 next_block_height: BlockHeight,
1056}
1057
1058struct QueryResponseCache {
1067 chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1068 subscribed: papaya::HashSet<ChainId>,
1070 notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1072 capacity_per_chain: std::num::NonZeroUsize,
1073}
1074
1075impl QueryResponseCache {
1076 fn new(capacity_per_chain: usize) -> Self {
1077 Self {
1078 chains: papaya::HashMap::new(),
1079 subscribed: papaya::HashSet::new(),
1080 notification_sender: StdMutex::new(None),
1081 capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1082 .expect("capacity must be > 0"),
1083 }
1084 }
1085
1086 fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1088 *self
1089 .notification_sender
1090 .lock()
1091 .expect("sender mutex poisoned") = Some(sender);
1092 }
1093
1094 fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1096 self.notification_sender
1097 .lock()
1098 .expect("sender mutex poisoned")
1099 .clone()
1100 }
1101
1102 fn mark_subscribed(&self, chain_id: ChainId) {
1104 self.subscribed.pin().insert(chain_id);
1105 }
1106
1107 fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1109 !self.subscribed.pin().contains(chain_id)
1110 }
1111
1112 fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1114 let pinned = self.subscribed.pin();
1115 for &chain_id in chain_ids {
1116 pinned.insert(chain_id);
1117 }
1118 }
1119
1120 #[allow(clippy::question_mark)]
1123 fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1124 let pinned = self.chains.pin();
1125 let Some(mutex) = pinned.get(&chain_id) else {
1126 #[cfg(with_metrics)]
1127 query_cache_metrics::QUERY_CACHE_MISS
1128 .with_label_values(&[])
1129 .inc();
1130 return None;
1131 };
1132 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1133 let key = (*app_id, request.to_vec());
1134 let result = cache.lru.get(&key).cloned();
1135 #[cfg(with_metrics)]
1136 {
1137 if result.is_some() {
1138 query_cache_metrics::QUERY_CACHE_HIT
1139 .with_label_values(&[])
1140 .inc();
1141 } else {
1142 query_cache_metrics::QUERY_CACHE_MISS
1143 .with_label_values(&[])
1144 .inc();
1145 }
1146 }
1147 result
1148 }
1149
1150 fn insert(
1154 &self,
1155 chain_id: ChainId,
1156 app_id: ApplicationId,
1157 request: Vec<u8>,
1158 response: Vec<u8>,
1159 next_block_height: BlockHeight,
1160 ) {
1161 let pinned = self.chains.pin();
1162 let capacity = self.capacity_per_chain;
1163 let mutex = pinned.get_or_insert_with(chain_id, || {
1164 StdMutex::new(PerChainCache {
1165 lru: LruCache::new(capacity),
1166 next_block_height,
1167 })
1168 });
1169 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1170 if next_block_height < cache.next_block_height {
1171 return; }
1173 if next_block_height > cache.next_block_height {
1177 debug!(
1178 "Unexpected query cache invalidation for chain {chain_id}:\
1179 {next_block_height} > {}",
1180 cache.next_block_height
1181 );
1182 #[cfg(with_metrics)]
1183 {
1184 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache.lru.len() as i64);
1185 query_cache_metrics::QUERY_CACHE_INVALIDATION
1186 .with_label_values(&[])
1187 .inc();
1188 }
1189 cache.lru.clear();
1190 cache.next_block_height = next_block_height;
1191 }
1192 #[cfg(with_metrics)]
1193 let prev_len = cache.lru.len();
1194 cache.lru.put((app_id, request), response);
1195 #[cfg(with_metrics)]
1196 if cache.lru.len() != prev_len {
1197 query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1198 }
1199 }
1200
1201 fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1204 let pinned = self.chains.pin();
1205 let capacity = self.capacity_per_chain;
1206 let mutex = pinned.get_or_insert_with(*chain_id, || {
1207 StdMutex::new(PerChainCache {
1208 lru: LruCache::new(capacity),
1209 next_block_height,
1210 })
1211 });
1212 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1213 if next_block_height > cache.next_block_height {
1214 #[cfg(with_metrics)]
1215 {
1216 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache.lru.len() as i64);
1217 query_cache_metrics::QUERY_CACHE_INVALIDATION
1218 .with_label_values(&[])
1219 .inc();
1220 }
1221 cache.lru.clear();
1222 cache.next_block_height = next_block_height;
1223 } else {
1224 debug!(
1225 "Query cache for chain {chain_id} was already invalidated:\
1226 {next_block_height} <= {}",
1227 cache.next_block_height
1228 );
1229 }
1230 }
1231}
1232
1233pub struct NodeService<C>
1236where
1237 C: ClientContext + 'static,
1238{
1239 config: ChainListenerConfig,
1240 port: NonZeroU16,
1241 #[cfg(with_metrics)]
1242 metrics_port: NonZeroU16,
1243 default_chain: Option<ChainId>,
1244 context: Arc<Mutex<C>>,
1245 read_only: bool,
1247 query_cache: Option<Arc<QueryResponseCache>>,
1249 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1250 cancellation_token: CancellationToken,
1251}
1252
1253impl<C> Clone for NodeService<C>
1254where
1255 C: ClientContext + 'static,
1256{
1257 fn clone(&self) -> Self {
1258 Self {
1259 config: self.config.clone(),
1260 port: self.port,
1261 #[cfg(with_metrics)]
1262 metrics_port: self.metrics_port,
1263 default_chain: self.default_chain,
1264 context: Arc::clone(&self.context),
1265 read_only: self.read_only,
1266 query_cache: self.query_cache.clone(),
1267 query_subscriptions: self.query_subscriptions.clone(),
1268 cancellation_token: self.cancellation_token.clone(),
1269 }
1270 }
1271}
1272
1273impl<C> NodeService<C>
1274where
1275 C: ClientContext,
1276{
1277 #[allow(clippy::too_many_arguments)]
1283 pub fn new(
1284 config: ChainListenerConfig,
1285 port: NonZeroU16,
1286 #[cfg(with_metrics)] metrics_port: NonZeroU16,
1287 default_chain: Option<ChainId>,
1288 context: Arc<Mutex<C>>,
1289 read_only: bool,
1290 query_cache_size: Option<usize>,
1291 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1292 cancellation_token: CancellationToken,
1293 ) -> Self {
1294 let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1295 Self {
1296 config,
1297 port,
1298 #[cfg(with_metrics)]
1299 metrics_port,
1300 default_chain,
1301 context,
1302 read_only,
1303 query_cache,
1304 query_subscriptions,
1305 cancellation_token,
1306 }
1307 }
1308
1309 #[cfg(with_metrics)]
1310 pub fn metrics_address(&self) -> SocketAddr {
1311 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1312 }
1313
1314 pub fn schema(&self) -> NodeServiceSchema<C> {
1315 let query = QueryRoot {
1316 context: Arc::clone(&self.context),
1317 port: self.port,
1318 default_chain: self.default_chain,
1319 };
1320 let subscription = SubscriptionRoot {
1321 context: Arc::clone(&self.context),
1322 query_subscriptions: self.query_subscriptions.clone(),
1323 cancellation_token: self.cancellation_token.clone(),
1324 };
1325
1326 if self.read_only {
1327 NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1328 } else {
1329 NodeServiceSchema::Full(
1330 Schema::build(
1331 query,
1332 MutationRoot {
1333 context: Arc::clone(&self.context),
1334 },
1335 subscription,
1336 )
1337 .finish(),
1338 )
1339 }
1340 }
1341
1342 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1344 pub async fn run(
1345 self,
1346 cancellation_token: CancellationToken,
1347 command_receiver: UnboundedReceiver<ListenerCommand>,
1348 ) -> Result<(), anyhow::Error> {
1349 let port = self.port.get();
1350 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1351 let application_handler =
1352 axum::routing::get(util::graphiql).post(Self::application_handler);
1353
1354 #[cfg(with_metrics)]
1355 monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1356
1357 let base_router = Router::new()
1358 .route("/", index_handler)
1359 .route(
1360 "/chains/{chain_id}/applications/{application_id}",
1361 application_handler,
1362 )
1363 .route("/ready", axum::routing::get(|| async { "ready!" }));
1364
1365 let app = match self.schema() {
1367 NodeServiceSchema::Full(schema) => {
1368 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1369 }
1370 NodeServiceSchema::ReadOnly(schema) => {
1371 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1372 }
1373 }
1374 .layer(Extension(self.clone()))
1375 .layer(CorsLayer::permissive());
1377
1378 info!("GraphiQL IDE: http://localhost:{}", port);
1379
1380 if let Some(cache) = &self.query_cache {
1382 let guard = self.context.lock().await;
1383 let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1384 let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1385 guard.client().subscribe_extra(chain_ids.clone(), &tx);
1386 cache.mark_all_subscribed(&chain_ids);
1387 cache.set_notification_sender(tx);
1388 drop(guard);
1389 let cache = Arc::clone(cache);
1390 tokio::spawn(async move {
1391 while let Some(notification) = receiver.recv().await {
1392 if let Reason::NewBlock { height, .. } = notification.reason {
1393 let next_block_height = height
1394 .try_add_one()
1395 .expect("block height should not overflow");
1396 cache.invalidate_chain(¬ification.chain_id, next_block_height);
1397 }
1398 }
1399 });
1400 }
1401
1402 let storage = self.context.lock().await.storage().clone();
1403
1404 let chain_listener = ChainListener::new(
1405 self.config,
1406 self.context,
1407 storage,
1408 cancellation_token.clone(),
1409 command_receiver,
1410 true,
1411 )
1412 .run()
1413 .await?;
1414 let mut chain_listener = Box::pin(chain_listener).fuse();
1415 let tcp_listener =
1416 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1417 let server = axum::serve(tcp_listener, app)
1418 .with_graceful_shutdown(cancellation_token.cancelled_owned())
1419 .into_future();
1420 futures::select! {
1421 result = chain_listener => result?,
1422 result = Box::pin(server).fuse() => result?,
1423 };
1424
1425 Ok(())
1426 }
1427
1428 async fn handle_service_request(
1430 &self,
1431 application_id: ApplicationId,
1432 request: Vec<u8>,
1433 chain_id: ChainId,
1434 block_hash: Option<CryptoHash>,
1435 ) -> Result<Vec<u8>, NodeServiceError> {
1436 let cache = block_hash
1438 .is_none()
1439 .then_some(self.query_cache.as_ref())
1440 .flatten();
1441
1442 if let Some(cache) = cache {
1444 if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1445 return Ok(cached);
1446 }
1447 }
1448
1449 let (
1450 QueryOutcome {
1451 response,
1452 operations,
1453 },
1454 block_height,
1455 ) = self
1456 .query_user_application(application_id, request.clone(), chain_id, block_hash)
1457 .await?;
1458 if operations.is_empty() {
1459 if let Some(cache) = cache {
1460 if cache.needs_subscription(&chain_id) {
1462 if let Some(sender) = cache.notification_sender() {
1463 self.context
1464 .lock()
1465 .await
1466 .client()
1467 .subscribe_extra(vec![chain_id], &sender);
1468 cache.mark_subscribed(chain_id);
1469 }
1470 }
1471 cache.insert(
1472 chain_id,
1473 application_id,
1474 request,
1475 response.clone(),
1476 block_height,
1477 );
1478 }
1479 return Ok(response);
1480 }
1481
1482 if self.read_only {
1483 return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1484 }
1485
1486 trace!("Query requested a new block with operations: {operations:?}");
1487 let client = self
1488 .context
1489 .lock()
1490 .await
1491 .make_chain_client(chain_id)
1492 .await?;
1493 let hash = loop {
1494 let timeout = match client
1495 .execute_operations(operations.clone(), vec![])
1496 .await?
1497 {
1498 ClientOutcome::Committed(certificate) => break certificate.hash(),
1499 ClientOutcome::Conflict(certificate) => {
1500 return Err(chain_client::Error::Conflict(certificate.hash()).into());
1501 }
1502 ClientOutcome::WaitForTimeout(timeout) => timeout,
1503 };
1504 let mut stream = client.subscribe().map_err(|_| {
1505 chain_client::Error::InternalError("Could not subscribe to the local node.")
1506 })?;
1507 util::wait_for_next_round(&mut stream, timeout).await;
1508 };
1509 let response = async_graphql::Response::new(hash.to_value());
1510 Ok(serde_json::to_vec(&response)?)
1511 }
1512
1513 async fn query_user_application(
1516 &self,
1517 application_id: ApplicationId,
1518 bytes: Vec<u8>,
1519 chain_id: ChainId,
1520 block_hash: Option<CryptoHash>,
1521 ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1522 let query = Query::User {
1523 application_id,
1524 bytes,
1525 };
1526 let client = self
1527 .context
1528 .lock()
1529 .await
1530 .make_chain_client(chain_id)
1531 .await?;
1532 let (
1533 QueryOutcome {
1534 response,
1535 operations,
1536 },
1537 next_block_height,
1538 ) = client.query_application(query, block_hash).await?;
1539 match response {
1540 QueryResponse::System(_) => {
1541 unreachable!("cannot get a system response for a user query")
1542 }
1543 QueryResponse::User(user_response_bytes) => Ok((
1544 QueryOutcome {
1545 response: user_response_bytes,
1546 operations,
1547 },
1548 next_block_height,
1549 )),
1550 }
1551 }
1552
1553 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1555 service
1556 .0
1557 .schema()
1558 .execute(request.into_inner())
1559 .await
1560 .into()
1561 }
1562
1563 async fn application_handler(
1567 Path((chain_id, application_id)): Path<(String, String)>,
1568 service: Extension<Self>,
1569 request: String,
1570 ) -> Result<Vec<u8>, NodeServiceError> {
1571 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1572 let application_id: ApplicationId = application_id.parse()?;
1573
1574 debug!(
1575 %chain_id,
1576 %application_id,
1577 "processing request for application:\n{:?}",
1578 &request
1579 );
1580 let response = service
1581 .0
1582 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1583 .await?;
1584
1585 Ok(response)
1586 }
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use linera_base::{
1592 crypto::CryptoHash,
1593 data_types::BlockHeight,
1594 identifiers::{ApplicationId, ChainId},
1595 };
1596
1597 use super::QueryResponseCache;
1598
1599 fn test_chain(n: u64) -> ChainId {
1600 ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1601 }
1602
1603 fn test_app(n: u64) -> ApplicationId {
1604 ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1605 }
1606
1607 #[test]
1608 fn cache_hit_and_miss() {
1609 let cache = QueryResponseCache::new(100);
1610 let chain = test_chain(0);
1611 let app = test_app(0);
1612 let request = b"query { balance }".to_vec();
1613 let response = b"{ \"balance\": 42 }".to_vec();
1614
1615 assert!(cache.get(chain, &app, &request).is_none());
1617
1618 cache.insert(
1620 chain,
1621 app,
1622 request.clone(),
1623 response.clone(),
1624 BlockHeight(1),
1625 );
1626
1627 assert_eq!(cache.get(chain, &app, &request), Some(response));
1629 }
1630
1631 #[test]
1632 fn per_chain_isolation() {
1633 let cache = QueryResponseCache::new(100);
1634 let chain_a = test_chain(0);
1635 let chain_b = test_chain(1);
1636 let app = test_app(0);
1637 let request = b"q".to_vec();
1638 let response = b"r".to_vec();
1639
1640 cache.insert(
1641 chain_a,
1642 app,
1643 request.clone(),
1644 response.clone(),
1645 BlockHeight(1),
1646 );
1647
1648 cache.invalidate_chain(&chain_b, BlockHeight(1));
1650 assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1651 }
1652
1653 #[test]
1654 fn invalidation_clears_all_entries() {
1655 let cache = QueryResponseCache::new(100);
1656 let chain = test_chain(0);
1657 let app = test_app(0);
1658
1659 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1660 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1661
1662 cache.invalidate_chain(&chain, BlockHeight(2));
1663 assert!(cache.get(chain, &app, b"q1").is_none());
1664 assert!(cache.get(chain, &app, b"q2").is_none());
1665 }
1666
1667 #[test]
1668 fn lru_eviction() {
1669 let cache = QueryResponseCache::new(2);
1670 let chain = test_chain(0);
1671 let app = test_app(0);
1672
1673 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1674 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1675 cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1677
1678 assert!(cache.get(chain, &app, b"q1").is_none());
1679 assert!(cache.get(chain, &app, b"q2").is_some());
1680 assert!(cache.get(chain, &app, b"q3").is_some());
1681 }
1682
1683 #[test]
1684 fn stale_insert_rejected_after_invalidation() {
1685 let cache = QueryResponseCache::new(100);
1686 let chain = test_chain(0);
1687 let app = test_app(0);
1688
1689 cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1691 let stale_height = BlockHeight(3);
1692
1693 cache.invalidate_chain(&chain, BlockHeight(4));
1695
1696 cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1698
1699 assert!(cache.get(chain, &app, b"q").is_none());
1701 }
1702}