linera_storage_service/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    mem,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20    batch::{Batch, WriteOperation},
21    lru_caching::LruCachingDatabase,
22    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23    FutureSyncExt as _,
24};
25use serde::de::DeserializeOwned;
26use tonic::transport::{Channel, Endpoint};
27
28#[cfg(with_testing)]
29use crate::common::storage_service_test_endpoint;
30use crate::{
31    common::{
32        KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
33    },
34    key_value_store::{
35        statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
36        KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
37        ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
38        ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
39        RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
40        RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
41        RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
42        RequestWriteBatchExtended, Statement,
43    },
44};
45
46// The maximum key size is set to 1M rather arbitrarily.
47const MAX_KEY_SIZE: usize = 1000000;
48
49// The shared store client.
50// * Interior mutability is required for the client because
51// accessing requires mutability while the KeyValueStore
52// does not allow it.
53// * The semaphore and max_stream_queries work as other
54// stores.
55//
56// The encoding of namespaces is done by taking their
57// serialization. This works because the set of serialization
58// of strings is prefix free.
59// The data is stored in the following way.
60// * A `key` in a `namespace` is stored as
61//   [`KeyPrefix::Key`] + namespace + key
62// * An additional key with empty value is stored at
63//   [`KeyPrefix::Namespace`] + namespace
64//   is stored to indicate the existence of a namespace.
65// * A key with empty value is stored at
66//   [`KeyPrefix::RootKey`] + namespace + root_key
67//   to indicate the existence of a root key.
68#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70    channel: Channel,
71    semaphore: Option<Arc<Semaphore>>,
72    max_stream_queries: usize,
73    namespace: Vec<u8>,
74}
75
76#[derive(Clone)]
77pub struct StorageServiceStoreInternal {
78    channel: Channel,
79    semaphore: Option<Arc<Semaphore>>,
80    max_stream_queries: usize,
81    prefix_len: usize,
82    start_key: Vec<u8>,
83    root_key_written: Arc<AtomicBool>,
84}
85
86impl WithError for StorageServiceDatabaseInternal {
87    type Error = StorageServiceStoreError;
88}
89
90impl WithError for StorageServiceStoreInternal {
91    type Error = StorageServiceStoreError;
92}
93
94impl ReadableKeyValueStore for StorageServiceStoreInternal {
95    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
96
97    fn max_stream_queries(&self) -> usize {
98        self.max_stream_queries
99    }
100
101    async fn read_value_bytes(
102        &self,
103        key: &[u8],
104    ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
105        ensure!(
106            key.len() <= MAX_KEY_SIZE,
107            StorageServiceStoreError::KeyTooLong
108        );
109        let mut full_key = self.start_key.clone();
110        full_key.extend(key);
111        let query = RequestReadValue { key: full_key };
112        let request = tonic::Request::new(query);
113        let channel = self.channel.clone();
114        let mut client = StorageServiceClient::new(channel);
115        let _guard = self.acquire().await;
116        let response = client.process_read_value(request).make_sync().await?;
117        let response = response.into_inner();
118        let ReplyReadValue {
119            value,
120            message_index,
121            num_chunks,
122        } = response;
123        if num_chunks == 0 {
124            Ok(value)
125        } else {
126            self.read_entries(message_index, num_chunks).await
127        }
128    }
129
130    async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
131        ensure!(
132            key.len() <= MAX_KEY_SIZE,
133            StorageServiceStoreError::KeyTooLong
134        );
135        let mut full_key = self.start_key.clone();
136        full_key.extend(key);
137        let query = RequestContainsKey { key: full_key };
138        let request = tonic::Request::new(query);
139        let channel = self.channel.clone();
140        let mut client = StorageServiceClient::new(channel);
141        let _guard = self.acquire().await;
142        let response = client.process_contains_key(request).make_sync().await?;
143        let response = response.into_inner();
144        let ReplyContainsKey { test } = response;
145        Ok(test)
146    }
147
148    async fn contains_keys(
149        &self,
150        keys: Vec<Vec<u8>>,
151    ) -> Result<Vec<bool>, StorageServiceStoreError> {
152        let mut full_keys = Vec::new();
153        for key in keys {
154            ensure!(
155                key.len() <= MAX_KEY_SIZE,
156                StorageServiceStoreError::KeyTooLong
157            );
158            let mut full_key = self.start_key.clone();
159            full_key.extend(&key);
160            full_keys.push(full_key);
161        }
162        let query = RequestContainsKeys { keys: full_keys };
163        let request = tonic::Request::new(query);
164        let channel = self.channel.clone();
165        let mut client = StorageServiceClient::new(channel);
166        let _guard = self.acquire().await;
167        let response = client.process_contains_keys(request).make_sync().await?;
168        let response = response.into_inner();
169        let ReplyContainsKeys { tests } = response;
170        Ok(tests)
171    }
172
173    async fn read_multi_values_bytes(
174        &self,
175        keys: Vec<Vec<u8>>,
176    ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
177        let mut full_keys = Vec::new();
178        for key in keys {
179            ensure!(
180                key.len() <= MAX_KEY_SIZE,
181                StorageServiceStoreError::KeyTooLong
182            );
183            let mut full_key = self.start_key.clone();
184            full_key.extend(&key);
185            full_keys.push(full_key);
186        }
187        let query = RequestReadMultiValues { keys: full_keys };
188        let request = tonic::Request::new(query);
189        let channel = self.channel.clone();
190        let mut client = StorageServiceClient::new(channel);
191        let _guard = self.acquire().await;
192        let response = client
193            .process_read_multi_values(request)
194            .make_sync()
195            .await?;
196        let response = response.into_inner();
197        let ReplyReadMultiValues {
198            values,
199            message_index,
200            num_chunks,
201        } = response;
202        if num_chunks == 0 {
203            let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
204            Ok(values)
205        } else {
206            self.read_entries(message_index, num_chunks).await
207        }
208    }
209
210    async fn find_keys_by_prefix(
211        &self,
212        key_prefix: &[u8],
213    ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
214        ensure!(
215            key_prefix.len() <= MAX_KEY_SIZE,
216            StorageServiceStoreError::KeyTooLong
217        );
218        let mut full_key_prefix = self.start_key.clone();
219        full_key_prefix.extend(key_prefix);
220        let query = RequestFindKeysByPrefix {
221            key_prefix: full_key_prefix,
222        };
223        let request = tonic::Request::new(query);
224        let channel = self.channel.clone();
225        let mut client = StorageServiceClient::new(channel);
226        let _guard = self.acquire().await;
227        let response = client
228            .process_find_keys_by_prefix(request)
229            .make_sync()
230            .await?;
231        let response = response.into_inner();
232        let ReplyFindKeysByPrefix {
233            keys,
234            message_index,
235            num_chunks,
236        } = response;
237        if num_chunks == 0 {
238            Ok(keys)
239        } else {
240            self.read_entries(message_index, num_chunks).await
241        }
242    }
243
244    async fn find_key_values_by_prefix(
245        &self,
246        key_prefix: &[u8],
247    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
248        ensure!(
249            key_prefix.len() <= MAX_KEY_SIZE,
250            StorageServiceStoreError::KeyTooLong
251        );
252        let mut full_key_prefix = self.start_key.clone();
253        full_key_prefix.extend(key_prefix);
254        let query = RequestFindKeyValuesByPrefix {
255            key_prefix: full_key_prefix,
256        };
257        let request = tonic::Request::new(query);
258        let channel = self.channel.clone();
259        let mut client = StorageServiceClient::new(channel);
260        let _guard = self.acquire().await;
261        let response = client
262            .process_find_key_values_by_prefix(request)
263            .make_sync()
264            .await?;
265        let response = response.into_inner();
266        let ReplyFindKeyValuesByPrefix {
267            key_values,
268            message_index,
269            num_chunks,
270        } = response;
271        if num_chunks == 0 {
272            let key_values = key_values
273                .into_iter()
274                .map(|x| (x.key, x.value))
275                .collect::<Vec<_>>();
276            Ok(key_values)
277        } else {
278            self.read_entries(message_index, num_chunks).await
279        }
280    }
281}
282
283impl WritableKeyValueStore for StorageServiceStoreInternal {
284    const MAX_VALUE_SIZE: usize = usize::MAX;
285
286    async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
287        if batch.operations.is_empty() {
288            return Ok(());
289        }
290        let mut statements = Vec::new();
291        let mut chunk_size = 0;
292
293        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
294            let mut full_key = self.start_key.clone();
295            full_key[0] = KeyPrefix::RootKey as u8;
296            let operation = Operation::Put(KeyValue {
297                key: full_key,
298                value: vec![],
299            });
300            let statement = Statement {
301                operation: Some(operation),
302            };
303            statements.push(statement);
304            chunk_size += self.start_key.len();
305        }
306
307        let root_key_len = self.start_key.len() - self.prefix_len;
308        for operation in batch.operations {
309            let (key_len, value_len) = match &operation {
310                WriteOperation::Delete { key } => (key.len(), 0),
311                WriteOperation::Put { key, value } => (key.len(), value.len()),
312                WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
313            };
314            let operation_size = key_len + value_len + root_key_len;
315            ensure!(
316                key_len <= MAX_KEY_SIZE,
317                StorageServiceStoreError::KeyTooLong
318            );
319            if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
320                let statement = self.get_statement(operation);
321                statements.push(statement);
322                chunk_size += operation_size;
323            } else {
324                self.submit_statements(mem::take(&mut statements)).await?;
325                chunk_size = 0;
326                if operation_size > MAX_PAYLOAD_SIZE {
327                    // One single operation is especially big. So split it in chunks.
328                    let WriteOperation::Put { key, value } = operation else {
329                        // Only the put can go over the limit
330                        unreachable!();
331                    };
332                    let mut full_key = self.start_key.clone();
333                    full_key.extend(key);
334                    let value_chunks = value
335                        .chunks(MAX_PAYLOAD_SIZE)
336                        .map(|x| x.to_vec())
337                        .collect::<Vec<_>>();
338                    let num_chunks = value_chunks.len();
339                    for (i_chunk, value) in value_chunks.into_iter().enumerate() {
340                        let last = i_chunk + 1 == num_chunks;
341                        let operation = Operation::Append(KeyValueAppend {
342                            key: full_key.clone(),
343                            value,
344                            last,
345                        });
346                        statements = vec![Statement {
347                            operation: Some(operation),
348                        }];
349                        self.submit_statements(mem::take(&mut statements)).await?;
350                    }
351                } else {
352                    // The operation is small enough, it is just that we have many so we need to split.
353                    let statement = self.get_statement(operation);
354                    statements.push(statement);
355                    chunk_size = operation_size;
356                }
357            }
358        }
359        self.submit_statements(mem::take(&mut statements)).await
360    }
361
362    async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
363        Ok(())
364    }
365}
366
367impl StorageServiceStoreInternal {
368    /// Obtains the semaphore lock on the database if needed.
369    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
370        match &self.semaphore {
371            None => None,
372            Some(count) => Some(count.acquire().await),
373        }
374    }
375
376    async fn submit_statements(
377        &self,
378        statements: Vec<Statement>,
379    ) -> Result<(), StorageServiceStoreError> {
380        if !statements.is_empty() {
381            let query = RequestWriteBatchExtended { statements };
382            let request = tonic::Request::new(query);
383            let channel = self.channel.clone();
384            let mut client = StorageServiceClient::new(channel);
385            let _guard = self.acquire().await;
386            let _response = client
387                .process_write_batch_extended(request)
388                .make_sync()
389                .await?;
390        }
391        Ok(())
392    }
393
394    fn get_statement(&self, operation: WriteOperation) -> Statement {
395        let operation = match operation {
396            WriteOperation::Delete { key } => {
397                let mut full_key = self.start_key.clone();
398                full_key.extend(key);
399                Operation::Delete(full_key)
400            }
401            WriteOperation::Put { key, value } => {
402                let mut full_key = self.start_key.clone();
403                full_key.extend(key);
404                Operation::Put(KeyValue {
405                    key: full_key,
406                    value,
407                })
408            }
409            WriteOperation::DeletePrefix { key_prefix } => {
410                let mut full_key_prefix = self.start_key.clone();
411                full_key_prefix.extend(key_prefix);
412                Operation::DeletePrefix(full_key_prefix)
413            }
414        };
415        Statement {
416            operation: Some(operation),
417        }
418    }
419
420    async fn read_single_entry(
421        &self,
422        message_index: i64,
423        index: i32,
424    ) -> Result<Vec<u8>, StorageServiceStoreError> {
425        let channel = self.channel.clone();
426        let query = RequestSpecificChunk {
427            message_index,
428            index,
429        };
430        let request = tonic::Request::new(query);
431        let mut client = StorageServiceClient::new(channel);
432        let response = client.process_specific_chunk(request).make_sync().await?;
433        let response = response.into_inner();
434        let ReplySpecificChunk { chunk } = response;
435        Ok(chunk)
436    }
437
438    async fn read_entries<S: DeserializeOwned>(
439        &self,
440        message_index: i64,
441        num_chunks: i32,
442    ) -> Result<S, StorageServiceStoreError> {
443        let mut handles = Vec::new();
444        for index in 0..num_chunks {
445            let handle = self.read_single_entry(message_index, index);
446            handles.push(handle);
447        }
448        let mut value = Vec::new();
449        for chunk in join_all(handles).await {
450            let chunk = chunk?;
451            value.extend(chunk);
452        }
453        Ok(bcs::from_bytes(&value)?)
454    }
455}
456
457impl KeyValueDatabase for StorageServiceDatabaseInternal {
458    type Config = StorageServiceStoreInternalConfig;
459    type Store = StorageServiceStoreInternal;
460
461    fn get_name() -> String {
462        "service store".to_string()
463    }
464
465    async fn connect(
466        config: &Self::Config,
467        namespace: &str,
468    ) -> Result<Self, StorageServiceStoreError> {
469        let semaphore = config
470            .max_concurrent_queries
471            .map(|n| Arc::new(Semaphore::new(n)));
472        let namespace = bcs::to_bytes(namespace)?;
473        let max_stream_queries = config.max_stream_queries;
474        let endpoint = config.http_address();
475        let endpoint = Endpoint::from_shared(endpoint)?;
476        let channel = endpoint.connect_lazy();
477        Ok(Self {
478            channel,
479            semaphore,
480            max_stream_queries,
481            namespace,
482        })
483    }
484
485    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
486        let channel = self.channel.clone();
487        let semaphore = self.semaphore.clone();
488        let max_stream_queries = self.max_stream_queries;
489        let mut start_key = vec![KeyPrefix::Key as u8];
490        start_key.extend(&self.namespace);
491        start_key.extend(root_key);
492        let prefix_len = self.namespace.len() + 1;
493        Ok(StorageServiceStoreInternal {
494            channel,
495            semaphore,
496            max_stream_queries,
497            prefix_len,
498            start_key,
499            root_key_written: Arc::new(AtomicBool::new(false)),
500        })
501    }
502
503    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
504        self.open_shared(root_key)
505    }
506
507    async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
508        let endpoint = config.http_address();
509        let endpoint = Endpoint::from_shared(endpoint)?;
510        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
511        let response = client.process_list_all(()).make_sync().await?;
512        let response = response.into_inner();
513        let ReplyListAll { namespaces } = response;
514        let namespaces = namespaces
515            .into_iter()
516            .map(|x| bcs::from_bytes(&x))
517            .collect::<Result<_, _>>()?;
518        Ok(namespaces)
519    }
520
521    async fn list_root_keys(
522        config: &Self::Config,
523        namespace: &str,
524    ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
525        let namespace = bcs::to_bytes(namespace)?;
526        let query = RequestListRootKeys { namespace };
527        let request = tonic::Request::new(query);
528        let endpoint = config.http_address();
529        let endpoint = Endpoint::from_shared(endpoint)?;
530        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
531        let response = client.process_list_root_keys(request).make_sync().await?;
532        let response = response.into_inner();
533        let ReplyListRootKeys { root_keys } = response;
534        Ok(root_keys)
535    }
536
537    async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
538        let endpoint = config.http_address();
539        let endpoint = Endpoint::from_shared(endpoint)?;
540        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
541        let _response = client.process_delete_all(()).make_sync().await?;
542        Ok(())
543    }
544
545    async fn exists(
546        config: &Self::Config,
547        namespace: &str,
548    ) -> Result<bool, StorageServiceStoreError> {
549        let namespace = bcs::to_bytes(namespace)?;
550        let query = RequestExistsNamespace { namespace };
551        let request = tonic::Request::new(query);
552        let endpoint = config.http_address();
553        let endpoint = Endpoint::from_shared(endpoint)?;
554        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
555        let response = client.process_exists_namespace(request).make_sync().await?;
556        let response = response.into_inner();
557        let ReplyExistsNamespace { exists } = response;
558        Ok(exists)
559    }
560
561    async fn create(
562        config: &Self::Config,
563        namespace: &str,
564    ) -> Result<(), StorageServiceStoreError> {
565        if StorageServiceDatabaseInternal::exists(config, namespace).await? {
566            return Err(StorageServiceStoreError::StoreAlreadyExists);
567        }
568        let namespace = bcs::to_bytes(namespace)?;
569        let query = RequestCreateNamespace { namespace };
570        let request = tonic::Request::new(query);
571        let endpoint = config.http_address();
572        let endpoint = Endpoint::from_shared(endpoint)?;
573        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
574        let _response = client.process_create_namespace(request).make_sync().await?;
575        Ok(())
576    }
577
578    async fn delete(
579        config: &Self::Config,
580        namespace: &str,
581    ) -> Result<(), StorageServiceStoreError> {
582        let namespace = bcs::to_bytes(namespace)?;
583        let query = RequestDeleteNamespace { namespace };
584        let request = tonic::Request::new(query);
585        let endpoint = config.http_address();
586        let endpoint = Endpoint::from_shared(endpoint)?;
587        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
588        let _response = client.process_delete_namespace(request).make_sync().await?;
589        Ok(())
590    }
591}
592
593#[cfg(with_testing)]
594impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
595    async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
596    {
597        let endpoint = storage_service_test_endpoint()?;
598        service_config_from_endpoint(&endpoint)
599    }
600}
601
602/// Creates a `StorageServiceStoreConfig` from an endpoint.
603pub fn service_config_from_endpoint(
604    endpoint: &str,
605) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
606    Ok(StorageServiceStoreInternalConfig {
607        endpoint: endpoint.to_string(),
608        max_concurrent_queries: None,
609        max_stream_queries: 100,
610    })
611}
612
613/// Checks that endpoint is truly absent.
614pub async fn storage_service_check_absence(
615    endpoint: &str,
616) -> Result<bool, StorageServiceStoreError> {
617    let endpoint = Endpoint::from_shared(endpoint.to_string())?;
618    let result = StorageServiceClient::connect(endpoint).await;
619    Ok(result.is_err())
620}
621
622/// Checks whether an endpoint is valid or not.
623pub async fn storage_service_check_validity(
624    endpoint: &str,
625) -> Result<(), StorageServiceStoreError> {
626    let config = service_config_from_endpoint(endpoint).unwrap();
627    let namespace = "namespace";
628    let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
629    let store = database.open_shared(&[])?;
630    let _value = store.read_value_bytes(&[42]).await?;
631    Ok(())
632}
633
634/// The service database client with metrics
635#[cfg(with_metrics)]
636pub type StorageServiceDatabase =
637    MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
638
639/// The service database client without metrics
640#[cfg(not(with_metrics))]
641pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;