1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Range, RangeBounds},
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14 batch::Batch,
15 common::{from_bytes_option_or_default, HasherOutput},
16 context::Context,
17 hashable_wrapper::WrappedHashableContainerView,
18 historical_hash_wrapper::HistoricallyHashableView,
19 store::ReadableKeyValueStore as _,
20 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25 use std::sync::LazyLock;
26
27 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28 use prometheus::HistogramVec;
29
30 pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
32 register_histogram_vec(
33 "log_view_hash_runtime",
34 "LogView hash runtime",
35 &[],
36 exponential_bucket_latencies(5.0),
37 )
38 });
39}
40
41#[repr(u8)]
43enum KeyTag {
44 Count = MIN_VIEW_TAG,
46 Index,
48}
49
50#[derive(Debug)]
52pub struct LogView<C, T> {
53 context: C,
54 delete_storage_first: bool,
55 stored_count: usize,
56 new_values: Vec<T>,
57}
58
59impl<C, T> View for LogView<C, T>
60where
61 C: Context,
62 T: Send + Sync + Serialize,
63{
64 const NUM_INIT_KEYS: usize = 1;
65
66 type Context = C;
67
68 fn context(&self) -> &C {
69 &self.context
70 }
71
72 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
73 Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
74 }
75
76 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
77 let stored_count =
78 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
79 Ok(Self {
80 context,
81 delete_storage_first: false,
82 stored_count,
83 new_values: Vec::new(),
84 })
85 }
86
87 fn rollback(&mut self) {
88 self.delete_storage_first = false;
89 self.new_values.clear();
90 }
91
92 async fn has_pending_changes(&self) -> bool {
93 if self.delete_storage_first {
94 return true;
95 }
96 !self.new_values.is_empty()
97 }
98
99 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
100 let mut delete_view = false;
101 if self.delete_storage_first {
102 batch.delete_key_prefix(self.context.base_key().bytes.clone());
103 self.stored_count = 0;
104 delete_view = true;
105 }
106 if !self.new_values.is_empty() {
107 delete_view = false;
108 for value in &self.new_values {
109 let key = self
110 .context
111 .base_key()
112 .derive_tag_key(KeyTag::Index as u8, &self.stored_count)?;
113 batch.put_key_value(key, value)?;
114 self.stored_count += 1;
115 }
116 let key = self.context.base_key().base_tag(KeyTag::Count as u8);
117 batch.put_key_value(key, &self.stored_count)?;
118 self.new_values.clear();
119 }
120 self.delete_storage_first = false;
121 Ok(delete_view)
122 }
123
124 fn clear(&mut self) {
125 self.delete_storage_first = true;
126 self.new_values.clear();
127 }
128}
129
130impl<C, T> ClonableView for LogView<C, T>
131where
132 C: Context,
133 T: Clone + Send + Sync + Serialize,
134{
135 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
136 Ok(LogView {
137 context: self.context.clone(),
138 delete_storage_first: self.delete_storage_first,
139 stored_count: self.stored_count,
140 new_values: self.new_values.clone(),
141 })
142 }
143}
144
145impl<C, T> LogView<C, T>
146where
147 C: Context,
148{
149 pub fn push(&mut self, value: T) {
161 self.new_values.push(value);
162 }
163
164 pub fn count(&self) -> usize {
178 if self.delete_storage_first {
179 self.new_values.len()
180 } else {
181 self.stored_count + self.new_values.len()
182 }
183 }
184
185 pub fn extra(&self) -> &C::Extra {
187 self.context.extra()
188 }
189}
190
191impl<C, T> LogView<C, T>
192where
193 C: Context,
194 T: Clone + DeserializeOwned + Serialize + Send + Sync,
195{
196 pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
209 let value = if self.delete_storage_first {
210 self.new_values.get(index).cloned()
211 } else if index < self.stored_count {
212 let key = self
213 .context
214 .base_key()
215 .derive_tag_key(KeyTag::Index as u8, &index)?;
216 self.context.store().read_value(&key).await?
217 } else {
218 self.new_values.get(index - self.stored_count).cloned()
219 };
220 Ok(value)
221 }
222
223 pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
240 let mut result = Vec::new();
241 if self.delete_storage_first {
242 for index in indices {
243 result.push(self.new_values.get(index).cloned());
244 }
245 } else {
246 let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
247 for (pos, index) in indices.into_iter().enumerate() {
248 if index < self.stored_count {
249 index_to_positions.entry(index).or_default().push(pos);
250 result.push(None);
251 } else {
252 result.push(self.new_values.get(index - self.stored_count).cloned());
253 }
254 }
255 let mut keys = Vec::new();
256 let mut vec_positions = Vec::new();
257 for (index, positions) in index_to_positions {
258 let key = self
259 .context
260 .base_key()
261 .derive_tag_key(KeyTag::Index as u8, &index)?;
262 keys.push(key);
263 vec_positions.push(positions);
264 }
265 let values = self.context.store().read_multi_values(keys).await?;
266 for (positions, value) in vec_positions.into_iter().zip(values) {
267 if let Some((&last, rest)) = positions.split_last() {
268 for &position in rest {
269 *result.get_mut(position).unwrap() = value.clone();
270 }
271 *result.get_mut(last).unwrap() = value;
272 }
273 }
274 }
275 Ok(result)
276 }
277
278 pub async fn multi_get_pairs(
295 &self,
296 indices: Vec<usize>,
297 ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
298 let values = self.multi_get(indices.clone()).await?;
299 Ok(indices.into_iter().zip(values).collect())
300 }
301
302 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
303 let count = range.len();
304 let mut keys = Vec::with_capacity(count);
305 for index in range {
306 let key = self
307 .context
308 .base_key()
309 .derive_tag_key(KeyTag::Index as u8, &index)?;
310 keys.push(key);
311 }
312 let mut values = Vec::with_capacity(count);
313 for entry in self.context.store().read_multi_values(keys).await? {
314 match entry {
315 None => {
316 let root_key = self.context.store().root_key()?;
317 return Err(ViewError::MissingEntries(root_key));
318 }
319 Some(value) => values.push(value),
320 }
321 }
322 Ok(values)
323 }
324
325 pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
340 where
341 R: RangeBounds<usize>,
342 {
343 let effective_stored_count = if self.delete_storage_first {
344 0
345 } else {
346 self.stored_count
347 };
348 let end = match range.end_bound() {
349 Bound::Included(end) => *end + 1,
350 Bound::Excluded(end) => *end,
351 Bound::Unbounded => self.count(),
352 }
353 .min(self.count());
354 let start = match range.start_bound() {
355 Bound::Included(start) => *start,
356 Bound::Excluded(start) => *start + 1,
357 Bound::Unbounded => 0,
358 };
359 if start >= end {
360 return Ok(Vec::new());
361 }
362 if start < effective_stored_count {
363 if end <= effective_stored_count {
364 self.read_context(start..end).await
365 } else {
366 let mut values = self.read_context(start..effective_stored_count).await?;
367 values.extend(
368 self.new_values[0..(end - effective_stored_count)]
369 .iter()
370 .cloned(),
371 );
372 Ok(values)
373 }
374 } else {
375 Ok(
376 self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
377 .to_vec(),
378 )
379 }
380 }
381}
382
383impl<C, T> HashableView for LogView<C, T>
384where
385 C: Context,
386 T: Send + Sync + Clone + Serialize + DeserializeOwned,
387{
388 type Hasher = sha3::Sha3_256;
389
390 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
391 self.hash().await
392 }
393
394 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
395 #[cfg(with_metrics)]
396 let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
397 let elements = self.read(..).await?;
398 let mut hasher = sha3::Sha3_256::default();
399 hasher.update_with_bcs_bytes(&elements)?;
400 Ok(hasher.finalize())
401 }
402}
403
404pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
406
407pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
409
410#[cfg(not(web))]
411mod graphql {
412 use std::borrow::Cow;
413
414 use super::LogView;
415 use crate::{
416 context::Context,
417 graphql::{hash_name, mangle},
418 };
419
420 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
421 fn type_name() -> Cow<'static, str> {
422 format!(
423 "LogView_{}_{:08x}",
424 mangle(T::type_name()),
425 hash_name::<T>()
426 )
427 .into()
428 }
429 }
430
431 #[async_graphql::Object(cache_control(no_cache), name_type)]
432 impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
433 where
434 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
435 {
436 #[graphql(derived(name = "count"))]
437 async fn count_(&self) -> Result<u32, async_graphql::Error> {
438 Ok(self.count() as u32)
439 }
440
441 async fn entries(
442 &self,
443 start: Option<usize>,
444 end: Option<usize>,
445 ) -> async_graphql::Result<Vec<T>> {
446 Ok(self
447 .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
448 .await?)
449 }
450 }
451}