1use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8 data_types::{BlobContent, BlockHeight, StreamUpdate},
9 identifiers::{AccountOwner, BlobId},
10 time::Instant,
11};
12use linera_views::{
13 context::Context,
14 key_value_store_view::KeyValueStoreView,
15 reentrant_collection_view::HashedReentrantCollectionView,
16 views::{ClonableView, ReplaceContext, View},
17};
18use linera_views_derive::CryptoHashView;
19#[cfg(with_testing)]
20use {
21 crate::{
22 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
23 },
24 linera_base::data_types::Blob,
25 linera_views::context::MemoryContext,
26 std::sync::Arc,
27};
28
29use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
30use crate::{
31 execution_state_actor::ExecutionStateActor, resources::ResourceController,
32 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
33 ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
34 ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
35 TransactionTracker,
36};
37
38#[derive(Debug, ClonableView, CryptoHashView)]
40pub struct ExecutionStateView<C> {
41 pub system: SystemExecutionStateView<C>,
43 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
45}
46
47impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
48 type Target = ExecutionStateView<C2>;
49
50 async fn with_context(
51 &mut self,
52 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
53 ) -> Self::Target {
54 ExecutionStateView {
55 system: self.system.with_context(ctx.clone()).await,
56 users: self.users.with_context(ctx.clone()).await,
57 }
58 }
59}
60
61pub struct ServiceRuntimeEndpoint {
63 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
65 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
67}
68
69#[cfg(with_testing)]
70impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
71where
72 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
73{
74 pub async fn simulate_instantiation(
76 &mut self,
77 contract: UserContractCode,
78 local_time: linera_base::data_types::Timestamp,
79 application_description: ApplicationDescription,
80 instantiation_argument: Vec<u8>,
81 contract_blob: Blob,
82 service_blob: Blob,
83 ) -> Result<(), ExecutionError> {
84 let chain_id = application_description.creator_chain_id;
85 assert_eq!(chain_id, self.context().extra().chain_id);
86 let context = OperationContext {
87 chain_id,
88 authenticated_owner: None,
89 height: application_description.block_height,
90 round: None,
91 timestamp: local_time,
92 };
93
94 let action = UserAction::Instantiate(context, instantiation_argument);
95 let next_application_index = application_description.application_index + 1;
96 let next_chain_index = 0;
97
98 let application_id = From::from(&application_description);
99 let blob = Blob::new_application_description(&application_description);
100
101 self.system.used_blobs.insert(&blob.id())?;
102 self.system.used_blobs.insert(&contract_blob.id())?;
103 self.system.used_blobs.insert(&service_blob.id())?;
104
105 self.context()
106 .extra()
107 .user_contracts()
108 .pin()
109 .insert(application_id, contract);
110
111 self.context()
112 .extra()
113 .add_blobs([
114 contract_blob,
115 service_blob,
116 Blob::new_application_description(&application_description),
117 ])
118 .await?;
119
120 let tracker = ResourceTracker::default();
121 let policy = ResourceControlPolicy::no_fees();
122 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
123 let mut txn_tracker = TransactionTracker::new(
124 local_time,
125 0,
126 next_application_index,
127 next_chain_index,
128 None,
129 &[],
130 );
131 txn_tracker.add_created_blob(blob);
132 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
133 .run_user_action(application_id, action, context.refund_grant_to(), None)
134 .await?;
135
136 Ok(())
137 }
138}
139
140pub enum UserAction {
141 Instantiate(OperationContext, Vec<u8>),
142 Operation(OperationContext, Vec<u8>),
143 Message(MessageContext, Vec<u8>),
144 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
145}
146
147impl UserAction {
148 pub(crate) fn signer(&self) -> Option<AccountOwner> {
149 match self {
150 UserAction::Instantiate(context, _) => context.authenticated_owner,
151 UserAction::Operation(context, _) => context.authenticated_owner,
152 UserAction::ProcessStreams(_, _) => None,
153 UserAction::Message(context, _) => context.authenticated_owner,
154 }
155 }
156
157 pub(crate) fn height(&self) -> BlockHeight {
158 match self {
159 UserAction::Instantiate(context, _) => context.height,
160 UserAction::Operation(context, _) => context.height,
161 UserAction::ProcessStreams(context, _) => context.height,
162 UserAction::Message(context, _) => context.height,
163 }
164 }
165
166 pub(crate) fn round(&self) -> Option<u32> {
167 match self {
168 UserAction::Instantiate(context, _) => context.round,
169 UserAction::Operation(context, _) => context.round,
170 UserAction::ProcessStreams(context, _) => context.round,
171 UserAction::Message(context, _) => context.round,
172 }
173 }
174
175 pub(crate) fn timestamp(&self) -> Timestamp {
176 match self {
177 UserAction::Instantiate(context, _) => context.timestamp,
178 UserAction::Operation(context, _) => context.timestamp,
179 UserAction::ProcessStreams(context, _) => context.timestamp,
180 UserAction::Message(context, _) => context.timestamp,
181 }
182 }
183}
184
185impl<C> ExecutionStateView<C>
186where
187 C: Context + Clone + Send + Sync + 'static,
188 C::Extra: ExecutionRuntimeContext,
189{
190 pub async fn query_application(
191 &mut self,
192 context: QueryContext,
193 query: Query,
194 endpoint: Option<&mut ServiceRuntimeEndpoint>,
195 ) -> Result<QueryOutcome, ExecutionError> {
196 assert_eq!(context.chain_id, self.context().extra().chain_id());
197 match query {
198 Query::System(query) => {
199 let outcome = self.system.handle_query(context, query);
200 Ok(outcome.into())
201 }
202 Query::User {
203 application_id,
204 bytes,
205 } => {
206 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
207 let outcome = match endpoint {
208 Some(endpoint) => {
209 self.query_user_application_with_long_lived_service(
210 application_id,
211 context,
212 bytes,
213 &mut endpoint.incoming_execution_requests,
214 &mut endpoint.runtime_request_sender,
215 )
216 .await?
217 }
218 None => {
219 self.query_user_application(application_id, context, bytes)
220 .await?
221 }
222 };
223 Ok(outcome.into())
224 }
225 }
226 }
227
228 async fn query_user_application(
229 &mut self,
230 application_id: ApplicationId,
231 context: QueryContext,
232 query: Vec<u8>,
233 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
234 self.query_user_application_with_deadline(
235 application_id,
236 context,
237 query,
238 None,
239 BTreeMap::new(),
240 )
241 .await
242 }
243
244 pub(crate) async fn query_user_application_with_deadline(
245 &mut self,
246 application_id: ApplicationId,
247 context: QueryContext,
248 query: Vec<u8>,
249 deadline: Option<Instant>,
250 created_blobs: BTreeMap<BlobId, BlobContent>,
251 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
252 let (execution_state_sender, mut execution_state_receiver) =
253 futures::channel::mpsc::unbounded();
254 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
255 let mut resource_controller = ResourceController::default();
256 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
257 let (code, description) = actor.load_service(application_id).await?;
258
259 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
260 let mut runtime =
261 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
262
263 async move {
264 let code = codes.next().await.expect("we send this immediately below");
265 runtime.preload_service(application_id, code, description)?;
266 runtime.run_query(application_id, query)
267 }
268 })
269 .await;
270
271 service_runtime_task.send(code)?;
272
273 while let Some(request) = execution_state_receiver.next().await {
274 actor.handle_request(request).await?;
275 }
276
277 service_runtime_task.join().await
278 }
279
280 async fn query_user_application_with_long_lived_service(
281 &mut self,
282 application_id: ApplicationId,
283 context: QueryContext,
284 query: Vec<u8>,
285 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
286 ExecutionRequest,
287 >,
288 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
289 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290 let (outcome_sender, outcome_receiver) = oneshot::channel();
291 let mut outcome_receiver = outcome_receiver.fuse();
292
293 runtime_request_sender
294 .send(ServiceRuntimeRequest::Query {
295 application_id,
296 context,
297 query,
298 callback: outcome_sender,
299 })
300 .expect("Service runtime thread should only stop when `request_sender` is dropped");
301
302 let mut txn_tracker = TransactionTracker::default();
303 let mut resource_controller = ResourceController::default();
304 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
305
306 loop {
307 futures::select! {
308 maybe_request = incoming_execution_requests.next() => {
309 if let Some(request) = maybe_request {
310 actor.handle_request(request).await?;
311 }
312 }
313 outcome = &mut outcome_receiver => {
314 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
315 }
316 }
317 }
318 }
319
320 pub async fn list_applications(
321 &self,
322 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
323 let mut applications = vec![];
324 for app_id in self.users.indices().await? {
325 let blob_id = app_id.description_blob_id();
326 let blob_content = self.system.read_blob_content(blob_id).await?;
327 let application_description = bcs::from_bytes(blob_content.bytes())?;
328 applications.push((app_id, application_description));
329 }
330 Ok(applications)
331 }
332}