1use std::{
7 collections::{btree_map::Entry, BTreeMap},
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13 register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueStore;
19use crate::{
20 batch::Batch,
21 store::{
22 AdminKeyValueStore, KeyIterable as _, KeyValueIterable as _, ReadableKeyValueStore,
23 WithError, WritableKeyValueStore,
24 },
25};
26
27#[derive(Clone)]
28pub struct KeyValueStoreMetrics {
30 read_value_bytes_latency: HistogramVec,
31 contains_key_latency: HistogramVec,
32 contains_keys_latency: HistogramVec,
33 read_multi_values_bytes_latency: HistogramVec,
34 find_keys_by_prefix_latency: HistogramVec,
35 find_key_values_by_prefix_latency: HistogramVec,
36 write_batch_latency: HistogramVec,
37 clear_journal_latency: HistogramVec,
38 connect_latency: HistogramVec,
39 open_exclusive_latency: HistogramVec,
40 list_all_latency: HistogramVec,
41 list_root_keys_latency: HistogramVec,
42 delete_all_latency: HistogramVec,
43 exists_latency: HistogramVec,
44 create_latency: HistogramVec,
45 delete_latency: HistogramVec,
46 read_value_none_cases: IntCounterVec,
47 read_value_key_size: HistogramVec,
48 read_value_value_size: HistogramVec,
49 read_multi_values_num_entries: HistogramVec,
50 read_multi_values_key_sizes: HistogramVec,
51 contains_keys_num_entries: HistogramVec,
52 contains_keys_key_sizes: HistogramVec,
53 contains_key_key_size: HistogramVec,
54 find_keys_by_prefix_prefix_size: HistogramVec,
55 find_keys_by_prefix_num_keys: HistogramVec,
56 find_keys_by_prefix_keys_size: HistogramVec,
57 find_key_values_by_prefix_prefix_size: HistogramVec,
58 find_key_values_by_prefix_num_keys: HistogramVec,
59 find_key_values_by_prefix_key_values_size: HistogramVec,
60 write_batch_size: HistogramVec,
61 list_all_sizes: HistogramVec,
62 exists_true_cases: IntCounterVec,
63}
64
65#[derive(Default)]
66struct StoreMetrics {
67 stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
68}
69
70static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
72 LazyLock::new(|| Mutex::new(StoreMetrics::default()));
73
74fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
75 let mut store_metrics = STORE_COUNTERS.lock().unwrap();
76 let key = name.to_string();
77 match store_metrics.stores.entry(key) {
78 Entry::Occupied(entry) => {
79 let entry = entry.into_mut();
80 entry.clone()
81 }
82 Entry::Vacant(entry) => {
83 let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
84 entry.insert(store_metric.clone());
85 store_metric
86 }
87 }
88}
89
90impl KeyValueStoreMetrics {
91 pub fn new(name: String) -> Self {
93 let var_name = name.replace(' ', "_");
95 let title_name = name.to_case(Case::Snake);
96
97 let entry1 = format!("{}_read_value_bytes_latency", var_name);
98 let entry2 = format!("{} read value bytes latency", title_name);
99 let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
100
101 let entry1 = format!("{}_contains_key_latency", var_name);
102 let entry2 = format!("{} contains key latency", title_name);
103 let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
104
105 let entry1 = format!("{}_contains_keys_latency", var_name);
106 let entry2 = format!("{} contains keys latency", title_name);
107 let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
108
109 let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
110 let entry2 = format!("{} read multi value bytes latency", title_name);
111 let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
112
113 let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
114 let entry2 = format!("{} find keys by prefix latency", title_name);
115 let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
116
117 let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
118 let entry2 = format!("{} find key values by prefix latency", title_name);
119 let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
120
121 let entry1 = format!("{}_write_batch_latency", var_name);
122 let entry2 = format!("{} write batch latency", title_name);
123 let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
124
125 let entry1 = format!("{}_clear_journal_latency", var_name);
126 let entry2 = format!("{} clear journal latency", title_name);
127 let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
128
129 let entry1 = format!("{}_connect_latency", var_name);
130 let entry2 = format!("{} connect latency", title_name);
131 let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
132
133 let entry1 = format!("{}_open_exclusive_latency", var_name);
134 let entry2 = format!("{} clone with root key latency", title_name);
135 let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
136
137 let entry1 = format!("{}_list_all_latency", var_name);
138 let entry2 = format!("{} list all latency", title_name);
139 let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
140
141 let entry1 = format!("{}_list_root_keys_latency", var_name);
142 let entry2 = format!("{} list root keys latency", title_name);
143 let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
144
145 let entry1 = format!("{}_delete_all_latency", var_name);
146 let entry2 = format!("{} delete all latency", title_name);
147 let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
148
149 let entry1 = format!("{}_exists_latency", var_name);
150 let entry2 = format!("{} exists latency", title_name);
151 let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
152
153 let entry1 = format!("{}_create_latency", var_name);
154 let entry2 = format!("{} create latency", title_name);
155 let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
156
157 let entry1 = format!("{}_delete_latency", var_name);
158 let entry2 = format!("{} delete latency", title_name);
159 let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
160
161 let entry1 = format!("{}_read_value_none_cases", var_name);
162 let entry2 = format!("{} read value none cases", title_name);
163 let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
164
165 let entry1 = format!("{}_read_value_key_size", var_name);
166 let entry2 = format!("{} read value key size", title_name);
167 let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
168
169 let entry1 = format!("{}_read_value_value_size", var_name);
170 let entry2 = format!("{} read value value size", title_name);
171 let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
172
173 let entry1 = format!("{}_read_multi_values_num_entries", var_name);
174 let entry2 = format!("{} read multi values num entries", title_name);
175 let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
176
177 let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
178 let entry2 = format!("{} read multi values key sizes", title_name);
179 let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
180
181 let entry1 = format!("{}_contains_keys_num_entries", var_name);
182 let entry2 = format!("{} contains keys num entries", title_name);
183 let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
184
185 let entry1 = format!("{}_contains_keys_key_sizes", var_name);
186 let entry2 = format!("{} contains keys key sizes", title_name);
187 let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
188
189 let entry1 = format!("{}_contains_key_key_size", var_name);
190 let entry2 = format!("{} contains key key size", title_name);
191 let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
192
193 let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
194 let entry2 = format!("{} find keys by prefix prefix size", title_name);
195 let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
196
197 let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
198 let entry2 = format!("{} find keys by prefix num keys", title_name);
199 let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
200
201 let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
202 let entry2 = format!("{} find keys by prefix keys size", title_name);
203 let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
204
205 let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
206 let entry2 = format!("{} find key values by prefix prefix size", title_name);
207 let find_key_values_by_prefix_prefix_size =
208 register_histogram_vec(&entry1, &entry2, &[], None);
209
210 let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
211 let entry2 = format!("{} find key values by prefix num keys", title_name);
212 let find_key_values_by_prefix_num_keys =
213 register_histogram_vec(&entry1, &entry2, &[], None);
214
215 let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
216 let entry2 = format!("{} find key values by prefix key values size", title_name);
217 let find_key_values_by_prefix_key_values_size =
218 register_histogram_vec(&entry1, &entry2, &[], None);
219
220 let entry1 = format!("{}_write_batch_size", var_name);
221 let entry2 = format!("{} write batch size", title_name);
222 let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
223
224 let entry1 = format!("{}_list_all_sizes", var_name);
225 let entry2 = format!("{} list all sizes", title_name);
226 let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
227
228 let entry1 = format!("{}_exists_true_cases", var_name);
229 let entry2 = format!("{} exists true cases", title_name);
230 let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
231
232 KeyValueStoreMetrics {
233 read_value_bytes_latency,
234 contains_key_latency,
235 contains_keys_latency,
236 read_multi_values_bytes_latency,
237 find_keys_by_prefix_latency,
238 find_key_values_by_prefix_latency,
239 write_batch_latency,
240 clear_journal_latency,
241 connect_latency,
242 open_exclusive_latency,
243 list_all_latency,
244 list_root_keys_latency,
245 delete_all_latency,
246 exists_latency,
247 create_latency,
248 delete_latency,
249 read_value_none_cases,
250 read_value_key_size,
251 read_value_value_size,
252 read_multi_values_num_entries,
253 read_multi_values_key_sizes,
254 contains_keys_num_entries,
255 contains_keys_key_sizes,
256 contains_key_key_size,
257 find_keys_by_prefix_prefix_size,
258 find_keys_by_prefix_num_keys,
259 find_keys_by_prefix_keys_size,
260 find_key_values_by_prefix_prefix_size,
261 find_key_values_by_prefix_num_keys,
262 find_key_values_by_prefix_key_values_size,
263 write_batch_size,
264 list_all_sizes,
265 exists_true_cases,
266 }
267 }
268}
269
270#[derive(Clone)]
272pub struct MeteredStore<K> {
273 counter: Arc<KeyValueStoreMetrics>,
275 store: K,
277}
278
279impl<K> WithError for MeteredStore<K>
280where
281 K: WithError,
282{
283 type Error = K::Error;
284}
285
286impl<K> ReadableKeyValueStore for MeteredStore<K>
287where
288 K: ReadableKeyValueStore,
289{
290 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
291 type Keys = K::Keys;
292 type KeyValues = K::KeyValues;
293
294 fn max_stream_queries(&self) -> usize {
295 self.store.max_stream_queries()
296 }
297
298 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
299 let _latency = self.counter.read_value_bytes_latency.measure_latency();
300 self.counter
301 .read_value_key_size
302 .with_label_values(&[])
303 .observe(key.len() as f64);
304 let result = self.store.read_value_bytes(key).await?;
305 match &result {
306 None => self
307 .counter
308 .read_value_none_cases
309 .with_label_values(&[])
310 .inc(),
311 Some(value) => self
312 .counter
313 .read_value_value_size
314 .with_label_values(&[])
315 .observe(value.len() as f64),
316 }
317 Ok(result)
318 }
319
320 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
321 let _latency = self.counter.contains_key_latency.measure_latency();
322 self.counter
323 .contains_key_key_size
324 .with_label_values(&[])
325 .observe(key.len() as f64);
326 self.store.contains_key(key).await
327 }
328
329 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
330 let _latency = self.counter.contains_keys_latency.measure_latency();
331 self.counter
332 .contains_keys_num_entries
333 .with_label_values(&[])
334 .observe(keys.len() as f64);
335 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
336 self.counter
337 .contains_keys_key_sizes
338 .with_label_values(&[])
339 .observe(key_sizes as f64);
340 self.store.contains_keys(keys).await
341 }
342
343 async fn read_multi_values_bytes(
344 &self,
345 keys: Vec<Vec<u8>>,
346 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
347 let _latency = self
348 .counter
349 .read_multi_values_bytes_latency
350 .measure_latency();
351 self.counter
352 .read_multi_values_num_entries
353 .with_label_values(&[])
354 .observe(keys.len() as f64);
355 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
356 self.counter
357 .read_multi_values_key_sizes
358 .with_label_values(&[])
359 .observe(key_sizes as f64);
360 self.store.read_multi_values_bytes(keys).await
361 }
362
363 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
364 let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
365 self.counter
366 .find_keys_by_prefix_prefix_size
367 .with_label_values(&[])
368 .observe(key_prefix.len() as f64);
369 let result = self.store.find_keys_by_prefix(key_prefix).await?;
370 let (num_keys, keys_size) = result
371 .iterator()
372 .map(|key| key.map(|k| k.len()))
373 .collect::<Result<Vec<usize>, _>>()?
374 .into_iter()
375 .fold((0, 0), |(count, size), len| (count + 1, size + len));
376 self.counter
377 .find_keys_by_prefix_num_keys
378 .with_label_values(&[])
379 .observe(num_keys as f64);
380 self.counter
381 .find_keys_by_prefix_keys_size
382 .with_label_values(&[])
383 .observe(keys_size as f64);
384 Ok(result)
385 }
386
387 async fn find_key_values_by_prefix(
388 &self,
389 key_prefix: &[u8],
390 ) -> Result<Self::KeyValues, Self::Error> {
391 let _latency = self
392 .counter
393 .find_key_values_by_prefix_latency
394 .measure_latency();
395 self.counter
396 .find_key_values_by_prefix_prefix_size
397 .with_label_values(&[])
398 .observe(key_prefix.len() as f64);
399 let result = self.store.find_key_values_by_prefix(key_prefix).await?;
400 let (num_keys, key_values_size) = result
401 .iterator()
402 .map(|key_value| key_value.map(|(key, value)| key.len() + value.len()))
403 .collect::<Result<Vec<usize>, _>>()?
404 .into_iter()
405 .fold((0, 0), |(count, size), len| (count + 1, size + len));
406 self.counter
407 .find_key_values_by_prefix_num_keys
408 .with_label_values(&[])
409 .observe(num_keys as f64);
410 self.counter
411 .find_key_values_by_prefix_key_values_size
412 .with_label_values(&[])
413 .observe(key_values_size as f64);
414 Ok(result)
415 }
416}
417
418impl<K> WritableKeyValueStore for MeteredStore<K>
419where
420 K: WritableKeyValueStore,
421{
422 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
423
424 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
425 let _latency = self.counter.write_batch_latency.measure_latency();
426 self.counter
427 .write_batch_size
428 .with_label_values(&[])
429 .observe(batch.size() as f64);
430 self.store.write_batch(batch).await
431 }
432
433 async fn clear_journal(&self) -> Result<(), Self::Error> {
434 let _metric = self.counter.clear_journal_latency.measure_latency();
435 self.store.clear_journal().await
436 }
437}
438
439impl<K> AdminKeyValueStore for MeteredStore<K>
440where
441 K: AdminKeyValueStore,
442{
443 type Config = K::Config;
444
445 fn get_name() -> String {
446 K::get_name()
447 }
448
449 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
450 let name = K::get_name();
451 let counter = get_counter(&name);
452 let _latency = counter.connect_latency.measure_latency();
453 let store = K::connect(config, namespace).await?;
454 let counter = get_counter(&name);
455 Ok(Self { counter, store })
456 }
457
458 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
459 let _latency = self.counter.open_exclusive_latency.measure_latency();
460 let store = self.store.open_exclusive(root_key)?;
461 let counter = self.counter.clone();
462 Ok(Self { counter, store })
463 }
464
465 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
466 let name = K::get_name();
467 let counter = get_counter(&name);
468 let _latency = counter.list_all_latency.measure_latency();
469 let namespaces = K::list_all(config).await?;
470 let counter = get_counter(&name);
471 counter
472 .list_all_sizes
473 .with_label_values(&[])
474 .observe(namespaces.len() as f64);
475 Ok(namespaces)
476 }
477
478 async fn list_root_keys(
479 config: &Self::Config,
480 namespace: &str,
481 ) -> Result<Vec<Vec<u8>>, Self::Error> {
482 let name = K::get_name();
483 let counter = get_counter(&name);
484 let _latency = counter.list_root_keys_latency.measure_latency();
485 K::list_root_keys(config, namespace).await
486 }
487
488 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
489 let name = K::get_name();
490 let counter = get_counter(&name);
491 let _latency = counter.delete_all_latency.measure_latency();
492 K::delete_all(config).await
493 }
494
495 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
496 let name = K::get_name();
497 let counter = get_counter(&name);
498 let _latency = counter.exists_latency.measure_latency();
499 let result = K::exists(config, namespace).await?;
500 if result {
501 let counter = get_counter(&name);
502 counter.exists_true_cases.with_label_values(&[]).inc();
503 }
504 Ok(result)
505 }
506
507 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
508 let name = K::get_name();
509 let counter = get_counter(&name);
510 let _latency = counter.create_latency.measure_latency();
511 K::create(config, namespace).await
512 }
513
514 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
515 let name = K::get_name();
516 let counter = get_counter(&name);
517 let _latency = counter.delete_latency.measure_latency();
518 K::delete(config, namespace).await
519 }
520}
521
522#[cfg(with_testing)]
523impl<K> TestKeyValueStore for MeteredStore<K>
524where
525 K: TestKeyValueStore,
526{
527 async fn new_test_config() -> Result<K::Config, Self::Error> {
528 K::new_test_config().await
529 }
530}