linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6#[cfg(not(web))]
7use std::time::Duration;
8
9use custom_debug_derive::Debug;
10use futures::{channel::mpsc, StreamExt as _};
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14    data_types::{
15        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
16    },
17    ensure, hex_debug, hex_vec_debug, http,
18    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19    ownership::ChainOwnership,
20};
21use linera_views::{batch::Batch, context::Context, views::View};
22use oneshot::Sender;
23use reqwest::{header::HeaderMap, Client, Url};
24
25use crate::{
26    system::{CreateApplicationResult, OpenChainConfig, Recipient},
27    util::RespondExt,
28    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
29    ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
30    UserContractCode, UserServiceCode,
31};
32
33#[cfg(with_metrics)]
34mod metrics {
35    use std::sync::LazyLock;
36
37    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
38    use prometheus::HistogramVec;
39
40    /// Histogram of the latency to load a contract bytecode.
41    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42        register_histogram_vec(
43            "load_contract_latency",
44            "Load contract latency",
45            &[],
46            exponential_bucket_latencies(250.0),
47        )
48    });
49
50    /// Histogram of the latency to load a service bytecode.
51    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
52        register_histogram_vec(
53            "load_service_latency",
54            "Load service latency",
55            &[],
56            exponential_bucket_latencies(250.0),
57        )
58    });
59}
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65    C: Context + Clone + Send + Sync + 'static,
66    C::Extra: ExecutionRuntimeContext,
67{
68    pub(crate) async fn load_contract(
69        &mut self,
70        id: ApplicationId,
71        txn_tracker: &mut TransactionTracker,
72    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73        #[cfg(with_metrics)]
74        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
75        let blob_id = id.description_blob_id();
76        let description = match txn_tracker.created_blobs().get(&blob_id) {
77            Some(description) => {
78                let blob = description.clone();
79                bcs::from_bytes(blob.bytes())?
80            }
81            None => self.system.describe_application(id, txn_tracker).await?,
82        };
83        let code = self
84            .context()
85            .extra()
86            .get_user_contract(&description)
87            .await?;
88        Ok((code, description))
89    }
90
91    pub(crate) async fn load_service(
92        &mut self,
93        id: ApplicationId,
94        txn_tracker: &mut TransactionTracker,
95    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
96        #[cfg(with_metrics)]
97        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
98        let blob_id = id.description_blob_id();
99        let description = match txn_tracker.created_blobs().get(&blob_id) {
100            Some(description) => {
101                let blob = description.clone();
102                bcs::from_bytes(blob.bytes())?
103            }
104            None => self.system.describe_application(id, txn_tracker).await?,
105        };
106        let code = self
107            .context()
108            .extra()
109            .get_user_service(&description)
110            .await?;
111        Ok((code, description))
112    }
113
114    // TODO(#1416): Support concurrent I/O.
115    pub(crate) async fn handle_request(
116        &mut self,
117        request: ExecutionRequest,
118        resource_controller: &mut ResourceController<Option<AccountOwner>>,
119    ) -> Result<(), ExecutionError> {
120        use ExecutionRequest::*;
121        match request {
122            #[cfg(not(web))]
123            LoadContract {
124                id,
125                callback,
126                mut txn_tracker,
127            } => {
128                let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
129                callback.respond((code, description, txn_tracker))
130            }
131            #[cfg(not(web))]
132            LoadService {
133                id,
134                callback,
135                mut txn_tracker,
136            } => {
137                let (code, description) = self.load_service(id, &mut txn_tracker).await?;
138                callback.respond((code, description, txn_tracker))
139            }
140
141            ChainBalance { callback } => {
142                let balance = *self.system.balance.get();
143                callback.respond(balance);
144            }
145
146            OwnerBalance { owner, callback } => {
147                let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
148                callback.respond(balance);
149            }
150
151            OwnerBalances { callback } => {
152                let balances = self.system.balances.index_values().await?;
153                callback.respond(balances.into_iter().collect());
154            }
155
156            BalanceOwners { callback } => {
157                let owners = self.system.balances.indices().await?;
158                callback.respond(owners);
159            }
160
161            Transfer {
162                source,
163                destination,
164                amount,
165                signer,
166                application_id,
167                callback,
168            } => callback.respond(
169                self.system
170                    .transfer(
171                        signer,
172                        Some(application_id),
173                        source,
174                        Recipient::Account(destination),
175                        amount,
176                    )
177                    .await?,
178            ),
179
180            Claim {
181                source,
182                destination,
183                amount,
184                signer,
185                application_id,
186                callback,
187            } => callback.respond(
188                self.system
189                    .claim(
190                        signer,
191                        Some(application_id),
192                        source.owner,
193                        source.chain_id,
194                        Recipient::Account(destination),
195                        amount,
196                    )
197                    .await?,
198            ),
199
200            SystemTimestamp { callback } => {
201                let timestamp = *self.system.timestamp.get();
202                callback.respond(timestamp);
203            }
204
205            ChainOwnership { callback } => {
206                let ownership = self.system.ownership.get().clone();
207                callback.respond(ownership);
208            }
209
210            ContainsKey { id, key, callback } => {
211                let view = self.users.try_load_entry(&id).await?;
212                let result = match view {
213                    Some(view) => view.contains_key(&key).await?,
214                    None => false,
215                };
216                callback.respond(result);
217            }
218
219            ContainsKeys { id, keys, callback } => {
220                let view = self.users.try_load_entry(&id).await?;
221                let result = match view {
222                    Some(view) => view.contains_keys(keys).await?,
223                    None => vec![false; keys.len()],
224                };
225                callback.respond(result);
226            }
227
228            ReadMultiValuesBytes { id, keys, callback } => {
229                let view = self.users.try_load_entry(&id).await?;
230                let values = match view {
231                    Some(view) => view.multi_get(keys).await?,
232                    None => vec![None; keys.len()],
233                };
234                callback.respond(values);
235            }
236
237            ReadValueBytes { id, key, callback } => {
238                let view = self.users.try_load_entry(&id).await?;
239                let result = match view {
240                    Some(view) => view.get(&key).await?,
241                    None => None,
242                };
243                callback.respond(result);
244            }
245
246            FindKeysByPrefix {
247                id,
248                key_prefix,
249                callback,
250            } => {
251                let view = self.users.try_load_entry(&id).await?;
252                let result = match view {
253                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
254                    None => Vec::new(),
255                };
256                callback.respond(result);
257            }
258
259            FindKeyValuesByPrefix {
260                id,
261                key_prefix,
262                callback,
263            } => {
264                let view = self.users.try_load_entry(&id).await?;
265                let result = match view {
266                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
267                    None => Vec::new(),
268                };
269                callback.respond(result);
270            }
271
272            WriteBatch {
273                id,
274                batch,
275                callback,
276            } => {
277                let mut view = self.users.try_load_entry_mut(&id).await?;
278                view.write_batch(batch).await?;
279                callback.respond(());
280            }
281
282            OpenChain {
283                ownership,
284                balance,
285                parent_id,
286                block_height,
287                application_permissions,
288                timestamp,
289                callback,
290                mut txn_tracker,
291            } => {
292                let config = OpenChainConfig {
293                    ownership,
294                    balance,
295                    application_permissions,
296                };
297                let chain_id = self
298                    .system
299                    .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
300                    .await?;
301                callback.respond((chain_id, txn_tracker));
302            }
303
304            CloseChain {
305                application_id,
306                callback,
307            } => {
308                let app_permissions = self.system.application_permissions.get();
309                if !app_permissions.can_close_chain(&application_id) {
310                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
311                } else {
312                    self.system.close_chain().await?;
313                    callback.respond(Ok(()));
314                }
315            }
316
317            ChangeApplicationPermissions {
318                application_id,
319                application_permissions,
320                callback,
321            } => {
322                let app_permissions = self.system.application_permissions.get();
323                if !app_permissions.can_change_application_permissions(&application_id) {
324                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
325                } else {
326                    self.system
327                        .application_permissions
328                        .set(application_permissions);
329                    callback.respond(Ok(()));
330                }
331            }
332
333            CreateApplication {
334                chain_id,
335                block_height,
336                module_id,
337                parameters,
338                required_application_ids,
339                callback,
340                txn_tracker,
341            } => {
342                let create_application_result = self
343                    .system
344                    .create_application(
345                        chain_id,
346                        block_height,
347                        module_id,
348                        parameters,
349                        required_application_ids,
350                        txn_tracker,
351                    )
352                    .await?;
353                callback.respond(Ok(create_application_result));
354            }
355
356            PerformHttpRequest {
357                request,
358                http_responses_are_oracle_responses,
359                callback,
360            } => {
361                let headers = request
362                    .headers
363                    .into_iter()
364                    .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
365                    .collect::<Result<HeaderMap, ExecutionError>>()?;
366
367                let url = Url::parse(&request.url)?;
368                let host = url
369                    .host_str()
370                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
371
372                let (_epoch, committee) = self
373                    .system
374                    .current_committee()
375                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
376                let allowed_hosts = &committee.policy().http_request_allow_list;
377
378                ensure!(
379                    allowed_hosts.contains(host),
380                    ExecutionError::UnauthorizedHttpRequest(url)
381                );
382
383                #[cfg_attr(web, allow(unused_mut))]
384                let mut request = Client::new()
385                    .request(request.method.into(), url)
386                    .body(request.body)
387                    .headers(headers);
388                #[cfg(not(web))]
389                {
390                    request = request.timeout(Duration::from_millis(
391                        committee.policy().http_request_timeout_ms,
392                    ));
393                }
394
395                let response = request.send().await?;
396
397                let mut response_size_limit = committee.policy().maximum_http_response_bytes;
398
399                if http_responses_are_oracle_responses {
400                    response_size_limit =
401                        response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
402                }
403
404                callback.respond(
405                    self.receive_http_response(response, response_size_limit)
406                        .await?,
407                );
408            }
409
410            ReadBlobContent { blob_id, callback } => {
411                let blob = self.system.read_blob_content(blob_id).await?;
412                if blob_id.blob_type == BlobType::Data {
413                    resource_controller
414                        .with_state(&mut self.system)
415                        .await?
416                        .track_blob_read(blob.bytes().len() as u64)?;
417                }
418                let is_new = self
419                    .system
420                    .blob_used(&mut TransactionTracker::default(), blob_id)
421                    .await?;
422                callback.respond((blob, is_new))
423            }
424
425            AssertBlobExists { blob_id, callback } => {
426                self.system.assert_blob_exists(blob_id).await?;
427                // Treating this as reading a size-0 blob for fee purposes.
428                if blob_id.blob_type == BlobType::Data {
429                    resource_controller
430                        .with_state(&mut self.system)
431                        .await?
432                        .track_blob_read(0)?;
433                }
434                callback.respond(
435                    self.system
436                        .blob_used(&mut TransactionTracker::default(), blob_id)
437                        .await?,
438                )
439            }
440
441            NextEventIndex {
442                stream_id,
443                callback,
444            } => {
445                let count = self
446                    .stream_event_counts
447                    .get_mut_or_default(&stream_id)
448                    .await?;
449                let index = *count;
450                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
451                callback.respond(index)
452            }
453
454            ReadEvent { event_id, callback } => {
455                let event = self.context().extra().get_event(event_id.clone()).await?;
456                let event = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
457                callback.respond(event);
458            }
459
460            SubscribeToEvents {
461                chain_id,
462                stream_id,
463                subscriber_app_id,
464                callback,
465            } => {
466                let subscriptions = self
467                    .system
468                    .event_subscriptions
469                    .get_mut_or_default(&(chain_id, stream_id))
470                    .await?;
471                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
472                    subscriptions.next_index
473                } else {
474                    0
475                };
476                callback.respond(next_index);
477            }
478
479            UnsubscribeFromEvents {
480                chain_id,
481                stream_id,
482                subscriber_app_id,
483                callback,
484            } => {
485                let key = (chain_id, stream_id);
486                let subscriptions = self
487                    .system
488                    .event_subscriptions
489                    .get_mut_or_default(&key)
490                    .await?;
491                subscriptions.applications.remove(&subscriber_app_id);
492                if subscriptions.applications.is_empty() {
493                    self.system.event_subscriptions.remove(&key)?;
494                }
495                callback.respond(());
496            }
497
498            GetApplicationPermissions { callback } => {
499                let app_permissions = self.system.application_permissions.get();
500                callback.respond(app_permissions.clone());
501            }
502        }
503
504        Ok(())
505    }
506}
507
508impl<C> ExecutionStateView<C>
509where
510    C: Context + Clone + Send + Sync + 'static,
511    C::Extra: ExecutionRuntimeContext,
512{
513    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
514    ///
515    /// Ensures that the response does not exceed the provided `size_limit`.
516    async fn receive_http_response(
517        &mut self,
518        response: reqwest::Response,
519        size_limit: u64,
520    ) -> Result<http::Response, ExecutionError> {
521        let status = response.status().as_u16();
522        let maybe_content_length = response.content_length();
523
524        let headers = response
525            .headers()
526            .iter()
527            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
528            .collect::<Vec<_>>();
529
530        let total_header_size = headers
531            .iter()
532            .map(|header| (header.name.len() + header.value.len()) as u64)
533            .sum();
534
535        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
536            ExecutionError::HttpResponseSizeLimitExceeded {
537                limit: size_limit,
538                size: total_header_size,
539            },
540        )?;
541
542        if let Some(content_length) = maybe_content_length {
543            if content_length > remaining_bytes {
544                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
545                    limit: size_limit,
546                    size: content_length + total_header_size,
547                });
548            }
549        }
550
551        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
552        let mut body_stream = response.bytes_stream();
553
554        while let Some(bytes) = body_stream.next().await.transpose()? {
555            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
556                ExecutionError::HttpResponseSizeLimitExceeded {
557                    limit: size_limit,
558                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
559                },
560            )?;
561
562            body.extend(&bytes);
563        }
564
565        Ok(http::Response {
566            status,
567            headers,
568            body,
569        })
570    }
571}
572
573/// Requests to the execution state.
574#[derive(Debug)]
575pub enum ExecutionRequest {
576    #[cfg(not(web))]
577    LoadContract {
578        id: ApplicationId,
579        #[debug(skip)]
580        callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
581        #[debug(skip)]
582        txn_tracker: TransactionTracker,
583    },
584
585    #[cfg(not(web))]
586    LoadService {
587        id: ApplicationId,
588        #[debug(skip)]
589        callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
590        #[debug(skip)]
591        txn_tracker: TransactionTracker,
592    },
593
594    ChainBalance {
595        #[debug(skip)]
596        callback: Sender<Amount>,
597    },
598
599    OwnerBalance {
600        owner: AccountOwner,
601        #[debug(skip)]
602        callback: Sender<Amount>,
603    },
604
605    OwnerBalances {
606        #[debug(skip)]
607        callback: Sender<Vec<(AccountOwner, Amount)>>,
608    },
609
610    BalanceOwners {
611        #[debug(skip)]
612        callback: Sender<Vec<AccountOwner>>,
613    },
614
615    Transfer {
616        source: AccountOwner,
617        destination: Account,
618        amount: Amount,
619        #[debug(skip_if = Option::is_none)]
620        signer: Option<AccountOwner>,
621        application_id: ApplicationId,
622        #[debug(skip)]
623        callback: Sender<Option<OutgoingMessage>>,
624    },
625
626    Claim {
627        source: Account,
628        destination: Account,
629        amount: Amount,
630        #[debug(skip_if = Option::is_none)]
631        signer: Option<AccountOwner>,
632        application_id: ApplicationId,
633        #[debug(skip)]
634        callback: Sender<OutgoingMessage>,
635    },
636
637    SystemTimestamp {
638        #[debug(skip)]
639        callback: Sender<Timestamp>,
640    },
641
642    ChainOwnership {
643        #[debug(skip)]
644        callback: Sender<ChainOwnership>,
645    },
646
647    ReadValueBytes {
648        id: ApplicationId,
649        #[debug(with = hex_debug)]
650        key: Vec<u8>,
651        #[debug(skip)]
652        callback: Sender<Option<Vec<u8>>>,
653    },
654
655    ContainsKey {
656        id: ApplicationId,
657        key: Vec<u8>,
658        #[debug(skip)]
659        callback: Sender<bool>,
660    },
661
662    ContainsKeys {
663        id: ApplicationId,
664        #[debug(with = hex_vec_debug)]
665        keys: Vec<Vec<u8>>,
666        callback: Sender<Vec<bool>>,
667    },
668
669    ReadMultiValuesBytes {
670        id: ApplicationId,
671        #[debug(with = hex_vec_debug)]
672        keys: Vec<Vec<u8>>,
673        #[debug(skip)]
674        callback: Sender<Vec<Option<Vec<u8>>>>,
675    },
676
677    FindKeysByPrefix {
678        id: ApplicationId,
679        #[debug(with = hex_debug)]
680        key_prefix: Vec<u8>,
681        #[debug(skip)]
682        callback: Sender<Vec<Vec<u8>>>,
683    },
684
685    FindKeyValuesByPrefix {
686        id: ApplicationId,
687        #[debug(with = hex_debug)]
688        key_prefix: Vec<u8>,
689        #[debug(skip)]
690        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
691    },
692
693    WriteBatch {
694        id: ApplicationId,
695        batch: Batch,
696        #[debug(skip)]
697        callback: Sender<()>,
698    },
699
700    OpenChain {
701        ownership: ChainOwnership,
702        #[debug(skip_if = Amount::is_zero)]
703        balance: Amount,
704        parent_id: ChainId,
705        block_height: BlockHeight,
706        application_permissions: ApplicationPermissions,
707        timestamp: Timestamp,
708        #[debug(skip)]
709        txn_tracker: TransactionTracker,
710        #[debug(skip)]
711        callback: Sender<(ChainId, TransactionTracker)>,
712    },
713
714    CloseChain {
715        application_id: ApplicationId,
716        #[debug(skip)]
717        callback: Sender<Result<(), ExecutionError>>,
718    },
719
720    ChangeApplicationPermissions {
721        application_id: ApplicationId,
722        application_permissions: ApplicationPermissions,
723        #[debug(skip)]
724        callback: Sender<Result<(), ExecutionError>>,
725    },
726
727    CreateApplication {
728        chain_id: ChainId,
729        block_height: BlockHeight,
730        module_id: ModuleId,
731        parameters: Vec<u8>,
732        required_application_ids: Vec<ApplicationId>,
733        #[debug(skip)]
734        txn_tracker: TransactionTracker,
735        #[debug(skip)]
736        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
737    },
738
739    PerformHttpRequest {
740        request: http::Request,
741        http_responses_are_oracle_responses: bool,
742        #[debug(skip)]
743        callback: Sender<http::Response>,
744    },
745
746    ReadBlobContent {
747        blob_id: BlobId,
748        #[debug(skip)]
749        callback: Sender<(BlobContent, bool)>,
750    },
751
752    AssertBlobExists {
753        blob_id: BlobId,
754        #[debug(skip)]
755        callback: Sender<bool>,
756    },
757
758    NextEventIndex {
759        stream_id: StreamId,
760        #[debug(skip)]
761        callback: Sender<u32>,
762    },
763
764    ReadEvent {
765        event_id: EventId,
766        callback: oneshot::Sender<Vec<u8>>,
767    },
768
769    SubscribeToEvents {
770        chain_id: ChainId,
771        stream_id: StreamId,
772        subscriber_app_id: ApplicationId,
773        #[debug(skip)]
774        callback: Sender<u32>,
775    },
776
777    UnsubscribeFromEvents {
778        chain_id: ChainId,
779        stream_id: StreamId,
780        subscriber_app_id: ApplicationId,
781        #[debug(skip)]
782        callback: Sender<()>,
783    },
784
785    GetApplicationPermissions {
786        #[debug(skip)]
787        callback: Sender<ApplicationPermissions>,
788    },
789}