linera_views/backends/
metering.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds metrics to a key-value store.
5
6use std::{
7    collections::{btree_map::Entry, BTreeMap},
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13    register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueDatabase;
19use crate::{
20    batch::Batch,
21    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[derive(Clone)]
25/// The implementation of the `KeyValueStoreMetrics` for the `KeyValueStore`.
26pub struct KeyValueStoreMetrics {
27    read_value_bytes_latency: HistogramVec,
28    contains_key_latency: HistogramVec,
29    contains_keys_latency: HistogramVec,
30    read_multi_values_bytes_latency: HistogramVec,
31    find_keys_by_prefix_latency: HistogramVec,
32    find_key_values_by_prefix_latency: HistogramVec,
33    write_batch_latency: HistogramVec,
34    clear_journal_latency: HistogramVec,
35    connect_latency: HistogramVec,
36    open_shared_latency: HistogramVec,
37    open_exclusive_latency: HistogramVec,
38    list_all_latency: HistogramVec,
39    list_root_keys_latency: HistogramVec,
40    delete_all_latency: HistogramVec,
41    exists_latency: HistogramVec,
42    create_latency: HistogramVec,
43    delete_latency: HistogramVec,
44    read_value_none_cases: IntCounterVec,
45    read_value_key_size: HistogramVec,
46    read_value_value_size: HistogramVec,
47    read_multi_values_num_entries: HistogramVec,
48    read_multi_values_key_sizes: HistogramVec,
49    contains_keys_num_entries: HistogramVec,
50    contains_keys_key_sizes: HistogramVec,
51    contains_key_key_size: HistogramVec,
52    find_keys_by_prefix_prefix_size: HistogramVec,
53    find_keys_by_prefix_num_keys: HistogramVec,
54    find_keys_by_prefix_keys_size: HistogramVec,
55    find_key_values_by_prefix_prefix_size: HistogramVec,
56    find_key_values_by_prefix_num_keys: HistogramVec,
57    find_key_values_by_prefix_key_values_size: HistogramVec,
58    write_batch_size: HistogramVec,
59    list_all_sizes: HistogramVec,
60    exists_true_cases: IntCounterVec,
61}
62
63#[derive(Default)]
64struct StoreMetrics {
65    stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
66}
67
68/// The global variables of the RocksDB stores
69static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
70    LazyLock::new(|| Mutex::new(StoreMetrics::default()));
71
72fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
73    let mut store_metrics = STORE_COUNTERS.lock().unwrap();
74    let key = name.to_string();
75    match store_metrics.stores.entry(key) {
76        Entry::Occupied(entry) => {
77            let entry = entry.into_mut();
78            entry.clone()
79        }
80        Entry::Vacant(entry) => {
81            let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
82            entry.insert(store_metric.clone());
83            store_metric
84        }
85    }
86}
87
88impl KeyValueStoreMetrics {
89    /// Creation of a named Metered counter.
90    pub fn new(name: String) -> Self {
91        // name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
92        let var_name = name.replace(' ', "_");
93        let title_name = name.to_case(Case::Snake);
94
95        let entry1 = format!("{}_read_value_bytes_latency", var_name);
96        let entry2 = format!("{} read value bytes latency", title_name);
97        let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
98
99        let entry1 = format!("{}_contains_key_latency", var_name);
100        let entry2 = format!("{} contains key latency", title_name);
101        let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
102
103        let entry1 = format!("{}_contains_keys_latency", var_name);
104        let entry2 = format!("{} contains keys latency", title_name);
105        let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
106
107        let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
108        let entry2 = format!("{} read multi value bytes latency", title_name);
109        let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
110
111        let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
112        let entry2 = format!("{} find keys by prefix latency", title_name);
113        let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
114
115        let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
116        let entry2 = format!("{} find key values by prefix latency", title_name);
117        let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
118
119        let entry1 = format!("{}_write_batch_latency", var_name);
120        let entry2 = format!("{} write batch latency", title_name);
121        let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
122
123        let entry1 = format!("{}_clear_journal_latency", var_name);
124        let entry2 = format!("{} clear journal latency", title_name);
125        let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
126
127        let entry1 = format!("{}_connect_latency", var_name);
128        let entry2 = format!("{} connect latency", title_name);
129        let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
130
131        let entry1 = format!("{}_open_shared_latency", var_name);
132        let entry2 = format!("{} open shared partition", title_name);
133        let open_shared_latency = register_histogram_vec(&entry1, &entry2, &[], None);
134
135        let entry1 = format!("{}_open_exclusive_latency", var_name);
136        let entry2 = format!("{} open exclusive partition", title_name);
137        let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
138
139        let entry1 = format!("{}_list_all_latency", var_name);
140        let entry2 = format!("{} list all latency", title_name);
141        let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
142
143        let entry1 = format!("{}_list_root_keys_latency", var_name);
144        let entry2 = format!("{} list root keys latency", title_name);
145        let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
146
147        let entry1 = format!("{}_delete_all_latency", var_name);
148        let entry2 = format!("{} delete all latency", title_name);
149        let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
150
151        let entry1 = format!("{}_exists_latency", var_name);
152        let entry2 = format!("{} exists latency", title_name);
153        let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
154
155        let entry1 = format!("{}_create_latency", var_name);
156        let entry2 = format!("{} create latency", title_name);
157        let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
158
159        let entry1 = format!("{}_delete_latency", var_name);
160        let entry2 = format!("{} delete latency", title_name);
161        let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
162
163        let entry1 = format!("{}_read_value_none_cases", var_name);
164        let entry2 = format!("{} read value none cases", title_name);
165        let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
166
167        let entry1 = format!("{}_read_value_key_size", var_name);
168        let entry2 = format!("{} read value key size", title_name);
169        let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
170
171        let entry1 = format!("{}_read_value_value_size", var_name);
172        let entry2 = format!("{} read value value size", title_name);
173        let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
174
175        let entry1 = format!("{}_read_multi_values_num_entries", var_name);
176        let entry2 = format!("{} read multi values num entries", title_name);
177        let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
178
179        let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
180        let entry2 = format!("{} read multi values key sizes", title_name);
181        let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
182
183        let entry1 = format!("{}_contains_keys_num_entries", var_name);
184        let entry2 = format!("{} contains keys num entries", title_name);
185        let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
186
187        let entry1 = format!("{}_contains_keys_key_sizes", var_name);
188        let entry2 = format!("{} contains keys key sizes", title_name);
189        let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
190
191        let entry1 = format!("{}_contains_key_key_size", var_name);
192        let entry2 = format!("{} contains key key size", title_name);
193        let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
194
195        let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
196        let entry2 = format!("{} find keys by prefix prefix size", title_name);
197        let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
198
199        let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
200        let entry2 = format!("{} find keys by prefix num keys", title_name);
201        let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
202
203        let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
204        let entry2 = format!("{} find keys by prefix keys size", title_name);
205        let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
206
207        let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
208        let entry2 = format!("{} find key values by prefix prefix size", title_name);
209        let find_key_values_by_prefix_prefix_size =
210            register_histogram_vec(&entry1, &entry2, &[], None);
211
212        let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
213        let entry2 = format!("{} find key values by prefix num keys", title_name);
214        let find_key_values_by_prefix_num_keys =
215            register_histogram_vec(&entry1, &entry2, &[], None);
216
217        let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
218        let entry2 = format!("{} find key values by prefix key values size", title_name);
219        let find_key_values_by_prefix_key_values_size =
220            register_histogram_vec(&entry1, &entry2, &[], None);
221
222        let entry1 = format!("{}_write_batch_size", var_name);
223        let entry2 = format!("{} write batch size", title_name);
224        let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
225
226        let entry1 = format!("{}_list_all_sizes", var_name);
227        let entry2 = format!("{} list all sizes", title_name);
228        let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
229
230        let entry1 = format!("{}_exists_true_cases", var_name);
231        let entry2 = format!("{} exists true cases", title_name);
232        let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
233
234        KeyValueStoreMetrics {
235            read_value_bytes_latency,
236            contains_key_latency,
237            contains_keys_latency,
238            read_multi_values_bytes_latency,
239            find_keys_by_prefix_latency,
240            find_key_values_by_prefix_latency,
241            write_batch_latency,
242            clear_journal_latency,
243            connect_latency,
244            open_shared_latency,
245            open_exclusive_latency,
246            list_all_latency,
247            list_root_keys_latency,
248            delete_all_latency,
249            exists_latency,
250            create_latency,
251            delete_latency,
252            read_value_none_cases,
253            read_value_key_size,
254            read_value_value_size,
255            read_multi_values_num_entries,
256            read_multi_values_key_sizes,
257            contains_keys_num_entries,
258            contains_keys_key_sizes,
259            contains_key_key_size,
260            find_keys_by_prefix_prefix_size,
261            find_keys_by_prefix_num_keys,
262            find_keys_by_prefix_keys_size,
263            find_key_values_by_prefix_prefix_size,
264            find_key_values_by_prefix_num_keys,
265            find_key_values_by_prefix_key_values_size,
266            write_batch_size,
267            list_all_sizes,
268            exists_true_cases,
269        }
270    }
271}
272
273/// A metered database that keeps track of every operation.
274#[derive(Clone)]
275pub struct MeteredDatabase<D> {
276    /// The metrics being computed.
277    counter: Arc<KeyValueStoreMetrics>,
278    /// The underlying database.
279    database: D,
280}
281
282/// A metered store that keeps track of every operation.
283#[derive(Clone)]
284pub struct MeteredStore<S> {
285    /// The metrics being computed.
286    counter: Arc<KeyValueStoreMetrics>,
287    /// The underlying store.
288    store: S,
289}
290
291impl<D> WithError for MeteredDatabase<D>
292where
293    D: WithError,
294{
295    type Error = D::Error;
296}
297
298impl<S> WithError for MeteredStore<S>
299where
300    S: WithError,
301{
302    type Error = S::Error;
303}
304
305impl<S> ReadableKeyValueStore for MeteredStore<S>
306where
307    S: ReadableKeyValueStore,
308{
309    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
310
311    fn max_stream_queries(&self) -> usize {
312        self.store.max_stream_queries()
313    }
314
315    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
316        let _latency = self.counter.read_value_bytes_latency.measure_latency();
317        self.counter
318            .read_value_key_size
319            .with_label_values(&[])
320            .observe(key.len() as f64);
321        let result = self.store.read_value_bytes(key).await?;
322        match &result {
323            None => self
324                .counter
325                .read_value_none_cases
326                .with_label_values(&[])
327                .inc(),
328            Some(value) => self
329                .counter
330                .read_value_value_size
331                .with_label_values(&[])
332                .observe(value.len() as f64),
333        }
334        Ok(result)
335    }
336
337    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
338        let _latency = self.counter.contains_key_latency.measure_latency();
339        self.counter
340            .contains_key_key_size
341            .with_label_values(&[])
342            .observe(key.len() as f64);
343        self.store.contains_key(key).await
344    }
345
346    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
347        let _latency = self.counter.contains_keys_latency.measure_latency();
348        self.counter
349            .contains_keys_num_entries
350            .with_label_values(&[])
351            .observe(keys.len() as f64);
352        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
353        self.counter
354            .contains_keys_key_sizes
355            .with_label_values(&[])
356            .observe(key_sizes as f64);
357        self.store.contains_keys(keys).await
358    }
359
360    async fn read_multi_values_bytes(
361        &self,
362        keys: Vec<Vec<u8>>,
363    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
364        let _latency = self
365            .counter
366            .read_multi_values_bytes_latency
367            .measure_latency();
368        self.counter
369            .read_multi_values_num_entries
370            .with_label_values(&[])
371            .observe(keys.len() as f64);
372        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
373        self.counter
374            .read_multi_values_key_sizes
375            .with_label_values(&[])
376            .observe(key_sizes as f64);
377        self.store.read_multi_values_bytes(keys).await
378    }
379
380    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
381        let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
382        self.counter
383            .find_keys_by_prefix_prefix_size
384            .with_label_values(&[])
385            .observe(key_prefix.len() as f64);
386        let result = self.store.find_keys_by_prefix(key_prefix).await?;
387        let (num_keys, keys_size) = result
388            .iter()
389            .map(|key| key.len())
390            .fold((0, 0), |(count, size), len| (count + 1, size + len));
391        self.counter
392            .find_keys_by_prefix_num_keys
393            .with_label_values(&[])
394            .observe(num_keys as f64);
395        self.counter
396            .find_keys_by_prefix_keys_size
397            .with_label_values(&[])
398            .observe(keys_size as f64);
399        Ok(result)
400    }
401
402    async fn find_key_values_by_prefix(
403        &self,
404        key_prefix: &[u8],
405    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
406        let _latency = self
407            .counter
408            .find_key_values_by_prefix_latency
409            .measure_latency();
410        self.counter
411            .find_key_values_by_prefix_prefix_size
412            .with_label_values(&[])
413            .observe(key_prefix.len() as f64);
414        let result = self.store.find_key_values_by_prefix(key_prefix).await?;
415        let (num_keys, key_values_size) = result
416            .iter()
417            .map(|(key, value)| key.len() + value.len())
418            .fold((0, 0), |(count, size), len| (count + 1, size + len));
419        self.counter
420            .find_key_values_by_prefix_num_keys
421            .with_label_values(&[])
422            .observe(num_keys as f64);
423        self.counter
424            .find_key_values_by_prefix_key_values_size
425            .with_label_values(&[])
426            .observe(key_values_size as f64);
427        Ok(result)
428    }
429}
430
431impl<S> WritableKeyValueStore for MeteredStore<S>
432where
433    S: WritableKeyValueStore,
434{
435    const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
436
437    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
438        let _latency = self.counter.write_batch_latency.measure_latency();
439        self.counter
440            .write_batch_size
441            .with_label_values(&[])
442            .observe(batch.size() as f64);
443        self.store.write_batch(batch).await
444    }
445
446    async fn clear_journal(&self) -> Result<(), Self::Error> {
447        let _metric = self.counter.clear_journal_latency.measure_latency();
448        self.store.clear_journal().await
449    }
450}
451
452impl<D> KeyValueDatabase for MeteredDatabase<D>
453where
454    D: KeyValueDatabase,
455{
456    type Config = D::Config;
457    type Store = MeteredStore<D::Store>;
458
459    fn get_name() -> String {
460        D::get_name()
461    }
462
463    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
464        let name = D::get_name();
465        let counter = get_counter(&name);
466        let _latency = counter.connect_latency.measure_latency();
467        let database = D::connect(config, namespace).await?;
468        let counter = get_counter(&name);
469        Ok(Self { counter, database })
470    }
471
472    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
473        let _latency = self.counter.open_shared_latency.measure_latency();
474        let store = self.database.open_shared(root_key)?;
475        let counter = self.counter.clone();
476        Ok(MeteredStore { counter, store })
477    }
478
479    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
480        let _latency = self.counter.open_exclusive_latency.measure_latency();
481        let store = self.database.open_exclusive(root_key)?;
482        let counter = self.counter.clone();
483        Ok(MeteredStore { counter, store })
484    }
485
486    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
487        let name = D::get_name();
488        let counter = get_counter(&name);
489        let _latency = counter.list_all_latency.measure_latency();
490        let namespaces = D::list_all(config).await?;
491        let counter = get_counter(&name);
492        counter
493            .list_all_sizes
494            .with_label_values(&[])
495            .observe(namespaces.len() as f64);
496        Ok(namespaces)
497    }
498
499    async fn list_root_keys(
500        config: &Self::Config,
501        namespace: &str,
502    ) -> Result<Vec<Vec<u8>>, Self::Error> {
503        let name = D::get_name();
504        let counter = get_counter(&name);
505        let _latency = counter.list_root_keys_latency.measure_latency();
506        D::list_root_keys(config, namespace).await
507    }
508
509    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
510        let name = D::get_name();
511        let counter = get_counter(&name);
512        let _latency = counter.delete_all_latency.measure_latency();
513        D::delete_all(config).await
514    }
515
516    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
517        let name = D::get_name();
518        let counter = get_counter(&name);
519        let _latency = counter.exists_latency.measure_latency();
520        let result = D::exists(config, namespace).await?;
521        if result {
522            let counter = get_counter(&name);
523            counter.exists_true_cases.with_label_values(&[]).inc();
524        }
525        Ok(result)
526    }
527
528    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
529        let name = D::get_name();
530        let counter = get_counter(&name);
531        let _latency = counter.create_latency.measure_latency();
532        D::create(config, namespace).await
533    }
534
535    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
536        let name = D::get_name();
537        let counter = get_counter(&name);
538        let _latency = counter.delete_latency.measure_latency();
539        D::delete(config, namespace).await
540    }
541}
542
543#[cfg(with_testing)]
544impl<D> TestKeyValueDatabase for MeteredDatabase<D>
545where
546    D: TestKeyValueDatabase,
547{
548    async fn new_test_config() -> Result<D::Config, Self::Error> {
549        D::new_test_config().await
550    }
551}