Skip to content

Commit

Permalink
More tests for finality relay (paritytech#816)
Browse files Browse the repository at this point in the history
* more tests for finality relay

* clippy

* remove env_logger dep

* fmt

* more clippy

* removed prune_unjustified_headers

* review
  • Loading branch information
svyatonik authored and serban300 committed Apr 9, 2024
1 parent 3cc0eb4 commit 740e1d9
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 200 deletions.
265 changes: 151 additions & 114 deletions bridges/relays/finality-relay/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ pub fn run<P: FinalitySyncPipeline>(
}

/// Unjustified headers container. Ordered by header number.
pub(crate) type UnjustifiedHeaders<P> = Vec<<P as FinalitySyncPipeline>::Header>;
pub(crate) type UnjustifiedHeaders<H> = Vec<H>;
/// Finality proofs container. Ordered by target header number.
pub(crate) type FinalityProofs<P> = Vec<(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)>;
/// Reference to finality proofs container.
pub(crate) type FinalityProofsRef<'a, P> = &'a [(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)];

/// Error that may happen inside finality synchronization loop.
#[derive(Debug)]
enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
/// Source client request has failed with given error.
Source(SourceError),
/// Target client request has failed with given error.
Expand Down Expand Up @@ -182,13 +187,23 @@ struct Transaction<Number> {
}

/// Finality proofs stream that may be restarted.
struct RestartableFinalityProofsStream<S> {
pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted.
needs_restart: bool,
pub(crate) needs_restart: bool,
/// The stream itself.
stream: Pin<Box<S>>,
}

#[cfg(test)]
impl<S> From<S> for RestartableFinalityProofsStream<S> {
fn from(stream: S) -> Self {
RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(stream),
}
}
}

/// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress.
Expand Down Expand Up @@ -272,6 +287,8 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
}
};
if finality_proofs_stream.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);

finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
}
Expand Down Expand Up @@ -368,7 +385,7 @@ where

async fn select_header_to_submit<P, SC, TC>(
source_client: &SC,
_target_client: &TC,
target_client: &TC,
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
recent_finality_proofs: &mut FinalityProofs<P>,
best_number_at_source: P::Number,
Expand All @@ -380,9 +397,6 @@ where
SC: SourceClient<P>,
TC: TargetClient<P>,
{
let mut selected_finality_proof = None;
let mut unjustified_headers = Vec::new();

// to see that the loop is progressing
log::trace!(
target: "bridge",
Expand All @@ -393,6 +407,70 @@ where

// read missing headers. if we see that the header schedules GRANDPA change, we need to
// submit this header
let selected_finality_proof = read_missing_headers::<P, SC, TC>(
source_client,
target_client,
best_number_at_source,
best_number_at_target,
)
.await?;
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
(unjustified_headers, Some((header, finality_proof)))
}
SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
};

// all headers that are missing from the target client are non-mandatory
// => even if we have already selected some header and its persistent finality proof,
// we may try to select better header by reading non-persistent proofs from the stream
read_finality_proofs_from_stream::<P, _>(finality_proofs_stream, recent_finality_proofs);
selected_finality_proof = select_better_recent_finality_proof::<P>(
recent_finality_proofs,
&mut unjustified_headers,
selected_finality_proof,
);

// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof
.as_ref()
.map(|(header, _)| header.number())
.unwrap_or(best_number_at_target);
prune_recent_finality_proofs::<P>(
oldest_finality_proof_to_keep,
recent_finality_proofs,
sync_params.recent_finality_proofs_limit,
);

Ok(selected_finality_proof)
}

/// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Mandatory(Header, FinalityProof),
/// Regular header and its proof has been selected. We may submit this proof, or proof for
/// some better header.
Regular(UnjustifiedHeaders<Header>, Header, FinalityProof),
/// We haven't found any missing header with persistent proof at the target client.
None(UnjustifiedHeaders<Header>),
}

