linera_rpc/grpc/
pool.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use linera_base::time::Duration;
5
6use super::{transport, GrpcError};
7
8/// A pool of transport channels to be used by gRPC.
9#[derive(Clone, Default)]
10pub struct GrpcConnectionPool {
11    options: transport::Options,
12    channels: papaya::HashMap<String, transport::Channel>,
13}
14
15impl GrpcConnectionPool {
16    pub fn new(options: transport::Options) -> Self {
17        Self {
18            options,
19            channels: papaya::HashMap::default(),
20        }
21    }
22
23    pub fn with_connect_timeout(mut self, connect_timeout: impl Into<Option<Duration>>) -> Self {
24        self.options.connect_timeout = connect_timeout.into();
25        self
26    }
27
28    pub fn with_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
29        self.options.timeout = timeout.into();
30        self
31    }
32
33    /// Obtains a channel for the current address. Either clones an existing one (thereby
34    /// reusing the connection), or creates one if needed. New channels do not create a
35    /// connection immediately.
36    pub fn channel(&self, address: String) -> Result<transport::Channel, GrpcError> {
37        let pinned = self.channels.pin();
38        if let Some(channel) = pinned.get(&address) {
39            return Ok(channel.clone());
40        }
41        let channel = transport::create_channel(address.clone(), &self.options)?;
42        Ok(pinned.get_or_insert(address, channel).clone())
43    }
44}