1use std::{
5 collections::{BTreeMap, BTreeSet},
6 mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11 data_types::{ArithmeticError, Blob, Event, OracleResponse, StreamUpdate, Timestamp},
12 ensure,
13 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20#[derive(Debug, Default)]
23pub struct TransactionTracker {
24 #[debug(skip_if = Option::is_none)]
25 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26 #[debug(skip_if = Vec::is_empty)]
27 oracle_responses: Vec<OracleResponse>,
28 #[debug(skip_if = Vec::is_empty)]
29 outgoing_messages: Vec<OutgoingMessage>,
30 local_time: Timestamp,
32 transaction_index: u32,
34 next_application_index: u32,
35 next_chain_index: u32,
36 events: Vec<Event>,
38 blobs: BTreeMap<BlobId, Blob>,
44 operation_result: Option<Vec<u8>>,
46 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
48 blobs_published: BTreeSet<BlobId>,
50}
51
52#[derive(Debug, Default)]
54pub struct TransactionOutcome {
55 #[debug(skip_if = Vec::is_empty)]
56 pub oracle_responses: Vec<OracleResponse>,
57 #[debug(skip_if = Vec::is_empty)]
58 pub outgoing_messages: Vec<OutgoingMessage>,
59 pub next_application_index: u32,
60 pub next_chain_index: u32,
61 pub events: Vec<Event>,
63 pub blobs: Vec<Blob>,
65 pub operation_result: Vec<u8>,
67 pub blobs_published: BTreeSet<BlobId>,
69}
70
71impl TransactionTracker {
72 pub fn new(
73 local_time: Timestamp,
74 transaction_index: u32,
75 next_application_index: u32,
76 next_chain_index: u32,
77 oracle_responses: Option<Vec<OracleResponse>>,
78 ) -> Self {
79 TransactionTracker {
80 local_time,
81 transaction_index,
82 next_application_index,
83 next_chain_index,
84 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
85 ..Self::default()
86 }
87 }
88
89 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
90 self.blobs = blobs;
91 self
92 }
93
94 pub fn local_time(&self) -> Timestamp {
95 self.local_time
96 }
97
98 pub fn set_local_time(&mut self, local_time: Timestamp) {
99 self.local_time = local_time;
100 }
101
102 pub fn transaction_index(&self) -> u32 {
103 self.transaction_index
104 }
105
106 pub fn next_application_index(&mut self) -> u32 {
107 let index = self.next_application_index;
108 self.next_application_index += 1;
109 index
110 }
111
112 pub fn next_chain_index(&mut self) -> u32 {
113 let index = self.next_chain_index;
114 self.next_chain_index += 1;
115 index
116 }
117
118 pub fn add_outgoing_message(
119 &mut self,
120 message: OutgoingMessage,
121 ) -> Result<(), ArithmeticError> {
122 self.outgoing_messages.push(message);
123 Ok(())
124 }
125
126 pub fn add_outgoing_messages(
127 &mut self,
128 messages: impl IntoIterator<Item = OutgoingMessage>,
129 ) -> Result<(), ArithmeticError> {
130 for message in messages {
131 self.add_outgoing_message(message)?;
132 }
133 Ok(())
134 }
135
136 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
137 self.events.push(Event {
138 stream_id,
139 index,
140 value,
141 });
142 }
143
144 pub fn add_created_blob(&mut self, blob: Blob) {
145 self.blobs.insert(blob.id(), blob);
146 }
147
148 pub fn add_published_blob(&mut self, blob_id: BlobId) {
149 self.blobs_published.insert(blob_id);
150 }
151
152 pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
153 &self.blobs
154 }
155
156 pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
157 self.oracle_responses.push(oracle_response);
158 }
159
160 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
161 self.operation_result = result
162 }
163
164 pub fn add_stream_to_process(
165 &mut self,
166 application_id: ApplicationId,
167 chain_id: ChainId,
168 stream_id: StreamId,
169 previous_index: u32,
170 next_index: u32,
171 ) {
172 if next_index == previous_index {
173 return; }
175 self.streams_to_process
176 .entry(application_id)
177 .or_default()
178 .entry((chain_id, stream_id))
179 .and_modify(|(pi, ni)| {
180 *pi = (*pi).min(previous_index);
181 *ni = (*ni).max(next_index);
182 })
183 .or_insert_with(|| (previous_index, next_index));
184 }
185
186 pub fn remove_stream_to_process(
187 &mut self,
188 application_id: ApplicationId,
189 chain_id: ChainId,
190 stream_id: StreamId,
191 ) {
192 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
193 return;
194 };
195 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
196 self.streams_to_process.remove(&application_id);
197 }
198 }
199
200 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
201 mem::take(&mut self.streams_to_process)
202 .into_iter()
203 .map(|(app_id, streams)| {
204 let updates = streams
205 .into_iter()
206 .map(
207 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
208 chain_id,
209 stream_id,
210 previous_index,
211 next_index,
212 },
213 )
214 .collect();
215 (app_id, updates)
216 })
217 .collect()
218 }
219
220 pub fn replay_oracle_response(
223 &mut self,
224 oracle_response: OracleResponse,
225 ) -> Result<bool, ExecutionError> {
226 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
227 ensure!(
228 recorded_response == oracle_response,
229 ExecutionError::OracleResponseMismatch
230 );
231 true
232 } else {
233 false
234 };
235 self.add_oracle_response(oracle_response);
236 Ok(replaying)
237 }
238
239 pub fn next_replayed_oracle_response(
247 &mut self,
248 ) -> Result<Option<OracleResponse>, ExecutionError> {
249 let Some(responses) = &mut self.replaying_oracle_responses else {
250 return Ok(None); };
252 let response = responses
253 .next()
254 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
255 Ok(Some(response))
256 }
257
258 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
259 let TransactionTracker {
260 replaying_oracle_responses,
261 oracle_responses,
262 outgoing_messages,
263 local_time: _,
264 transaction_index: _,
265 next_application_index,
266 next_chain_index,
267 events,
268 blobs,
269 operation_result,
270 streams_to_process,
271 blobs_published,
272 } = self;
273 ensure!(
274 streams_to_process.is_empty(),
275 ExecutionError::UnprocessedStreams
276 );
277 if let Some(mut responses) = replaying_oracle_responses {
278 ensure!(
279 responses.next().is_none(),
280 ExecutionError::UnexpectedOracleResponse
281 );
282 }
283 Ok(TransactionOutcome {
284 outgoing_messages,
285 oracle_responses,
286 next_application_index,
287 next_chain_index,
288 events,
289 blobs: blobs.into_values().collect(),
290 operation_result: operation_result.unwrap_or_default(),
291 blobs_published,
292 })
293 }
294}
295
296#[cfg(with_testing)]
297impl TransactionTracker {
298 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
301 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses))
302 }
303
304 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
307 where
308 T: IntoIterator,
309 T::Item: std::borrow::Borrow<BlobId>,
310 {
311 use std::borrow::Borrow;
312
313 let oracle_responses = blob_ids
314 .into_iter()
315 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
316 .collect();
317 TransactionTracker::new_replaying(oracle_responses)
318 }
319}