1use std::{collections::HashSet, num::ParseIntError, str::FromStr};
5
6use futures::future;
7use linera_base::{
8 crypto::CryptoError,
9 data_types::{TimeDelta, Timestamp},
10 identifiers::{ApplicationId, ChainId, GenericApplicationId},
11 time::Duration,
12};
13use linera_core::{data_types::RoundTimeout, node::NotificationStream, worker::Reason};
14use tokio_stream::StreamExt as _;
15
16pub fn parse_json<T: serde::de::DeserializeOwned>(s: &str) -> anyhow::Result<T> {
17 Ok(serde_json::from_str(s.trim())?)
18}
19
20pub fn parse_millis(s: &str) -> Result<Duration, ParseIntError> {
21 Ok(Duration::from_millis(s.parse()?))
22}
23
24pub fn non_zero_duration(d: Duration) -> Option<Duration> {
26 if d.is_zero() {
27 None
28 } else {
29 Some(d)
30 }
31}
32
33pub fn parse_secs(s: &str) -> Result<Duration, ParseIntError> {
34 Ok(Duration::from_secs(s.parse()?))
35}
36
37pub fn parse_millis_delta(s: &str) -> Result<TimeDelta, ParseIntError> {
38 Ok(TimeDelta::from_millis(s.parse()?))
39}
40
41pub fn parse_json_optional_millis_delta(s: &str) -> anyhow::Result<Option<TimeDelta>> {
42 Ok(parse_json::<Option<u64>>(s)?.map(TimeDelta::from_millis))
43}
44
45pub fn parse_chain_set(s: &str) -> Result<HashSet<ChainId>, CryptoError> {
46 match s.trim() {
47 "" => Ok(HashSet::new()),
48 s => s.split(",").map(ChainId::from_str).collect(),
49 }
50}
51
52pub fn parse_app_set(s: &str) -> anyhow::Result<HashSet<GenericApplicationId>> {
53 s.trim()
54 .split(",")
55 .map(|app_str| {
56 GenericApplicationId::from_str(app_str)
57 .or_else(|_| Ok(ApplicationId::from_str(app_str)?.into()))
58 })
59 .collect()
60}
61
62pub async fn wait_for_next_round(stream: &mut NotificationStream, timeout: RoundTimeout) {
64 let mut stream = stream.filter(|notification| match ¬ification.reason {
65 Reason::NewBlock { height, .. } | Reason::NewEvents { height, .. } => {
66 *height >= timeout.next_block_height
67 }
68 Reason::NewRound { round, .. } => *round > timeout.current_round,
69 Reason::NewIncomingBundle { .. } | Reason::BlockExecuted { .. } => false,
70 });
71 future::select(
72 Box::pin(stream.next()),
73 Box::pin(linera_base::time::timer::sleep(
74 timeout.timestamp.duration_since(Timestamp::now()),
75 )),
76 )
77 .await;
78}
79
80macro_rules! impl_from_infallible {
81 ($target:path) => {
82 impl From<::std::convert::Infallible> for $target {
83 fn from(infallible: ::std::convert::Infallible) -> Self {
84 match infallible {}
85 }
86 }
87 };
88}
89
90pub(crate) use impl_from_infallible;