1use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8 data_types::{BlobContent, BlockHeight, StreamUpdate},
9 identifiers::{AccountOwner, BlobId, StreamId},
10 time::Instant,
11};
12use linera_views::{
13 context::Context,
14 key_value_store_view::KeyValueStoreView,
15 map_view::MapView,
16 reentrant_collection_view::HashedReentrantCollectionView,
17 views::{ClonableView, ReplaceContext, View},
18};
19use linera_views_derive::CryptoHashView;
20#[cfg(with_testing)]
21use {
22 crate::{
23 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
24 },
25 linera_base::data_types::Blob,
26 linera_views::context::MemoryContext,
27 std::sync::Arc,
28};
29
30use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
31use crate::{
32 execution_state_actor::ExecutionStateActor, resources::ResourceController,
33 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
34 ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
35 ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
36 TransactionTracker,
37};
38
39#[derive(Debug, ClonableView, CryptoHashView)]
41pub struct ExecutionStateView<C> {
42 pub system: SystemExecutionStateView<C>,
44 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
46 pub stream_event_counts: MapView<C, StreamId, u32>,
48}
49
50impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
51 type Target = ExecutionStateView<C2>;
52
53 async fn with_context(
54 &mut self,
55 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
56 ) -> Self::Target {
57 ExecutionStateView {
58 system: self.system.with_context(ctx.clone()).await,
59 users: self.users.with_context(ctx.clone()).await,
60 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
61 }
62 }
63}
64
65pub struct ServiceRuntimeEndpoint {
67 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
69 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
71}
72
73#[cfg(with_testing)]
74impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
75where
76 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
77{
78 pub async fn simulate_instantiation(
80 &mut self,
81 contract: UserContractCode,
82 local_time: linera_base::data_types::Timestamp,
83 application_description: ApplicationDescription,
84 instantiation_argument: Vec<u8>,
85 contract_blob: Blob,
86 service_blob: Blob,
87 ) -> Result<(), ExecutionError> {
88 let chain_id = application_description.creator_chain_id;
89 assert_eq!(chain_id, self.context().extra().chain_id);
90 let context = OperationContext {
91 chain_id,
92 authenticated_signer: None,
93 height: application_description.block_height,
94 round: None,
95 timestamp: local_time,
96 };
97
98 let action = UserAction::Instantiate(context, instantiation_argument);
99 let next_application_index = application_description.application_index + 1;
100 let next_chain_index = 0;
101
102 let application_id = From::from(&application_description);
103 let blob = Blob::new_application_description(&application_description);
104
105 self.system.used_blobs.insert(&blob.id())?;
106 self.system.used_blobs.insert(&contract_blob.id())?;
107 self.system.used_blobs.insert(&service_blob.id())?;
108
109 self.context()
110 .extra()
111 .user_contracts()
112 .pin()
113 .insert(application_id, contract);
114
115 self.context()
116 .extra()
117 .add_blobs([
118 contract_blob,
119 service_blob,
120 Blob::new_application_description(&application_description),
121 ])
122 .await?;
123
124 let tracker = ResourceTracker::default();
125 let policy = ResourceControlPolicy::no_fees();
126 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
127 let mut txn_tracker = TransactionTracker::new(
128 local_time,
129 0,
130 next_application_index,
131 next_chain_index,
132 None,
133 &[],
134 );
135 txn_tracker.add_created_blob(blob);
136 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
137 .run_user_action(application_id, action, context.refund_grant_to(), None)
138 .await?;
139
140 Ok(())
141 }
142}
143
144pub enum UserAction {
145 Instantiate(OperationContext, Vec<u8>),
146 Operation(OperationContext, Vec<u8>),
147 Message(MessageContext, Vec<u8>),
148 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
149}
150
151impl UserAction {
152 pub(crate) fn signer(&self) -> Option<AccountOwner> {
153 match self {
154 UserAction::Instantiate(context, _) => context.authenticated_signer,
155 UserAction::Operation(context, _) => context.authenticated_signer,
156 UserAction::ProcessStreams(_, _) => None,
157 UserAction::Message(context, _) => context.authenticated_signer,
158 }
159 }
160
161 pub(crate) fn height(&self) -> BlockHeight {
162 match self {
163 UserAction::Instantiate(context, _) => context.height,
164 UserAction::Operation(context, _) => context.height,
165 UserAction::ProcessStreams(context, _) => context.height,
166 UserAction::Message(context, _) => context.height,
167 }
168 }
169
170 pub(crate) fn round(&self) -> Option<u32> {
171 match self {
172 UserAction::Instantiate(context, _) => context.round,
173 UserAction::Operation(context, _) => context.round,
174 UserAction::ProcessStreams(context, _) => context.round,
175 UserAction::Message(context, _) => context.round,
176 }
177 }
178
179 pub(crate) fn timestamp(&self) -> Timestamp {
180 match self {
181 UserAction::Instantiate(context, _) => context.timestamp,
182 UserAction::Operation(context, _) => context.timestamp,
183 UserAction::ProcessStreams(context, _) => context.timestamp,
184 UserAction::Message(context, _) => context.timestamp,
185 }
186 }
187}
188
189impl<C> ExecutionStateView<C>
190where
191 C: Context + Clone + Send + Sync + 'static,
192 C::Extra: ExecutionRuntimeContext,
193{
194 pub async fn query_application(
195 &mut self,
196 context: QueryContext,
197 query: Query,
198 endpoint: Option<&mut ServiceRuntimeEndpoint>,
199 ) -> Result<QueryOutcome, ExecutionError> {
200 assert_eq!(context.chain_id, self.context().extra().chain_id());
201 match query {
202 Query::System(query) => {
203 let outcome = self.system.handle_query(context, query);
204 Ok(outcome.into())
205 }
206 Query::User {
207 application_id,
208 bytes,
209 } => {
210 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
211 let outcome = match endpoint {
212 Some(endpoint) => {
213 self.query_user_application_with_long_lived_service(
214 application_id,
215 context,
216 bytes,
217 &mut endpoint.incoming_execution_requests,
218 &mut endpoint.runtime_request_sender,
219 )
220 .await?
221 }
222 None => {
223 self.query_user_application(application_id, context, bytes)
224 .await?
225 }
226 };
227 Ok(outcome.into())
228 }
229 }
230 }
231
232 async fn query_user_application(
233 &mut self,
234 application_id: ApplicationId,
235 context: QueryContext,
236 query: Vec<u8>,
237 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
238 self.query_user_application_with_deadline(
239 application_id,
240 context,
241 query,
242 None,
243 BTreeMap::new(),
244 )
245 .await
246 }
247
248 pub(crate) async fn query_user_application_with_deadline(
249 &mut self,
250 application_id: ApplicationId,
251 context: QueryContext,
252 query: Vec<u8>,
253 deadline: Option<Instant>,
254 created_blobs: BTreeMap<BlobId, BlobContent>,
255 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
256 let (execution_state_sender, mut execution_state_receiver) =
257 futures::channel::mpsc::unbounded();
258 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
259 let mut resource_controller = ResourceController::default();
260 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
261 let (code, description) = actor.load_service(application_id).await?;
262
263 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
264 let mut runtime =
265 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
266
267 async move {
268 let code = codes.next().await.expect("we send this immediately below");
269 runtime.preload_service(application_id, code, description)?;
270 runtime.run_query(application_id, query)
271 }
272 })
273 .await;
274
275 service_runtime_task.send(code)?;
276
277 while let Some(request) = execution_state_receiver.next().await {
278 actor.handle_request(request).await?;
279 }
280
281 service_runtime_task.join().await
282 }
283
284 async fn query_user_application_with_long_lived_service(
285 &mut self,
286 application_id: ApplicationId,
287 context: QueryContext,
288 query: Vec<u8>,
289 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
290 ExecutionRequest,
291 >,
292 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
293 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
294 let (outcome_sender, outcome_receiver) = oneshot::channel();
295 let mut outcome_receiver = outcome_receiver.fuse();
296
297 runtime_request_sender
298 .send(ServiceRuntimeRequest::Query {
299 application_id,
300 context,
301 query,
302 callback: outcome_sender,
303 })
304 .expect("Service runtime thread should only stop when `request_sender` is dropped");
305
306 let mut txn_tracker = TransactionTracker::default();
307 let mut resource_controller = ResourceController::default();
308 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
309
310 loop {
311 futures::select! {
312 maybe_request = incoming_execution_requests.next() => {
313 if let Some(request) = maybe_request {
314 actor.handle_request(request).await?;
315 }
316 }
317 outcome = &mut outcome_receiver => {
318 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
319 }
320 }
321 }
322 }
323
324 pub async fn list_applications(
325 &self,
326 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
327 let mut applications = vec![];
328 for app_id in self.users.indices().await? {
329 let blob_id = app_id.description_blob_id();
330 let blob_content = self.system.read_blob_content(blob_id).await?;
331 let application_description = bcs::from_bytes(blob_content.bytes())?;
332 applications.push((app_id, application_description));
333 }
334 Ok(applications)
335 }
336}