1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10use linera_base::data_types::ArithmeticError;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16 batch::Batch,
17 common::{from_bytes_option_or_default, HasherOutput},
18 context::Context,
19 hashable_wrapper::WrappedHashableContainerView,
20 historical_hash_wrapper::HistoricallyHashableView,
21 store::ReadableKeyValueStore as _,
22 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27 use std::sync::LazyLock;
28
29 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30 use prometheus::HistogramVec;
31
32 pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34 register_histogram_vec(
35 "log_view_hash_runtime",
36 "LogView hash runtime",
37 &[],
38 exponential_bucket_latencies(5.0),
39 )
40 });
41}
42
43#[repr(u8)]
45enum KeyTag {
46 Count = MIN_VIEW_TAG,
48 Index,
50}
51
52#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct LogView<C, T> {
56 #[allocative(skip)]
58 context: C,
59 delete_storage_first: bool,
61 stored_count: u32,
63 new_values: Vec<T>,
65}
66
67impl<C, T> View for LogView<C, T>
68where
69 C: Context,
70 T: Send + Sync + Serialize,
71{
72 const NUM_INIT_KEYS: usize = 1;
73
74 type Context = C;
75
76 fn context(&self) -> C {
77 self.context.clone()
78 }
79
80 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
81 Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
82 }
83
84 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
85 let stored_count =
86 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
87 Ok(Self {
88 context,
89 delete_storage_first: false,
90 stored_count,
91 new_values: Vec::new(),
92 })
93 }
94
95 fn rollback(&mut self) {
96 self.delete_storage_first = false;
97 self.new_values.clear();
98 }
99
100 async fn has_pending_changes(&self) -> bool {
101 if self.delete_storage_first {
102 return true;
103 }
104 !self.new_values.is_empty()
105 }
106
107 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
108 let mut delete_view = false;
109 if self.delete_storage_first {
110 batch.delete_key_prefix(self.context.base_key().bytes.clone());
111 delete_view = true;
112 }
113 if !self.new_values.is_empty() {
114 delete_view = false;
115 let new_values_len =
116 u32::try_from(self.new_values.len()).map_err(|_| ArithmeticError::Overflow)?;
117 let new_count = self
118 .stored_count
119 .checked_add(new_values_len)
120 .ok_or(ArithmeticError::Overflow)?;
121 let mut index = self.stored_count;
122 for value in &self.new_values {
123 let key = self
124 .context
125 .base_key()
126 .derive_tag_key(KeyTag::Index as u8, &index)?;
127 batch.put_key_value(key, value)?;
128 index += 1;
129 }
130 let key = self.context.base_key().base_tag(KeyTag::Count as u8);
131 batch.put_key_value(key, &new_count)?;
132 }
133 Ok(delete_view)
134 }
135
136 fn post_save(&mut self) {
137 if self.delete_storage_first {
138 self.stored_count = 0;
139 }
140 self.stored_count += u32::try_from(self.new_values.len()).expect("verified in pre_save");
141 self.new_values.clear();
142 self.delete_storage_first = false;
143 }
144
145 fn clear(&mut self) {
146 self.delete_storage_first = true;
147 self.new_values.clear();
148 }
149}
150
151impl<C, T> ClonableView for LogView<C, T>
152where
153 C: Context,
154 T: Clone + Send + Sync + Serialize,
155{
156 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
157 Ok(LogView {
158 context: self.context.clone(),
159 delete_storage_first: self.delete_storage_first,
160 stored_count: self.stored_count,
161 new_values: self.new_values.clone(),
162 })
163 }
164}
165
166impl<C, T> LogView<C, T>
167where
168 C: Context,
169{
170 pub fn push(&mut self, value: T) {
182 self.new_values.push(value);
183 }
184
185 pub fn count(&self) -> usize {
199 if self.delete_storage_first {
200 self.new_values.len()
201 } else {
202 self.stored_count as usize + self.new_values.len()
203 }
204 }
205
206 pub fn extra(&self) -> &C::Extra {
208 self.context.extra()
209 }
210}
211
212impl<C, T> LogView<C, T>
213where
214 C: Context,
215 T: Clone + DeserializeOwned + Serialize + Send + Sync,
216{
217 pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
230 let stored_count = self.stored_count as usize;
231 let value = if self.delete_storage_first {
232 self.new_values.get(index).cloned()
233 } else if index < stored_count {
234 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
235 let key = self
236 .context
237 .base_key()
238 .derive_tag_key(KeyTag::Index as u8, &index)?;
239 self.context.store().read_value(&key).await?
240 } else {
241 self.new_values.get(index - stored_count).cloned()
242 };
243 Ok(value)
244 }
245
246 pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
263 let mut result = Vec::new();
264 if self.delete_storage_first {
265 for index in indices {
266 result.push(self.new_values.get(index).cloned());
267 }
268 } else {
269 let stored_count = self.stored_count as usize;
270 let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
271 for (pos, index) in indices.into_iter().enumerate() {
272 if index < stored_count {
273 index_to_positions.entry(index).or_default().push(pos);
274 result.push(None);
275 } else {
276 result.push(self.new_values.get(index - stored_count).cloned());
277 }
278 }
279 let mut keys = Vec::new();
280 let mut vec_positions = Vec::new();
281 for (index, positions) in index_to_positions {
282 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
283 let key = self
284 .context
285 .base_key()
286 .derive_tag_key(KeyTag::Index as u8, &index)?;
287 keys.push(key);
288 vec_positions.push(positions);
289 }
290 let values = self.context.store().read_multi_values(&keys).await?;
291 for (positions, value) in vec_positions.into_iter().zip(values) {
292 if let Some((&last, rest)) = positions.split_last() {
293 for &position in rest {
294 *result.get_mut(position).unwrap() = value.clone();
295 }
296 *result.get_mut(last).unwrap() = value;
297 }
298 }
299 }
300 Ok(result)
301 }
302
303 pub async fn multi_get_pairs(
320 &self,
321 indices: Vec<usize>,
322 ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
323 let values = self.multi_get(indices.clone()).await?;
324 Ok(indices.into_iter().zip(values).collect())
325 }
326
327 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
328 let count = range.len();
329 let mut keys = Vec::with_capacity(count);
330 for index in range {
331 let index = u32::try_from(index).map_err(|_| ArithmeticError::Overflow)?;
332 let key = self
333 .context
334 .base_key()
335 .derive_tag_key(KeyTag::Index as u8, &index)?;
336 keys.push(key);
337 }
338 let mut values = Vec::with_capacity(count);
339 for entry in self.context.store().read_multi_values(&keys).await? {
340 match entry {
341 None => {
342 return Err(ViewError::MissingEntries("LogView".into()));
343 }
344 Some(value) => values.push(value),
345 }
346 }
347 Ok(values)
348 }
349
350 pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
365 where
366 R: RangeBounds<usize>,
367 {
368 let effective_stored_count = if self.delete_storage_first {
369 0
370 } else {
371 self.stored_count as usize
372 };
373 let end = match range.end_bound() {
374 Bound::Included(end) => *end + 1,
375 Bound::Excluded(end) => *end,
376 Bound::Unbounded => self.count(),
377 }
378 .min(self.count());
379 let start = match range.start_bound() {
380 Bound::Included(start) => *start,
381 Bound::Excluded(start) => *start + 1,
382 Bound::Unbounded => 0,
383 };
384 if start >= end {
385 return Ok(Vec::new());
386 }
387 if start < effective_stored_count {
388 if end <= effective_stored_count {
389 self.read_context(start..end).await
390 } else {
391 let mut values = self.read_context(start..effective_stored_count).await?;
392 values.extend(
393 self.new_values[0..(end - effective_stored_count)]
394 .iter()
395 .cloned(),
396 );
397 Ok(values)
398 }
399 } else {
400 Ok(
401 self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
402 .to_vec(),
403 )
404 }
405 }
406}
407
408impl<C, T> HashableView for LogView<C, T>
409where
410 C: Context,
411 T: Send + Sync + Clone + Serialize + DeserializeOwned,
412{
413 type Hasher = sha3::Sha3_256;
414
415 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
416 self.hash().await
417 }
418
419 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
420 #[cfg(with_metrics)]
421 let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
422 let elements = self.read(..).await?;
423 let mut hasher = sha3::Sha3_256::default();
424 hasher.update_with_bcs_bytes(&elements)?;
425 Ok(hasher.finalize())
426 }
427}
428
429pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
431
432pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
434
435#[cfg(not(web))]
436mod graphql {
437 use std::borrow::Cow;
438
439 use linera_base::data_types::ArithmeticError;
440
441 use super::LogView;
442 use crate::{
443 context::Context,
444 graphql::{hash_name, mangle},
445 };
446
447 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
448 fn type_name() -> Cow<'static, str> {
449 format!(
450 "LogView_{}_{:08x}",
451 mangle(T::type_name()),
452 hash_name::<T>()
453 )
454 .into()
455 }
456 }
457
458 #[async_graphql::Object(cache_control(no_cache), name_type)]
459 impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
460 where
461 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
462 {
463 #[graphql(derived(name = "count"))]
464 async fn count_(&self) -> Result<u32, async_graphql::Error> {
465 Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
466 }
467
468 async fn entries(
469 &self,
470 start: Option<usize>,
471 end: Option<usize>,
472 ) -> async_graphql::Result<Vec<T>> {
473 Ok(self
474 .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
475 .await?)
476 }
477 }
478}