1use std::{
5 collections::{BTreeMap, BTreeSet},
6 mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11 data_types::{ArithmeticError, Blob, Event, OracleResponse, StreamUpdate, Timestamp},
12 ensure,
13 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20#[derive(Debug, Default)]
23pub struct TransactionTracker {
24 #[debug(skip_if = Option::is_none)]
25 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26 #[debug(skip_if = Vec::is_empty)]
27 oracle_responses: Vec<OracleResponse>,
28 #[debug(skip_if = Vec::is_empty)]
29 outgoing_messages: Vec<OutgoingMessage>,
30 local_time: Timestamp,
32 transaction_index: u32,
34 next_message_index: u32,
35 next_application_index: u32,
36 next_chain_index: u32,
37 events: Vec<Event>,
39 blobs: BTreeMap<BlobId, Blob>,
45 operation_result: Option<Vec<u8>>,
47 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
49 blobs_published: BTreeSet<BlobId>,
51}
52
53#[derive(Debug, Default)]
55pub struct TransactionOutcome {
56 #[debug(skip_if = Vec::is_empty)]
57 pub oracle_responses: Vec<OracleResponse>,
58 #[debug(skip_if = Vec::is_empty)]
59 pub outgoing_messages: Vec<OutgoingMessage>,
60 pub next_message_index: u32,
61 pub next_application_index: u32,
62 pub next_chain_index: u32,
63 pub events: Vec<Event>,
65 pub blobs: Vec<Blob>,
67 pub operation_result: Vec<u8>,
69 pub blobs_published: BTreeSet<BlobId>,
71}
72
73impl TransactionTracker {
74 pub fn new(
75 local_time: Timestamp,
76 transaction_index: u32,
77 next_message_index: u32,
78 next_application_index: u32,
79 next_chain_index: u32,
80 oracle_responses: Option<Vec<OracleResponse>>,
81 ) -> Self {
82 TransactionTracker {
83 local_time,
84 transaction_index,
85 next_message_index,
86 next_application_index,
87 next_chain_index,
88 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
89 ..Self::default()
90 }
91 }
92
93 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
94 self.blobs = blobs;
95 self
96 }
97
98 pub fn local_time(&self) -> Timestamp {
99 self.local_time
100 }
101
102 pub fn set_local_time(&mut self, local_time: Timestamp) {
103 self.local_time = local_time;
104 }
105
106 pub fn transaction_index(&self) -> u32 {
107 self.transaction_index
108 }
109
110 pub fn next_message_index(&self) -> u32 {
111 self.next_message_index
112 }
113
114 pub fn next_application_index(&mut self) -> u32 {
115 let index = self.next_application_index;
116 self.next_application_index += 1;
117 index
118 }
119
120 pub fn next_chain_index(&mut self) -> u32 {
121 let index = self.next_chain_index;
122 self.next_chain_index += 1;
123 index
124 }
125
126 pub fn add_outgoing_message(
127 &mut self,
128 message: OutgoingMessage,
129 ) -> Result<(), ArithmeticError> {
130 self.next_message_index = self
131 .next_message_index
132 .checked_add(1)
133 .ok_or(ArithmeticError::Overflow)?;
134 self.outgoing_messages.push(message);
135 Ok(())
136 }
137
138 pub fn add_outgoing_messages(
139 &mut self,
140 messages: impl IntoIterator<Item = OutgoingMessage>,
141 ) -> Result<(), ArithmeticError> {
142 for message in messages {
143 self.add_outgoing_message(message)?;
144 }
145 Ok(())
146 }
147
148 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
149 self.events.push(Event {
150 stream_id,
151 index,
152 value,
153 });
154 }
155
156 pub fn add_created_blob(&mut self, blob: Blob) {
157 self.blobs.insert(blob.id(), blob);
158 }
159
160 pub fn add_published_blob(&mut self, blob_id: BlobId) {
161 self.blobs_published.insert(blob_id);
162 }
163
164 pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
165 &self.blobs
166 }
167
168 pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
169 self.oracle_responses.push(oracle_response);
170 }
171
172 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
173 self.operation_result = result
174 }
175
176 pub fn add_stream_to_process(
177 &mut self,
178 application_id: ApplicationId,
179 chain_id: ChainId,
180 stream_id: StreamId,
181 previous_index: u32,
182 next_index: u32,
183 ) {
184 if next_index == previous_index {
185 return; }
187 self.streams_to_process
188 .entry(application_id)
189 .or_default()
190 .entry((chain_id, stream_id))
191 .and_modify(|(pi, ni)| {
192 *pi = (*pi).min(previous_index);
193 *ni = (*ni).max(next_index);
194 })
195 .or_insert_with(|| (previous_index, next_index));
196 }
197
198 pub fn remove_stream_to_process(
199 &mut self,
200 application_id: ApplicationId,
201 chain_id: ChainId,
202 stream_id: StreamId,
203 ) {
204 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
205 return;
206 };
207 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
208 self.streams_to_process.remove(&application_id);
209 }
210 }
211
212 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
213 mem::take(&mut self.streams_to_process)
214 .into_iter()
215 .map(|(app_id, streams)| {
216 let updates = streams
217 .into_iter()
218 .map(
219 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
220 chain_id,
221 stream_id,
222 previous_index,
223 next_index,
224 },
225 )
226 .collect();
227 (app_id, updates)
228 })
229 .collect()
230 }
231
232 pub fn replay_oracle_response(
235 &mut self,
236 oracle_response: OracleResponse,
237 ) -> Result<bool, ExecutionError> {
238 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
239 ensure!(
240 recorded_response == oracle_response,
241 ExecutionError::OracleResponseMismatch
242 );
243 true
244 } else {
245 false
246 };
247 self.add_oracle_response(oracle_response);
248 Ok(replaying)
249 }
250
251 pub fn next_replayed_oracle_response(
259 &mut self,
260 ) -> Result<Option<OracleResponse>, ExecutionError> {
261 let Some(responses) = &mut self.replaying_oracle_responses else {
262 return Ok(None); };
264 let response = responses
265 .next()
266 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
267 Ok(Some(response))
268 }
269
270 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
271 let TransactionTracker {
272 replaying_oracle_responses,
273 oracle_responses,
274 outgoing_messages,
275 local_time: _,
276 transaction_index: _,
277 next_message_index,
278 next_application_index,
279 next_chain_index,
280 events,
281 blobs,
282 operation_result,
283 streams_to_process,
284 blobs_published,
285 } = self;
286 ensure!(
287 streams_to_process.is_empty(),
288 ExecutionError::UnprocessedStreams
289 );
290 if let Some(mut responses) = replaying_oracle_responses {
291 ensure!(
292 responses.next().is_none(),
293 ExecutionError::UnexpectedOracleResponse
294 );
295 }
296 Ok(TransactionOutcome {
297 outgoing_messages,
298 oracle_responses,
299 next_message_index,
300 next_application_index,
301 next_chain_index,
302 events,
303 blobs: blobs.into_values().collect(),
304 operation_result: operation_result.unwrap_or_default(),
305 blobs_published,
306 })
307 }
308}
309
310#[cfg(with_testing)]
311impl TransactionTracker {
312 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
315 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, 0, Some(oracle_responses))
316 }
317
318 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
321 where
322 T: IntoIterator,
323 T::Item: std::borrow::Borrow<BlobId>,
324 {
325 use std::borrow::Borrow;
326
327 let oracle_responses = blob_ids
328 .into_iter()
329 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
330 .collect();
331 TransactionTracker::new_replaying(oracle_responses)
332 }
333}