/// Read missing headers and their persistent finality proofs from the target client.
///
/// If we have found some header with known proof, it is returned.
/// Otherwise, `SelectedFinalityProof::None` is returned.
///
/// Unless we have found mandatory header, all missing headers are collected and returned.
pub(crate) async fn read_missing_headers<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>>(
source_client: &SC,
_target_client: &TC,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> Result<SelectedFinalityProof<P::Header, P::FinalityProof>, Error<P, SC::Error, TC::Error>> {
let mut unjustified_headers = Vec::new();
let mut selected_finality_proof = None;
let mut header_number = best_number_at_target + One::one();
while header_number <= best_number_at_source {
let (header, finality_proof) = source_client
Expand All @@ -404,13 +482,13 @@ where
match (is_mandatory, finality_proof) {
(true, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(Some((header, finality_proof)));
return Ok(SelectedFinalityProof::Mandatory(header, finality_proof));
}
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
selected_finality_proof = Some((header, finality_proof));
prune_unjustified_headers::<P>(header_number, &mut unjustified_headers);
}
(false, None) => {
unjustified_headers.push(header);
Expand All @@ -420,37 +498,17 @@ where
header_number = header_number + One::one();
}

// see if we can improve finality by using recent finality proofs
if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() {
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";

// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();

// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;

// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;

// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if intersection.contains(selected_header_number) {
// now remove all obsolete headers and extract selected header
let selected_header = prune_unjustified_headers::<P>(*selected_header_number, &mut unjustified_headers)
.expect("unjustified_headers contain all headers from intersection; qed");
selected_finality_proof = Some((selected_header, finality_proof.clone()));
}
}
Ok(match selected_finality_proof {
Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
None => SelectedFinalityProof::None(unjustified_headers),
})
}

// read all proofs from the stream, probably selecting updated proof that we're going to submit
/// Read finality proofs from the stream.
pub(crate) fn read_finality_proofs_from_stream<P: FinalitySyncPipeline, FPS: Stream<Item = P::FinalityProof>>(
finality_proofs_stream: &mut RestartableFinalityProofsStream<FPS>,
recent_finality_proofs: &mut FinalityProofs<P>,
) {
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Expand All @@ -461,95 +519,74 @@ where
}
None => break,
};
let finality_proof_target_header_number = match finality_proof.target_header_number() {
Some(target_header_number) => target_header_number,
None => {
continue;
}
};

let justified_header =
prune_unjustified_headers::<P>(finality_proof_target_header_number, &mut unjustified_headers);
if let Some(justified_header) = justified_header {
recent_finality_proofs.clear();
selected_finality_proof = Some((justified_header, finality_proof));
} else {
// the number of proofs read during single wakeup is expected to be low, so we aren't pruning
// `recent_finality_proofs` collection too often
recent_finality_proofs.push((finality_proof_target_header_number, finality_proof));
}
recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof));
}
}

// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof
.as_ref()
.map(|(header, _)| header.number())
.unwrap_or(best_number_at_target);
prune_recent_finality_proofs::<P>(
oldest_finality_proof_to_keep,
recent_finality_proofs,
sync_params.recent_finality_proofs_limit,
);
/// Try to select better header and its proof, given finality proofs that we
/// have recently read from the stream.
pub(crate) fn select_better_recent_finality_proof<P: FinalitySyncPipeline>(
recent_finality_proofs: FinalityProofsRef<P>,
unjustified_headers: &mut UnjustifiedHeaders<P::Header>,
selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
) -> Option<(P::Header, P::FinalityProof)> {
if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
return selected_finality_proof;
}

Ok(selected_finality_proof)
}
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";

/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`.
///
/// Returns the header that matches `justified_header_number` (if any).
pub(crate) fn prune_unjustified_headers<P: FinalitySyncPipeline>(
justified_header_number: P::Number,
unjustified_headers: &mut UnjustifiedHeaders<P>,
) -> Option<P::Header> {
prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| {
header.number()
})
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();

// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;

// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;

// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if !intersection.contains(selected_header_number) {
return selected_finality_proof;
}

// now remove all obsolete headers and extract selected header
let selected_header_position = unjustified_headers
.binary_search_by_key(selected_header_number, |header| header.number())
.expect("unjustified_headers contain all headers from intersection; qed");
let selected_header = unjustified_headers.swap_remove(selected_header_position);
Some((selected_header, finality_proof.clone()))
}

pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
justified_header_number: P::Number,
recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize,
) {
prune_ordered_vec(
justified_header_number,
recent_finality_proofs,
recent_finality_proofs_limit,
|(header_number, _)| *header_number,
let position =
recent_finality_proofs.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number);

// remove all obsolete elements
*recent_finality_proofs = recent_finality_proofs.split_off(
position
.map(|position| position + 1)
.unwrap_or_else(|position| position),
);
}

fn prune_ordered_vec<T, Number: relay_utils::BlockNumberBase>(
header_number: Number,
ordered_vec: &mut Vec<T>,
maximal_vec_size: usize,
extract_header_number: impl Fn(&T) -> Number,
) -> Option<T> {
let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number);

// first extract element we're interested in
let extracted_element = match position {
Ok(position) => {
let updated_vec = ordered_vec.split_off(position + 1);
let extracted_element = ordered_vec.pop().expect(
"binary_search_by_key has returned Ok(); so there's element at `position`;\
we're splitting vec at `position+1`; so we have pruned at least 1 element;\
qed",
);
*ordered_vec = updated_vec;
Some(extracted_element)
}
Err(position) => {
*ordered_vec = ordered_vec.split_off(position);
None
}
};

// now - limit vec by size
let split_index = ordered_vec.len().saturating_sub(maximal_vec_size);
*ordered_vec = ordered_vec.split_off(split_index);

extracted_element
let split_index = recent_finality_proofs
.len()
.saturating_sub(recent_finality_proofs_limit);
*recent_finality_proofs = recent_finality_proofs.split_off(split_index);
}

fn print_sync_progress<P: FinalitySyncPipeline>(
Expand Down
Loading

0 comments on commit 740e1d9

Please sign in to comment.