1use std::{
5 collections::{BTreeMap, BTreeSet},
6 future::Future,
7 mem, vec,
8};
9
10use custom_debug_derive::Debug;
11use linera_base::{
12 data_types::{Blob, BlobContent, Event, OracleResponse, StreamUpdate, Timestamp},
13 ensure,
14 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
15};
16
17use crate::{ExecutionError, OutgoingMessage};
18
19type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
20
21#[derive(Debug, Default)]
24pub struct TransactionTracker {
25 #[debug(skip_if = Option::is_none)]
26 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
27 #[debug(skip_if = Vec::is_empty)]
28 oracle_responses: Vec<OracleResponse>,
29 #[debug(skip_if = Vec::is_empty)]
30 outgoing_messages: Vec<OutgoingMessage>,
31 local_time: Timestamp,
33 transaction_index: u32,
35 next_application_index: u32,
36 next_chain_index: u32,
37 events: Vec<Event>,
39 blobs: BTreeMap<BlobId, BlobContent>,
49 previously_created_blobs: BTreeMap<BlobId, BlobContent>,
51 operation_result: Option<Vec<u8>>,
53 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
55 blobs_published: BTreeSet<BlobId>,
57}
58
59#[derive(Debug, Default)]
61pub struct TransactionOutcome {
62 #[debug(skip_if = Vec::is_empty)]
63 pub oracle_responses: Vec<OracleResponse>,
64 #[debug(skip_if = Vec::is_empty)]
65 pub outgoing_messages: Vec<OutgoingMessage>,
66 pub next_application_index: u32,
67 pub next_chain_index: u32,
68 pub events: Vec<Event>,
70 pub blobs: Vec<Blob>,
72 pub operation_result: Vec<u8>,
74 pub blobs_published: BTreeSet<BlobId>,
76}
77
78impl TransactionTracker {
79 pub fn new(
80 local_time: Timestamp,
81 transaction_index: u32,
82 next_application_index: u32,
83 next_chain_index: u32,
84 oracle_responses: Option<Vec<OracleResponse>>,
85 blobs: &[Vec<Blob>],
86 ) -> Self {
87 let mut previously_created_blobs = BTreeMap::new();
88 for tx_blobs in blobs {
89 for blob in tx_blobs {
90 previously_created_blobs.insert(blob.id(), blob.content().clone());
91 }
92 }
93 TransactionTracker {
94 local_time,
95 transaction_index,
96 next_application_index,
97 next_chain_index,
98 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
99 previously_created_blobs,
100 ..Self::default()
101 }
102 }
103
104 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
105 self.blobs = blobs;
106 self
107 }
108
109 pub fn local_time(&self) -> Timestamp {
110 self.local_time
111 }
112
113 pub fn set_local_time(&mut self, local_time: Timestamp) {
114 self.local_time = local_time;
115 }
116
117 pub fn transaction_index(&self) -> u32 {
118 self.transaction_index
119 }
120
121 pub fn next_application_index(&mut self) -> u32 {
122 let index = self.next_application_index;
123 self.next_application_index += 1;
124 index
125 }
126
127 pub fn next_chain_index(&mut self) -> u32 {
128 let index = self.next_chain_index;
129 self.next_chain_index += 1;
130 index
131 }
132
133 pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
134 self.outgoing_messages.push(message);
135 }
136
137 pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
138 for message in messages {
139 self.add_outgoing_message(message);
140 }
141 }
142
143 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
144 self.events.push(Event {
145 stream_id,
146 index,
147 value,
148 });
149 }
150
151 pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
152 if let Some(content) = self.blobs.get(blob_id) {
153 return Some(content);
154 }
155 self.previously_created_blobs.get(blob_id)
156 }
157
158 pub fn add_created_blob(&mut self, blob: Blob) {
159 self.blobs.insert(blob.id(), blob.into_content());
160 }
161
162 pub fn add_published_blob(&mut self, blob_id: BlobId) {
163 self.blobs_published.insert(blob_id);
164 }
165
166 pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
167 &self.blobs
168 }
169
170 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
171 self.operation_result = result
172 }
173
174 pub async fn oracle<F, G>(&mut self, f: F) -> Result<&OracleResponse, ExecutionError>
178 where
179 F: FnOnce() -> G,
180 G: Future<Output = Result<OracleResponse, ExecutionError>>,
181 {
182 let response = match self.next_replayed_oracle_response()? {
183 Some(response) => response,
184 None => f().await?,
185 };
186 self.oracle_responses.push(response);
187 Ok(self.oracle_responses.last().unwrap())
188 }
189
190 pub fn add_stream_to_process(
191 &mut self,
192 application_id: ApplicationId,
193 chain_id: ChainId,
194 stream_id: StreamId,
195 previous_index: u32,
196 next_index: u32,
197 ) {
198 if next_index == previous_index {
199 return; }
201 self.streams_to_process
202 .entry(application_id)
203 .or_default()
204 .entry((chain_id, stream_id))
205 .and_modify(|(pi, ni)| {
206 *pi = (*pi).min(previous_index);
207 *ni = (*ni).max(next_index);
208 })
209 .or_insert_with(|| (previous_index, next_index));
210 }
211
212 pub fn remove_stream_to_process(
213 &mut self,
214 application_id: ApplicationId,
215 chain_id: ChainId,
216 stream_id: StreamId,
217 ) {
218 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
219 return;
220 };
221 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
222 self.streams_to_process.remove(&application_id);
223 }
224 }
225
226 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
227 mem::take(&mut self.streams_to_process)
228 .into_iter()
229 .map(|(app_id, streams)| {
230 let updates = streams
231 .into_iter()
232 .map(
233 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
234 chain_id,
235 stream_id,
236 previous_index,
237 next_index,
238 },
239 )
240 .collect();
241 (app_id, updates)
242 })
243 .collect()
244 }
245
246 pub fn replay_oracle_response(
249 &mut self,
250 oracle_response: OracleResponse,
251 ) -> Result<bool, ExecutionError> {
252 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
253 ensure!(
254 recorded_response == oracle_response,
255 ExecutionError::OracleResponseMismatch
256 );
257 true
258 } else {
259 false
260 };
261 self.oracle_responses.push(oracle_response);
262 Ok(replaying)
263 }
264
265 fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
273 let Some(responses) = &mut self.replaying_oracle_responses else {
274 return Ok(None); };
276 let response = responses
277 .next()
278 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
279 Ok(Some(response))
280 }
281
282 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
283 let TransactionTracker {
284 replaying_oracle_responses,
285 oracle_responses,
286 outgoing_messages,
287 local_time: _,
288 transaction_index: _,
289 next_application_index,
290 next_chain_index,
291 events,
292 blobs,
293 previously_created_blobs: _,
294 operation_result,
295 streams_to_process,
296 blobs_published,
297 } = self;
298 ensure!(
299 streams_to_process.is_empty(),
300 ExecutionError::UnprocessedStreams
301 );
302 if let Some(mut responses) = replaying_oracle_responses {
303 ensure!(
304 responses.next().is_none(),
305 ExecutionError::UnexpectedOracleResponse
306 );
307 }
308 let blobs = blobs
309 .into_iter()
310 .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
311 .collect::<Vec<_>>();
312 Ok(TransactionOutcome {
313 outgoing_messages,
314 oracle_responses,
315 next_application_index,
316 next_chain_index,
317 events,
318 blobs,
319 operation_result: operation_result.unwrap_or_default(),
320 blobs_published,
321 })
322 }
323}
324
325#[cfg(with_testing)]
326impl TransactionTracker {
327 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
330 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
331 }
332
333 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
336 where
337 T: IntoIterator,
338 T::Item: std::borrow::Borrow<BlobId>,
339 {
340 use std::borrow::Borrow;
341
342 let oracle_responses = blob_ids
343 .into_iter()
344 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
345 .collect();
346 TransactionTracker::new_replaying(oracle_responses)
347 }
348}