diff --git a/bridges/relays/finality-relay/src/finality_loop.rs b/bridges/relays/finality-relay/src/finality_loop.rs index 50c23b375723..af5da42cee70 100644 --- a/bridges/relays/finality-relay/src/finality_loop.rs +++ b/bridges/relays/finality-relay/src/finality_loop.rs @@ -136,16 +136,21 @@ pub fn run( } /// Unjustified headers container. Ordered by header number. -pub(crate) type UnjustifiedHeaders

= Vec<

::Header>; +pub(crate) type UnjustifiedHeaders = Vec; /// Finality proofs container. Ordered by target header number. pub(crate) type FinalityProofs

= Vec<(

::Number,

::FinalityProof, )>; +/// Reference to finality proofs container. +pub(crate) type FinalityProofsRef<'a, P> = &'a [( +

::Number, +

::FinalityProof, +)]; /// Error that may happen inside finality synchronization loop. #[derive(Debug)] -enum Error { +pub(crate) enum Error { /// Source client request has failed with given error. Source(SourceError), /// Target client request has failed with given error. @@ -182,13 +187,23 @@ struct Transaction { } /// Finality proofs stream that may be restarted. -struct RestartableFinalityProofsStream { +pub(crate) struct RestartableFinalityProofsStream { /// Flag that the stream needs to be restarted. - needs_restart: bool, + pub(crate) needs_restart: bool, /// The stream itself. stream: Pin>, } +#[cfg(test)] +impl From for RestartableFinalityProofsStream { + fn from(stream: S) -> Self { + RestartableFinalityProofsStream { + needs_restart: false, + stream: Box::pin(stream), + } + } +} + /// Finality synchronization loop state. struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { /// Synchronization loop progress. @@ -272,6 +287,8 @@ async fn run_until_connection_lost( } }; if finality_proofs_stream.needs_restart { + log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME); + finality_proofs_stream.needs_restart = false; finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?); } @@ -368,7 +385,7 @@ where async fn select_header_to_submit( source_client: &SC, - _target_client: &TC, + target_client: &TC, finality_proofs_stream: &mut RestartableFinalityProofsStream, recent_finality_proofs: &mut FinalityProofs

, best_number_at_source: P::Number, @@ -380,9 +397,6 @@ where SC: SourceClient

, TC: TargetClient

, { - let mut selected_finality_proof = None; - let mut unjustified_headers = Vec::new(); - // to see that the loop is progressing log::trace!( target: "bridge", @@ -393,6 +407,70 @@ where // read missing headers. if we see that the header schedules GRANDPA change, we need to // submit this header + let selected_finality_proof = read_missing_headers::( + source_client, + target_client, + best_number_at_source, + best_number_at_target, + ) + .await?; + let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof { + SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))), + SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => { + (unjustified_headers, Some((header, finality_proof))) + } + SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None), + }; + + // all headers that are missing from the target client are non-mandatory + // => even if we have already selected some header and its persistent finality proof, + // we may try to select better header by reading non-persistent proofs from the stream + read_finality_proofs_from_stream::(finality_proofs_stream, recent_finality_proofs); + selected_finality_proof = select_better_recent_finality_proof::

( + recent_finality_proofs, + &mut unjustified_headers, + selected_finality_proof, + ); + + // remove obsolete 'recent' finality proofs + keep its size under certain limit + let oldest_finality_proof_to_keep = selected_finality_proof + .as_ref() + .map(|(header, _)| header.number()) + .unwrap_or(best_number_at_target); + prune_recent_finality_proofs::

( + oldest_finality_proof_to_keep, + recent_finality_proofs, + sync_params.recent_finality_proofs_limit, + ); + + Ok(selected_finality_proof) +} + +/// Finality proof that has been selected by the `read_missing_headers` function. +pub(crate) enum SelectedFinalityProof { + /// Mandatory header and its proof has been selected. We shall submit proof for this header. + Mandatory(Header, FinalityProof), + /// Regular header and its proof has been selected. We may submit this proof, or proof for + /// some better header. + Regular(UnjustifiedHeaders

