linera_storage_service/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    mem,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::{ensure, util::future::FutureSyncExt as _};
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20    batch::{Batch, WriteOperation},
21    lru_caching::LruCachingDatabase,
22    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24use serde::de::DeserializeOwned;
25use tonic::transport::{Channel, Endpoint};
26
27#[cfg(with_testing)]
28use crate::common::storage_service_test_endpoint;
29use crate::{
30    common::{
31        KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
32    },
33    key_value_store::{
34        statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
35        KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
36        ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
37        ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
38        RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
39        RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
40        RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
41        RequestWriteBatchExtended, Statement,
42    },
43};
44
45// The maximum key size is set to 1M rather arbitrarily.
46const MAX_KEY_SIZE: usize = 1000000;
47
48// The shared store client.
49// * Interior mutability is required for the client because
50// accessing requires mutability while the KeyValueStore
51// does not allow it.
52// * The semaphore and max_stream_queries work as other
53// stores.
54//
55// The encoding of namespaces is done by taking their
56// serialization. This works because the set of serialization
57// of strings is prefix free.
58// The data is stored in the following way.
59// * A `key` in a `namespace` is stored as
60//   [`KeyPrefix::Key`] + namespace + key
61// * An additional key with empty value is stored at
62//   [`KeyPrefix::Namespace`] + namespace
63//   is stored to indicate the existence of a namespace.
64// * A key with empty value is stored at
65//   [`KeyPrefix::RootKey`] + namespace + root_key
66//   to indicate the existence of a root key.
67#[derive(Clone)]
68pub struct StorageServiceDatabaseInternal {
69    channel: Channel,
70    semaphore: Option<Arc<Semaphore>>,
71    max_stream_queries: usize,
72    namespace: Vec<u8>,
73}
74
75#[derive(Clone)]
76pub struct StorageServiceStoreInternal {
77    channel: Channel,
78    semaphore: Option<Arc<Semaphore>>,
79    max_stream_queries: usize,
80    prefix_len: usize,
81    start_key: Vec<u8>,
82    root_key_written: Arc<AtomicBool>,
83}
84
85impl WithError for StorageServiceDatabaseInternal {
86    type Error = StorageServiceStoreError;
87}
88
89impl WithError for StorageServiceStoreInternal {
90    type Error = StorageServiceStoreError;
91}
92
93impl ReadableKeyValueStore for StorageServiceStoreInternal {
94    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
95
96    fn max_stream_queries(&self) -> usize {
97        self.max_stream_queries
98    }
99
100    fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
101        let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
102        Ok(root_key)
103    }
104
105    async fn read_value_bytes(
106        &self,
107        key: &[u8],
108    ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
109        ensure!(
110            key.len() <= MAX_KEY_SIZE,
111            StorageServiceStoreError::KeyTooLong
112        );
113        let mut full_key = self.start_key.clone();
114        full_key.extend(key);
115        let query = RequestReadValue { key: full_key };
116        let request = tonic::Request::new(query);
117        let channel = self.channel.clone();
118        let mut client = StorageServiceClient::new(channel);
119        let _guard = self.acquire().await;
120        let response = client.process_read_value(request).make_sync().await?;
121        let response = response.into_inner();
122        let ReplyReadValue {
123            value,
124            message_index,
125            num_chunks,
126        } = response;
127        if num_chunks == 0 {
128            Ok(value)
129        } else {
130            self.read_entries(message_index, num_chunks).await
131        }
132    }
133
134    async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
135        ensure!(
136            key.len() <= MAX_KEY_SIZE,
137            StorageServiceStoreError::KeyTooLong
138        );
139        let mut full_key = self.start_key.clone();
140        full_key.extend(key);
141        let query = RequestContainsKey { key: full_key };
142        let request = tonic::Request::new(query);
143        let channel = self.channel.clone();
144        let mut client = StorageServiceClient::new(channel);
145        let _guard = self.acquire().await;
146        let response = client.process_contains_key(request).make_sync().await?;
147        let response = response.into_inner();
148        let ReplyContainsKey { test } = response;
149        Ok(test)
150    }
151
152    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, StorageServiceStoreError> {
153        let mut full_keys = Vec::new();
154        for key in keys {
155            ensure!(
156                key.len() <= MAX_KEY_SIZE,
157                StorageServiceStoreError::KeyTooLong
158            );
159            let mut full_key = self.start_key.clone();
160            full_key.extend(key);
161            full_keys.push(full_key);
162        }
163        let query = RequestContainsKeys { keys: full_keys };
164        let request = tonic::Request::new(query);
165        let channel = self.channel.clone();
166        let mut client = StorageServiceClient::new(channel);
167        let _guard = self.acquire().await;
168        let response = client.process_contains_keys(request).make_sync().await?;
169        let response = response.into_inner();
170        let ReplyContainsKeys { tests } = response;
171        Ok(tests)
172    }
173
174    async fn read_multi_values_bytes(
175        &self,
176        keys: &[Vec<u8>],
177    ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
178        let mut full_keys = Vec::new();
179        for key in keys {
180            ensure!(
181                key.len() <= MAX_KEY_SIZE,
182                StorageServiceStoreError::KeyTooLong
183            );
184            let mut full_key = self.start_key.clone();
185            full_key.extend(key);
186            full_keys.push(full_key);
187        }
188        let query = RequestReadMultiValues { keys: full_keys };
189        let request = tonic::Request::new(query);
190        let channel = self.channel.clone();
191        let mut client = StorageServiceClient::new(channel);
192        let _guard = self.acquire().await;
193        let response = client
194            .process_read_multi_values(request)
195            .make_sync()
196            .await?;
197        let response = response.into_inner();
198        let ReplyReadMultiValues {
199            values,
200            message_index,
201            num_chunks,
202        } = response;
203        if num_chunks == 0 {
204            let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
205            Ok(values)
206        } else {
207            self.read_entries(message_index, num_chunks).await
208        }
209    }
210
211    async fn find_keys_by_prefix(
212        &self,
213        key_prefix: &[u8],
214    ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
215        ensure!(
216            key_prefix.len() <= MAX_KEY_SIZE,
217            StorageServiceStoreError::KeyTooLong
218        );
219        let mut full_key_prefix = self.start_key.clone();
220        full_key_prefix.extend(key_prefix);
221        let query = RequestFindKeysByPrefix {
222            key_prefix: full_key_prefix,
223        };
224        let request = tonic::Request::new(query);
225        let channel = self.channel.clone();
226        let mut client = StorageServiceClient::new(channel);
227        let _guard = self.acquire().await;
228        let response = client
229            .process_find_keys_by_prefix(request)
230            .make_sync()
231            .await?;
232        let response = response.into_inner();
233        let ReplyFindKeysByPrefix {
234            keys,
235            message_index,
236            num_chunks,
237        } = response;
238        if num_chunks == 0 {
239            Ok(keys)
240        } else {
241            self.read_entries(message_index, num_chunks).await
242        }
243    }
244
245    async fn find_key_values_by_prefix(
246        &self,
247        key_prefix: &[u8],
248    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
249        ensure!(
250            key_prefix.len() <= MAX_KEY_SIZE,
251            StorageServiceStoreError::KeyTooLong
252        );
253        let mut full_key_prefix = self.start_key.clone();
254        full_key_prefix.extend(key_prefix);
255        let query = RequestFindKeyValuesByPrefix {
256            key_prefix: full_key_prefix,
257        };
258        let request = tonic::Request::new(query);
259        let channel = self.channel.clone();
260        let mut client = StorageServiceClient::new(channel);
261        let _guard = self.acquire().await;
262        let response = client
263            .process_find_key_values_by_prefix(request)
264            .make_sync()
265            .await?;
266        let response = response.into_inner();
267        let ReplyFindKeyValuesByPrefix {
268            key_values,
269            message_index,
270            num_chunks,
271        } = response;
272        if num_chunks == 0 {
273            let key_values = key_values
274                .into_iter()
275                .map(|x| (x.key, x.value))
276                .collect::<Vec<_>>();
277            Ok(key_values)
278        } else {
279            self.read_entries(message_index, num_chunks).await
280        }
281    }
282}
283
284impl WritableKeyValueStore for StorageServiceStoreInternal {
285    const MAX_VALUE_SIZE: usize = usize::MAX;
286
287    async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
288        if batch.operations.is_empty() {
289            return Ok(());
290        }
291        let mut statements = Vec::new();
292        let mut chunk_size = 0;
293
294        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
295            let mut full_key = self.start_key.clone();
296            full_key[0] = KeyPrefix::RootKey as u8;
297            let operation = Operation::Put(KeyValue {
298                key: full_key,
299                value: vec![],
300            });
301            let statement = Statement {
302                operation: Some(operation),
303            };
304            statements.push(statement);
305            chunk_size += self.start_key.len();
306        }
307
308        let bcs_root_key_len = self.start_key.len() - self.prefix_len;
309        for operation in batch.operations {
310            let (key_len, value_len) = match &operation {
311                WriteOperation::Delete { key } => (key.len(), 0),
312                WriteOperation::Put { key, value } => (key.len(), value.len()),
313                WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
314            };
315            let operation_size = key_len + value_len + bcs_root_key_len;
316            ensure!(
317                key_len <= MAX_KEY_SIZE,
318                StorageServiceStoreError::KeyTooLong
319            );
320            if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
321                let statement = self.get_statement(operation);
322                statements.push(statement);
323                chunk_size += operation_size;
324            } else {
325                self.submit_statements(mem::take(&mut statements)).await?;
326                chunk_size = 0;
327                if operation_size > MAX_PAYLOAD_SIZE {
328                    // One single operation is especially big. So split it in chunks.
329                    let WriteOperation::Put { key, value } = operation else {
330                        // Only the put can go over the limit
331                        unreachable!();
332                    };
333                    let mut full_key = self.start_key.clone();
334                    full_key.extend(key);
335                    let value_chunks = value
336                        .chunks(MAX_PAYLOAD_SIZE)
337                        .map(|x| x.to_vec())
338                        .collect::<Vec<_>>();
339                    let num_chunks = value_chunks.len();
340                    for (i_chunk, value) in value_chunks.into_iter().enumerate() {
341                        let last = i_chunk + 1 == num_chunks;
342                        let operation = Operation::Append(KeyValueAppend {
343                            key: full_key.clone(),
344                            value,
345                            last,
346                        });
347                        statements = vec![Statement {
348                            operation: Some(operation),
349                        }];
350                        self.submit_statements(mem::take(&mut statements)).await?;
351                    }
352                } else {
353                    // The operation is small enough, it is just that we have many so we need to split.
354                    let statement = self.get_statement(operation);
355                    statements.push(statement);
356                    chunk_size = operation_size;
357                }
358            }
359        }
360        self.submit_statements(mem::take(&mut statements)).await
361    }
362
363    async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
364        Ok(())
365    }
366}
367
368impl StorageServiceStoreInternal {
369    /// Obtains the semaphore lock on the database if needed.
370    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
371        match &self.semaphore {
372            None => None,
373            Some(count) => Some(count.acquire().await),
374        }
375    }
376
377    async fn submit_statements(
378        &self,
379        statements: Vec<Statement>,
380    ) -> Result<(), StorageServiceStoreError> {
381        if !statements.is_empty() {
382            let query = RequestWriteBatchExtended { statements };
383            let request = tonic::Request::new(query);
384            let channel = self.channel.clone();
385            let mut client = StorageServiceClient::new(channel);
386            let _guard = self.acquire().await;
387            let _response = client
388                .process_write_batch_extended(request)
389                .make_sync()
390                .await?;
391        }
392        Ok(())
393    }
394
395    fn get_statement(&self, operation: WriteOperation) -> Statement {
396        let operation = match operation {
397            WriteOperation::Delete { key } => {
398                let mut full_key = self.start_key.clone();
399                full_key.extend(key);
400                Operation::Delete(full_key)
401            }
402            WriteOperation::Put { key, value } => {
403                let mut full_key = self.start_key.clone();
404                full_key.extend(key);
405                Operation::Put(KeyValue {
406                    key: full_key,
407                    value,
408                })
409            }
410            WriteOperation::DeletePrefix { key_prefix } => {
411                let mut full_key_prefix = self.start_key.clone();
412                full_key_prefix.extend(key_prefix);
413                Operation::DeletePrefix(full_key_prefix)
414            }
415        };
416        Statement {
417            operation: Some(operation),
418        }
419    }
420
421    async fn read_single_entry(
422        &self,
423        message_index: i64,
424        index: i32,
425    ) -> Result<Vec<u8>, StorageServiceStoreError> {
426        let channel = self.channel.clone();
427        let query = RequestSpecificChunk {
428            message_index,
429            index,
430        };
431        let request = tonic::Request::new(query);
432        let mut client = StorageServiceClient::new(channel);
433        let response = client.process_specific_chunk(request).make_sync().await?;
434        let response = response.into_inner();
435        let ReplySpecificChunk { chunk } = response;
436        Ok(chunk)
437    }
438
439    async fn read_entries<S: DeserializeOwned>(
440        &self,
441        message_index: i64,
442        num_chunks: i32,
443    ) -> Result<S, StorageServiceStoreError> {
444        let mut handles = Vec::new();
445        for index in 0..num_chunks {
446            let handle = self.read_single_entry(message_index, index);
447            handles.push(handle);
448        }
449        let mut value = Vec::new();
450        for chunk in join_all(handles).await {
451            let chunk = chunk?;
452            value.extend(chunk);
453        }
454        Ok(bcs::from_bytes(&value)?)
455    }
456}
457
458impl KeyValueDatabase for StorageServiceDatabaseInternal {
459    type Config = StorageServiceStoreInternalConfig;
460    type Store = StorageServiceStoreInternal;
461
462    fn get_name() -> String {
463        "service store".to_string()
464    }
465
466    async fn connect(
467        config: &Self::Config,
468        namespace: &str,
469    ) -> Result<Self, StorageServiceStoreError> {
470        let semaphore = config
471            .max_concurrent_queries
472            .map(|n| Arc::new(Semaphore::new(n)));
473        let namespace = bcs::to_bytes(namespace)?;
474        let max_stream_queries = config.max_stream_queries;
475        let endpoint = config.http_address();
476        let endpoint = Endpoint::from_shared(endpoint)?;
477        let channel = endpoint.connect_lazy();
478        Ok(Self {
479            channel,
480            semaphore,
481            max_stream_queries,
482            namespace,
483        })
484    }
485
486    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
487        let channel = self.channel.clone();
488        let semaphore = self.semaphore.clone();
489        let max_stream_queries = self.max_stream_queries;
490        let mut start_key = vec![KeyPrefix::Key as u8];
491        start_key.extend(&self.namespace);
492        start_key.extend(bcs::to_bytes(root_key)?);
493        let prefix_len = self.namespace.len() + 1;
494        Ok(StorageServiceStoreInternal {
495            channel,
496            semaphore,
497            max_stream_queries,
498            prefix_len,
499            start_key,
500            root_key_written: Arc::new(AtomicBool::new(false)),
501        })
502    }
503
504    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
505        self.open_shared(root_key)
506    }
507
508    async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
509        let endpoint = config.http_address();
510        let endpoint = Endpoint::from_shared(endpoint)?;
511        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
512        let response = client.process_list_all(()).make_sync().await?;
513        let response = response.into_inner();
514        let ReplyListAll { namespaces } = response;
515        let namespaces = namespaces
516            .into_iter()
517            .map(|x| bcs::from_bytes(&x))
518            .collect::<Result<_, _>>()?;
519        Ok(namespaces)
520    }
521
522    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
523        let query = RequestListRootKeys {
524            namespace: self.namespace.clone(),
525        };
526        let request = tonic::Request::new(query);
527        let mut client = StorageServiceClient::new(self.channel.clone());
528        let response = client.process_list_root_keys(request).make_sync().await?;
529        let response = response.into_inner();
530        let ReplyListRootKeys { root_keys } = response;
531        Ok(root_keys)
532    }
533
534    async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
535        let endpoint = config.http_address();
536        let endpoint = Endpoint::from_shared(endpoint)?;
537        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
538        let _response = client.process_delete_all(()).make_sync().await?;
539        Ok(())
540    }
541
542    async fn exists(
543        config: &Self::Config,
544        namespace: &str,
545    ) -> Result<bool, StorageServiceStoreError> {
546        let namespace = bcs::to_bytes(namespace)?;
547        let query = RequestExistsNamespace { namespace };
548        let request = tonic::Request::new(query);
549        let endpoint = config.http_address();
550        let endpoint = Endpoint::from_shared(endpoint)?;
551        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
552        let response = client.process_exists_namespace(request).make_sync().await?;
553        let response = response.into_inner();
554        let ReplyExistsNamespace { exists } = response;
555        Ok(exists)
556    }
557
558    async fn create(
559        config: &Self::Config,
560        namespace: &str,
561    ) -> Result<(), StorageServiceStoreError> {
562        if StorageServiceDatabaseInternal::exists(config, namespace).await? {
563            return Err(StorageServiceStoreError::StoreAlreadyExists);
564        }
565        let namespace = bcs::to_bytes(namespace)?;
566        let query = RequestCreateNamespace { namespace };
567        let request = tonic::Request::new(query);
568        let endpoint = config.http_address();
569        let endpoint = Endpoint::from_shared(endpoint)?;
570        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
571        let _response = client.process_create_namespace(request).make_sync().await?;
572        Ok(())
573    }
574
575    async fn delete(
576        config: &Self::Config,
577        namespace: &str,
578    ) -> Result<(), StorageServiceStoreError> {
579        let namespace = bcs::to_bytes(namespace)?;
580        let query = RequestDeleteNamespace { namespace };
581        let request = tonic::Request::new(query);
582        let endpoint = config.http_address();
583        let endpoint = Endpoint::from_shared(endpoint)?;
584        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
585        let _response = client.process_delete_namespace(request).make_sync().await?;
586        Ok(())
587    }
588}
589
590#[cfg(with_testing)]
591impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
592    async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
593    {
594        let endpoint = storage_service_test_endpoint()?;
595        service_config_from_endpoint(&endpoint)
596    }
597}
598
599/// Creates a `StorageServiceStoreConfig` from an endpoint.
600pub fn service_config_from_endpoint(
601    endpoint: &str,
602) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
603    Ok(StorageServiceStoreInternalConfig {
604        endpoint: endpoint.to_string(),
605        max_concurrent_queries: None,
606        max_stream_queries: 100,
607    })
608}
609
610/// Checks that endpoint is truly absent.
611pub async fn storage_service_check_absence(
612    endpoint: &str,
613) -> Result<bool, StorageServiceStoreError> {
614    let endpoint = Endpoint::from_shared(endpoint.to_string())?;
615    let result = StorageServiceClient::connect(endpoint).await;
616    Ok(result.is_err())
617}
618
619/// Checks whether an endpoint is valid or not.
620pub async fn storage_service_check_validity(
621    endpoint: &str,
622) -> Result<(), StorageServiceStoreError> {
623    let config = service_config_from_endpoint(endpoint).unwrap();
624    let namespace = "namespace";
625    let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
626    let store = database.open_shared(&[])?;
627    let _value = store.read_value_bytes(&[42]).await?;
628    Ok(())
629}
630
631/// The service database client with metrics
632#[cfg(with_metrics)]
633pub type StorageServiceDatabase =
634    MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
635
636/// The service database client without metrics
637#[cfg(not(with_metrics))]
638pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;