1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10use linera_base::data_types::ArithmeticError;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16 batch::Batch,
17 common::{from_bytes_option_or_default, HasherOutput},
18 context::Context,
19 hashable_wrapper::WrappedHashableContainerView,
20 historical_hash_wrapper::HistoricallyHashableView,
21 store::ReadableKeyValueStore as _,
22 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27 use std::sync::LazyLock;
28
29 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30 use prometheus::HistogramVec;
31
32 pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34 register_histogram_vec(
35 "log_view_hash_runtime",
36 "LogView hash runtime",
37 &[],
38 exponential_bucket_latencies(5.0),
39 )
40 });
41}
42
43#[repr(u8)]
45enum KeyTag {
46 Count = MIN_VIEW_TAG,
48 Index,
50}
51
52#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct LogView<C, T> {
56 #[allocative(skip)]
58 context: C,
59 delete_storage_first: bool,
61 stored_count: u32,
63 new_values: Vec<T>,
65}
66
67impl<C, T> View for LogView<C, T>
68where
69 C: Context,
70 T: Send + Sync + Serialize,
71{
72 const NUM_INIT_KEYS: usize = 1;
73
74 type Context = C;
75
76 fn context(&self) -> C {
77 self.context.clone()
78 }
79
80 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
81 Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
82 }
83
84 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
85 let stored_count =
86 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
87 Ok(Self {
88 context,
89 delete_storage_first: false,
90 stored_count,
91 new_values: Vec::new(),
92 })
93 }
94
95 fn rollback(&mut self) {
96 self.delete_storage_first = false;
97 self.new_values.clear();
98 }
99
100 async fn has_pending_changes(&self) -> bool {
101 if self.delete_storage_first {
102 return true;
103 }
104 !self.new_values.is_empty()
105 }
106
107 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
108 let mut delete_view = false;
109 if self.delete_storage_first {
110 batch.delete_key_prefix(self.context.base_key().bytes.clone());
111 delete_view = true;
112 }
113 if !self.new_values.is_empty() {
114 delete_view = false;
115 let new_values_len =
116 u32::try_from(self.new_values.len()).map_err(|_| ArithmeticError::Overflow)?;
117 let new_count = self
118 .stored_count
119 .checked_add(new_values_len)
120 .ok_or(ArithmeticError::Overflow)?;
121 for (index, value) in (self.stored_count..).zip(&self.new_values) {
122 let key = self
123 .context
124 .base_key()
125 .derive_tag_key(KeyTag::Index as u8, &index)?;
126 batch.put_key_value(key, value)?;
127 }
128 let key = self.context.base_key().base_tag(KeyTag::Count as u8);
129 batch.put_key_value(key, &new_count)?;
130 }
131 Ok(delete_view)
132 }
133
134 fn post_save(&mut self) {
135 if self.delete_storage_first {
136 self.stored_count = 0;
137 }
138 self.stored_count += u32::try_from(self.new_values.len()).expect("verified in pre_save");
139 self.new_values.clear();
140 self.delete_storage_first = false;
141 }
142
143 fn clear(&mut self) {
144 self.delete_storage_first = true;
145 self.new_values.clear();
146 }
147}
148
149impl<C, T> ClonableView for LogView<C, T>
150where
151 C: Context,
152 T: Clone + Send + Sync + Serialize,
153{
154 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
155 Ok(LogView {
156 context: self.context.clone(),
157 delete_storage_first: self.delete_storage_first,
158 stored_count: self.stored_count,
159 new_values: self.new_values.clone(),
160 })
161 }
162}
163
164impl<C, T> LogView<C, T>
165where
166 C: Context,
167{
168 pub fn push(&mut self, value: T) {
180 self.new_values.push(value);
181 }
182
183 pub fn count(&self) -> usize {
197 if self.delete_storage_first {
198 self.new_values.len()
199 } else {
200 self.stored_count as usize + self.new_values.len()
201 }
202 }
203
204 pub fn extra(&self) -> &C::Extra {
206 self.context.extra()
207 }
208}
209
210impl<C, T> LogView<C, T>
211where
212 C: Context,
213 T: Clone + DeserializeOwned + Serialize + Send + Sync,
214{
215 pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
228 let stored_count = self.stored_count as usize;
229 let value = if self.delete_storage_first {
230 self.new_values.get(index).cloned()
231 } else if index < stored_count {
232 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
233 let key = self
234 .context
235 .base_key()
236 .derive_tag_key(KeyTag::Index as u8, &index)?;
237 self.context.store().read_value(&key).await?
238 } else {
239 self.new_values.get(index - stored_count).cloned()
240 };
241 Ok(value)
242 }
243
244 pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
261 let mut result = Vec::new();
262 if self.delete_storage_first {
263 for index in indices {
264 result.push(self.new_values.get(index).cloned());
265 }
266 } else {
267 let stored_count = self.stored_count as usize;
268 let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
269 for (pos, index) in indices.into_iter().enumerate() {
270 if index < stored_count {
271 index_to_positions.entry(index).or_default().push(pos);
272 result.push(None);
273 } else {
274 result.push(self.new_values.get(index - stored_count).cloned());
275 }
276 }
277 let mut keys = Vec::new();
278 let mut vec_positions = Vec::new();
279 for (index, positions) in index_to_positions {
280 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
281 let key = self
282 .context
283 .base_key()
284 .derive_tag_key(KeyTag::Index as u8, &index)?;
285 keys.push(key);
286 vec_positions.push(positions);
287 }
288 let values = self.context.store().read_multi_values(&keys).await?;
289 for (positions, value) in vec_positions.into_iter().zip(values) {
290 if let Some((&last, rest)) = positions.split_last() {
291 for &position in rest {
292 *result.get_mut(position).unwrap() = value.clone();
293 }
294 *result.get_mut(last).unwrap() = value;
295 }
296 }
297 }
298 Ok(result)
299 }
300
301 pub async fn multi_get_pairs(
318 &self,
319 indices: Vec<usize>,
320 ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
321 let values = self.multi_get(indices.clone()).await?;
322 Ok(indices.into_iter().zip(values).collect())
323 }
324
325 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
326 let count = range.len();
327 let mut keys = Vec::with_capacity(count);
328 for index in range {
329 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
330 let key = self
331 .context
332 .base_key()
333 .derive_tag_key(KeyTag::Index as u8, &index)?;
334 keys.push(key);
335 }
336 let mut values = Vec::with_capacity(count);
337 for entry in self.context.store().read_multi_values(&keys).await? {
338 match entry {
339 None => {
340 return Err(ViewError::MissingEntries("LogView".into()));
341 }
342 Some(value) => values.push(value),
343 }
344 }
345 Ok(values)
346 }
347
348 pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
363 where
364 R: RangeBounds<usize>,
365 {
366 let effective_stored_count = if self.delete_storage_first {
367 0
368 } else {
369 self.stored_count as usize
370 };
371 let end = match range.end_bound() {
372 Bound::Included(end) => *end + 1,
373 Bound::Excluded(end) => *end,
374 Bound::Unbounded => self.count(),
375 }
376 .min(self.count());
377 let start = match range.start_bound() {
378 Bound::Included(start) => *start,
379 Bound::Excluded(start) => *start + 1,
380 Bound::Unbounded => 0,
381 };
382 if start >= end {
383 return Ok(Vec::new());
384 }
385 if start < effective_stored_count {
386 if end <= effective_stored_count {
387 self.read_context(start..end).await
388 } else {
389 let mut values = self.read_context(start..effective_stored_count).await?;
390 values.extend(
391 self.new_values[0..(end - effective_stored_count)]
392 .iter()
393 .cloned(),
394 );
395 Ok(values)
396 }
397 } else {
398 Ok(
399 self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
400 .to_vec(),
401 )
402 }
403 }
404}
405
406impl<C, T> HashableView for LogView<C, T>
407where
408 C: Context,
409 T: Send + Sync + Clone + Serialize + DeserializeOwned,
410{
411 type Hasher = sha3::Sha3_256;
412
413 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
414 self.hash().await
415 }
416
417 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
418 #[cfg(with_metrics)]
419 let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
420 let elements = self.read(..).await?;
421 let mut hasher = sha3::Sha3_256::default();
422 hasher.update_with_bcs_bytes(&elements)?;
423 Ok(hasher.finalize())
424 }
425}
426
427pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
429
430pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
432
433#[cfg(not(web))]
434mod graphql {
435 use std::borrow::Cow;
436
437 use linera_base::data_types::ArithmeticError;
438
439 use super::LogView;
440 use crate::{
441 context::Context,
442 graphql::{hash_name, mangle},
443 };
444
445 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
446 fn type_name() -> Cow<'static, str> {
447 format!(
448 "LogView_{}_{:08x}",
449 mangle(T::type_name()),
450 hash_name::<T>()
451 )
452 .into()
453 }
454 }
455
456 #[async_graphql::Object(cache_control(no_cache), name_type)]
457 impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
458 where
459 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
460 {
461 #[graphql(derived(name = "count"))]
462 async fn count_(&self) -> Result<u32, async_graphql::Error> {
463 Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
464 }
465
466 async fn entries(
467 &self,
468 start: Option<usize>,
469 end: Option<usize>,
470 ) -> async_graphql::Result<Vec<T>> {
471 Ok(self
472 .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
473 .await?)
474 }
475 }
476}