1use std::ops::{Bound, Range, RangeBounds};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Serialize};
9
10use crate::{
11 batch::Batch,
12 common::{from_bytes_option_or_default, HasherOutput},
13 context::Context,
14 hashable_wrapper::WrappedHashableContainerView,
15 store::ReadableKeyValueStore as _,
16 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
17};
18
19#[cfg(with_metrics)]
20mod metrics {
21 use std::sync::LazyLock;
22
23 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
24 use prometheus::HistogramVec;
25
26 pub static LOG_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
28 register_histogram_vec(
29 "log_view_hash_runtime",
30 "LogView hash runtime",
31 &[],
32 exponential_bucket_latencies(5.0),
33 )
34 });
35}
36
37#[repr(u8)]
39enum KeyTag {
40 Count = MIN_VIEW_TAG,
42 Index,
44}
45
46#[derive(Debug)]
48pub struct LogView<C, T> {
49 context: C,
50 delete_storage_first: bool,
51 stored_count: usize,
52 new_values: Vec<T>,
53}
54
55impl<C, T> View for LogView<C, T>
56where
57 C: Context,
58 T: Send + Sync + Serialize,
59{
60 const NUM_INIT_KEYS: usize = 1;
61
62 type Context = C;
63
64 fn context(&self) -> &C {
65 &self.context
66 }
67
68 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
69 Ok(vec![context.base_key().base_tag(KeyTag::Count as u8)])
70 }
71
72 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
73 let stored_count =
74 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
75 Ok(Self {
76 context,
77 delete_storage_first: false,
78 stored_count,
79 new_values: Vec::new(),
80 })
81 }
82
83 async fn load(context: C) -> Result<Self, ViewError> {
84 let keys = Self::pre_load(&context)?;
85 let values = context.store().read_multi_values_bytes(keys).await?;
86 Self::post_load(context, &values)
87 }
88
89 fn rollback(&mut self) {
90 self.delete_storage_first = false;
91 self.new_values.clear();
92 }
93
94 async fn has_pending_changes(&self) -> bool {
95 if self.delete_storage_first {
96 return true;
97 }
98 !self.new_values.is_empty()
99 }
100
101 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
102 let mut delete_view = false;
103 if self.delete_storage_first {
104 batch.delete_key_prefix(self.context.base_key().bytes.clone());
105 self.stored_count = 0;
106 delete_view = true;
107 }
108 if !self.new_values.is_empty() {
109 delete_view = false;
110 for value in &self.new_values {
111 let key = self
112 .context
113 .base_key()
114 .derive_tag_key(KeyTag::Index as u8, &self.stored_count)?;
115 batch.put_key_value(key, value)?;
116 self.stored_count += 1;
117 }
118 let key = self.context.base_key().base_tag(KeyTag::Count as u8);
119 batch.put_key_value(key, &self.stored_count)?;
120 self.new_values.clear();
121 }
122 self.delete_storage_first = false;
123 Ok(delete_view)
124 }
125
126 fn clear(&mut self) {
127 self.delete_storage_first = true;
128 self.new_values.clear();
129 }
130}
131
132impl<C, T> ClonableView for LogView<C, T>
133where
134 C: Context,
135 T: Clone + Send + Sync + Serialize,
136{
137 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
138 Ok(LogView {
139 context: self.context.clone(),
140 delete_storage_first: self.delete_storage_first,
141 stored_count: self.stored_count,
142 new_values: self.new_values.clone(),
143 })
144 }
145}
146
147impl<C, T> LogView<C, T>
148where
149 C: Context,
150{
151 pub fn push(&mut self, value: T) {
163 self.new_values.push(value);
164 }
165
166 pub fn count(&self) -> usize {
180 if self.delete_storage_first {
181 self.new_values.len()
182 } else {
183 self.stored_count + self.new_values.len()
184 }
185 }
186
187 pub fn extra(&self) -> &C::Extra {
189 self.context.extra()
190 }
191}
192
193impl<C, T> LogView<C, T>
194where
195 C: Context,
196 T: Clone + DeserializeOwned + Serialize + Send + Sync,
197{
198 pub async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
211 let value = if self.delete_storage_first {
212 self.new_values.get(index).cloned()
213 } else if index < self.stored_count {
214 let key = self
215 .context
216 .base_key()
217 .derive_tag_key(KeyTag::Index as u8, &index)?;
218 self.context.store().read_value(&key).await?
219 } else {
220 self.new_values.get(index - self.stored_count).cloned()
221 };
222 Ok(value)
223 }
224
225 pub async fn multi_get(&self, indices: Vec<usize>) -> Result<Vec<Option<T>>, ViewError> {
242 let mut result = Vec::new();
243 if self.delete_storage_first {
244 for index in indices {
245 result.push(self.new_values.get(index).cloned());
246 }
247 } else {
248 let mut keys = Vec::new();
249 let mut positions = Vec::new();
250 for (pos, index) in indices.into_iter().enumerate() {
251 if index < self.stored_count {
252 let key = self
253 .context
254 .base_key()
255 .derive_tag_key(KeyTag::Index as u8, &index)?;
256 keys.push(key);
257 positions.push(pos);
258 result.push(None);
259 } else {
260 result.push(self.new_values.get(index - self.stored_count).cloned());
261 }
262 }
263 let values = self.context.store().read_multi_values(keys).await?;
264 for (pos, value) in positions.into_iter().zip(values) {
265 *result.get_mut(pos).unwrap() = value;
266 }
267 }
268 Ok(result)
269 }
270
271 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
272 let count = range.len();
273 let mut keys = Vec::with_capacity(count);
274 for index in range {
275 let key = self
276 .context
277 .base_key()
278 .derive_tag_key(KeyTag::Index as u8, &index)?;
279 keys.push(key);
280 }
281 let mut values = Vec::with_capacity(count);
282 for entry in self.context.store().read_multi_values(keys).await? {
283 match entry {
284 None => {
285 return Err(ViewError::MissingEntries);
286 }
287 Some(value) => values.push(value),
288 }
289 }
290 Ok(values)
291 }
292
293 pub async fn read<R>(&self, range: R) -> Result<Vec<T>, ViewError>
308 where
309 R: RangeBounds<usize>,
310 {
311 let effective_stored_count = if self.delete_storage_first {
312 0
313 } else {
314 self.stored_count
315 };
316 let end = match range.end_bound() {
317 Bound::Included(end) => *end + 1,
318 Bound::Excluded(end) => *end,
319 Bound::Unbounded => self.count(),
320 }
321 .min(self.count());
322 let start = match range.start_bound() {
323 Bound::Included(start) => *start,
324 Bound::Excluded(start) => *start + 1,
325 Bound::Unbounded => 0,
326 };
327 if start >= end {
328 return Ok(Vec::new());
329 }
330 if start < effective_stored_count {
331 if end <= effective_stored_count {
332 self.read_context(start..end).await
333 } else {
334 let mut values = self.read_context(start..effective_stored_count).await?;
335 values.extend(
336 self.new_values[0..(end - effective_stored_count)]
337 .iter()
338 .cloned(),
339 );
340 Ok(values)
341 }
342 } else {
343 Ok(
344 self.new_values[(start - effective_stored_count)..(end - effective_stored_count)]
345 .to_vec(),
346 )
347 }
348 }
349}
350
351impl<C, T> HashableView for LogView<C, T>
352where
353 C: Context,
354 T: Send + Sync + Clone + Serialize + DeserializeOwned,
355{
356 type Hasher = sha3::Sha3_256;
357
358 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
359 self.hash().await
360 }
361
362 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
363 #[cfg(with_metrics)]
364 let _hash_latency = metrics::LOG_VIEW_HASH_RUNTIME.measure_latency();
365 let elements = self.read(..).await?;
366 let mut hasher = sha3::Sha3_256::default();
367 hasher.update_with_bcs_bytes(&elements)?;
368 Ok(hasher.finalize())
369 }
370}
371
372pub type HashedLogView<C, T> = WrappedHashableContainerView<C, LogView<C, T>, HasherOutput>;
374
375#[cfg(not(web))]
376mod graphql {
377 use std::borrow::Cow;
378
379 use super::LogView;
380 use crate::{
381 context::Context,
382 graphql::{hash_name, mangle},
383 };
384
385 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for LogView<C, T> {
386 fn type_name() -> Cow<'static, str> {
387 format!(
388 "LogView_{}_{:08x}",
389 mangle(T::type_name()),
390 hash_name::<T>()
391 )
392 .into()
393 }
394 }
395
396 #[async_graphql::Object(cache_control(no_cache), name_type)]
397 impl<C: Context, T: async_graphql::OutputType> LogView<C, T>
398 where
399 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
400 {
401 async fn entries(
402 &self,
403 start: Option<usize>,
404 end: Option<usize>,
405 ) -> async_graphql::Result<Vec<T>> {
406 Ok(self
407 .read(start.unwrap_or_default()..end.unwrap_or_else(|| self.count()))
408 .await?)
409 }
410 }
411}