1use std::{
7 collections::{btree_map::Entry, BTreeMap},
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13 register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueStore;
19use crate::{
20 batch::Batch,
21 store::{AdminKeyValueStore, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[derive(Clone)]
25pub struct KeyValueStoreMetrics {
27 read_value_bytes_latency: HistogramVec,
28 contains_key_latency: HistogramVec,
29 contains_keys_latency: HistogramVec,
30 read_multi_values_bytes_latency: HistogramVec,
31 find_keys_by_prefix_latency: HistogramVec,
32 find_key_values_by_prefix_latency: HistogramVec,
33 write_batch_latency: HistogramVec,
34 clear_journal_latency: HistogramVec,
35 connect_latency: HistogramVec,
36 open_exclusive_latency: HistogramVec,
37 list_all_latency: HistogramVec,
38 list_root_keys_latency: HistogramVec,
39 delete_all_latency: HistogramVec,
40 exists_latency: HistogramVec,
41 create_latency: HistogramVec,
42 delete_latency: HistogramVec,
43 read_value_none_cases: IntCounterVec,
44 read_value_key_size: HistogramVec,
45 read_value_value_size: HistogramVec,
46 read_multi_values_num_entries: HistogramVec,
47 read_multi_values_key_sizes: HistogramVec,
48 contains_keys_num_entries: HistogramVec,
49 contains_keys_key_sizes: HistogramVec,
50 contains_key_key_size: HistogramVec,
51 find_keys_by_prefix_prefix_size: HistogramVec,
52 find_keys_by_prefix_num_keys: HistogramVec,
53 find_keys_by_prefix_keys_size: HistogramVec,
54 find_key_values_by_prefix_prefix_size: HistogramVec,
55 find_key_values_by_prefix_num_keys: HistogramVec,
56 find_key_values_by_prefix_key_values_size: HistogramVec,
57 write_batch_size: HistogramVec,
58 list_all_sizes: HistogramVec,
59 exists_true_cases: IntCounterVec,
60}
61
62#[derive(Default)]
63struct StoreMetrics {
64 stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
65}
66
67static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
69 LazyLock::new(|| Mutex::new(StoreMetrics::default()));
70
71fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
72 let mut store_metrics = STORE_COUNTERS.lock().unwrap();
73 let key = name.to_string();
74 match store_metrics.stores.entry(key) {
75 Entry::Occupied(entry) => {
76 let entry = entry.into_mut();
77 entry.clone()
78 }
79 Entry::Vacant(entry) => {
80 let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
81 entry.insert(store_metric.clone());
82 store_metric
83 }
84 }
85}
86
87impl KeyValueStoreMetrics {
88 pub fn new(name: String) -> Self {
90 let var_name = name.replace(' ', "_");
92 let title_name = name.to_case(Case::Snake);
93
94 let entry1 = format!("{}_read_value_bytes_latency", var_name);
95 let entry2 = format!("{} read value bytes latency", title_name);
96 let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
97
98 let entry1 = format!("{}_contains_key_latency", var_name);
99 let entry2 = format!("{} contains key latency", title_name);
100 let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
101
102 let entry1 = format!("{}_contains_keys_latency", var_name);
103 let entry2 = format!("{} contains keys latency", title_name);
104 let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
105
106 let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
107 let entry2 = format!("{} read multi value bytes latency", title_name);
108 let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
109
110 let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
111 let entry2 = format!("{} find keys by prefix latency", title_name);
112 let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
113
114 let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
115 let entry2 = format!("{} find key values by prefix latency", title_name);
116 let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
117
118 let entry1 = format!("{}_write_batch_latency", var_name);
119 let entry2 = format!("{} write batch latency", title_name);
120 let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
121
122 let entry1 = format!("{}_clear_journal_latency", var_name);
123 let entry2 = format!("{} clear journal latency", title_name);
124 let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
125
126 let entry1 = format!("{}_connect_latency", var_name);
127 let entry2 = format!("{} connect latency", title_name);
128 let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
129
130 let entry1 = format!("{}_open_exclusive_latency", var_name);
131 let entry2 = format!("{} clone with root key latency", title_name);
132 let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
133
134 let entry1 = format!("{}_list_all_latency", var_name);
135 let entry2 = format!("{} list all latency", title_name);
136 let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
137
138 let entry1 = format!("{}_list_root_keys_latency", var_name);
139 let entry2 = format!("{} list root keys latency", title_name);
140 let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
141
142 let entry1 = format!("{}_delete_all_latency", var_name);
143 let entry2 = format!("{} delete all latency", title_name);
144 let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
145
146 let entry1 = format!("{}_exists_latency", var_name);
147 let entry2 = format!("{} exists latency", title_name);
148 let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
149
150 let entry1 = format!("{}_create_latency", var_name);
151 let entry2 = format!("{} create latency", title_name);
152 let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
153
154 let entry1 = format!("{}_delete_latency", var_name);
155 let entry2 = format!("{} delete latency", title_name);
156 let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
157
158 let entry1 = format!("{}_read_value_none_cases", var_name);
159 let entry2 = format!("{} read value none cases", title_name);
160 let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
161
162 let entry1 = format!("{}_read_value_key_size", var_name);
163 let entry2 = format!("{} read value key size", title_name);
164 let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
165
166 let entry1 = format!("{}_read_value_value_size", var_name);
167 let entry2 = format!("{} read value value size", title_name);
168 let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
169
170 let entry1 = format!("{}_read_multi_values_num_entries", var_name);
171 let entry2 = format!("{} read multi values num entries", title_name);
172 let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
173
174 let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
175 let entry2 = format!("{} read multi values key sizes", title_name);
176 let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
177
178 let entry1 = format!("{}_contains_keys_num_entries", var_name);
179 let entry2 = format!("{} contains keys num entries", title_name);
180 let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
181
182 let entry1 = format!("{}_contains_keys_key_sizes", var_name);
183 let entry2 = format!("{} contains keys key sizes", title_name);
184 let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
185
186 let entry1 = format!("{}_contains_key_key_size", var_name);
187 let entry2 = format!("{} contains key key size", title_name);
188 let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
189
190 let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
191 let entry2 = format!("{} find keys by prefix prefix size", title_name);
192 let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
193
194 let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
195 let entry2 = format!("{} find keys by prefix num keys", title_name);
196 let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
197
198 let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
199 let entry2 = format!("{} find keys by prefix keys size", title_name);
200 let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
201
202 let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
203 let entry2 = format!("{} find key values by prefix prefix size", title_name);
204 let find_key_values_by_prefix_prefix_size =
205 register_histogram_vec(&entry1, &entry2, &[], None);
206
207 let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
208 let entry2 = format!("{} find key values by prefix num keys", title_name);
209 let find_key_values_by_prefix_num_keys =
210 register_histogram_vec(&entry1, &entry2, &[], None);
211
212 let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
213 let entry2 = format!("{} find key values by prefix key values size", title_name);
214 let find_key_values_by_prefix_key_values_size =
215 register_histogram_vec(&entry1, &entry2, &[], None);
216
217 let entry1 = format!("{}_write_batch_size", var_name);
218 let entry2 = format!("{} write batch size", title_name);
219 let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
220
221 let entry1 = format!("{}_list_all_sizes", var_name);
222 let entry2 = format!("{} list all sizes", title_name);
223 let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
224
225 let entry1 = format!("{}_exists_true_cases", var_name);
226 let entry2 = format!("{} exists true cases", title_name);
227 let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
228
229 KeyValueStoreMetrics {
230 read_value_bytes_latency,
231 contains_key_latency,
232 contains_keys_latency,
233 read_multi_values_bytes_latency,
234 find_keys_by_prefix_latency,
235 find_key_values_by_prefix_latency,
236 write_batch_latency,
237 clear_journal_latency,
238 connect_latency,
239 open_exclusive_latency,
240 list_all_latency,
241 list_root_keys_latency,
242 delete_all_latency,
243 exists_latency,
244 create_latency,
245 delete_latency,
246 read_value_none_cases,
247 read_value_key_size,
248 read_value_value_size,
249 read_multi_values_num_entries,
250 read_multi_values_key_sizes,
251 contains_keys_num_entries,
252 contains_keys_key_sizes,
253 contains_key_key_size,
254 find_keys_by_prefix_prefix_size,
255 find_keys_by_prefix_num_keys,
256 find_keys_by_prefix_keys_size,
257 find_key_values_by_prefix_prefix_size,
258 find_key_values_by_prefix_num_keys,
259 find_key_values_by_prefix_key_values_size,
260 write_batch_size,
261 list_all_sizes,
262 exists_true_cases,
263 }
264 }
265}
266
267#[derive(Clone)]
269pub struct MeteredStore<K> {
270 counter: Arc<KeyValueStoreMetrics>,
272 store: K,
274}
275
276impl<K> WithError for MeteredStore<K>
277where
278 K: WithError,
279{
280 type Error = K::Error;
281}
282
283impl<K> ReadableKeyValueStore for MeteredStore<K>
284where
285 K: ReadableKeyValueStore,
286{
287 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
288
289 fn max_stream_queries(&self) -> usize {
290 self.store.max_stream_queries()
291 }
292
293 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
294 let _latency = self.counter.read_value_bytes_latency.measure_latency();
295 self.counter
296 .read_value_key_size
297 .with_label_values(&[])
298 .observe(key.len() as f64);
299 let result = self.store.read_value_bytes(key).await?;
300 match &result {
301 None => self
302 .counter
303 .read_value_none_cases
304 .with_label_values(&[])
305 .inc(),
306 Some(value) => self
307 .counter
308 .read_value_value_size
309 .with_label_values(&[])
310 .observe(value.len() as f64),
311 }
312 Ok(result)
313 }
314
315 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
316 let _latency = self.counter.contains_key_latency.measure_latency();
317 self.counter
318 .contains_key_key_size
319 .with_label_values(&[])
320 .observe(key.len() as f64);
321 self.store.contains_key(key).await
322 }
323
324 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
325 let _latency = self.counter.contains_keys_latency.measure_latency();
326 self.counter
327 .contains_keys_num_entries
328 .with_label_values(&[])
329 .observe(keys.len() as f64);
330 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
331 self.counter
332 .contains_keys_key_sizes
333 .with_label_values(&[])
334 .observe(key_sizes as f64);
335 self.store.contains_keys(keys).await
336 }
337
338 async fn read_multi_values_bytes(
339 &self,
340 keys: Vec<Vec<u8>>,
341 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
342 let _latency = self
343 .counter
344 .read_multi_values_bytes_latency
345 .measure_latency();
346 self.counter
347 .read_multi_values_num_entries
348 .with_label_values(&[])
349 .observe(keys.len() as f64);
350 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
351 self.counter
352 .read_multi_values_key_sizes
353 .with_label_values(&[])
354 .observe(key_sizes as f64);
355 self.store.read_multi_values_bytes(keys).await
356 }
357
358 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
359 let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
360 self.counter
361 .find_keys_by_prefix_prefix_size
362 .with_label_values(&[])
363 .observe(key_prefix.len() as f64);
364 let result = self.store.find_keys_by_prefix(key_prefix).await?;
365 let (num_keys, keys_size) = result
366 .iter()
367 .map(|key| key.len())
368 .fold((0, 0), |(count, size), len| (count + 1, size + len));
369 self.counter
370 .find_keys_by_prefix_num_keys
371 .with_label_values(&[])
372 .observe(num_keys as f64);
373 self.counter
374 .find_keys_by_prefix_keys_size
375 .with_label_values(&[])
376 .observe(keys_size as f64);
377 Ok(result)
378 }
379
380 async fn find_key_values_by_prefix(
381 &self,
382 key_prefix: &[u8],
383 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
384 let _latency = self
385 .counter
386 .find_key_values_by_prefix_latency
387 .measure_latency();
388 self.counter
389 .find_key_values_by_prefix_prefix_size
390 .with_label_values(&[])
391 .observe(key_prefix.len() as f64);
392 let result = self.store.find_key_values_by_prefix(key_prefix).await?;
393 let (num_keys, key_values_size) = result
394 .iter()
395 .map(|(key, value)| key.len() + value.len())
396 .fold((0, 0), |(count, size), len| (count + 1, size + len));
397 self.counter
398 .find_key_values_by_prefix_num_keys
399 .with_label_values(&[])
400 .observe(num_keys as f64);
401 self.counter
402 .find_key_values_by_prefix_key_values_size
403 .with_label_values(&[])
404 .observe(key_values_size as f64);
405 Ok(result)
406 }
407}
408
409impl<K> WritableKeyValueStore for MeteredStore<K>
410where
411 K: WritableKeyValueStore,
412{
413 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
414
415 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
416 let _latency = self.counter.write_batch_latency.measure_latency();
417 self.counter
418 .write_batch_size
419 .with_label_values(&[])
420 .observe(batch.size() as f64);
421 self.store.write_batch(batch).await
422 }
423
424 async fn clear_journal(&self) -> Result<(), Self::Error> {
425 let _metric = self.counter.clear_journal_latency.measure_latency();
426 self.store.clear_journal().await
427 }
428}
429
430impl<K> AdminKeyValueStore for MeteredStore<K>
431where
432 K: AdminKeyValueStore,
433{
434 type Config = K::Config;
435
436 fn get_name() -> String {
437 K::get_name()
438 }
439
440 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
441 let name = K::get_name();
442 let counter = get_counter(&name);
443 let _latency = counter.connect_latency.measure_latency();
444 let store = K::connect(config, namespace).await?;
445 let counter = get_counter(&name);
446 Ok(Self { counter, store })
447 }
448
449 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
450 let _latency = self.counter.open_exclusive_latency.measure_latency();
451 let store = self.store.open_exclusive(root_key)?;
452 let counter = self.counter.clone();
453 Ok(Self { counter, store })
454 }
455
456 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
457 let name = K::get_name();
458 let counter = get_counter(&name);
459 let _latency = counter.list_all_latency.measure_latency();
460 let namespaces = K::list_all(config).await?;
461 let counter = get_counter(&name);
462 counter
463 .list_all_sizes
464 .with_label_values(&[])
465 .observe(namespaces.len() as f64);
466 Ok(namespaces)
467 }
468
469 async fn list_root_keys(
470 config: &Self::Config,
471 namespace: &str,
472 ) -> Result<Vec<Vec<u8>>, Self::Error> {
473 let name = K::get_name();
474 let counter = get_counter(&name);
475 let _latency = counter.list_root_keys_latency.measure_latency();
476 K::list_root_keys(config, namespace).await
477 }
478
479 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
480 let name = K::get_name();
481 let counter = get_counter(&name);
482 let _latency = counter.delete_all_latency.measure_latency();
483 K::delete_all(config).await
484 }
485
486 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
487 let name = K::get_name();
488 let counter = get_counter(&name);
489 let _latency = counter.exists_latency.measure_latency();
490 let result = K::exists(config, namespace).await?;
491 if result {
492 let counter = get_counter(&name);
493 counter.exists_true_cases.with_label_values(&[]).inc();
494 }
495 Ok(result)
496 }
497
498 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
499 let name = K::get_name();
500 let counter = get_counter(&name);
501 let _latency = counter.create_latency.measure_latency();
502 K::create(config, namespace).await
503 }
504
505 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
506 let name = K::get_name();
507 let counter = get_counter(&name);
508 let _latency = counter.delete_latency.measure_latency();
509 K::delete(config, namespace).await
510 }
511}
512
513#[cfg(with_testing)]
514impl<K> TestKeyValueStore for MeteredStore<K>
515where
516 K: TestKeyValueStore,
517{
518 async fn new_test_config() -> Result<K::Config, Self::Error> {
519 K::new_test_config().await
520 }
521}