, Header, FinalityProof), + /// We haven't found any missing header with persistent proof at the target client. + None(UnjustifiedHeaders
), +} + +/// Read missing headers and their persistent finality proofs from the target client. +/// +/// If we have found some header with known proof, it is returned. +/// Otherwise, `SelectedFinalityProof::None` is returned. +/// +/// Unless we have found mandatory header, all missing headers are collected and returned. +pub(crate) async fn read_missing_headers, TC: TargetClient

>( + source_client: &SC, + _target_client: &TC, + best_number_at_source: P::Number, + best_number_at_target: P::Number, +) -> Result, Error> { + let mut unjustified_headers = Vec::new(); + let mut selected_finality_proof = None; let mut header_number = best_number_at_target + One::one(); while header_number <= best_number_at_source { let (header, finality_proof) = source_client @@ -404,13 +482,13 @@ where match (is_mandatory, finality_proof) { (true, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); - return Ok(Some((header, finality_proof))); + return Ok(SelectedFinalityProof::Mandatory(header, finality_proof)); } (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), (false, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); + unjustified_headers.clear(); selected_finality_proof = Some((header, finality_proof)); - prune_unjustified_headers::

(header_number, &mut unjustified_headers); } (false, None) => { unjustified_headers.push(header); @@ -420,37 +498,17 @@ where header_number = header_number + One::one(); } - // see if we can improve finality by using recent finality proofs - if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() { - const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; - - // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end - let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); - let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); - - // we have proofs for headers in range buffered_range_begin..=buffered_range_end - let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; - let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; - - // we have two ranges => find intersection - let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); - let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); - let intersection = intersection_begin..=intersection_end; - - // find last proof from intersection - let selected_finality_proof_index = recent_finality_proofs - .binary_search_by_key(intersection.end(), |(number, _)| *number) - .unwrap_or_else(|index| index.saturating_sub(1)); - let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; - if intersection.contains(selected_header_number) { - // now remove all obsolete headers and extract selected header - let selected_header = prune_unjustified_headers::

(*selected_header_number, &mut unjustified_headers) - .expect("unjustified_headers contain all headers from intersection; qed"); - selected_finality_proof = Some((selected_header, finality_proof.clone())); - } - } + Ok(match selected_finality_proof { + Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof), + None => SelectedFinalityProof::None(unjustified_headers), + }) +} - // read all proofs from the stream, probably selecting updated proof that we're going to submit +/// Read finality proofs from the stream. +pub(crate) fn read_finality_proofs_from_stream>( + finality_proofs_stream: &mut RestartableFinalityProofsStream, + recent_finality_proofs: &mut FinalityProofs

