linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10};
11
12use futures::{
13    stream::{FuturesUnordered, TryStreamExt},
14    Future, StreamExt,
15};
16use linera_base::{
17    crypto::ValidatorPublicKey,
18    data_types::{BlockHeight, Round},
19    ensure,
20    identifiers::{BlobId, BlobType, ChainId, StreamId},
21    time::{timer::timeout, Duration, Instant},
22};
23use linera_chain::{
24    data_types::{BlockProposal, LiteVote},
25    manager::LockingBlock,
26    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
27};
28use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
29use linera_storage::{ResultReadCertificates, Storage};
30use thiserror::Error;
31use tracing::{instrument, Level};
32
33use crate::{
34    client::chain_client,
35    data_types::{ChainInfo, ChainInfoQuery},
36    local_node::LocalNodeClient,
37    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
38    remote_node::RemoteNode,
39    LocalNodeError,
40};
41
42/// The default amount of time we wait for additional validators to contribute
43/// to the result, as a fraction of how long it took to reach a quorum.
44pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
45/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
46const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
47
48/// Used for `communicate_chain_action`
49#[derive(Clone)]
50pub enum CommunicateAction {
51    SubmitBlock {
52        proposal: Box<BlockProposal>,
53        blob_ids: Vec<BlobId>,
54    },
55    FinalizeBlock {
56        certificate: Box<ValidatedBlockCertificate>,
57        delivery: CrossChainMessageDelivery,
58    },
59    RequestTimeout {
60        chain_id: ChainId,
61        height: BlockHeight,
62        round: Round,
63    },
64}
65
66impl CommunicateAction {
67    /// The round to which this action pertains.
68    pub fn round(&self) -> Round {
69        match self {
70            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
71            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
72            CommunicateAction::RequestTimeout { round, .. } => *round,
73        }
74    }
75}
76
77#[derive(Clone)]
78pub struct ValidatorUpdater<A, S>
79where
80    S: Storage,
81{
82    pub remote_node: RemoteNode<A>,
83    pub local_node: LocalNodeClient<S>,
84    pub admin_id: ChainId,
85}
86
87/// An error result for requests to a stake-weighted quorum.
88#[derive(Error, Debug)]
89pub enum CommunicationError<E: fmt::Debug> {
90    /// No consensus is possible since validators returned different possibilities
91    /// for the next block
92    #[error(
93        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
94    )]
95    NoConsensus(u64, Vec<(u64, usize)>),
96    /// A single error that was returned by a sufficient number of nodes to be trusted as
97    /// valid.
98    #[error("Failed to communicate with a quorum of validators: {0}")]
99    Trusted(E),
100    /// No single error reached the validity threshold so we're returning a sample of
101    /// errors for debugging purposes, together with their weight.
102    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
103    Sample(Vec<(E, u64)>),
104}
105
106/// Executes a sequence of actions in parallel for all validators.
107///
108/// Tries to stop early when a quorum is reached. If `grace_period` is specified, other validators
109/// are given additional time to contribute to the result. The grace period is calculated as a fraction
110/// (defaulting to `DEFAULT_GRACE_PERIOD`) of the time taken to reach quorum.
111pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
112    validator_clients: &'a [RemoteNode<A>],
113    committee: &Committee,
114    group_by: G,
115    execute: F,
116    // Grace period as a fraction of time taken to reach quorum
117    grace_period: f64,
118) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
119where
120    A: ValidatorNode + Clone + 'static,
121    F: Clone + Fn(RemoteNode<A>) -> R,
122    R: Future<Output = Result<V, chain_client::Error>> + 'a,
123    G: Fn(&V) -> K,
124    K: Hash + PartialEq + Eq + Clone + 'static,
125    V: 'static,
126{
127    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
128        .iter()
129        .filter_map(|remote_node| {
130            if committee.weight(&remote_node.public_key) == 0 {
131                // This should not happen but better prevent it because certificates
132                // are not allowed to include votes with weight 0.
133                return None;
134            }
135            let execute = execute.clone();
136            let remote_node = remote_node.clone();
137            Some(async move { (remote_node.public_key, execute(remote_node).await) })
138        })
139        .collect();
140
141    let start_time = Instant::now();
142    let mut end_time: Option<Instant> = None;
143    let mut remaining_votes = committee.total_votes();
144    let mut highest_key_score = 0;
145    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
146    let mut error_scores = HashMap::new();
147
148    'vote_wait: while let Ok(Some((name, result))) = timeout(
149        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
150        responses.next(),
151    )
152    .await
153    {
154        remaining_votes -= committee.weight(&name);
155        match result {
156            Ok(value) => {
157                let key = group_by(&value);
158                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
159                entry.0 += committee.weight(&name);
160                entry.1.push((name, value));
161                highest_key_score = highest_key_score.max(entry.0);
162            }
163            Err(err) => {
164                // TODO(#2857): Handle non-remote errors properly.
165                let err = match err {
166                    chain_client::Error::RemoteNodeError(err) => err,
167                    err => NodeError::ResponseHandlingError {
168                        error: err.to_string(),
169                    },
170                };
171                let entry = error_scores.entry(err.clone()).or_insert(0);
172                *entry += committee.weight(&name);
173                if *entry >= committee.validity_threshold() {
174                    // At least one honest node returned this error.
175                    // No quorum can be reached, so return early.
176                    return Err(CommunicationError::Trusted(err));
177                }
178            }
179        }
180        // If it becomes clear that no key can reach a quorum, break early.
181        if highest_key_score + remaining_votes < committee.quorum_threshold() {
182            break 'vote_wait;
183        }
184
185        // If a key reaches a quorum, wait for the grace period to collect more values
186        // or error information and then stop.
187        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
188            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
189        }
190    }
191
192    let scores = value_scores
193        .values()
194        .map(|(weight, values)| (*weight, values.len()))
195        .collect();
196    // If a key has a quorum, return it with its values.
197    if let Some((key, (_, values))) = value_scores
198        .into_iter()
199        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
200    {
201        return Ok((key, values));
202    }
203
204    if error_scores.is_empty() {
205        return Err(CommunicationError::NoConsensus(
206            committee.quorum_threshold(),
207            scores,
208        ));
209    }
210
211    // No specific error is available to report reliably.
212    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
213    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
214    sample.truncate(4);
215    Err(CommunicationError::Sample(sample))
216}
217
218impl<A, S> ValidatorUpdater<A, S>
219where
220    A: ValidatorNode + Clone + 'static,
221    S: Storage + Clone + Send + Sync + 'static,
222{
223    #[instrument(
224        level = "trace", skip_all, err(level = Level::WARN),
225        fields(chain_id = %certificate.block().header.chain_id)
226    )]
227    async fn send_confirmed_certificate(
228        &mut self,
229        certificate: GenericCertificate<ConfirmedBlock>,
230        delivery: CrossChainMessageDelivery,
231    ) -> Result<Box<ChainInfo>, chain_client::Error> {
232        let mut result = self
233            .remote_node
234            .handle_optimized_confirmed_certificate(&certificate, delivery)
235            .await;
236
237        let mut sent_admin_chain = false;
238        let mut sent_blobs = false;
239        loop {
240            result = match result {
241                Err(NodeError::EventsNotFound(event_ids))
242                    if !sent_admin_chain
243                        && certificate.inner().chain_id() != self.admin_id
244                        && event_ids.iter().all(|event_id| {
245                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
246                                && event_id.chain_id == self.admin_id
247                        }) =>
248                {
249                    // The validator doesn't have the committee that signed the certificate.
250                    self.update_admin_chain().await?;
251                    sent_admin_chain = true;
252                    self.remote_node
253                        .handle_confirmed_certificate(certificate.clone(), delivery)
254                        .await
255                }
256                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
257                    // The validator is missing the blobs required by the certificate.
258                    self.remote_node
259                        .check_blobs_not_found(&certificate, &blob_ids)?;
260                    // The certificate is confirmed, so the blobs must be in storage.
261                    let maybe_blobs = self.local_node.read_blobs_from_storage(&blob_ids).await?;
262                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
263                    self.remote_node.node.upload_blobs(blobs).await?;
264                    sent_blobs = true;
265                    self.remote_node
266                        .handle_confirmed_certificate(certificate.clone(), delivery)
267                        .await
268                }
269                result => return Ok(result?),
270            };
271        }
272    }
273
274    async fn send_validated_certificate(
275        &mut self,
276        certificate: GenericCertificate<ValidatedBlock>,
277        delivery: CrossChainMessageDelivery,
278    ) -> Result<Box<ChainInfo>, chain_client::Error> {
279        let result = self
280            .remote_node
281            .handle_optimized_validated_certificate(&certificate, delivery)
282            .await;
283
284        Ok(match &result {
285            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
286                self.remote_node
287                    .check_blobs_not_found(&certificate, blob_ids)?;
288                let chain_id = certificate.inner().chain_id();
289                // The certificate is for a validated block, i.e. for our locking block.
290                // Take the missing blobs from our local chain manager.
291                let blobs = self
292                    .local_node
293                    .get_locking_blobs(blob_ids, chain_id)
294                    .await?
295                    .ok_or_else(|| original_err.clone())?;
296                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
297                self.remote_node
298                    .handle_validated_certificate(certificate)
299                    .await
300            }
301            _ => result,
302        }?)
303    }
304
305    /// Requests a vote for a timeout certificate for the given round from the remote node.
306    ///
307    /// If the remote node is not in that round or at that height yet, sends the chain information
308    /// to update it.
309    async fn request_timeout(
310        &mut self,
311        chain_id: ChainId,
312        round: Round,
313        height: BlockHeight,
314    ) -> Result<Box<ChainInfo>, chain_client::Error> {
315        let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
316        match self
317            .remote_node
318            .handle_chain_info_query(query.clone())
319            .await
320        {
321            Ok(info) => return Ok(info),
322            Err(NodeError::WrongRound(validator_round)) => {
323                tracing::info!(
324                    validator = ?self.remote_node.public_key, %chain_id, %validator_round, %round,
325                    "Failed to request timeout from validator: it is not in the same round.",
326                );
327            }
328            Err(NodeError::UnexpectedBlockHeight {
329                expected_block_height,
330                found_block_height,
331            }) if expected_block_height < found_block_height => {
332                tracing::info!(
333                    validator = ?self.remote_node.public_key,
334                    %chain_id,
335                    %expected_block_height,
336                    %found_block_height,
337                    "Failed to request timeout from validator: it is not at the same height.",
338                );
339            }
340            Err(err) => return Err(err.into()),
341        }
342        self.send_chain_information(chain_id, height, CrossChainMessageDelivery::NonBlocking)
343            .await?;
344        Ok(self.remote_node.handle_chain_info_query(query).await?)
345    }
346
347    async fn send_block_proposal(
348        &mut self,
349        proposal: Box<BlockProposal>,
350        mut blob_ids: Vec<BlobId>,
351    ) -> Result<Box<ChainInfo>, chain_client::Error> {
352        let chain_id = proposal.content.block.chain_id;
353        let mut sent_cross_chain_updates = BTreeMap::new();
354        let mut publisher_chain_ids_sent = BTreeSet::new();
355        loop {
356            match self
357                .remote_node
358                .handle_block_proposal(proposal.clone())
359                .await
360            {
361                Ok(info) => return Ok(info),
362                Err(NodeError::WrongRound(_round)) => {
363                    // The proposal is for a different round, so we need to update the validator.
364                    // TODO: this should probably be more specific as to which rounds are retried.
365                    tracing::debug!(
366                        "Wrong round; sending chain {chain_id} to validator {}.",
367                        self.remote_node.public_key
368                    );
369                    self.send_chain_information(
370                        chain_id,
371                        proposal.content.block.height,
372                        CrossChainMessageDelivery::NonBlocking,
373                    )
374                    .await?;
375                }
376                Err(NodeError::UnexpectedBlockHeight {
377                    expected_block_height,
378                    found_block_height,
379                }) if expected_block_height < found_block_height
380                    && found_block_height == proposal.content.block.height =>
381                {
382                    tracing::debug!(
383                        "Wrong height; sending chain {chain_id} to validator {}.",
384                        self.remote_node.public_key
385                    );
386                    // The proposal is for a later block height, so we need to update the validator.
387                    self.send_chain_information(
388                        chain_id,
389                        found_block_height,
390                        CrossChainMessageDelivery::NonBlocking,
391                    )
392                    .await?;
393                }
394                Err(NodeError::MissingCrossChainUpdate {
395                    chain_id,
396                    origin,
397                    height,
398                }) if chain_id == proposal.content.block.chain_id
399                    && sent_cross_chain_updates
400                        .get(&origin)
401                        .is_none_or(|h| *h < height) =>
402                {
403                    tracing::debug!(
404                        "Missing cross-chain update; sending chain {origin} to validator {}.",
405                        self.remote_node.public_key
406                    );
407                    sent_cross_chain_updates.insert(origin, height);
408                    // Some received certificates may be missing for this validator
409                    // (e.g. to create the chain or make the balance sufficient) so we are going to
410                    // synchronize them now and retry.
411                    self.send_chain_information(
412                        origin,
413                        height.try_add_one()?,
414                        CrossChainMessageDelivery::Blocking,
415                    )
416                    .await?;
417                }
418                Err(NodeError::EventsNotFound(event_ids)) => {
419                    let mut publisher_heights = BTreeMap::new();
420                    let new_chain_ids = event_ids
421                        .iter()
422                        .map(|event_id| event_id.chain_id)
423                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
424                        .collect::<BTreeSet<_>>();
425                    tracing::debug!(
426                        "Missing events; sending chains {new_chain_ids:?} to validator {}",
427                        self.remote_node.public_key
428                    );
429                    ensure!(
430                        !new_chain_ids.is_empty(),
431                        NodeError::EventsNotFound(event_ids)
432                    );
433                    for chain_id in new_chain_ids {
434                        let height = self
435                            .local_node
436                            .chain_state_view(chain_id)
437                            .await?
438                            .next_height_to_preprocess()
439                            .await?;
440                        publisher_heights.insert(chain_id, height);
441                        publisher_chain_ids_sent.insert(chain_id);
442                    }
443                    self.send_chain_info_up_to_heights(
444                        publisher_heights,
445                        CrossChainMessageDelivery::NonBlocking,
446                    )
447                    .await?;
448                }
449                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
450                    if !blob_ids.is_empty() =>
451                {
452                    tracing::debug!("Missing blobs");
453                    // For `BlobsNotFound`, we assume that the local node should already be
454                    // updated with the needed blobs, so sending the chain information about the
455                    // certificates that last used the blobs to the validator node should be enough.
456                    let published_blob_ids =
457                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
458                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
459                    let mut published_blobs = Vec::new();
460                    {
461                        let chain = self.local_node.chain_state_view(chain_id).await?;
462                        for blob_id in published_blob_ids {
463                            published_blobs
464                                .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
465                        }
466                    }
467                    self.remote_node
468                        .send_pending_blobs(chain_id, published_blobs)
469                        .await?;
470                    let missing_blob_ids = self
471                        .remote_node
472                        .node
473                        .missing_blob_ids(mem::take(&mut blob_ids))
474                        .await?;
475                    let blob_states = self
476                        .local_node
477                        .read_blob_states_from_storage(&missing_blob_ids)
478                        .await?;
479                    let mut chain_heights = BTreeMap::new();
480                    for blob_state in blob_states {
481                        let block_chain_id = blob_state.chain_id;
482                        let block_height = blob_state.block_height.try_add_one()?;
483                        chain_heights
484                            .entry(block_chain_id)
485                            .and_modify(|h| *h = block_height.max(*h))
486                            .or_insert(block_height);
487                    }
488                    tracing::debug!("Sending chains {chain_heights:?}");
489
490                    self.send_chain_info_up_to_heights(
491                        chain_heights,
492                        CrossChainMessageDelivery::NonBlocking,
493                    )
494                    .await?;
495                }
496                // Fail immediately on other errors.
497                Err(err) => return Err(err.into()),
498            }
499        }
500    }
501
502    async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
503        let local_admin_info = self.local_node.chain_info(self.admin_id).await?;
504        Box::pin(self.send_chain_information(
505            self.admin_id,
506            local_admin_info.next_block_height,
507            CrossChainMessageDelivery::NonBlocking,
508        ))
509        .await
510    }
511
512    pub async fn send_chain_information(
513        &mut self,
514        chain_id: ChainId,
515        target_block_height: BlockHeight,
516        delivery: CrossChainMessageDelivery,
517    ) -> Result<(), chain_client::Error> {
518        let info = if let Ok(height) = target_block_height.try_sub_one() {
519            // Figure out which certificates this validator is missing. In many cases, it's just the
520            // last one, so we optimistically send that one right away.
521            let hash = self
522                .local_node
523                .chain_state_view(chain_id)
524                .await?
525                .block_hashes(height..=height)
526                .await?
527                .into_iter()
528                .next()
529                .ok_or_else(|| {
530                    chain_client::Error::InternalError(
531                        "send_chain_information called with invalid target_block_height",
532                    )
533                })?;
534            let certificate = self
535                .local_node
536                .storage_client()
537                .read_certificate(hash)
538                .await?
539                .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?;
540            let info = match self.send_confirmed_certificate(certificate, delivery).await {
541                Err(chain_client::Error::RemoteNodeError(NodeError::EventsNotFound(event_ids)))
542                    if event_ids.iter().all(|event_id| {
543                        event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
544                            && event_id.chain_id == self.admin_id
545                    }) =>
546                {
547                    if chain_id != self.admin_id {
548                        tracing::error!(
549                            "Missing epochs were not handled by send_confirmed_certificate."
550                        );
551                    }
552                    let query = ChainInfoQuery::new(chain_id);
553                    self.remote_node.handle_chain_info_query(query).await?
554                }
555                Err(err) => return Err(err),
556                Ok(info) => info,
557            };
558            // Obtain the missing blocks and the manager state from the local node.
559            let range = info.next_block_height..target_block_height;
560            let validator_missing_hashes = self
561                .local_node
562                .chain_state_view(chain_id)
563                .await?
564                .block_hashes(range)
565                .await?;
566            if !validator_missing_hashes.is_empty() {
567                // Send the requested certificates in order.
568                let certificates = self
569                    .local_node
570                    .storage_client()
571                    .read_certificates(validator_missing_hashes.clone())
572                    .await?;
573                let certificates =
574                    match ResultReadCertificates::new(certificates, validator_missing_hashes) {
575                        ResultReadCertificates::Certificates(certificates) => certificates,
576                        ResultReadCertificates::InvalidHashes(hashes) => {
577                            return Err(chain_client::Error::ReadCertificatesError(hashes))
578                        }
579                    };
580                for certificate in certificates {
581                    self.send_confirmed_certificate(certificate, delivery)
582                        .await?;
583                }
584            }
585            info
586        } else {
587            // The remote node might not know about the chain yet.
588            let blob_states = self
589                .local_node
590                .read_blob_states_from_storage(&[BlobId::new(
591                    chain_id.0,
592                    BlobType::ChainDescription,
593                )])
594                .await?;
595            let mut chain_heights = BTreeMap::new();
596            for blob_state in blob_states {
597                let block_chain_id = blob_state.chain_id;
598                let block_height = blob_state.block_height.try_add_one()?;
599                chain_heights
600                    .entry(block_chain_id)
601                    .and_modify(|h| *h = block_height.max(*h))
602                    .or_insert(block_height);
603            }
604            self.send_chain_info_up_to_heights(
605                chain_heights,
606                CrossChainMessageDelivery::NonBlocking,
607            )
608            .await?;
609            let query = ChainInfoQuery::new(chain_id);
610            self.remote_node.handle_chain_info_query(query).await?
611        };
612        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
613        let query = ChainInfoQuery::new(chain_id).with_manager_values();
614        let local_info = match self.local_node.handle_chain_info_query(query).await {
615            Ok(response) => response.info,
616            // We don't have the full chain description.
617            Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
618            Err(error) => return Err(error.into()),
619        };
620        let manager = local_info.manager;
621        if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
622            return Ok(());
623        }
624        // The remote node is at our height but not at the current round. Send it the proposal,
625        // validated block certificate or timeout certificate that proves the current round.
626        for proposal in manager
627            .requested_proposed
628            .into_iter()
629            .chain(manager.requested_signed_proposal)
630        {
631            if proposal.content.round == manager.current_round {
632                if let Err(err) = self.remote_node.handle_block_proposal(proposal).await {
633                    tracing::info!("Failed to send block proposal: {err}");
634                } else {
635                    return Ok(());
636                }
637            }
638        }
639        if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
640            if validated.round == manager.current_round {
641                if let Err(err) = self
642                    .remote_node
643                    .handle_optimized_validated_certificate(
644                        &validated,
645                        CrossChainMessageDelivery::NonBlocking,
646                    )
647                    .await
648                {
649                    tracing::info!("Failed to send locking block: {err}");
650                } else {
651                    return Ok(());
652                }
653            }
654        }
655        if let Some(cert) = manager.timeout {
656            if cert.round >= remote_round {
657                tracing::debug!("Sending timeout for {}", cert.round);
658                self.remote_node.handle_timeout_certificate(*cert).await?;
659            }
660        }
661        Ok(())
662    }
663
664    async fn send_chain_info_up_to_heights(
665        &mut self,
666        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
667        delivery: CrossChainMessageDelivery,
668    ) -> Result<(), chain_client::Error> {
669        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
670            let mut updater = self.clone();
671            async move {
672                updater
673                    .send_chain_information(chain_id, height, delivery)
674                    .await
675            }
676        }))
677        .try_collect::<Vec<_>>()
678        .await?;
679        Ok(())
680    }
681
682    pub async fn send_chain_update(
683        &mut self,
684        action: CommunicateAction,
685    ) -> Result<LiteVote, chain_client::Error> {
686        let chain_id = match &action {
687            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
688            CommunicateAction::FinalizeBlock { certificate, .. } => {
689                certificate.inner().block().header.chain_id
690            }
691            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
692        };
693        // Send the block proposal, certificate or timeout request and return a vote.
694        let vote = match action {
695            CommunicateAction::SubmitBlock { proposal, blob_ids } => {
696                let info = self.send_block_proposal(proposal, blob_ids).await?;
697                info.manager.pending.ok_or_else(|| {
698                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
699                })?
700            }
701            CommunicateAction::FinalizeBlock {
702                certificate,
703                delivery,
704            } => {
705                let info = self
706                    .send_validated_certificate(*certificate, delivery)
707                    .await?;
708                info.manager.pending.ok_or_else(|| {
709                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
710                })?
711            }
712            CommunicateAction::RequestTimeout { round, height, .. } => {
713                let info = self.request_timeout(chain_id, round, height).await?;
714                info.manager.timeout_vote.ok_or_else(|| {
715                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
716                })?
717            }
718        };
719        vote.check(self.remote_node.public_key)?;
720        Ok(vote)
721    }
722}