1use std::{
5 mem,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20 batch::{Batch, WriteOperation},
21 lru_caching::LruCachingDatabase,
22 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23 FutureSyncExt as _,
24};
25use serde::de::DeserializeOwned;
26use tonic::transport::{Channel, Endpoint};
27
28#[cfg(with_testing)]
29use crate::common::storage_service_test_endpoint;
30use crate::{
31 common::{
32 KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
33 },
34 key_value_store::{
35 statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
36 KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
37 ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
38 ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
39 RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
40 RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
41 RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
42 RequestWriteBatchExtended, Statement,
43 },
44};
45
46const MAX_KEY_SIZE: usize = 1000000;
48
49#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70 channel: Channel,
71 semaphore: Option<Arc<Semaphore>>,
72 max_stream_queries: usize,
73 namespace: Vec<u8>,
74}
75
76#[derive(Clone)]
77pub struct StorageServiceStoreInternal {
78 channel: Channel,
79 semaphore: Option<Arc<Semaphore>>,
80 max_stream_queries: usize,
81 prefix_len: usize,
82 start_key: Vec<u8>,
83 root_key_written: Arc<AtomicBool>,
84}
85
86impl WithError for StorageServiceDatabaseInternal {
87 type Error = StorageServiceStoreError;
88}
89
90impl WithError for StorageServiceStoreInternal {
91 type Error = StorageServiceStoreError;
92}
93
94impl ReadableKeyValueStore for StorageServiceStoreInternal {
95 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
96
97 fn max_stream_queries(&self) -> usize {
98 self.max_stream_queries
99 }
100
101 fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
102 let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
103 Ok(root_key)
104 }
105
106 async fn read_value_bytes(
107 &self,
108 key: &[u8],
109 ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
110 ensure!(
111 key.len() <= MAX_KEY_SIZE,
112 StorageServiceStoreError::KeyTooLong
113 );
114 let mut full_key = self.start_key.clone();
115 full_key.extend(key);
116 let query = RequestReadValue { key: full_key };
117 let request = tonic::Request::new(query);
118 let channel = self.channel.clone();
119 let mut client = StorageServiceClient::new(channel);
120 let _guard = self.acquire().await;
121 let response = client.process_read_value(request).make_sync().await?;
122 let response = response.into_inner();
123 let ReplyReadValue {
124 value,
125 message_index,
126 num_chunks,
127 } = response;
128 if num_chunks == 0 {
129 Ok(value)
130 } else {
131 self.read_entries(message_index, num_chunks).await
132 }
133 }
134
135 async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
136 ensure!(
137 key.len() <= MAX_KEY_SIZE,
138 StorageServiceStoreError::KeyTooLong
139 );
140 let mut full_key = self.start_key.clone();
141 full_key.extend(key);
142 let query = RequestContainsKey { key: full_key };
143 let request = tonic::Request::new(query);
144 let channel = self.channel.clone();
145 let mut client = StorageServiceClient::new(channel);
146 let _guard = self.acquire().await;
147 let response = client.process_contains_key(request).make_sync().await?;
148 let response = response.into_inner();
149 let ReplyContainsKey { test } = response;
150 Ok(test)
151 }
152
153 async fn contains_keys(
154 &self,
155 keys: Vec<Vec<u8>>,
156 ) -> Result<Vec<bool>, StorageServiceStoreError> {
157 let mut full_keys = Vec::new();
158 for key in keys {
159 ensure!(
160 key.len() <= MAX_KEY_SIZE,
161 StorageServiceStoreError::KeyTooLong
162 );
163 let mut full_key = self.start_key.clone();
164 full_key.extend(&key);
165 full_keys.push(full_key);
166 }
167 let query = RequestContainsKeys { keys: full_keys };
168 let request = tonic::Request::new(query);
169 let channel = self.channel.clone();
170 let mut client = StorageServiceClient::new(channel);
171 let _guard = self.acquire().await;
172 let response = client.process_contains_keys(request).make_sync().await?;
173 let response = response.into_inner();
174 let ReplyContainsKeys { tests } = response;
175 Ok(tests)
176 }
177
178 async fn read_multi_values_bytes(
179 &self,
180 keys: Vec<Vec<u8>>,
181 ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
182 let mut full_keys = Vec::new();
183 for key in keys {
184 ensure!(
185 key.len() <= MAX_KEY_SIZE,
186 StorageServiceStoreError::KeyTooLong
187 );
188 let mut full_key = self.start_key.clone();
189 full_key.extend(&key);
190 full_keys.push(full_key);
191 }
192 let query = RequestReadMultiValues { keys: full_keys };
193 let request = tonic::Request::new(query);
194 let channel = self.channel.clone();
195 let mut client = StorageServiceClient::new(channel);
196 let _guard = self.acquire().await;
197 let response = client
198 .process_read_multi_values(request)
199 .make_sync()
200 .await?;
201 let response = response.into_inner();
202 let ReplyReadMultiValues {
203 values,
204 message_index,
205 num_chunks,
206 } = response;
207 if num_chunks == 0 {
208 let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
209 Ok(values)
210 } else {
211 self.read_entries(message_index, num_chunks).await
212 }
213 }
214
215 async fn find_keys_by_prefix(
216 &self,
217 key_prefix: &[u8],
218 ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
219 ensure!(
220 key_prefix.len() <= MAX_KEY_SIZE,
221 StorageServiceStoreError::KeyTooLong
222 );
223 let mut full_key_prefix = self.start_key.clone();
224 full_key_prefix.extend(key_prefix);
225 let query = RequestFindKeysByPrefix {
226 key_prefix: full_key_prefix,
227 };
228 let request = tonic::Request::new(query);
229 let channel = self.channel.clone();
230 let mut client = StorageServiceClient::new(channel);
231 let _guard = self.acquire().await;
232 let response = client
233 .process_find_keys_by_prefix(request)
234 .make_sync()
235 .await?;
236 let response = response.into_inner();
237 let ReplyFindKeysByPrefix {
238 keys,
239 message_index,
240 num_chunks,
241 } = response;
242 if num_chunks == 0 {
243 Ok(keys)
244 } else {
245 self.read_entries(message_index, num_chunks).await
246 }
247 }
248
249 async fn find_key_values_by_prefix(
250 &self,
251 key_prefix: &[u8],
252 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
253 ensure!(
254 key_prefix.len() <= MAX_KEY_SIZE,
255 StorageServiceStoreError::KeyTooLong
256 );
257 let mut full_key_prefix = self.start_key.clone();
258 full_key_prefix.extend(key_prefix);
259 let query = RequestFindKeyValuesByPrefix {
260 key_prefix: full_key_prefix,
261 };
262 let request = tonic::Request::new(query);
263 let channel = self.channel.clone();
264 let mut client = StorageServiceClient::new(channel);
265 let _guard = self.acquire().await;
266 let response = client
267 .process_find_key_values_by_prefix(request)
268 .make_sync()
269 .await?;
270 let response = response.into_inner();
271 let ReplyFindKeyValuesByPrefix {
272 key_values,
273 message_index,
274 num_chunks,
275 } = response;
276 if num_chunks == 0 {
277 let key_values = key_values
278 .into_iter()
279 .map(|x| (x.key, x.value))
280 .collect::<Vec<_>>();
281 Ok(key_values)
282 } else {
283 self.read_entries(message_index, num_chunks).await
284 }
285 }
286}
287
288impl WritableKeyValueStore for StorageServiceStoreInternal {
289 const MAX_VALUE_SIZE: usize = usize::MAX;
290
291 async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
292 if batch.operations.is_empty() {
293 return Ok(());
294 }
295 let mut statements = Vec::new();
296 let mut chunk_size = 0;
297
298 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
299 let mut full_key = self.start_key.clone();
300 full_key[0] = KeyPrefix::RootKey as u8;
301 let operation = Operation::Put(KeyValue {
302 key: full_key,
303 value: vec![],
304 });
305 let statement = Statement {
306 operation: Some(operation),
307 };
308 statements.push(statement);
309 chunk_size += self.start_key.len();
310 }
311
312 let bcs_root_key_len = self.start_key.len() - self.prefix_len;
313 for operation in batch.operations {
314 let (key_len, value_len) = match &operation {
315 WriteOperation::Delete { key } => (key.len(), 0),
316 WriteOperation::Put { key, value } => (key.len(), value.len()),
317 WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
318 };
319 let operation_size = key_len + value_len + bcs_root_key_len;
320 ensure!(
321 key_len <= MAX_KEY_SIZE,
322 StorageServiceStoreError::KeyTooLong
323 );
324 if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
325 let statement = self.get_statement(operation);
326 statements.push(statement);
327 chunk_size += operation_size;
328 } else {
329 self.submit_statements(mem::take(&mut statements)).await?;
330 chunk_size = 0;
331 if operation_size > MAX_PAYLOAD_SIZE {
332 let WriteOperation::Put { key, value } = operation else {
334 unreachable!();
336 };
337 let mut full_key = self.start_key.clone();
338 full_key.extend(key);
339 let value_chunks = value
340 .chunks(MAX_PAYLOAD_SIZE)
341 .map(|x| x.to_vec())
342 .collect::<Vec<_>>();
343 let num_chunks = value_chunks.len();
344 for (i_chunk, value) in value_chunks.into_iter().enumerate() {
345 let last = i_chunk + 1 == num_chunks;
346 let operation = Operation::Append(KeyValueAppend {
347 key: full_key.clone(),
348 value,
349 last,
350 });
351 statements = vec![Statement {
352 operation: Some(operation),
353 }];
354 self.submit_statements(mem::take(&mut statements)).await?;
355 }
356 } else {
357 let statement = self.get_statement(operation);
359 statements.push(statement);
360 chunk_size = operation_size;
361 }
362 }
363 }
364 self.submit_statements(mem::take(&mut statements)).await
365 }
366
367 async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
368 Ok(())
369 }
370}
371
372impl StorageServiceStoreInternal {
373 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
375 match &self.semaphore {
376 None => None,
377 Some(count) => Some(count.acquire().await),
378 }
379 }
380
381 async fn submit_statements(
382 &self,
383 statements: Vec<Statement>,
384 ) -> Result<(), StorageServiceStoreError> {
385 if !statements.is_empty() {
386 let query = RequestWriteBatchExtended { statements };
387 let request = tonic::Request::new(query);
388 let channel = self.channel.clone();
389 let mut client = StorageServiceClient::new(channel);
390 let _guard = self.acquire().await;
391 let _response = client
392 .process_write_batch_extended(request)
393 .make_sync()
394 .await?;
395 }
396 Ok(())
397 }
398
399 fn get_statement(&self, operation: WriteOperation) -> Statement {
400 let operation = match operation {
401 WriteOperation::Delete { key } => {
402 let mut full_key = self.start_key.clone();
403 full_key.extend(key);
404 Operation::Delete(full_key)
405 }
406 WriteOperation::Put { key, value } => {
407 let mut full_key = self.start_key.clone();
408 full_key.extend(key);
409 Operation::Put(KeyValue {
410 key: full_key,
411 value,
412 })
413 }
414 WriteOperation::DeletePrefix { key_prefix } => {
415 let mut full_key_prefix = self.start_key.clone();
416 full_key_prefix.extend(key_prefix);
417 Operation::DeletePrefix(full_key_prefix)
418 }
419 };
420 Statement {
421 operation: Some(operation),
422 }
423 }
424
425 async fn read_single_entry(
426 &self,
427 message_index: i64,
428 index: i32,
429 ) -> Result<Vec<u8>, StorageServiceStoreError> {
430 let channel = self.channel.clone();
431 let query = RequestSpecificChunk {
432 message_index,
433 index,
434 };
435 let request = tonic::Request::new(query);
436 let mut client = StorageServiceClient::new(channel);
437 let response = client.process_specific_chunk(request).make_sync().await?;
438 let response = response.into_inner();
439 let ReplySpecificChunk { chunk } = response;
440 Ok(chunk)
441 }
442
443 async fn read_entries<S: DeserializeOwned>(
444 &self,
445 message_index: i64,
446 num_chunks: i32,
447 ) -> Result<S, StorageServiceStoreError> {
448 let mut handles = Vec::new();
449 for index in 0..num_chunks {
450 let handle = self.read_single_entry(message_index, index);
451 handles.push(handle);
452 }
453 let mut value = Vec::new();
454 for chunk in join_all(handles).await {
455 let chunk = chunk?;
456 value.extend(chunk);
457 }
458 Ok(bcs::from_bytes(&value)?)
459 }
460}
461
462impl KeyValueDatabase for StorageServiceDatabaseInternal {
463 type Config = StorageServiceStoreInternalConfig;
464 type Store = StorageServiceStoreInternal;
465
466 fn get_name() -> String {
467 "service store".to_string()
468 }
469
470 async fn connect(
471 config: &Self::Config,
472 namespace: &str,
473 ) -> Result<Self, StorageServiceStoreError> {
474 let semaphore = config
475 .max_concurrent_queries
476 .map(|n| Arc::new(Semaphore::new(n)));
477 let namespace = bcs::to_bytes(namespace)?;
478 let max_stream_queries = config.max_stream_queries;
479 let endpoint = config.http_address();
480 let endpoint = Endpoint::from_shared(endpoint)?;
481 let channel = endpoint.connect_lazy();
482 Ok(Self {
483 channel,
484 semaphore,
485 max_stream_queries,
486 namespace,
487 })
488 }
489
490 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
491 let channel = self.channel.clone();
492 let semaphore = self.semaphore.clone();
493 let max_stream_queries = self.max_stream_queries;
494 let mut start_key = vec![KeyPrefix::Key as u8];
495 start_key.extend(&self.namespace);
496 start_key.extend(bcs::to_bytes(root_key)?);
497 let prefix_len = self.namespace.len() + 1;
498 Ok(StorageServiceStoreInternal {
499 channel,
500 semaphore,
501 max_stream_queries,
502 prefix_len,
503 start_key,
504 root_key_written: Arc::new(AtomicBool::new(false)),
505 })
506 }
507
508 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
509 self.open_shared(root_key)
510 }
511
512 async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
513 let endpoint = config.http_address();
514 let endpoint = Endpoint::from_shared(endpoint)?;
515 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
516 let response = client.process_list_all(()).make_sync().await?;
517 let response = response.into_inner();
518 let ReplyListAll { namespaces } = response;
519 let namespaces = namespaces
520 .into_iter()
521 .map(|x| bcs::from_bytes(&x))
522 .collect::<Result<_, _>>()?;
523 Ok(namespaces)
524 }
525
526 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
527 let query = RequestListRootKeys {
528 namespace: self.namespace.clone(),
529 };
530 let request = tonic::Request::new(query);
531 let mut client = StorageServiceClient::new(self.channel.clone());
532 let response = client.process_list_root_keys(request).make_sync().await?;
533 let response = response.into_inner();
534 let ReplyListRootKeys { root_keys } = response;
535 Ok(root_keys)
536 }
537
538 async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
539 let endpoint = config.http_address();
540 let endpoint = Endpoint::from_shared(endpoint)?;
541 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
542 let _response = client.process_delete_all(()).make_sync().await?;
543 Ok(())
544 }
545
546 async fn exists(
547 config: &Self::Config,
548 namespace: &str,
549 ) -> Result<bool, StorageServiceStoreError> {
550 let namespace = bcs::to_bytes(namespace)?;
551 let query = RequestExistsNamespace { namespace };
552 let request = tonic::Request::new(query);
553 let endpoint = config.http_address();
554 let endpoint = Endpoint::from_shared(endpoint)?;
555 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
556 let response = client.process_exists_namespace(request).make_sync().await?;
557 let response = response.into_inner();
558 let ReplyExistsNamespace { exists } = response;
559 Ok(exists)
560 }
561
562 async fn create(
563 config: &Self::Config,
564 namespace: &str,
565 ) -> Result<(), StorageServiceStoreError> {
566 if StorageServiceDatabaseInternal::exists(config, namespace).await? {
567 return Err(StorageServiceStoreError::StoreAlreadyExists);
568 }
569 let namespace = bcs::to_bytes(namespace)?;
570 let query = RequestCreateNamespace { namespace };
571 let request = tonic::Request::new(query);
572 let endpoint = config.http_address();
573 let endpoint = Endpoint::from_shared(endpoint)?;
574 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
575 let _response = client.process_create_namespace(request).make_sync().await?;
576 Ok(())
577 }
578
579 async fn delete(
580 config: &Self::Config,
581 namespace: &str,
582 ) -> Result<(), StorageServiceStoreError> {
583 let namespace = bcs::to_bytes(namespace)?;
584 let query = RequestDeleteNamespace { namespace };
585 let request = tonic::Request::new(query);
586 let endpoint = config.http_address();
587 let endpoint = Endpoint::from_shared(endpoint)?;
588 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
589 let _response = client.process_delete_namespace(request).make_sync().await?;
590 Ok(())
591 }
592}
593
594#[cfg(with_testing)]
595impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
596 async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
597 {
598 let endpoint = storage_service_test_endpoint()?;
599 service_config_from_endpoint(&endpoint)
600 }
601}
602
603pub fn service_config_from_endpoint(
605 endpoint: &str,
606) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
607 Ok(StorageServiceStoreInternalConfig {
608 endpoint: endpoint.to_string(),
609 max_concurrent_queries: None,
610 max_stream_queries: 100,
611 })
612}
613
614pub async fn storage_service_check_absence(
616 endpoint: &str,
617) -> Result<bool, StorageServiceStoreError> {
618 let endpoint = Endpoint::from_shared(endpoint.to_string())?;
619 let result = StorageServiceClient::connect(endpoint).await;
620 Ok(result.is_err())
621}
622
623pub async fn storage_service_check_validity(
625 endpoint: &str,
626) -> Result<(), StorageServiceStoreError> {
627 let config = service_config_from_endpoint(endpoint).unwrap();
628 let namespace = "namespace";
629 let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
630 let store = database.open_shared(&[])?;
631 let _value = store.read_value_bytes(&[42]).await?;
632 Ok(())
633}
634
635#[cfg(with_metrics)]
637pub type StorageServiceDatabase =
638 MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
639
640#[cfg(not(with_metrics))]
642pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;