1use std::{
5 mem,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20 batch::{Batch, WriteOperation},
21 lru_caching::LruCachingDatabase,
22 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23 FutureSyncExt as _,
24};
25use serde::de::DeserializeOwned;
26use tonic::transport::{Channel, Endpoint};
27
28#[cfg(with_testing)]
29use crate::common::storage_service_test_endpoint;
30use crate::{
31 common::{
32 KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
33 },
34 key_value_store::{
35 statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
36 KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
37 ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
38 ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
39 RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
40 RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
41 RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
42 RequestWriteBatchExtended, Statement,
43 },
44};
45
46const MAX_KEY_SIZE: usize = 1000000;
48
49#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70 channel: Channel,
71 semaphore: Option<Arc<Semaphore>>,
72 max_stream_queries: usize,
73 namespace: Vec<u8>,
74}
75
76#[derive(Clone)]
77pub struct StorageServiceStoreInternal {
78 channel: Channel,
79 semaphore: Option<Arc<Semaphore>>,
80 max_stream_queries: usize,
81 prefix_len: usize,
82 start_key: Vec<u8>,
83 root_key_written: Arc<AtomicBool>,
84}
85
86impl WithError for StorageServiceDatabaseInternal {
87 type Error = StorageServiceStoreError;
88}
89
90impl WithError for StorageServiceStoreInternal {
91 type Error = StorageServiceStoreError;
92}
93
94impl ReadableKeyValueStore for StorageServiceStoreInternal {
95 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
96
97 fn max_stream_queries(&self) -> usize {
98 self.max_stream_queries
99 }
100
101 async fn read_value_bytes(
102 &self,
103 key: &[u8],
104 ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
105 ensure!(
106 key.len() <= MAX_KEY_SIZE,
107 StorageServiceStoreError::KeyTooLong
108 );
109 let mut full_key = self.start_key.clone();
110 full_key.extend(key);
111 let query = RequestReadValue { key: full_key };
112 let request = tonic::Request::new(query);
113 let channel = self.channel.clone();
114 let mut client = StorageServiceClient::new(channel);
115 let _guard = self.acquire().await;
116 let response = client.process_read_value(request).make_sync().await?;
117 let response = response.into_inner();
118 let ReplyReadValue {
119 value,
120 message_index,
121 num_chunks,
122 } = response;
123 if num_chunks == 0 {
124 Ok(value)
125 } else {
126 self.read_entries(message_index, num_chunks).await
127 }
128 }
129
130 async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
131 ensure!(
132 key.len() <= MAX_KEY_SIZE,
133 StorageServiceStoreError::KeyTooLong
134 );
135 let mut full_key = self.start_key.clone();
136 full_key.extend(key);
137 let query = RequestContainsKey { key: full_key };
138 let request = tonic::Request::new(query);
139 let channel = self.channel.clone();
140 let mut client = StorageServiceClient::new(channel);
141 let _guard = self.acquire().await;
142 let response = client.process_contains_key(request).make_sync().await?;
143 let response = response.into_inner();
144 let ReplyContainsKey { test } = response;
145 Ok(test)
146 }
147
148 async fn contains_keys(
149 &self,
150 keys: Vec<Vec<u8>>,
151 ) -> Result<Vec<bool>, StorageServiceStoreError> {
152 let mut full_keys = Vec::new();
153 for key in keys {
154 ensure!(
155 key.len() <= MAX_KEY_SIZE,
156 StorageServiceStoreError::KeyTooLong
157 );
158 let mut full_key = self.start_key.clone();
159 full_key.extend(&key);
160 full_keys.push(full_key);
161 }
162 let query = RequestContainsKeys { keys: full_keys };
163 let request = tonic::Request::new(query);
164 let channel = self.channel.clone();
165 let mut client = StorageServiceClient::new(channel);
166 let _guard = self.acquire().await;
167 let response = client.process_contains_keys(request).make_sync().await?;
168 let response = response.into_inner();
169 let ReplyContainsKeys { tests } = response;
170 Ok(tests)
171 }
172
173 async fn read_multi_values_bytes(
174 &self,
175 keys: Vec<Vec<u8>>,
176 ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
177 let mut full_keys = Vec::new();
178 for key in keys {
179 ensure!(
180 key.len() <= MAX_KEY_SIZE,
181 StorageServiceStoreError::KeyTooLong
182 );
183 let mut full_key = self.start_key.clone();
184 full_key.extend(&key);
185 full_keys.push(full_key);
186 }
187 let query = RequestReadMultiValues { keys: full_keys };
188 let request = tonic::Request::new(query);
189 let channel = self.channel.clone();
190 let mut client = StorageServiceClient::new(channel);
191 let _guard = self.acquire().await;
192 let response = client
193 .process_read_multi_values(request)
194 .make_sync()
195 .await?;
196 let response = response.into_inner();
197 let ReplyReadMultiValues {
198 values,
199 message_index,
200 num_chunks,
201 } = response;
202 if num_chunks == 0 {
203 let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
204 Ok(values)
205 } else {
206 self.read_entries(message_index, num_chunks).await
207 }
208 }
209
210 async fn find_keys_by_prefix(
211 &self,
212 key_prefix: &[u8],
213 ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
214 ensure!(
215 key_prefix.len() <= MAX_KEY_SIZE,
216 StorageServiceStoreError::KeyTooLong
217 );
218 let mut full_key_prefix = self.start_key.clone();
219 full_key_prefix.extend(key_prefix);
220 let query = RequestFindKeysByPrefix {
221 key_prefix: full_key_prefix,
222 };
223 let request = tonic::Request::new(query);
224 let channel = self.channel.clone();
225 let mut client = StorageServiceClient::new(channel);
226 let _guard = self.acquire().await;
227 let response = client
228 .process_find_keys_by_prefix(request)
229 .make_sync()
230 .await?;
231 let response = response.into_inner();
232 let ReplyFindKeysByPrefix {
233 keys,
234 message_index,
235 num_chunks,
236 } = response;
237 if num_chunks == 0 {
238 Ok(keys)
239 } else {
240 self.read_entries(message_index, num_chunks).await
241 }
242 }
243
244 async fn find_key_values_by_prefix(
245 &self,
246 key_prefix: &[u8],
247 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
248 ensure!(
249 key_prefix.len() <= MAX_KEY_SIZE,
250 StorageServiceStoreError::KeyTooLong
251 );
252 let mut full_key_prefix = self.start_key.clone();
253 full_key_prefix.extend(key_prefix);
254 let query = RequestFindKeyValuesByPrefix {
255 key_prefix: full_key_prefix,
256 };
257 let request = tonic::Request::new(query);
258 let channel = self.channel.clone();
259 let mut client = StorageServiceClient::new(channel);
260 let _guard = self.acquire().await;
261 let response = client
262 .process_find_key_values_by_prefix(request)
263 .make_sync()
264 .await?;
265 let response = response.into_inner();
266 let ReplyFindKeyValuesByPrefix {
267 key_values,
268 message_index,
269 num_chunks,
270 } = response;
271 if num_chunks == 0 {
272 let key_values = key_values
273 .into_iter()
274 .map(|x| (x.key, x.value))
275 .collect::<Vec<_>>();
276 Ok(key_values)
277 } else {
278 self.read_entries(message_index, num_chunks).await
279 }
280 }
281}
282
283impl WritableKeyValueStore for StorageServiceStoreInternal {
284 const MAX_VALUE_SIZE: usize = usize::MAX;
285
286 async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
287 if batch.operations.is_empty() {
288 return Ok(());
289 }
290 let mut statements = Vec::new();
291 let mut chunk_size = 0;
292
293 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
294 let mut full_key = self.start_key.clone();
295 full_key[0] = KeyPrefix::RootKey as u8;
296 let operation = Operation::Put(KeyValue {
297 key: full_key,
298 value: vec![],
299 });
300 let statement = Statement {
301 operation: Some(operation),
302 };
303 statements.push(statement);
304 chunk_size += self.start_key.len();
305 }
306
307 let root_key_len = self.start_key.len() - self.prefix_len;
308 for operation in batch.operations {
309 let (key_len, value_len) = match &operation {
310 WriteOperation::Delete { key } => (key.len(), 0),
311 WriteOperation::Put { key, value } => (key.len(), value.len()),
312 WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
313 };
314 let operation_size = key_len + value_len + root_key_len;
315 ensure!(
316 key_len <= MAX_KEY_SIZE,
317 StorageServiceStoreError::KeyTooLong
318 );
319 if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
320 let statement = self.get_statement(operation);
321 statements.push(statement);
322 chunk_size += operation_size;
323 } else {
324 self.submit_statements(mem::take(&mut statements)).await?;
325 chunk_size = 0;
326 if operation_size > MAX_PAYLOAD_SIZE {
327 let WriteOperation::Put { key, value } = operation else {
329 unreachable!();
331 };
332 let mut full_key = self.start_key.clone();
333 full_key.extend(key);
334 let value_chunks = value
335 .chunks(MAX_PAYLOAD_SIZE)
336 .map(|x| x.to_vec())
337 .collect::<Vec<_>>();
338 let num_chunks = value_chunks.len();
339 for (i_chunk, value) in value_chunks.into_iter().enumerate() {
340 let last = i_chunk + 1 == num_chunks;
341 let operation = Operation::Append(KeyValueAppend {
342 key: full_key.clone(),
343 value,
344 last,
345 });
346 statements = vec![Statement {
347 operation: Some(operation),
348 }];
349 self.submit_statements(mem::take(&mut statements)).await?;
350 }
351 } else {
352 let statement = self.get_statement(operation);
354 statements.push(statement);
355 chunk_size = operation_size;
356 }
357 }
358 }
359 self.submit_statements(mem::take(&mut statements)).await
360 }
361
362 async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
363 Ok(())
364 }
365}
366
367impl StorageServiceStoreInternal {
368 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
370 match &self.semaphore {
371 None => None,
372 Some(count) => Some(count.acquire().await),
373 }
374 }
375
376 async fn submit_statements(
377 &self,
378 statements: Vec<Statement>,
379 ) -> Result<(), StorageServiceStoreError> {
380 if !statements.is_empty() {
381 let query = RequestWriteBatchExtended { statements };
382 let request = tonic::Request::new(query);
383 let channel = self.channel.clone();
384 let mut client = StorageServiceClient::new(channel);
385 let _guard = self.acquire().await;
386 let _response = client
387 .process_write_batch_extended(request)
388 .make_sync()
389 .await?;
390 }
391 Ok(())
392 }
393
394 fn get_statement(&self, operation: WriteOperation) -> Statement {
395 let operation = match operation {
396 WriteOperation::Delete { key } => {
397 let mut full_key = self.start_key.clone();
398 full_key.extend(key);
399 Operation::Delete(full_key)
400 }
401 WriteOperation::Put { key, value } => {
402 let mut full_key = self.start_key.clone();
403 full_key.extend(key);
404 Operation::Put(KeyValue {
405 key: full_key,
406 value,
407 })
408 }
409 WriteOperation::DeletePrefix { key_prefix } => {
410 let mut full_key_prefix = self.start_key.clone();
411 full_key_prefix.extend(key_prefix);
412 Operation::DeletePrefix(full_key_prefix)
413 }
414 };
415 Statement {
416 operation: Some(operation),
417 }
418 }
419
420 async fn read_single_entry(
421 &self,
422 message_index: i64,
423 index: i32,
424 ) -> Result<Vec<u8>, StorageServiceStoreError> {
425 let channel = self.channel.clone();
426 let query = RequestSpecificChunk {
427 message_index,
428 index,
429 };
430 let request = tonic::Request::new(query);
431 let mut client = StorageServiceClient::new(channel);
432 let response = client.process_specific_chunk(request).make_sync().await?;
433 let response = response.into_inner();
434 let ReplySpecificChunk { chunk } = response;
435 Ok(chunk)
436 }
437
438 async fn read_entries<S: DeserializeOwned>(
439 &self,
440 message_index: i64,
441 num_chunks: i32,
442 ) -> Result<S, StorageServiceStoreError> {
443 let mut handles = Vec::new();
444 for index in 0..num_chunks {
445 let handle = self.read_single_entry(message_index, index);
446 handles.push(handle);
447 }
448 let mut value = Vec::new();
449 for chunk in join_all(handles).await {
450 let chunk = chunk?;
451 value.extend(chunk);
452 }
453 Ok(bcs::from_bytes(&value)?)
454 }
455}
456
457impl KeyValueDatabase for StorageServiceDatabaseInternal {
458 type Config = StorageServiceStoreInternalConfig;
459 type Store = StorageServiceStoreInternal;
460
461 fn get_name() -> String {
462 "service store".to_string()
463 }
464
465 async fn connect(
466 config: &Self::Config,
467 namespace: &str,
468 ) -> Result<Self, StorageServiceStoreError> {
469 let semaphore = config
470 .max_concurrent_queries
471 .map(|n| Arc::new(Semaphore::new(n)));
472 let namespace = bcs::to_bytes(namespace)?;
473 let max_stream_queries = config.max_stream_queries;
474 let endpoint = config.http_address();
475 let endpoint = Endpoint::from_shared(endpoint)?;
476 let channel = endpoint.connect_lazy();
477 Ok(Self {
478 channel,
479 semaphore,
480 max_stream_queries,
481 namespace,
482 })
483 }
484
485 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
486 let channel = self.channel.clone();
487 let semaphore = self.semaphore.clone();
488 let max_stream_queries = self.max_stream_queries;
489 let mut start_key = vec![KeyPrefix::Key as u8];
490 start_key.extend(&self.namespace);
491 start_key.extend(root_key);
492 let prefix_len = self.namespace.len() + 1;
493 Ok(StorageServiceStoreInternal {
494 channel,
495 semaphore,
496 max_stream_queries,
497 prefix_len,
498 start_key,
499 root_key_written: Arc::new(AtomicBool::new(false)),
500 })
501 }
502
503 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
504 self.open_shared(root_key)
505 }
506
507 async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
508 let endpoint = config.http_address();
509 let endpoint = Endpoint::from_shared(endpoint)?;
510 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
511 let response = client.process_list_all(()).make_sync().await?;
512 let response = response.into_inner();
513 let ReplyListAll { namespaces } = response;
514 let namespaces = namespaces
515 .into_iter()
516 .map(|x| bcs::from_bytes(&x))
517 .collect::<Result<_, _>>()?;
518 Ok(namespaces)
519 }
520
521 async fn list_root_keys(
522 config: &Self::Config,
523 namespace: &str,
524 ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
525 let namespace = bcs::to_bytes(namespace)?;
526 let query = RequestListRootKeys { namespace };
527 let request = tonic::Request::new(query);
528 let endpoint = config.http_address();
529 let endpoint = Endpoint::from_shared(endpoint)?;
530 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
531 let response = client.process_list_root_keys(request).make_sync().await?;
532 let response = response.into_inner();
533 let ReplyListRootKeys { root_keys } = response;
534 Ok(root_keys)
535 }
536
537 async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
538 let endpoint = config.http_address();
539 let endpoint = Endpoint::from_shared(endpoint)?;
540 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
541 let _response = client.process_delete_all(()).make_sync().await?;
542 Ok(())
543 }
544
545 async fn exists(
546 config: &Self::Config,
547 namespace: &str,
548 ) -> Result<bool, StorageServiceStoreError> {
549 let namespace = bcs::to_bytes(namespace)?;
550 let query = RequestExistsNamespace { namespace };
551 let request = tonic::Request::new(query);
552 let endpoint = config.http_address();
553 let endpoint = Endpoint::from_shared(endpoint)?;
554 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
555 let response = client.process_exists_namespace(request).make_sync().await?;
556 let response = response.into_inner();
557 let ReplyExistsNamespace { exists } = response;
558 Ok(exists)
559 }
560
561 async fn create(
562 config: &Self::Config,
563 namespace: &str,
564 ) -> Result<(), StorageServiceStoreError> {
565 if StorageServiceDatabaseInternal::exists(config, namespace).await? {
566 return Err(StorageServiceStoreError::StoreAlreadyExists);
567 }
568 let namespace = bcs::to_bytes(namespace)?;
569 let query = RequestCreateNamespace { namespace };
570 let request = tonic::Request::new(query);
571 let endpoint = config.http_address();
572 let endpoint = Endpoint::from_shared(endpoint)?;
573 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
574 let _response = client.process_create_namespace(request).make_sync().await?;
575 Ok(())
576 }
577
578 async fn delete(
579 config: &Self::Config,
580 namespace: &str,
581 ) -> Result<(), StorageServiceStoreError> {
582 let namespace = bcs::to_bytes(namespace)?;
583 let query = RequestDeleteNamespace { namespace };
584 let request = tonic::Request::new(query);
585 let endpoint = config.http_address();
586 let endpoint = Endpoint::from_shared(endpoint)?;
587 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
588 let _response = client.process_delete_namespace(request).make_sync().await?;
589 Ok(())
590 }
591}
592
593#[cfg(with_testing)]
594impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
595 async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
596 {
597 let endpoint = storage_service_test_endpoint()?;
598 service_config_from_endpoint(&endpoint)
599 }
600}
601
602pub fn service_config_from_endpoint(
604 endpoint: &str,
605) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
606 Ok(StorageServiceStoreInternalConfig {
607 endpoint: endpoint.to_string(),
608 max_concurrent_queries: None,
609 max_stream_queries: 100,
610 })
611}
612
613pub async fn storage_service_check_absence(
615 endpoint: &str,
616) -> Result<bool, StorageServiceStoreError> {
617 let endpoint = Endpoint::from_shared(endpoint.to_string())?;
618 let result = StorageServiceClient::connect(endpoint).await;
619 Ok(result.is_err())
620}
621
622pub async fn storage_service_check_validity(
624 endpoint: &str,
625) -> Result<(), StorageServiceStoreError> {
626 let config = service_config_from_endpoint(endpoint).unwrap();
627 let namespace = "namespace";
628 let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
629 let store = database.open_shared(&[])?;
630 let _value = store.read_value_bytes(&[42]).await?;
631 Ok(())
632}
633
634#[cfg(with_metrics)]
636pub type StorageServiceDatabase =
637 MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
638
639#[cfg(not(with_metrics))]
641pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;