, +) { loop { let next_proof = finality_proofs_stream.stream.next(); let finality_proof = match next_proof.now_or_never() { @@ -461,49 +519,52 @@ where } None => break, }; - let finality_proof_target_header_number = match finality_proof.target_header_number() { - Some(target_header_number) => target_header_number, - None => { - continue; - } - }; - let justified_header = - prune_unjustified_headers::

(finality_proof_target_header_number, &mut unjustified_headers); - if let Some(justified_header) = justified_header { - recent_finality_proofs.clear(); - selected_finality_proof = Some((justified_header, finality_proof)); - } else { - // the number of proofs read during single wakeup is expected to be low, so we aren't pruning - // `recent_finality_proofs` collection too often - recent_finality_proofs.push((finality_proof_target_header_number, finality_proof)); - } + recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof)); } +} - // remove obsolete 'recent' finality proofs + keep its size under certain limit - let oldest_finality_proof_to_keep = selected_finality_proof - .as_ref() - .map(|(header, _)| header.number()) - .unwrap_or(best_number_at_target); - prune_recent_finality_proofs::

( - oldest_finality_proof_to_keep, - recent_finality_proofs, - sync_params.recent_finality_proofs_limit, - ); +/// Try to select better header and its proof, given finality proofs that we +/// have recently read from the stream. +pub(crate) fn select_better_recent_finality_proof( + recent_finality_proofs: FinalityProofsRef

, + unjustified_headers: &mut UnjustifiedHeaders, + selected_finality_proof: Option<(P::Header, P::FinalityProof)>, +) -> Option<(P::Header, P::FinalityProof)> { + if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() { + return selected_finality_proof; + } - Ok(selected_finality_proof) -} + const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; -/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`. -/// -/// Returns the header that matches `justified_header_number` (if any). -pub(crate) fn prune_unjustified_headers( - justified_header_number: P::Number, - unjustified_headers: &mut UnjustifiedHeaders

, -) -> Option { - prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| { - header.number() - }) + // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end + let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); + let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); + + // we have proofs for headers in range buffered_range_begin..=buffered_range_end + let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; + let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; + + // we have two ranges => find intersection + let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); + let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); + let intersection = intersection_begin..=intersection_end; + + // find last proof from intersection + let selected_finality_proof_index = recent_finality_proofs + .binary_search_by_key(intersection.end(), |(number, _)| *number) + .unwrap_or_else(|index| index.saturating_sub(1)); + let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; + if !intersection.contains(selected_header_number) { + return selected_finality_proof; + } + + // now remove all obsolete headers and extract selected header + let selected_header_position = unjustified_headers + .binary_search_by_key(selected_header_number, |header| header.number()) + .expect("unjustified_headers contain all headers from intersection; qed"); + let selected_header = unjustified_headers.swap_remove(selected_header_position); + Some((selected_header, finality_proof.clone())) } pub(crate) fn prune_recent_finality_proofs( @@ -511,45 +572,21 @@ pub(crate) fn prune_recent_finality_proofs( recent_finality_proofs: &mut FinalityProofs

, recent_finality_proofs_limit: usize, ) { - prune_ordered_vec( - justified_header_number, - recent_finality_proofs, - recent_finality_proofs_limit, - |(header_number, _)| *header_number, + let position = + recent_finality_proofs.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number); + + // remove all obsolete elements + *recent_finality_proofs = recent_finality_proofs.split_off( + position + .map(|position| position + 1) + .unwrap_or_else(|position| position), ); -} - -fn prune_ordered_vec( - header_number: Number, - ordered_vec: &mut Vec, - maximal_vec_size: usize, - extract_header_number: impl Fn(&T) -> Number, -) -> Option { - let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number); - - // first extract element we're interested in - let extracted_element = match position { - Ok(position) => { - let updated_vec = ordered_vec.split_off(position + 1); - let extracted_element = ordered_vec.pop().expect( - "binary_search_by_key has returned Ok(); so there's element at `position`;\ - we're splitting vec at `position+1`; so we have pruned at least 1 element;\ - qed", - ); - *ordered_vec = updated_vec; - Some(extracted_element) - } - Err(position) => { - *ordered_vec = ordered_vec.split_off(position); - None - } - }; // now - limit vec by size - let split_index = ordered_vec.len().saturating_sub(maximal_vec_size); - *ordered_vec = ordered_vec.split_off(split_index); - - extracted_element + let split_index = recent_finality_proofs + .len() + .saturating_sub(recent_finality_proofs_limit); + *recent_finality_proofs = recent_finality_proofs.split_off(split_index); } fn print_sync_progress( diff --git a/bridges/relays/finality-relay/src/finality_loop_tests.rs b/bridges/relays/finality-relay/src/finality_loop_tests.rs index 5dfe8edd2124..53f5225ab7e3 100644 --- a/bridges/relays/finality-relay/src/finality_loop_tests.rs +++ b/bridges/relays/finality-relay/src/finality_loop_tests.rs @@ -19,8 +19,8 @@ #![cfg(test)] use crate::finality_loop::{ - prune_recent_finality_proofs, prune_unjustified_headers, run, FinalityProofs, FinalitySyncParams, SourceClient, - TargetClient, UnjustifiedHeaders, + prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof, + FinalityProofs, FinalitySyncParams, SourceClient, TargetClient, }; use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; @@ -71,10 +71,10 @@ impl SourceHeader for TestSourceHeader { } #[derive(Debug, Clone, PartialEq)] -struct TestFinalityProof(Option); +struct TestFinalityProof(TestNumber); impl FinalityProof for TestFinalityProof { - fn target_header_number(&self) -> Option { + fn target_header_number(&self) -> TestNumber { self.0 } } @@ -176,14 +176,14 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync source_best_block_number: 10, source_headers: vec![ (6, (TestSourceHeader(false, 6), None)), - (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(Some(7))))), - (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(Some(8))))), - (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(Some(9))))), + (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), + (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))), + (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), (10, (TestSourceHeader(false, 10), None)), ] .into_iter() .collect(), - source_proofs: vec![TestFinalityProof(Some(12)), TestFinalityProof(Some(14))], + source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], target_best_block_number: 5, target_headers: vec![], @@ -222,22 +222,22 @@ fn finality_sync_loop_works() { // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because // there are no more persistent finality proofs // - // once this ^^^ is done, we generate more blocks && read proof for blocks 12, 14 and 16 from the stream - // but we only submit proof for 16 - // - // proof for block 15 is ignored - we haven't managed to decode it + // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from the stream if data.target_best_block_number == 9 { - data.source_best_block_number = 17; + data.source_best_block_number = 14; data.source_headers.insert(11, (TestSourceHeader(false, 11), None)); data.source_headers - .insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(Some(12))))); + .insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12)))); data.source_headers.insert(13, (TestSourceHeader(false, 13), None)); data.source_headers - .insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(Some(14))))); - data.source_headers - .insert(15, (TestSourceHeader(false, 15), Some(TestFinalityProof(None)))); + .insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14)))); + } + // once this ^^^ is done, we generate more blocks && read persistent proof for block 16 + if data.target_best_block_number == 14 { + data.source_best_block_number = 17; + data.source_headers.insert(15, (TestSourceHeader(false, 15), None)); data.source_headers - .insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(Some(16))))); + .insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16)))); data.source_headers.insert(17, (TestSourceHeader(false, 17), None)); } @@ -247,67 +247,132 @@ fn finality_sync_loop_works() { assert_eq!( client_data.target_headers, vec![ - (TestSourceHeader(true, 8), TestFinalityProof(Some(8))), - (TestSourceHeader(false, 9), TestFinalityProof(Some(9))), - (TestSourceHeader(false, 16), TestFinalityProof(Some(16))), + // before adding 11..14: finality proof for mandatory header#8 + (TestSourceHeader(true, 8), TestFinalityProof(8)), + // before adding 11..14: persistent finality proof for non-mandatory header#9 + (TestSourceHeader(false, 9), TestFinalityProof(9)), + // after adding 11..14: ephemeral finality proof for non-mandatory header#14 + (TestSourceHeader(false, 14), TestFinalityProof(14)), + // after adding 15..17: persistent finality proof for non-mandatory header#16 + (TestSourceHeader(false, 16), TestFinalityProof(16)), ], ); } #[test] -fn prune_unjustified_headers_works() { - let original_unjustified_headers: UnjustifiedHeaders = vec![ - TestSourceHeader(false, 10), - TestSourceHeader(false, 13), - TestSourceHeader(false, 15), - TestSourceHeader(false, 17), - TestSourceHeader(false, 19), - ] - .into_iter() - .collect(); +fn select_better_recent_finality_proof_works() { + // if there are no unjustified headers, nothing is changed + assert_eq!( + select_better_recent_finality_proof::( + &[(5, TestFinalityProof(5))], + &mut vec![], + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ), + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ); - // when header is in the collection - let mut unjustified_headers = original_unjustified_headers.clone(); + // if there are no recent finality proofs, nothing is changed assert_eq!( - prune_unjustified_headers::(10, &mut unjustified_headers), - Some(TestSourceHeader(false, 10)), + select_better_recent_finality_proof::( + &[], + &mut vec![TestSourceHeader(false, 5)], + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ), + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), ); - assert_eq!(&original_unjustified_headers[1..], unjustified_headers,); - // when the header doesn't exist in the collection - let mut unjustified_headers = original_unjustified_headers.clone(); + // if there's no intersection between recent finality proofs and unjustified headers, nothing is changed + let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; assert_eq!( - prune_unjustified_headers::(11, &mut unjustified_headers), - None, + select_better_recent_finality_proof::( + &[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))], + &mut unjustified_headers, + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ), + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), ); - assert_eq!(&original_unjustified_headers[1..], unjustified_headers,); - // when last entry is pruned - let mut unjustified_headers = original_unjustified_headers.clone(); + // if there's intersection between recent finality proofs and unjustified headers, but there are no + // proofs in this intersection, nothing is changed + let mut unjustified_headers = vec![ + TestSourceHeader(false, 8), + TestSourceHeader(false, 9), + TestSourceHeader(false, 10), + ]; assert_eq!( - prune_unjustified_headers::(19, &mut unjustified_headers), - Some(TestSourceHeader(false, 19)), + select_better_recent_finality_proof::( + &[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))], + &mut unjustified_headers, + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ), + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ); + assert_eq!( + unjustified_headers, + vec![ + TestSourceHeader(false, 8), + TestSourceHeader(false, 9), + TestSourceHeader(false, 10) + ] ); - assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); + // if there's intersection between recent finality proofs and unjustified headers and there's + // a proof in this intersection: + // - this better (last from intersection) proof is selected; + // - 'obsolete' unjustified headers are pruned. + let mut unjustified_headers = vec![ + TestSourceHeader(false, 8), + TestSourceHeader(false, 9), + TestSourceHeader(false, 10), + ]; + assert_eq!( + select_better_recent_finality_proof::( + &[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))], + &mut unjustified_headers, + Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + ), + Some((TestSourceHeader(false, 9), TestFinalityProof(9))), + ); +} - // when we try and prune past last entry - let mut unjustified_headers = original_unjustified_headers.clone(); +#[test] +fn read_finality_proofs_from_stream_works() { + // when stream is currently empty, nothing is changed + let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))]; + let mut stream = futures::stream::pending().into(); + read_finality_proofs_from_stream::(&mut stream, &mut recent_finality_proofs); + assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]); + assert_eq!(stream.needs_restart, false); + + // when stream has entry with target, it is added to the recent proofs container + let mut stream = futures::stream::iter(vec![TestFinalityProof(4)]) + .chain(futures::stream::pending()) + .into(); + read_finality_proofs_from_stream::(&mut stream, &mut recent_finality_proofs); assert_eq!( - prune_unjustified_headers::(20, &mut unjustified_headers), - None, + recent_finality_proofs, + vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] + ); + assert_eq!(stream.needs_restart, false); + + // when stream has ended, we'll need to restart it + let mut stream = futures::stream::empty().into(); + read_finality_proofs_from_stream::(&mut stream, &mut recent_finality_proofs); + assert_eq!( + recent_finality_proofs, + vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] ); - assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); + assert_eq!(stream.needs_restart, true); } #[test] fn prune_recent_finality_proofs_works() { let original_recent_finality_proofs: FinalityProofs = vec![ - (10, TestFinalityProof(Some(10))), - (13, TestFinalityProof(Some(13))), - (15, TestFinalityProof(Some(15))), - (17, TestFinalityProof(Some(17))), - (19, TestFinalityProof(Some(19))), + (10, TestFinalityProof(10)), + (13, TestFinalityProof(13)), + (15, TestFinalityProof(15)), + (17, TestFinalityProof(17)), + (19, TestFinalityProof(19)), ] .into_iter() .collect(); diff --git a/bridges/relays/finality-relay/src/lib.rs b/bridges/relays/finality-relay/src/lib.rs index e9d946e27f18..a246e4bd954c 100644 --- a/bridges/relays/finality-relay/src/lib.rs +++ b/bridges/relays/finality-relay/src/lib.rs @@ -53,8 +53,6 @@ pub trait SourceHeader: Clone + Debug + PartialEq + Send + Sync { /// Abstract finality proof that is justifying block finality. pub trait FinalityProof: Clone + Send + Sync + Debug { - /// Return header id that this proof is generated for. - /// - /// None is returned if proof is invalid from relayer PoV. - fn target_header_number(&self) -> Option; + /// Return number of header that this proof is generated for. + fn target_header_number(&self) -> Number; } diff --git a/bridges/relays/substrate-client/src/finality_source.rs b/bridges/relays/substrate-client/src/finality_source.rs index 2c76619e867f..18293efa128f 100644 --- a/bridges/relays/substrate-client/src/finality_source.rs +++ b/bridges/relays/substrate-client/src/finality_source.rs @@ -22,6 +22,7 @@ use crate::error::Error; use crate::sync_header::SyncHeader; use async_trait::async_trait; +use bp_header_chain::justification::decode_justification_target; use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader}; use futures::stream::{unfold, Stream, StreamExt}; use relay_utils::relay_loop::Client as RelayClient; @@ -30,26 +31,23 @@ use std::{marker::PhantomData, pin::Pin}; /// Wrapped raw Justification. #[derive(Debug, Clone)] -pub struct Justification

{ +pub struct Justification { + /// Header number decoded from the [`raw_justification`]. + target_header_number: Number, + /// Raw, encoded justification bytes. raw_justification: sp_runtime::Justification, - _phantom: PhantomData
, } -impl
Justification
{ +impl Justification { /// Extract raw justification. pub fn into_inner(self) -> sp_runtime::Justification { self.raw_justification } } -impl
FinalityProof for Justification
-where - Header: HeaderT, -{ - fn target_header_number(&self) -> Option { - bp_header_chain::justification::decode_justification_target::
(&self.raw_justification) - .ok() - .map(|(_, number)| number) +impl FinalityProof for Justification { + fn target_header_number(&self) -> Number { + self.target_header_number } } @@ -96,11 +94,11 @@ where Hash = C::Hash, Number = C::BlockNumber, Header = SyncHeader, - FinalityProof = Justification, + FinalityProof = Justification, >, P::Header: SourceHeader, { - type FinalityProofsStream = Pin>>>; + type FinalityProofsStream = Pin>>>; async fn best_finalized_block_number(&self) -> Result { // we **CAN** continue to relay finality proofs if source node is out of sync, because @@ -122,8 +120,8 @@ where .justification() .cloned() .map(|raw_justification| Justification { + target_header_number: number, raw_justification, - _phantom: Default::default(), }), )) } @@ -132,14 +130,31 @@ where Ok(unfold( self.client.clone().subscribe_justifications().await?, move |mut subscription| async move { - let next_justification = subscription.next().await?; - Some(( - Justification { - raw_justification: next_justification.0, - _phantom: Default::default(), - }, - subscription, - )) + loop { + let next_justification = subscription.next().await?; + let decoded_target = decode_justification_target::(&next_justification.0); + let target_header_number = match decoded_target { + Ok((_, number)) => number, + Err(err) => { + log::error!( + target: "bridge", + "Failed to decode justification target from the {} justifications stream: {:?}", + P::SOURCE_NAME, + err, + ); + + continue; + } + }; + + return Some(( + Justification { + target_header_number, + raw_justification: next_justification.0, + }, + subscription, + )); + } }, ) .boxed()) diff --git a/bridges/relays/substrate/src/finality_pipeline.rs b/bridges/relays/substrate/src/finality_pipeline.rs index 7ec592d5dee1..21865b6c4485 100644 --- a/bridges/relays/substrate/src/finality_pipeline.rs +++ b/bridges/relays/substrate/src/finality_pipeline.rs @@ -89,7 +89,7 @@ where type Hash = HashOf; type Number = BlockNumberOf; type Header = SyncHeader; - type FinalityProof = Justification; + type FinalityProof = Justification; } /// Run Substrate-to-Substrate finality sync. @@ -103,7 +103,7 @@ pub async fn run( Hash = HashOf, Number = BlockNumberOf, Header = SyncHeader, - FinalityProof = Justification, + FinalityProof = Justification, >, SourceChain: Clone + Chain, BlockNumberOf: BlockNumberBase, diff --git a/bridges/relays/substrate/src/millau_headers_to_rialto.rs b/bridges/relays/substrate/src/millau_headers_to_rialto.rs index 889676d673bf..f84eee03a9fd 100644 --- a/bridges/relays/substrate/src/millau_headers_to_rialto.rs +++ b/bridges/relays/substrate/src/millau_headers_to_rialto.rs @@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto { async fn make_submit_finality_proof_transaction( &self, header: MillauSyncHeader, - proof: Justification, + proof: Justification, ) -> Result { let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let nonce = self.target_client.next_account_index(account_id).await?; diff --git a/bridges/relays/substrate/src/rialto_headers_to_millau.rs b/bridges/relays/substrate/src/rialto_headers_to_millau.rs index 12b2086728a7..5a9bbb12133a 100644 --- a/bridges/relays/substrate/src/rialto_headers_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_headers_to_millau.rs @@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau { async fn make_submit_finality_proof_transaction( &self, header: RialtoSyncHeader, - proof: Justification, + proof: Justification, ) -> Result { let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let nonce = self.target_client.next_account_index(account_id).await?;