1use std::{
5 collections::BTreeMap,
6 ops::{Deref, DerefMut},
7 vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13 crypto::{BcsHashable, CryptoHash},
14 data_types::{BlobContent, BlockHeight, StreamUpdate},
15 identifiers::{AccountOwner, BlobId, ChainId, StreamId},
16 time::Instant,
17};
18use linera_views::{
19 context::Context,
20 historical_hash_wrapper::HistoricallyHashableView,
21 key_value_store_view::KeyValueStoreView,
22 map_view::MapView,
23 reentrant_collection_view::ReentrantCollectionView,
24 views::{ClonableView, ReplaceContext, View},
25 ViewError,
26};
27use serde::{Deserialize, Serialize};
28#[cfg(with_testing)]
29use {
30 crate::{
31 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
32 },
33 linera_base::data_types::Blob,
34 linera_views::context::MemoryContext,
35 std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40 execution_state_actor::ExecutionStateActor, resources::ResourceController,
41 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
42 ExecutionRuntimeConfig, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
43 ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
44 TransactionTracker,
45};
46
47#[derive(Debug, ClonableView, View, Allocative)]
49#[allocative(bound = "C")]
50pub struct ExecutionStateViewInner<C> {
51 pub system: SystemExecutionStateView<C>,
53 pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
55 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
57 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
59}
60
61impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
62 type Target = ExecutionStateViewInner<C2>;
63
64 async fn with_context(
65 &mut self,
66 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
67 ) -> Self::Target {
68 ExecutionStateViewInner {
69 system: self.system.with_context(ctx.clone()).await,
70 users: self.users.with_context(ctx.clone()).await,
71 previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
72 previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
73 }
74 }
75}
76
77#[derive(Debug, ClonableView, View, Allocative)]
79#[allocative(bound = "C")]
80pub struct ExecutionStateView<C> {
81 inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
82}
83
84impl<C> Deref for ExecutionStateView<C> {
85 type Target = ExecutionStateViewInner<C>;
86
87 fn deref(&self) -> &ExecutionStateViewInner<C> {
88 self.inner.deref()
89 }
90}
91
92impl<C> DerefMut for ExecutionStateView<C> {
93 fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
94 self.inner.deref_mut()
95 }
96}
97
98impl<C> ExecutionStateView<C>
99where
100 C: Context + Clone + Send + Sync + 'static,
101 C::Extra: ExecutionRuntimeContext,
102{
103 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
104 #[derive(Serialize, Deserialize)]
105 struct ExecutionStateViewHash([u8; 32]);
106 impl BcsHashable<'_> for ExecutionStateViewHash {}
107 let hash = self.inner.historical_hash().await?;
108 Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
109 }
110}
111
112impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
113 type Target = ExecutionStateView<C2>;
114
115 async fn with_context(
116 &mut self,
117 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
118 ) -> Self::Target {
119 ExecutionStateView {
120 inner: self.inner.with_context(ctx.clone()).await,
121 }
122 }
123}
124
125pub struct ServiceRuntimeEndpoint {
127 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
129 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
131}
132
133#[cfg(with_testing)]
134impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
135where
136 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
137{
138 pub async fn simulate_instantiation(
140 &mut self,
141 contract: UserContractCode,
142 local_time: linera_base::data_types::Timestamp,
143 application_description: ApplicationDescription,
144 instantiation_argument: Vec<u8>,
145 contract_blob: Blob,
146 service_blob: Blob,
147 ) -> Result<(), ExecutionError> {
148 let chain_id = application_description.creator_chain_id;
149 assert_eq!(chain_id, self.context().extra().chain_id);
150 let context = OperationContext {
151 chain_id,
152 authenticated_owner: None,
153 height: application_description.block_height,
154 round: None,
155 timestamp: local_time,
156 };
157
158 let action = UserAction::Instantiate(context, instantiation_argument);
159 let next_application_index = application_description.application_index + 1;
160 let next_chain_index = 0;
161
162 let application_id = From::from(&application_description);
163 let blob = Blob::new_application_description(&application_description);
164
165 self.system.used_blobs.insert(&blob.id())?;
166 self.system.used_blobs.insert(&contract_blob.id())?;
167 self.system.used_blobs.insert(&service_blob.id())?;
168
169 self.context()
170 .extra()
171 .user_contracts()
172 .pin()
173 .insert(application_id, contract);
174
175 self.context()
176 .extra()
177 .add_blobs([
178 contract_blob,
179 service_blob,
180 Blob::new_application_description(&application_description),
181 ])
182 .await?;
183
184 let tracker = ResourceTracker::default();
185 let policy = ResourceControlPolicy::no_fees();
186 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
187 let mut txn_tracker = TransactionTracker::new(
188 local_time,
189 0,
190 next_application_index,
191 next_chain_index,
192 None,
193 &[],
194 );
195 txn_tracker.add_created_blob(blob);
196 Box::pin(
197 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
198 .run_user_action(application_id, action, context.refund_grant_to(), None),
199 )
200 .await?;
201
202 Ok(())
203 }
204}
205
206pub enum UserAction {
207 Instantiate(OperationContext, Vec<u8>),
208 Operation(OperationContext, Vec<u8>),
209 Message(MessageContext, Vec<u8>),
210 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
211}
212
213impl UserAction {
214 pub(crate) fn signer(&self) -> Option<AccountOwner> {
215 match self {
216 UserAction::Instantiate(context, _) => context.authenticated_owner,
217 UserAction::Operation(context, _) => context.authenticated_owner,
218 UserAction::ProcessStreams(_, _) => None,
219 UserAction::Message(context, _) => context.authenticated_owner,
220 }
221 }
222
223 pub(crate) fn height(&self) -> BlockHeight {
224 match self {
225 UserAction::Instantiate(context, _) => context.height,
226 UserAction::Operation(context, _) => context.height,
227 UserAction::ProcessStreams(context, _) => context.height,
228 UserAction::Message(context, _) => context.height,
229 }
230 }
231
232 pub(crate) fn round(&self) -> Option<u32> {
233 match self {
234 UserAction::Instantiate(context, _) => context.round,
235 UserAction::Operation(context, _) => context.round,
236 UserAction::ProcessStreams(context, _) => context.round,
237 UserAction::Message(context, _) => context.round,
238 }
239 }
240
241 pub(crate) fn timestamp(&self) -> Timestamp {
242 match self {
243 UserAction::Instantiate(context, _) => context.timestamp,
244 UserAction::Operation(context, _) => context.timestamp,
245 UserAction::ProcessStreams(context, _) => context.timestamp,
246 UserAction::Message(context, _) => context.timestamp,
247 }
248 }
249}
250
251impl<C> ExecutionStateView<C>
252where
253 C: Context + Clone + Send + Sync + 'static,
254 C::Extra: ExecutionRuntimeContext,
255{
256 pub async fn query_application(
257 &mut self,
258 context: QueryContext,
259 query: Query,
260 endpoint: Option<&mut ServiceRuntimeEndpoint>,
261 ) -> Result<QueryOutcome, ExecutionError> {
262 assert_eq!(context.chain_id, self.context().extra().chain_id());
263 match query {
264 Query::System(query) => {
265 let outcome = self.system.handle_query(context, query);
266 Ok(outcome.into())
267 }
268 Query::User {
269 application_id,
270 bytes,
271 } => {
272 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
273 let outcome = match endpoint {
274 Some(endpoint) => {
275 self.query_user_application_with_long_lived_service(
276 application_id,
277 context,
278 bytes,
279 &mut endpoint.incoming_execution_requests,
280 &mut endpoint.runtime_request_sender,
281 )
282 .await?
283 }
284 None => {
285 self.query_user_application(application_id, context, bytes)
286 .await?
287 }
288 };
289 Ok(outcome.into())
290 }
291 }
292 }
293
294 async fn query_user_application(
295 &mut self,
296 application_id: ApplicationId,
297 context: QueryContext,
298 query: Vec<u8>,
299 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
300 self.query_user_application_with_deadline(
301 application_id,
302 context,
303 query,
304 None,
305 BTreeMap::new(),
306 )
307 .await
308 }
309
310 pub(crate) async fn query_user_application_with_deadline(
311 &mut self,
312 application_id: ApplicationId,
313 context: QueryContext,
314 query: Vec<u8>,
315 deadline: Option<Instant>,
316 created_blobs: BTreeMap<BlobId, BlobContent>,
317 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
318 let (execution_state_sender, mut execution_state_receiver) =
319 futures::channel::mpsc::unbounded();
320 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
321 let mut resource_controller = ResourceController::default();
322 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
323
324 let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
325
326 let thread = web_thread::Thread::new();
327 let service_runtime_task = thread.run_send(JsVec(codes), move |codes| async move {
328 let mut runtime =
329 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
330
331 for (code, description) in codes.0.into_iter().zip(descriptions) {
332 runtime.preload_service(ApplicationId::from(&description), code, description)?;
333 }
334
335 runtime.run_query(application_id, query)
336 });
337
338 while let Some(request) = execution_state_receiver.next().await {
339 actor.handle_request(request).await?;
340 }
341
342 service_runtime_task.await?
343 }
344
345 async fn query_user_application_with_long_lived_service(
346 &mut self,
347 application_id: ApplicationId,
348 context: QueryContext,
349 query: Vec<u8>,
350 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
351 ExecutionRequest,
352 >,
353 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
354 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
355 let (outcome_sender, outcome_receiver) = oneshot::channel();
356 let mut outcome_receiver = outcome_receiver.fuse();
357
358 runtime_request_sender
359 .send(ServiceRuntimeRequest::Query {
360 application_id,
361 context,
362 query,
363 callback: outcome_sender,
364 })
365 .expect("Service runtime thread should only stop when `request_sender` is dropped");
366
367 let mut txn_tracker = TransactionTracker::default();
368 let mut resource_controller = ResourceController::default();
369 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
370
371 loop {
372 futures::select! {
373 maybe_request = incoming_execution_requests.next() => {
374 if let Some(request) = maybe_request {
375 actor.handle_request(request).await?;
376 }
377 }
378 outcome = &mut outcome_receiver => {
379 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
380 }
381 }
382 }
383 }
384
385 pub async fn list_applications(
386 &self,
387 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
388 let mut applications = vec![];
389 for app_id in self.users.indices().await? {
390 let blob_id = app_id.description_blob_id();
391 let blob_content = self.system.read_blob_content(blob_id).await?;
392 let application_description = bcs::from_bytes(blob_content.bytes())?;
393 applications.push((app_id, application_description));
394 }
395 Ok(applications)
396 }
397}