1use std::{
5 collections::{BTreeMap, BTreeSet},
6 future::Future,
7 mem, vec,
8};
9
10use custom_debug_derive::Debug;
11use linera_base::{
12 data_types::{Blob, BlobContent, Event, OracleResponse, StreamUpdate, Timestamp},
13 ensure,
14 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
15};
16
17use crate::{ExecutionError, OutgoingMessage};
18
19type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
20
21#[derive(Debug, Default)]
24pub struct TransactionTracker {
25 #[debug(skip_if = Option::is_none)]
26 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
27 #[debug(skip_if = Vec::is_empty)]
28 oracle_responses: Vec<OracleResponse>,
29 #[debug(skip_if = Vec::is_empty)]
30 outgoing_messages: Vec<OutgoingMessage>,
31 local_time: Timestamp,
33 transaction_index: u32,
35 next_application_index: u32,
36 next_chain_index: u32,
37 events: Vec<Event>,
39 blobs: BTreeMap<BlobId, BlobContent>,
49 previously_created_blobs: BTreeMap<BlobId, BlobContent>,
51 operation_result: Option<Vec<u8>>,
53 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
55 blobs_published: BTreeSet<BlobId>,
57 free_blob_ids: BTreeSet<BlobId>,
59}
60
61#[derive(Debug, Default)]
63pub struct TransactionOutcome {
64 #[debug(skip_if = Vec::is_empty)]
65 pub oracle_responses: Vec<OracleResponse>,
66 #[debug(skip_if = Vec::is_empty)]
67 pub outgoing_messages: Vec<OutgoingMessage>,
68 pub next_application_index: u32,
69 pub next_chain_index: u32,
70 pub events: Vec<Event>,
72 pub blobs: Vec<Blob>,
74 pub operation_result: Vec<u8>,
76 pub blobs_published: BTreeSet<BlobId>,
78 pub free_blob_ids: BTreeSet<BlobId>,
80}
81
82impl TransactionTracker {
83 pub fn new(
84 local_time: Timestamp,
85 transaction_index: u32,
86 next_application_index: u32,
87 next_chain_index: u32,
88 oracle_responses: Option<Vec<OracleResponse>>,
89 blobs: &[Vec<Blob>],
90 ) -> Self {
91 let mut previously_created_blobs = BTreeMap::new();
92 for tx_blobs in blobs {
93 for blob in tx_blobs {
94 previously_created_blobs.insert(blob.id(), blob.content().clone());
95 }
96 }
97 TransactionTracker {
98 local_time,
99 transaction_index,
100 next_application_index,
101 next_chain_index,
102 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
103 previously_created_blobs,
104 ..Self::default()
105 }
106 }
107
108 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
109 self.blobs = blobs;
110 self
111 }
112
113 pub fn local_time(&self) -> Timestamp {
114 self.local_time
115 }
116
117 pub fn set_local_time(&mut self, local_time: Timestamp) {
118 self.local_time = local_time;
119 }
120
121 pub fn transaction_index(&self) -> u32 {
122 self.transaction_index
123 }
124
125 pub fn peek_application_index(&self) -> u32 {
126 self.next_application_index
127 }
128
129 pub fn next_application_index(&mut self) -> u32 {
130 let index = self.next_application_index;
131 self.next_application_index += 1;
132 index
133 }
134
135 pub fn next_chain_index(&mut self) -> u32 {
136 let index = self.next_chain_index;
137 self.next_chain_index += 1;
138 index
139 }
140
141 pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
142 self.outgoing_messages.push(message);
143 }
144
145 pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
146 for message in messages {
147 self.add_outgoing_message(message);
148 }
149 }
150
151 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
152 self.events.push(Event {
153 stream_id,
154 index,
155 value,
156 });
157 }
158
159 pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
160 if let Some(content) = self.blobs.get(blob_id) {
161 return Some(content);
162 }
163 self.previously_created_blobs.get(blob_id)
164 }
165
166 pub fn add_created_blob(&mut self, blob: Blob) {
167 self.blobs.insert(blob.id(), blob.into_content());
168 }
169
170 pub fn add_published_blob(&mut self, blob_id: BlobId) {
171 self.blobs_published.insert(blob_id);
172 }
173
174 pub fn mark_blob_free(&mut self, blob_id: BlobId) {
176 self.free_blob_ids.insert(blob_id);
177 }
178
179 pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
180 &self.blobs
181 }
182
183 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
184 self.operation_result = result
185 }
186
187 pub async fn oracle<F, G>(&mut self, f: F) -> Result<&OracleResponse, ExecutionError>
191 where
192 F: FnOnce() -> G,
193 G: Future<Output = Result<OracleResponse, ExecutionError>>,
194 {
195 let response = match self.next_replayed_oracle_response()? {
196 Some(response) => response,
197 None => f().await?,
198 };
199 self.oracle_responses.push(response);
200 Ok(self.oracle_responses.last().unwrap())
201 }
202
203 pub fn add_stream_to_process(
204 &mut self,
205 application_id: ApplicationId,
206 chain_id: ChainId,
207 stream_id: StreamId,
208 previous_index: u32,
209 next_index: u32,
210 ) {
211 if next_index == previous_index {
212 return; }
214 self.streams_to_process
215 .entry(application_id)
216 .or_default()
217 .entry((chain_id, stream_id))
218 .and_modify(|(pi, ni)| {
219 *pi = (*pi).min(previous_index);
220 *ni = (*ni).max(next_index);
221 })
222 .or_insert_with(|| (previous_index, next_index));
223 }
224
225 pub fn remove_stream_to_process(
226 &mut self,
227 application_id: ApplicationId,
228 chain_id: ChainId,
229 stream_id: StreamId,
230 ) {
231 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
232 return;
233 };
234 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
235 self.streams_to_process.remove(&application_id);
236 }
237 }
238
239 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
240 mem::take(&mut self.streams_to_process)
241 .into_iter()
242 .map(|(app_id, streams)| {
243 let updates = streams
244 .into_iter()
245 .map(
246 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
247 chain_id,
248 stream_id,
249 previous_index,
250 next_index,
251 },
252 )
253 .collect();
254 (app_id, updates)
255 })
256 .collect()
257 }
258
259 pub fn replay_oracle_response(
262 &mut self,
263 oracle_response: OracleResponse,
264 ) -> Result<bool, ExecutionError> {
265 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
266 ensure!(
267 recorded_response == oracle_response,
268 ExecutionError::OracleResponseMismatch
269 );
270 true
271 } else {
272 false
273 };
274 self.oracle_responses.push(oracle_response);
275 Ok(replaying)
276 }
277
278 fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
286 let Some(responses) = &mut self.replaying_oracle_responses else {
287 return Ok(None); };
289 let response = responses
290 .next()
291 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
292 Ok(Some(response))
293 }
294
295 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
296 let TransactionTracker {
297 replaying_oracle_responses,
298 oracle_responses,
299 outgoing_messages,
300 local_time: _,
301 transaction_index: _,
302 next_application_index,
303 next_chain_index,
304 events,
305 blobs,
306 previously_created_blobs: _,
307 operation_result,
308 streams_to_process,
309 blobs_published,
310 free_blob_ids,
311 } = self;
312 ensure!(
313 streams_to_process.is_empty(),
314 ExecutionError::UnprocessedStreams
315 );
316 if let Some(mut responses) = replaying_oracle_responses {
317 ensure!(
318 responses.next().is_none(),
319 ExecutionError::UnexpectedOracleResponse
320 );
321 }
322 let blobs = blobs
323 .into_iter()
324 .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
325 .collect::<Vec<_>>();
326 Ok(TransactionOutcome {
327 outgoing_messages,
328 oracle_responses,
329 next_application_index,
330 next_chain_index,
331 events,
332 blobs,
333 operation_result: operation_result.unwrap_or_default(),
334 blobs_published,
335 free_blob_ids,
336 })
337 }
338}
339
340#[cfg(with_testing)]
341impl TransactionTracker {
342 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
345 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
346 }
347
348 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
351 where
352 T: IntoIterator,
353 T::Item: std::borrow::Borrow<BlobId>,
354 {
355 use std::borrow::Borrow;
356
357 let oracle_responses = blob_ids
358 .into_iter()
359 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
360 .collect();
361 TransactionTracker::new_replaying(oracle_responses)
362 }
363}