1use std::{
5 mem,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::{ensure, util::future::FutureSyncExt as _};
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20 batch::{Batch, WriteOperation},
21 lru_caching::LruCachingDatabase,
22 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24use serde::de::DeserializeOwned;
25use tonic::transport::{Channel, Endpoint};
26
27#[cfg(with_testing)]
28use crate::common::storage_service_test_endpoint;
29use crate::{
30 common::{
31 KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
32 },
33 key_value_store::{
34 statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
35 KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
36 ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
37 ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
38 RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
39 RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
40 RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
41 RequestWriteBatchExtended, Statement,
42 },
43};
44
45const MAX_KEY_SIZE: usize = 1000000;
47
48#[derive(Clone)]
68pub struct StorageServiceDatabaseInternal {
69 channel: Channel,
70 semaphore: Option<Arc<Semaphore>>,
71 max_stream_queries: usize,
72 namespace: Vec<u8>,
73}
74
75#[derive(Clone)]
76pub struct StorageServiceStoreInternal {
77 channel: Channel,
78 semaphore: Option<Arc<Semaphore>>,
79 max_stream_queries: usize,
80 prefix_len: usize,
81 start_key: Vec<u8>,
82 root_key_written: Arc<AtomicBool>,
83}
84
85impl WithError for StorageServiceDatabaseInternal {
86 type Error = StorageServiceStoreError;
87}
88
89impl WithError for StorageServiceStoreInternal {
90 type Error = StorageServiceStoreError;
91}
92
93impl ReadableKeyValueStore for StorageServiceStoreInternal {
94 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
95
96 fn max_stream_queries(&self) -> usize {
97 self.max_stream_queries
98 }
99
100 fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
101 let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
102 Ok(root_key)
103 }
104
105 async fn read_value_bytes(
106 &self,
107 key: &[u8],
108 ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
109 ensure!(
110 key.len() <= MAX_KEY_SIZE,
111 StorageServiceStoreError::KeyTooLong
112 );
113 let mut full_key = self.start_key.clone();
114 full_key.extend(key);
115 let query = RequestReadValue { key: full_key };
116 let request = tonic::Request::new(query);
117 let channel = self.channel.clone();
118 let mut client = StorageServiceClient::new(channel);
119 let _guard = self.acquire().await;
120 let response = client.process_read_value(request).make_sync().await?;
121 let response = response.into_inner();
122 let ReplyReadValue {
123 value,
124 message_index,
125 num_chunks,
126 } = response;
127 if num_chunks == 0 {
128 Ok(value)
129 } else {
130 self.read_entries(message_index, num_chunks).await
131 }
132 }
133
134 async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
135 ensure!(
136 key.len() <= MAX_KEY_SIZE,
137 StorageServiceStoreError::KeyTooLong
138 );
139 let mut full_key = self.start_key.clone();
140 full_key.extend(key);
141 let query = RequestContainsKey { key: full_key };
142 let request = tonic::Request::new(query);
143 let channel = self.channel.clone();
144 let mut client = StorageServiceClient::new(channel);
145 let _guard = self.acquire().await;
146 let response = client.process_contains_key(request).make_sync().await?;
147 let response = response.into_inner();
148 let ReplyContainsKey { test } = response;
149 Ok(test)
150 }
151
152 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, StorageServiceStoreError> {
153 let mut full_keys = Vec::new();
154 for key in keys {
155 ensure!(
156 key.len() <= MAX_KEY_SIZE,
157 StorageServiceStoreError::KeyTooLong
158 );
159 let mut full_key = self.start_key.clone();
160 full_key.extend(key);
161 full_keys.push(full_key);
162 }
163 let query = RequestContainsKeys { keys: full_keys };
164 let request = tonic::Request::new(query);
165 let channel = self.channel.clone();
166 let mut client = StorageServiceClient::new(channel);
167 let _guard = self.acquire().await;
168 let response = client.process_contains_keys(request).make_sync().await?;
169 let response = response.into_inner();
170 let ReplyContainsKeys { tests } = response;
171 Ok(tests)
172 }
173
174 async fn read_multi_values_bytes(
175 &self,
176 keys: &[Vec<u8>],
177 ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
178 let mut full_keys = Vec::new();
179 for key in keys {
180 ensure!(
181 key.len() <= MAX_KEY_SIZE,
182 StorageServiceStoreError::KeyTooLong
183 );
184 let mut full_key = self.start_key.clone();
185 full_key.extend(key);
186 full_keys.push(full_key);
187 }
188 let query = RequestReadMultiValues { keys: full_keys };
189 let request = tonic::Request::new(query);
190 let channel = self.channel.clone();
191 let mut client = StorageServiceClient::new(channel);
192 let _guard = self.acquire().await;
193 let response = client
194 .process_read_multi_values(request)
195 .make_sync()
196 .await?;
197 let response = response.into_inner();
198 let ReplyReadMultiValues {
199 values,
200 message_index,
201 num_chunks,
202 } = response;
203 if num_chunks == 0 {
204 let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
205 Ok(values)
206 } else {
207 self.read_entries(message_index, num_chunks).await
208 }
209 }
210
211 async fn find_keys_by_prefix(
212 &self,
213 key_prefix: &[u8],
214 ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
215 ensure!(
216 key_prefix.len() <= MAX_KEY_SIZE,
217 StorageServiceStoreError::KeyTooLong
218 );
219 let mut full_key_prefix = self.start_key.clone();
220 full_key_prefix.extend(key_prefix);
221 let query = RequestFindKeysByPrefix {
222 key_prefix: full_key_prefix,
223 };
224 let request = tonic::Request::new(query);
225 let channel = self.channel.clone();
226 let mut client = StorageServiceClient::new(channel);
227 let _guard = self.acquire().await;
228 let response = client
229 .process_find_keys_by_prefix(request)
230 .make_sync()
231 .await?;
232 let response = response.into_inner();
233 let ReplyFindKeysByPrefix {
234 keys,
235 message_index,
236 num_chunks,
237 } = response;
238 if num_chunks == 0 {
239 Ok(keys)
240 } else {
241 self.read_entries(message_index, num_chunks).await
242 }
243 }
244
245 async fn find_key_values_by_prefix(
246 &self,
247 key_prefix: &[u8],
248 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
249 ensure!(
250 key_prefix.len() <= MAX_KEY_SIZE,
251 StorageServiceStoreError::KeyTooLong
252 );
253 let mut full_key_prefix = self.start_key.clone();
254 full_key_prefix.extend(key_prefix);
255 let query = RequestFindKeyValuesByPrefix {
256 key_prefix: full_key_prefix,
257 };
258 let request = tonic::Request::new(query);
259 let channel = self.channel.clone();
260 let mut client = StorageServiceClient::new(channel);
261 let _guard = self.acquire().await;
262 let response = client
263 .process_find_key_values_by_prefix(request)
264 .make_sync()
265 .await?;
266 let response = response.into_inner();
267 let ReplyFindKeyValuesByPrefix {
268 key_values,
269 message_index,
270 num_chunks,
271 } = response;
272 if num_chunks == 0 {
273 let key_values = key_values
274 .into_iter()
275 .map(|x| (x.key, x.value))
276 .collect::<Vec<_>>();
277 Ok(key_values)
278 } else {
279 self.read_entries(message_index, num_chunks).await
280 }
281 }
282}
283
284impl WritableKeyValueStore for StorageServiceStoreInternal {
285 const MAX_VALUE_SIZE: usize = usize::MAX;
286
287 async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
288 if batch.operations.is_empty() {
289 return Ok(());
290 }
291 let mut statements = Vec::new();
292 let mut chunk_size = 0;
293
294 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
295 let mut full_key = self.start_key.clone();
296 full_key[0] = KeyPrefix::RootKey as u8;
297 let operation = Operation::Put(KeyValue {
298 key: full_key,
299 value: vec![],
300 });
301 let statement = Statement {
302 operation: Some(operation),
303 };
304 statements.push(statement);
305 chunk_size += self.start_key.len();
306 }
307
308 let bcs_root_key_len = self.start_key.len() - self.prefix_len;
309 for operation in batch.operations {
310 let (key_len, value_len) = match &operation {
311 WriteOperation::Delete { key } => (key.len(), 0),
312 WriteOperation::Put { key, value } => (key.len(), value.len()),
313 WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
314 };
315 let operation_size = key_len + value_len + bcs_root_key_len;
316 ensure!(
317 key_len <= MAX_KEY_SIZE,
318 StorageServiceStoreError::KeyTooLong
319 );
320 if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
321 let statement = self.get_statement(operation);
322 statements.push(statement);
323 chunk_size += operation_size;
324 } else {
325 self.submit_statements(mem::take(&mut statements)).await?;
326 chunk_size = 0;
327 if operation_size > MAX_PAYLOAD_SIZE {
328 let WriteOperation::Put { key, value } = operation else {
330 unreachable!();
332 };
333 let mut full_key = self.start_key.clone();
334 full_key.extend(key);
335 let value_chunks = value
336 .chunks(MAX_PAYLOAD_SIZE)
337 .map(|x| x.to_vec())
338 .collect::<Vec<_>>();
339 let num_chunks = value_chunks.len();
340 for (i_chunk, value) in value_chunks.into_iter().enumerate() {
341 let last = i_chunk + 1 == num_chunks;
342 let operation = Operation::Append(KeyValueAppend {
343 key: full_key.clone(),
344 value,
345 last,
346 });
347 statements = vec![Statement {
348 operation: Some(operation),
349 }];
350 self.submit_statements(mem::take(&mut statements)).await?;
351 }
352 } else {
353 let statement = self.get_statement(operation);
355 statements.push(statement);
356 chunk_size = operation_size;
357 }
358 }
359 }
360 self.submit_statements(mem::take(&mut statements)).await
361 }
362
363 async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
364 Ok(())
365 }
366}
367
368impl StorageServiceStoreInternal {
369 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
371 match &self.semaphore {
372 None => None,
373 Some(count) => Some(count.acquire().await),
374 }
375 }
376
377 async fn submit_statements(
378 &self,
379 statements: Vec<Statement>,
380 ) -> Result<(), StorageServiceStoreError> {
381 if !statements.is_empty() {
382 let query = RequestWriteBatchExtended { statements };
383 let request = tonic::Request::new(query);
384 let channel = self.channel.clone();
385 let mut client = StorageServiceClient::new(channel);
386 let _guard = self.acquire().await;
387 let _response = client
388 .process_write_batch_extended(request)
389 .make_sync()
390 .await?;
391 }
392 Ok(())
393 }
394
395 fn get_statement(&self, operation: WriteOperation) -> Statement {
396 let operation = match operation {
397 WriteOperation::Delete { key } => {
398 let mut full_key = self.start_key.clone();
399 full_key.extend(key);
400 Operation::Delete(full_key)
401 }
402 WriteOperation::Put { key, value } => {
403 let mut full_key = self.start_key.clone();
404 full_key.extend(key);
405 Operation::Put(KeyValue {
406 key: full_key,
407 value,
408 })
409 }
410 WriteOperation::DeletePrefix { key_prefix } => {
411 let mut full_key_prefix = self.start_key.clone();
412 full_key_prefix.extend(key_prefix);
413 Operation::DeletePrefix(full_key_prefix)
414 }
415 };
416 Statement {
417 operation: Some(operation),
418 }
419 }
420
421 async fn read_single_entry(
422 &self,
423 message_index: i64,
424 index: i32,
425 ) -> Result<Vec<u8>, StorageServiceStoreError> {
426 let channel = self.channel.clone();
427 let query = RequestSpecificChunk {
428 message_index,
429 index,
430 };
431 let request = tonic::Request::new(query);
432 let mut client = StorageServiceClient::new(channel);
433 let response = client.process_specific_chunk(request).make_sync().await?;
434 let response = response.into_inner();
435 let ReplySpecificChunk { chunk } = response;
436 Ok(chunk)
437 }
438
439 async fn read_entries<S: DeserializeOwned>(
440 &self,
441 message_index: i64,
442 num_chunks: i32,
443 ) -> Result<S, StorageServiceStoreError> {
444 let mut handles = Vec::new();
445 for index in 0..num_chunks {
446 let handle = self.read_single_entry(message_index, index);
447 handles.push(handle);
448 }
449 let mut value = Vec::new();
450 for chunk in join_all(handles).await {
451 let chunk = chunk?;
452 value.extend(chunk);
453 }
454 Ok(bcs::from_bytes(&value)?)
455 }
456}
457
458impl KeyValueDatabase for StorageServiceDatabaseInternal {
459 type Config = StorageServiceStoreInternalConfig;
460 type Store = StorageServiceStoreInternal;
461
462 fn get_name() -> String {
463 "service store".to_string()
464 }
465
466 async fn connect(
467 config: &Self::Config,
468 namespace: &str,
469 ) -> Result<Self, StorageServiceStoreError> {
470 let semaphore = config
471 .max_concurrent_queries
472 .map(|n| Arc::new(Semaphore::new(n)));
473 let namespace = bcs::to_bytes(namespace)?;
474 let max_stream_queries = config.max_stream_queries;
475 let endpoint = config.http_address();
476 let endpoint = Endpoint::from_shared(endpoint)?;
477 let channel = endpoint.connect_lazy();
478 Ok(Self {
479 channel,
480 semaphore,
481 max_stream_queries,
482 namespace,
483 })
484 }
485
486 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
487 let channel = self.channel.clone();
488 let semaphore = self.semaphore.clone();
489 let max_stream_queries = self.max_stream_queries;
490 let mut start_key = vec![KeyPrefix::Key as u8];
491 start_key.extend(&self.namespace);
492 start_key.extend(bcs::to_bytes(root_key)?);
493 let prefix_len = self.namespace.len() + 1;
494 Ok(StorageServiceStoreInternal {
495 channel,
496 semaphore,
497 max_stream_queries,
498 prefix_len,
499 start_key,
500 root_key_written: Arc::new(AtomicBool::new(false)),
501 })
502 }
503
504 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
505 self.open_shared(root_key)
506 }
507
508 async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
509 let endpoint = config.http_address();
510 let endpoint = Endpoint::from_shared(endpoint)?;
511 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
512 let response = client.process_list_all(()).make_sync().await?;
513 let response = response.into_inner();
514 let ReplyListAll { namespaces } = response;
515 let namespaces = namespaces
516 .into_iter()
517 .map(|x| bcs::from_bytes(&x))
518 .collect::<Result<_, _>>()?;
519 Ok(namespaces)
520 }
521
522 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
523 let query = RequestListRootKeys {
524 namespace: self.namespace.clone(),
525 };
526 let request = tonic::Request::new(query);
527 let mut client = StorageServiceClient::new(self.channel.clone());
528 let response = client.process_list_root_keys(request).make_sync().await?;
529 let response = response.into_inner();
530 let ReplyListRootKeys { root_keys } = response;
531 Ok(root_keys)
532 }
533
534 async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
535 let endpoint = config.http_address();
536 let endpoint = Endpoint::from_shared(endpoint)?;
537 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
538 let _response = client.process_delete_all(()).make_sync().await?;
539 Ok(())
540 }
541
542 async fn exists(
543 config: &Self::Config,
544 namespace: &str,
545 ) -> Result<bool, StorageServiceStoreError> {
546 let namespace = bcs::to_bytes(namespace)?;
547 let query = RequestExistsNamespace { namespace };
548 let request = tonic::Request::new(query);
549 let endpoint = config.http_address();
550 let endpoint = Endpoint::from_shared(endpoint)?;
551 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
552 let response = client.process_exists_namespace(request).make_sync().await?;
553 let response = response.into_inner();
554 let ReplyExistsNamespace { exists } = response;
555 Ok(exists)
556 }
557
558 async fn create(
559 config: &Self::Config,
560 namespace: &str,
561 ) -> Result<(), StorageServiceStoreError> {
562 if StorageServiceDatabaseInternal::exists(config, namespace).await? {
563 return Err(StorageServiceStoreError::StoreAlreadyExists);
564 }
565 let namespace = bcs::to_bytes(namespace)?;
566 let query = RequestCreateNamespace { namespace };
567 let request = tonic::Request::new(query);
568 let endpoint = config.http_address();
569 let endpoint = Endpoint::from_shared(endpoint)?;
570 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
571 let _response = client.process_create_namespace(request).make_sync().await?;
572 Ok(())
573 }
574
575 async fn delete(
576 config: &Self::Config,
577 namespace: &str,
578 ) -> Result<(), StorageServiceStoreError> {
579 let namespace = bcs::to_bytes(namespace)?;
580 let query = RequestDeleteNamespace { namespace };
581 let request = tonic::Request::new(query);
582 let endpoint = config.http_address();
583 let endpoint = Endpoint::from_shared(endpoint)?;
584 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
585 let _response = client.process_delete_namespace(request).make_sync().await?;
586 Ok(())
587 }
588}
589
590#[cfg(with_testing)]
591impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
592 async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
593 {
594 let endpoint = storage_service_test_endpoint()?;
595 service_config_from_endpoint(&endpoint)
596 }
597}
598
599pub fn service_config_from_endpoint(
601 endpoint: &str,
602) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
603 Ok(StorageServiceStoreInternalConfig {
604 endpoint: endpoint.to_string(),
605 max_concurrent_queries: None,
606 max_stream_queries: 100,
607 })
608}
609
610pub async fn storage_service_check_absence(
612 endpoint: &str,
613) -> Result<bool, StorageServiceStoreError> {
614 let endpoint = Endpoint::from_shared(endpoint.to_string())?;
615 let result = StorageServiceClient::connect(endpoint).await;
616 Ok(result.is_err())
617}
618
619pub async fn storage_service_check_validity(
621 endpoint: &str,
622) -> Result<(), StorageServiceStoreError> {
623 let config = service_config_from_endpoint(endpoint).unwrap();
624 let namespace = "namespace";
625 let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
626 let store = database.open_shared(&[])?;
627 let _value = store.read_value_bytes(&[42]).await?;
628 Ok(())
629}
630
631#[cfg(with_metrics)]
633pub type StorageServiceDatabase =
634 MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
635
636#[cfg(not(with_metrics))]
638pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;