1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Range, RangeBounds},
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use serde::{de::DeserializeOwned, Serialize};
13
14use crate::{
15 batch::Batch,
16 common::{from_bytes_option_or_default, HasherOutput},
17 context::Context,
18 hashable_wrapper::WrappedHashableContainerView,
19 historical_hash_wrapper::HistoricallyHashableView,
20 store::ReadableKeyValueStore as _,
21 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
22};
23
24#[cfg(with_metrics)]
25mod metrics {
26 use std::sync::LazyLock;
27
28 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
29 use prometheus::HistogramVec;
30
31 pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
33 register_histogram_vec(
34 "log_view_hash_runtime",
35 "LogView hash runtime",
36 &[],
37 exponential_bucket_latencies(5.0),
38 )
39 });
40}
41
42#[repr(u8)]
44enum KeyTag {
45 Count = MIN_VIEW_TAG,
47 Index,
49}
50
51#[derive(Debug, Allocative)]
53#[allocative(bound = "C, T: Allocative")]
54pub struct LogView<C, T> {
55 #[allocative(skip)]
57 context: C,
58 delete_storage_first: bool,
60 stored_count: usize,
62 new_values: Vec<T>,
64}
65
66impl<C, T> View for LogView<C, T>
67where
68 C: Context,
69 T: Send + Sync + Serialize,
70{
71 const NUM_INIT_KEYS: usize = 1;
72
73 type Context = C;
74
75 fn context(&self) -> C {
76 self.context.clone()
77 }
78
79 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
80 Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
81 }
82
83 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
84 let stored_count =
85 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
86 Ok(Self {
87 context,
88 delete_storage_first: false,
89 stored_count,
90 new_values: Vec::new(),
91 })
92 }
93
94 fn rollback(&mut self) {
95 self.delete_storage_first = false;
96 self.new_values.clear();
97 }
98
99 async fn has_pending_changes(&self) -> bool {
100 if self.delete_storage_first {
101 return true;
102 }
103 !self.new_values.is_empty()
104 }
105
106 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
107 let mut delete_view = false;
108 if self.delete_storage_first {
109 batch.delete_key_prefix(self.context.base_key().bytes.clone());
110 delete_view = true;
111 }
112 if !self.new_values.is_empty() {
113 delete_view = false;
114 let mut count = self.stored_count;
115 for value in &self.new_values {
116 let key = self
117 .context
118 .base_key()
119 .derive_tag_key(KeyTag::Index as u8, &count)?;
120 batch.put_key_value(key, value)?;
121 count += 1;
122 }
123 let key = self.context.base_key().base_tag(KeyTag::Count as u8);
124 batch.put_key_value(key, &count)?;
125 }
126 Ok(delete_view)
127 }
128
129 fn post_save(&mut self) {
130 if self.delete_storage_first {
131 self.stored_count = 0;
132 }
133 self.stored_count += self.new_values.len();
134 self.new_values.clear();
135 self.delete_storage_first = false;
136 }
137
138 fn clear(&mut self) {
139 self.delete_storage_first = true;
140 self.new_values.clear();
141 }
142}
143
144impl<C, T> ClonableView for LogView<C, T>
145where
146 C: Context,
147 T: Clone + Send + Sync + Serialize,
148{
149 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
150 Ok(LogView {
151 context: self.context.clone(),
152 delete_storage_first: self.delete_storage_first,
153 stored_count: self.stored_count,
154 new_values: self.new_values.clone(),
155 })
156 }
157}
158
159impl<C, T> LogView<C, T>
160where
161 C: Context,
162{
163 pub fn push(&mut self, value: T) {
175 self.new_values.push(value);
176 }
177
178 pub fn count(&self) -> usize {
192 if self.delete_storage_first {
193 self.new_values.len()
194 } else {
195 self.stored_count + self.new_values.len()
196 }
197 }
198
199 pub fn extra(&self) -> &C::Extra {
201 self.context.extra()
202 }
203}
204
205impl<C, T> LogView<C, T>
206where
207 C: Context,
208 T: Clone + DeserializeOwned + Serialize + Send + Sync,
209{
210 pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
223 let value = if self.delete_storage_first {
224 self.new_values.get(index).cloned()
225 } else if index < self.stored_count {
226 let key = self
227 .context
228 .base_key()
229 .derive_tag_key(KeyTag::Index as u8, &index)?;
230 self.context.store().read_value(&key).await?
231 } else {
232 self.new_values.get(index - self.stored_count).cloned()
233 };
234 Ok(value)
235 }
236
237 pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
254 let mut result = Vec::new();
255 if self.delete_storage_first {
256 for index in indices {
257 result.push(self.new_values.get(index).cloned());
258 }
259 } else {
260 let mut index_to_positions = BTreeMap::<usize, Vec<usize>>::new();
261 for (pos, index) in indices.into_iter().enumerate() {
262 if index < self.stored_count {
263 index_to_positions.entry(index).or_default().push(pos);
264 result.push(None);
265 } else {
266 result.push(self.new_values.get(index - self.stored_count).cloned());
267 }
268 }
269 let mut keys = Vec::new();
270 let mut vec_positions = Vec::new();
271 for (index, positions) in index_to_positions {
272 let key = self
273 .context
274 .base_key()
275 .derive_tag_key(KeyTag::Index as u8, &index)?;
276 keys.push(key);
277 vec_positions.push(positions);
278 }
279 let values = self.context.store().read_multi_values(&keys).await?;
280 for (positions, value) in vec_positions.into_iter().zip(values) {
281 if let Some((&last, rest)) = positions.split_last() {
282 for &position in rest {
283 *result.get_mut(position).unwrap() = value.clone();
284 }
285 *result.get_mut(last).unwrap() = value;
286 }
287 }
288 }
289 Ok(result)
290 }
291
292 pub async fn multi_get_pairs(
309 &self,
310 indices: Vec<usize>,
311 ) -> Result<Vec<(usize, Option<T>)>, ViewError> {
312 let values = self.multi_get(indices.clone()).await?;
313 Ok(indices.into_iter().zip(values).collect())
314 }
315
316 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
317 let count = range.len();
318 let mut keys = Vec::with_capacity(count);
319 for index in range {
320 let key = self
321 .context
322 .base_key()
323 .derive_tag_key(KeyTag::Index as u8, &index)?;
324 keys.push(key);
325 }
326 let mut values = Vec::with_capacity(count);
327 for entry in self.context.store().read_multi_values(&keys).await? {
328 match entry {
329 None => {
330 return Err(ViewError::MissingEntries("LogView".into()));
331 }
332 Some(value) => values.push(value),
333 }
334 }
335 Ok(values)
336 }
337
338 pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
353 where
354 R: RangeBounds<usize>,
355 {
356 let effective_stored_count = if self.delete_storage_first {
357 0
358 } else {
359 self.stored_count
360 };
361 let end = match range.end_bound() {
362 Bound::Included(end) => *end + 1,
363 Bound::Excluded(end) => *end,
364 Bound::Unbounded => self.count(),
365 }
366 .min(self.count());
367 let start = match range.start_bound() {
368 Bound::Included(start) => *start,
369 Bound::Excluded(start) => *start + 1,
370 Bound::Unbounded => 0,
371 };
372 if start >= end {
373 return Ok(Vec::new());
374 }
375 if start < effective_stored_count {
376 if end <= effective_stored_count {
377 self.read_context(start..end).await
378 } else {
379 let mut values = self.read_context(start..effective_stored_count).await?;
380 values.extend(
381 self.new_values[0..(end - effective_stored_count)]
382 .iter()
383 .cloned(),
384 );
385 Ok(values)
386 }
387 } else {
388 Ok(
389 self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
390 .to_vec(),
391 )
392 }
393 }
394}
395
396impl<C, T> HashableView for LogView<C, T>
397where
398 C: Context,
399 T: Send + Sync + Clone + Serialize + DeserializeOwned,
400{
401 type Hasher = sha3::Sha3_256;
402
403 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
404 self.hash().await
405 }
406
407 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
408 #[cfg(with_metrics)]
409 let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
410 let elements = self.read(..).await?;
411 let mut hasher = sha3::Sha3_256::default();
412 hasher.update_with_bcs_bytes(&elements)?;
413 Ok(hasher.finalize())
414 }
415}
416
417pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
419
420pub type HistoricallyHashedLogView<C, T> = HistoricallyHashableView<C, LogView<C, T>>;
422
423#[cfg(not(web))]
424mod graphql {
425 use std::borrow::Cow;
426
427 use super::LogView;
428 use crate::{
429 context::Context,
430 graphql::{hash_name, mangle},
431 };
432
433 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
434 fn type_name() -> Cow<'static, str> {
435 format!(
436 "LogView_{}_{:08x}",
437 mangle(T::type_name()),
438 hash_name::<T>()
439 )
440 .into()
441 }
442 }
443
444 #[async_graphql::Object(cache_control(no_cache), name_type)]
445 impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
446 where
447 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
448 {
449 #[graphql(derived(name = "count"))]
450 async fn count_(&self) -> Result<u32, async_graphql::Error> {
451 Ok(self.count() as u32)
452 }
453
454 async fn entries(
455 &self,
456 start: Option<usize>,
457 end: Option<usize>,
458 ) -> async_graphql::Result<Vec<T>> {
459 Ok(self
460 .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
461 .await?)
462 }
463 }
464}