1use std::{
5 collections::{BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{
11 future::{join_all, select_all},
12 lock::Mutex,
13 FutureExt as _, StreamExt,
14};
15use linera_base::{
16 crypto::{CryptoHash, Signer},
17 data_types::{ChainDescription, Timestamp},
18 identifiers::{AccountOwner, BlobId, BlobType, ChainId},
19 task::NonBlockingFuture,
20};
21use linera_core::{
22 client::{AbortOnDrop, ChainClient, ChainClientError},
23 node::NotificationStream,
24 worker::{Notification, Reason},
25 Environment,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, instrument, warn, Instrument as _};
30
31use crate::{
32 wallet::{UserChain, Wallet},
33 Error,
34};
35
36#[derive(Debug, Default, Clone, clap::Args)]
37pub struct ChainListenerConfig {
38 #[arg(
41 long = "listener-skip-process-inbox",
42 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43 )]
44 pub skip_process_inbox: bool,
45
46 #[arg(
48 long = "listener-delay-before-ms",
49 default_value = "0",
50 env = "LINERA_LISTENER_DELAY_BEFORE"
51 )]
52 pub delay_before_ms: u64,
53
54 #[arg(
56 long = "listener-delay-after-ms",
57 default_value = "0",
58 env = "LINERA_LISTENER_DELAY_AFTER"
59 )]
60 pub delay_after_ms: u64,
61}
62
63type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
64
65#[cfg_attr(not(web), trait_variant::make(Send))]
66#[allow(async_fn_in_trait)]
67pub trait ClientContext {
68 type Environment: linera_core::Environment;
69
70 fn wallet(&self) -> &Wallet;
71
72 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
73
74 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
75
76 fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
77 let chain = self
78 .wallet()
79 .get(chain_id)
80 .cloned()
81 .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
82 self.client().create_chain_client(
83 chain_id,
84 chain.block_hash,
85 chain.next_block_height,
86 chain.pending_proposal,
87 chain.owner,
88 )
89 }
90
91 async fn update_wallet_for_new_chain(
92 &mut self,
93 chain_id: ChainId,
94 owner: Option<AccountOwner>,
95 timestamp: Timestamp,
96 ) -> Result<(), Error>;
97
98 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
99}
100
101#[allow(async_fn_in_trait)]
102pub trait ClientContextExt: ClientContext {
103 fn clients(&self) -> Vec<ContextChainClient<Self>> {
104 let chain_ids = self.wallet().chain_ids();
105 let mut clients = vec![];
106 for chain_id in chain_ids {
107 clients.push(self.make_chain_client(chain_id));
108 }
109 clients
110 }
111
112 async fn chain_description(&mut self, chain_id: ChainId) -> Result<ChainDescription, Error> {
113 let blob_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
114
115 let blob = match self.storage().read_blob(blob_id).await {
116 Ok(Some(blob)) => blob,
117 Ok(None) => {
118 self.client().ensure_has_chain_description(chain_id).await?
121 }
122 Err(err) => {
123 return Err(err.into());
124 }
125 };
126
127 Ok(bcs::from_bytes(blob.bytes())?)
128 }
129}
130
131impl<T: ClientContext> ClientContextExt for T {}
132
133struct ListeningClient<C: ClientContext> {
139 client: ContextChainClient<C>,
141 abort_handle: AbortOnDrop,
143 join_handle: NonBlockingFuture<()>,
145 notification_stream: Arc<Mutex<NotificationStream>>,
147 timeout: Timestamp,
149}
150
151impl<C: ClientContext> ListeningClient<C> {
152 fn new(
153 client: ContextChainClient<C>,
154 abort_handle: AbortOnDrop,
155 join_handle: NonBlockingFuture<()>,
156 notification_stream: NotificationStream,
157 ) -> Self {
158 Self {
159 client,
160 abort_handle,
161 join_handle,
162 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
164 timeout: Timestamp::from(u64::MAX),
165 }
166 }
167
168 async fn stop(self) {
169 drop(self.abort_handle);
170 if let Err(error) = self.join_handle.await {
171 warn!("Failed to join listening task: {error:?}");
172 }
173 }
174}
175
176pub struct ChainListener<C: ClientContext> {
179 context: Arc<Mutex<C>>,
180 storage: <C::Environment as Environment>::Storage,
181 config: Arc<ChainListenerConfig>,
182 listening: BTreeMap<ChainId, ListeningClient<C>>,
183 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
186 cancellation_token: CancellationToken,
187}
188
189impl<C: ClientContext> ChainListener<C> {
190 pub fn new(
192 config: ChainListenerConfig,
193 context: Arc<Mutex<C>>,
194 storage: <C::Environment as Environment>::Storage,
195 cancellation_token: CancellationToken,
196 ) -> Self {
197 Self {
198 storage,
199 context,
200 config: Arc::new(config),
201 listening: Default::default(),
202 event_subscribers: Default::default(),
203 cancellation_token,
204 }
205 }
206
207 #[instrument(skip(self))]
209 pub async fn run(mut self) -> Result<(), Error> {
210 let chain_ids = {
211 let guard = self.context.lock().await;
212 let mut chain_ids = BTreeSet::from_iter(guard.wallet().chain_ids());
213 chain_ids.insert(guard.wallet().genesis_admin_chain());
214 chain_ids
215 };
216 self.listen_recursively(chain_ids).await?;
217 loop {
218 match self.next_action().await? {
219 Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
220 Action::Notification(notification) => {
221 self.process_notification(notification).await?
222 }
223 Action::Stop => break,
224 }
225 }
226 join_all(self.listening.into_values().map(|client| client.stop())).await;
227 Ok(())
228 }
229
230 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
232 Self::sleep(self.config.delay_before_ms).await;
233 match ¬ification.reason {
234 Reason::NewIncomingBundle { .. } => {
235 self.maybe_process_inbox(notification.chain_id).await?;
236 }
237 Reason::NewRound { .. } => self.update_validators(¬ification).await?,
238 Reason::NewBlock { hash, .. } => {
239 self.update_validators(¬ification).await?;
240 self.update_wallet(notification.chain_id).await?;
241 self.add_new_chains(*hash).await?;
242 let publishers = self
243 .update_event_subscriptions(notification.chain_id)
244 .await?;
245 if !publishers.is_empty() {
246 self.listen_recursively(publishers).await?;
247 self.maybe_process_inbox(notification.chain_id).await?;
248 }
249 self.process_new_events(notification.chain_id).await?;
250 }
251 }
252 Self::sleep(self.config.delay_after_ms).await;
253 Ok(())
254 }
255
256 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
259 let block = self.storage.read_confirmed_block(hash).await?.into_block();
260 let blobs = block.created_blobs().into_iter();
261 let new_chains = blobs
262 .filter_map(|(blob_id, blob)| {
263 if blob_id.blob_type == BlobType::ChainDescription {
264 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
265 .expect("ChainDescription should deserialize correctly");
266 let owners = chain_desc.config().ownership.all_owners().cloned();
267 Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
268 } else {
269 None
270 }
271 })
272 .collect::<Vec<_>>();
273 if new_chains.is_empty() {
274 return Ok(());
275 }
276 let mut new_ids = BTreeSet::new();
277 let mut context_guard = self.context.lock().await;
278 for (new_chain_id, owners) in new_chains {
279 for chain_owner in owners {
280 if context_guard
281 .client()
282 .signer()
283 .contains_key(&chain_owner)
284 .await
285 .map_err(ChainClientError::signer_failure)?
286 {
287 context_guard
288 .update_wallet_for_new_chain(
289 new_chain_id,
290 Some(chain_owner),
291 block.header.timestamp,
292 )
293 .await?;
294 new_ids.insert(new_chain_id);
295 }
296 }
297 }
298 drop(context_guard);
299 self.listen_recursively(new_ids).await?;
300 Ok(())
301 }
302
303 async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
305 let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
306 return Ok(());
307 };
308 for subscriber_id in subscribers {
309 self.maybe_process_inbox(subscriber_id).await?;
310 }
311 Ok(())
312 }
313
314 async fn listen_recursively(&mut self, mut chain_ids: BTreeSet<ChainId>) -> Result<(), Error> {
317 while let Some(chain_id) = chain_ids.pop_first() {
318 chain_ids.extend(self.listen(chain_id).await?);
319 }
320 Ok(())
321 }
322
323 async fn listen(&mut self, chain_id: ChainId) -> Result<BTreeSet<ChainId>, Error> {
327 if self.listening.contains_key(&chain_id) {
328 return Ok(BTreeSet::new());
329 }
330 let client = self.context.lock().await.make_chain_client(chain_id);
331 let (listener, abort_handle, notification_stream) = client.listen().await?;
332 if client.is_tracked() {
333 client.synchronize_from_validators().await?;
334 } else {
335 client.synchronize_chain_state(chain_id).await?;
336 }
337 let join_handle = linera_base::task::spawn(listener.in_current_span());
338 let listening_client =
339 ListeningClient::new(client, abort_handle, join_handle, notification_stream);
340 self.listening.insert(chain_id, listening_client);
341 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
342 self.maybe_process_inbox(chain_id).await?;
343 Ok(publishing_chains)
344 }
345
346 async fn update_event_subscriptions(
348 &mut self,
349 chain_id: ChainId,
350 ) -> Result<BTreeSet<ChainId>, Error> {
351 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
352 if !listening_client.client.is_tracked() {
353 return Ok(BTreeSet::new());
354 }
355 let publishing_chains = listening_client.client.event_stream_publishers().await?;
356 for publisher_id in &publishing_chains {
357 self.event_subscribers
358 .entry(*publisher_id)
359 .or_default()
360 .insert(chain_id);
361 }
362 Ok(publishing_chains)
363 }
364
365 async fn next_action(&mut self) -> Result<Action, Error> {
367 loop {
368 let (timeout_chain_id, timeout) = self.next_timeout()?;
369 let notification_futures = self
370 .listening
371 .values_mut()
372 .map(|client| {
373 let stream = client.notification_stream.clone();
374 Box::pin(async move { stream.lock().await.next().await })
375 })
376 .collect::<Vec<_>>();
377 futures::select! {
378 () = self.cancellation_token.cancelled().fuse() => {
379 return Ok(Action::Stop);
380 }
381 () = self.storage.clock().sleep_until(timeout).fuse() => {
382 return Ok(Action::ProcessInbox(timeout_chain_id));
383 }
384 (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
385 let Some(notification) = maybe_notification else {
386 let chain_id = *self.listening.keys().nth(index).unwrap();
387 self.listening.remove(&chain_id);
388 warn!("Notification stream for {chain_id} closed");
389 continue;
390 };
391 return Ok(Action::Notification(notification));
392 }
393 }
394 }
395 }
396
397 fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
399 let (chain_id, client) = self
400 .listening
401 .iter()
402 .min_by_key(|(_, client)| client.timeout)
403 .expect("No chains left to listen to");
404 Ok((*chain_id, client.timeout))
405 }
406
407 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
409 let chain_id = notification.chain_id;
410 let listening_client = self.listening.get(&chain_id).expect("missing client");
411 if let Err(error) = listening_client.client.update_validators(None).await {
412 warn!(
413 "Failed to update validators about the local chain after \
414 receiving {notification:?} with error: {error:?}"
415 );
416 }
417 Ok(())
418 }
419
420 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
422 let client = &self
423 .listening
424 .get(&chain_id)
425 .expect("missing client")
426 .client;
427 self.context.lock().await.update_wallet(client).await?;
428 Ok(())
429 }
430
431 async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
439 if self.config.skip_process_inbox {
440 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
441 return Ok(());
442 }
443 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
444 if !listening_client.client.is_tracked() {
445 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
446 return Ok(());
447 }
448 debug!("Processing inbox for {chain_id:.8}");
449 listening_client.timeout = Timestamp::from(u64::MAX);
450 match listening_client
451 .client
452 .process_inbox_without_prepare()
453 .await
454 {
455 Err(ChainClientError::CannotFindKeyForChain(chain_id)) => {
456 debug!(%chain_id, "Cannot find key for chain");
457 }
458 Err(error) => warn!(%error, "Failed to process inbox."),
459 Ok((certs, None)) => info!("Done processing inbox. {} blocks created.", certs.len()),
460 Ok((certs, Some(new_timeout))) => {
461 info!(
462 "{} blocks created. Will try processing the inbox later based \
463 on the given round timeout: {new_timeout:?}",
464 certs.len(),
465 );
466 listening_client.timeout = new_timeout.timestamp;
467 }
468 }
469 let mut context_guard = self.context.lock().await;
470 context_guard
471 .update_wallet(&listening_client.client)
472 .await?;
473 Ok(())
474 }
475
476 async fn sleep(delay_ms: u64) {
478 if delay_ms > 0 {
479 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
480 }
481 }
482}
483
484enum Action {
485 ProcessInbox(ChainId),
486 Notification(Notification),
487 Stop,
488}