linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6#[cfg(not(web))]
7use std::time::Duration;
8
9use custom_debug_derive::Debug;
10use futures::{channel::mpsc, StreamExt as _};
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    data_types::{
15        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20};
21use linera_views::{batch::Batch, context::Context, views::View};
22use oneshot::Sender;
23use reqwest::{header::HeaderMap, Client, Url};
24
25use crate::{
26    system::{CreateApplicationResult, OpenChainConfig, Recipient},
27    util::RespondExt,
28    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
29    ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
30    UserContractCode, UserServiceCode,
31};
32
33#[cfg(with_metrics)]
34mod metrics {
35    use std::sync::LazyLock;
36
37    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
38    use prometheus::HistogramVec;
39
40    /// Histogram of the latency to load a contract bytecode.
41    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42        register_histogram_vec(
43            "load_contract_latency",
44            "Load contract latency",
45            &[],
46            exponential_bucket_latencies(250.0),
47        )
48    });
49
50    /// Histogram of the latency to load a service bytecode.
51    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
52        register_histogram_vec(
53            "load_service_latency",
54            "Load service latency",
55            &[],
56            exponential_bucket_latencies(250.0),
57        )
58    });
59}
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65    C: Context + Clone + Send + Sync + 'static,
66    C::Extra: ExecutionRuntimeContext,
67{
68    pub(crate) async fn load_contract(
69        &mut self,
70        id: ApplicationId,
71        txn_tracker: &mut TransactionTracker,
72    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73        #[cfg(with_metrics)]
74        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
75        let blob_id = id.description_blob_id();
76        let description = match txn_tracker.created_blobs().get(&blob_id) {
77            Some(blob) => bcs::from_bytes(blob.bytes())?,
78            None => self.system.describe_application(id, txn_tracker).await?,
79        };
80        let code = self
81            .context()
82            .extra()
83            .get_user_contract(&description)
84            .await?;
85        Ok((code, description))
86    }
87
88    pub(crate) async fn load_service(
89        &mut self,
90        id: ApplicationId,
91        txn_tracker: &mut TransactionTracker,
92    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
93        #[cfg(with_metrics)]
94        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
95        let blob_id = id.description_blob_id();
96        let description = match txn_tracker.created_blobs().get(&blob_id) {
97            Some(blob) => bcs::from_bytes(blob.bytes())?,
98            None => self.system.describe_application(id, txn_tracker).await?,
99        };
100        let code = self
101            .context()
102            .extra()
103            .get_user_service(&description)
104            .await?;
105        Ok((code, description))
106    }
107
108    // TODO(#1416): Support concurrent I/O.
109    pub(crate) async fn handle_request(
110        &mut self,
111        request: ExecutionRequest,
112        resource_controller: &mut ResourceController<Option<AccountOwner>>,
113    ) -> Result<(), ExecutionError> {
114        use ExecutionRequest::*;
115        match request {
116            #[cfg(not(web))]
117            LoadContract {
118                id,
119                callback,
120                mut txn_tracker,
121            } => {
122                let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
123                callback.respond((code, description, txn_tracker))
124            }
125            #[cfg(not(web))]
126            LoadService {
127                id,
128                callback,
129                mut txn_tracker,
130            } => {
131                let (code, description) = self.load_service(id, &mut txn_tracker).await?;
132                callback.respond((code, description, txn_tracker))
133            }
134
135            ChainBalance { callback } => {
136                let balance = *self.system.balance.get();
137                callback.respond(balance);
138            }
139
140            OwnerBalance { owner, callback } => {
141                let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
142                callback.respond(balance);
143            }
144
145            OwnerBalances { callback } => {
146                let balances = self.system.balances.index_values().await?;
147                callback.respond(balances.into_iter().collect());
148            }
149
150            BalanceOwners { callback } => {
151                let owners = self.system.balances.indices().await?;
152                callback.respond(owners);
153            }
154
155            Transfer {
156                source,
157                destination,
158                amount,
159                signer,
160                application_id,
161                callback,
162            } => callback.respond(
163                self.system
164                    .transfer(
165                        signer,
166                        Some(application_id),
167                        source,
168                        Recipient::Account(destination),
169                        amount,
170                    )
171                    .await?,
172            ),
173
174            Claim {
175                source,
176                destination,
177                amount,
178                signer,
179                application_id,
180                callback,
181            } => callback.respond(
182                self.system
183                    .claim(
184                        signer,
185                        Some(application_id),
186                        source.owner,
187                        source.chain_id,
188                        Recipient::Account(destination),
189                        amount,
190                    )
191                    .await?,
192            ),
193
194            SystemTimestamp { callback } => {
195                let timestamp = *self.system.timestamp.get();
196                callback.respond(timestamp);
197            }
198
199            ChainOwnership { callback } => {
200                let ownership = self.system.ownership.get().clone();
201                callback.respond(ownership);
202            }
203
204            ContainsKey { id, key, callback } => {
205                let view = self.users.try_load_entry(&id).await?;
206                let result = match view {
207                    Some(view) => view.contains_key(&key).await?,
208                    None => false,
209                };
210                callback.respond(result);
211            }
212
213            ContainsKeys { id, keys, callback } => {
214                let view = self.users.try_load_entry(&id).await?;
215                let result = match view {
216                    Some(view) => view.contains_keys(keys).await?,
217                    None => vec![false; keys.len()],
218                };
219                callback.respond(result);
220            }
221
222            ReadMultiValuesBytes { id, keys, callback } => {
223                let view = self.users.try_load_entry(&id).await?;
224                let values = match view {
225                    Some(view) => view.multi_get(keys).await?,
226                    None => vec![None; keys.len()],
227                };
228                callback.respond(values);
229            }
230
231            ReadValueBytes { id, key, callback } => {
232                let view = self.users.try_load_entry(&id).await?;
233                let result = match view {
234                    Some(view) => view.get(&key).await?,
235                    None => None,
236                };
237                callback.respond(result);
238            }
239
240            FindKeysByPrefix {
241                id,
242                key_prefix,
243                callback,
244            } => {
245                let view = self.users.try_load_entry(&id).await?;
246                let result = match view {
247                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
248                    None => Vec::new(),
249                };
250                callback.respond(result);
251            }
252
253            FindKeyValuesByPrefix {
254                id,
255                key_prefix,
256                callback,
257            } => {
258                let view = self.users.try_load_entry(&id).await?;
259                let result = match view {
260                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
261                    None => Vec::new(),
262                };
263                callback.respond(result);
264            }
265
266            WriteBatch {
267                id,
268                batch,
269                callback,
270            } => {
271                let mut view = self.users.try_load_entry_mut(&id).await?;
272                view.write_batch(batch).await?;
273                callback.respond(());
274            }
275
276            OpenChain {
277                ownership,
278                balance,
279                parent_id,
280                block_height,
281                application_permissions,
282                timestamp,
283                callback,
284                mut txn_tracker,
285            } => {
286                let config = OpenChainConfig {
287                    ownership,
288                    balance,
289                    application_permissions,
290                };
291                let chain_id = self
292                    .system
293                    .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
294                    .await?;
295                callback.respond((chain_id, txn_tracker));
296            }
297
298            CloseChain {
299                application_id,
300                callback,
301            } => {
302                let app_permissions = self.system.application_permissions.get();
303                if !app_permissions.can_close_chain(&application_id) {
304                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
305                } else {
306                    self.system.close_chain().await?;
307                    callback.respond(Ok(()));
308                }
309            }
310
311            ChangeApplicationPermissions {
312                application_id,
313                application_permissions,
314                callback,
315            } => {
316                let app_permissions = self.system.application_permissions.get();
317                if !app_permissions.can_change_application_permissions(&application_id) {
318                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
319                } else {
320                    self.system
321                        .application_permissions
322                        .set(application_permissions);
323                    callback.respond(Ok(()));
324                }
325            }
326
327            CreateApplication {
328                chain_id,
329                block_height,
330                module_id,
331                parameters,
332                required_application_ids,
333                callback,
334                txn_tracker,
335            } => {
336                let create_application_result = self
337                    .system
338                    .create_application(
339                        chain_id,
340                        block_height,
341                        module_id,
342                        parameters,
343                        required_application_ids,
344                        txn_tracker,
345                    )
346                    .await?;
347                callback.respond(Ok(create_application_result));
348            }
349
350            PerformHttpRequest {
351                request,
352                http_responses_are_oracle_responses,
353                callback,
354            } => {
355                let headers = request
356                    .headers
357                    .into_iter()
358                    .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
359                    .collect::<Result<HeaderMap, ExecutionError>>()?;
360
361                let url = Url::parse(&request.url)?;
362                let host = url
363                    .host_str()
364                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
365
366                let (_epoch, committee) = self
367                    .system
368                    .current_committee()
369                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
370                let allowed_hosts = &committee.policy().http_request_allow_list;
371
372                ensure!(
373                    allowed_hosts.contains(host),
374                    ExecutionError::UnauthorizedHttpRequest(url)
375                );
376
377                #[cfg_attr(web, allow(unused_mut))]
378                let mut request = Client::new()
379                    .request(request.method.into(), url)
380                    .body(request.body)
381                    .headers(headers);
382                #[cfg(not(web))]
383                {
384                    request = request.timeout(Duration::from_millis(
385                        committee.policy().http_request_timeout_ms,
386                    ));
387                }
388
389                let response = request.send().await?;
390
391                let mut response_size_limit = committee.policy().maximum_http_response_bytes;
392
393                if http_responses_are_oracle_responses {
394                    response_size_limit =
395                        response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
396                }
397
398                callback.respond(
399                    self.receive_http_response(response, response_size_limit)
400                        .await?,
401                );
402            }
403
404            ReadBlobContent { blob_id, callback } => {
405                let blob = self.system.read_blob_content(blob_id).await?;
406                if blob_id.blob_type == BlobType::Data {
407                    resource_controller
408                        .with_state(&mut self.system)
409                        .await?
410                        .track_blob_read(blob.bytes().len() as u64)?;
411                }
412                let is_new = self
413                    .system
414                    .blob_used(&mut TransactionTracker::default(), blob_id)
415                    .await?;
416                callback.respond((blob, is_new))
417            }
418
419            AssertBlobExists { blob_id, callback } => {
420                self.system.assert_blob_exists(blob_id).await?;
421                // Treating this as reading a size-0 blob for fee purposes.
422                if blob_id.blob_type == BlobType::Data {
423                    resource_controller
424                        .with_state(&mut self.system)
425                        .await?
426                        .track_blob_read(0)?;
427                }
428                callback.respond(
429                    self.system
430                        .blob_used(&mut TransactionTracker::default(), blob_id)
431                        .await?,
432                )
433            }
434
435            NextEventIndex {
436                stream_id,
437                callback,
438            } => {
439                let count = self
440                    .stream_event_counts
441                    .get_mut_or_default(&stream_id)
442                    .await?;
443                let index = *count;
444                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
445                callback.respond(index)
446            }
447
448            ReadEvent { event_id, callback } => {
449                let event = self.context().extra().get_event(event_id.clone()).await?;
450                let event = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
451                callback.respond(event);
452            }
453
454            SubscribeToEvents {
455                chain_id,
456                stream_id,
457                subscriber_app_id,
458                callback,
459            } => {
460                let subscriptions = self
461                    .system
462                    .event_subscriptions
463                    .get_mut_or_default(&(chain_id, stream_id))
464                    .await?;
465                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
466                    subscriptions.next_index
467                } else {
468                    0
469                };
470                callback.respond(next_index);
471            }
472
473            UnsubscribeFromEvents {
474                chain_id,
475                stream_id,
476                subscriber_app_id,
477                callback,
478            } => {
479                let key = (chain_id, stream_id);
480                let subscriptions = self
481                    .system
482                    .event_subscriptions
483                    .get_mut_or_default(&key)
484                    .await?;
485                subscriptions.applications.remove(&subscriber_app_id);
486                if subscriptions.applications.is_empty() {
487                    self.system.event_subscriptions.remove(&key)?;
488                }
489                callback.respond(());
490            }
491
492            GetApplicationPermissions { callback } => {
493                let app_permissions = self.system.application_permissions.get();
494                callback.respond(app_permissions.clone());
495            }
496        }
497
498        Ok(())
499    }
500}
501
502impl<C> ExecutionStateView<C>
503where
504    C: Context + Clone + Send + Sync + 'static,
505    C::Extra: ExecutionRuntimeContext,
506{
507    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
508    ///
509    /// Ensures that the response does not exceed the provided `size_limit`.
510    async fn receive_http_response(
511        &mut self,
512        response: reqwest::Response,
513        size_limit: u64,
514    ) -> Result<http::Response, ExecutionError> {
515        let status = response.status().as_u16();
516        let maybe_content_length = response.content_length();
517
518        let headers = response
519            .headers()
520            .iter()
521            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
522            .collect::<Vec<_>>();
523
524        let total_header_size = headers
525            .iter()
526            .map(|header| (header.name.len() + header.value.len()) as u64)
527            .sum();
528
529        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
530            ExecutionError::HttpResponseSizeLimitExceeded {
531                limit: size_limit,
532                size: total_header_size,
533            },
534        )?;
535
536        if let Some(content_length) = maybe_content_length {
537            if content_length > remaining_bytes {
538                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
539                    limit: size_limit,
540                    size: content_length + total_header_size,
541                });
542            }
543        }
544
545        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
546        let mut body_stream = response.bytes_stream();
547
548        while let Some(bytes) = body_stream.next().await.transpose()? {
549            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
550                ExecutionError::HttpResponseSizeLimitExceeded {
551                    limit: size_limit,
552                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
553                },
554            )?;
555
556            body.extend(&bytes);
557        }
558
559        Ok(http::Response {
560            status,
561            headers,
562            body,
563        })
564    }
565}
566
567/// Requests to the execution state.
568#[derive(Debug)]
569pub enum ExecutionRequest {
570    #[cfg(not(web))]
571    LoadContract {
572        id: ApplicationId,
573        #[debug(skip)]
574        callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
575        #[debug(skip)]
576        txn_tracker: TransactionTracker,
577    },
578
579    #[cfg(not(web))]
580    LoadService {
581        id: ApplicationId,
582        #[debug(skip)]
583        callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
584        #[debug(skip)]
585        txn_tracker: TransactionTracker,
586    },
587
588    ChainBalance {
589        #[debug(skip)]
590        callback: Sender<Amount>,
591    },
592
593    OwnerBalance {
594        owner: AccountOwner,
595        #[debug(skip)]
596        callback: Sender<Amount>,
597    },
598
599    OwnerBalances {
600        #[debug(skip)]
601        callback: Sender<Vec<(AccountOwner, Amount)>>,
602    },
603
604    BalanceOwners {
605        #[debug(skip)]
606        callback: Sender<Vec<AccountOwner>>,
607    },
608
609    Transfer {
610        source: AccountOwner,
611        destination: Account,
612        amount: Amount,
613        #[debug(skip_if = Option::is_none)]
614        signer: Option<AccountOwner>,
615        application_id: ApplicationId,
616        #[debug(skip)]
617        callback: Sender<Option<OutgoingMessage>>,
618    },
619
620    Claim {
621        source: Account,
622        destination: Account,
623        amount: Amount,
624        #[debug(skip_if = Option::is_none)]
625        signer: Option<AccountOwner>,
626        application_id: ApplicationId,
627        #[debug(skip)]
628        callback: Sender<OutgoingMessage>,
629    },
630
631    SystemTimestamp {
632        #[debug(skip)]
633        callback: Sender<Timestamp>,
634    },
635
636    ChainOwnership {
637        #[debug(skip)]
638        callback: Sender<ChainOwnership>,
639    },
640
641    ReadValueBytes {
642        id: ApplicationId,
643        #[debug(with = hex_debug)]
644        key: Vec<u8>,
645        #[debug(skip)]
646        callback: Sender<Option<Vec<u8>>>,
647    },
648
649    ContainsKey {
650        id: ApplicationId,
651        key: Vec<u8>,
652        #[debug(skip)]
653        callback: Sender<bool>,
654    },
655
656    ContainsKeys {
657        id: ApplicationId,
658        #[debug(with = hex_vec_debug)]
659        keys: Vec<Vec<u8>>,
660        callback: Sender<Vec<bool>>,
661    },
662
663    ReadMultiValuesBytes {
664        id: ApplicationId,
665        #[debug(with = hex_vec_debug)]
666        keys: Vec<Vec<u8>>,
667        #[debug(skip)]
668        callback: Sender<Vec<Option<Vec<u8>>>>,
669    },
670
671    FindKeysByPrefix {
672        id: ApplicationId,
673        #[debug(with = hex_debug)]
674        key_prefix: Vec<u8>,
675        #[debug(skip)]
676        callback: Sender<Vec<Vec<u8>>>,
677    },
678
679    FindKeyValuesByPrefix {
680        id: ApplicationId,
681        #[debug(with = hex_debug)]
682        key_prefix: Vec<u8>,
683        #[debug(skip)]
684        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
685    },
686
687    WriteBatch {
688        id: ApplicationId,
689        batch: Batch,
690        #[debug(skip)]
691        callback: Sender<()>,
692    },
693
694    OpenChain {
695        ownership: ChainOwnership,
696        #[debug(skip_if = Amount::is_zero)]
697        balance: Amount,
698        parent_id: ChainId,
699        block_height: BlockHeight,
700        application_permissions: ApplicationPermissions,
701        timestamp: Timestamp,
702        #[debug(skip)]
703        txn_tracker: TransactionTracker,
704        #[debug(skip)]
705        callback: Sender<(ChainId, TransactionTracker)>,
706    },
707
708    CloseChain {
709        application_id: ApplicationId,
710        #[debug(skip)]
711        callback: Sender<Result<(), ExecutionError>>,
712    },
713
714    ChangeApplicationPermissions {
715        application_id: ApplicationId,
716        application_permissions: ApplicationPermissions,
717        #[debug(skip)]
718        callback: Sender<Result<(), ExecutionError>>,
719    },
720
721    CreateApplication {
722        chain_id: ChainId,
723        block_height: BlockHeight,
724        module_id: ModuleId,
725        parameters: Vec<u8>,
726        required_application_ids: Vec<ApplicationId>,
727        #[debug(skip)]
728        txn_tracker: TransactionTracker,
729        #[debug(skip)]
730        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
731    },
732
733    PerformHttpRequest {
734        request: http::Request,
735        http_responses_are_oracle_responses: bool,
736        #[debug(skip)]
737        callback: Sender<http::Response>,
738    },
739
740    ReadBlobContent {
741        blob_id: BlobId,
742        #[debug(skip)]
743        callback: Sender<(BlobContent, bool)>,
744    },
745
746    AssertBlobExists {
747        blob_id: BlobId,
748        #[debug(skip)]
749        callback: Sender<bool>,
750    },
751
752    NextEventIndex {
753        stream_id: StreamId,
754        #[debug(skip)]
755        callback: Sender<u32>,
756    },
757
758    ReadEvent {
759        event_id: EventId,
760        callback: oneshot::Sender<Vec<u8>>,
761    },
762
763    SubscribeToEvents {
764        chain_id: ChainId,
765        stream_id: StreamId,
766        subscriber_app_id: ApplicationId,
767        #[debug(skip)]
768        callback: Sender<u32>,
769    },
770
771    UnsubscribeFromEvents {
772        chain_id: ChainId,
773        stream_id: StreamId,
774        subscriber_app_id: ApplicationId,
775        #[debug(skip)]
776        callback: Sender<()>,
777    },
778
779    GetApplicationPermissions {
780        #[debug(skip)]
781        callback: Sender<ApplicationPermissions>,
782    },
783}