From df7f4abe68fd41141875c326624edb0f28c9e1f1 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 16 Feb 2022 19:29:25 -0300 Subject: [PATCH] feat: Implement rollback buffer in N2N / N2C sources --- Cargo.lock | 20 +++---- Cargo.toml | 2 +- src/bin/oura/dump.rs | 2 + src/bin/oura/watch.rs | 2 + src/sources/n2c/blocks.rs | 3 +- src/sources/n2c/run.rs | 117 ++++++++++++++++++++++++++------------ src/sources/n2c/setup.rs | 13 ++++- src/sources/n2n/run.rs | 77 ++++++++++++++++++------- src/sources/n2n/setup.rs | 13 ++++- 9 files changed, 176 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 065df74d..57c0cbc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,9 +1069,9 @@ dependencies = [ [[package]] name = "pallas" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdd5e773cae1309528041ddd3ee2b6300cf2c9db57f9e2507cbca2abb46f28c2" +checksum = "94643e265185da36ee52026351f4b06fcc371b04eefaf433867536318c5e7d70" dependencies = [ "pallas-crypto", "pallas-miniprotocols", @@ -1081,9 +1081,9 @@ dependencies = [ [[package]] name = "pallas-crypto" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671daf38b854986595c3417c22b5a2851470c349ac592e6b496351548a3d87a5" +checksum = "c43362b13ca989390ad322af164722f3a14f6f7ddb58afa43588c43d7c94766b" dependencies = [ "cryptoxide", "hex", @@ -1094,9 +1094,9 @@ dependencies = [ [[package]] name = "pallas-miniprotocols" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a03b8dea411d41577a7bf88433d89b85b4c5f0b9be34c5e397951e19719eaa3" +checksum = "a713a31478ee300028772d455271c4fffe0983240c61ffd29c2974ae9dd41211" dependencies = [ "hex", "itertools", @@ -1108,9 +1108,9 @@ dependencies = [ [[package]] name = "pallas-multiplexer" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2d8ce543d4b759988b3e41a2c6b65a6f31519b0e700cfc01dec36686e10643" +checksum = "eb08a73b3a9ed19a20f2527488fa8b3bdc8dffdf05feb6b7d732ee5604faac1d" dependencies = [ "byteorder 1.4.3", "hex", @@ -1119,9 +1119,9 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.5.0-alpha.1" +version = "0.5.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ccef5506625a02c35b89eb09dbeb5490bfdb035e9b68a6290b5acbd3e68cd2" +checksum = "1f91d1c7cd2ab5d8a46b632eab1823d73ef5ff112a564521fa0983337addc7fa" dependencies = [ "base58", "hex", diff --git a/Cargo.toml b/Cargo.toml index bb5d1fee..5295e024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.5.0-alpha.1" +pallas = "0.5.0-alpha.2" # pallas = { path = "../pallas/pallas" } hex = "0.4.3" net2 = "0.2.37" diff --git a/src/bin/oura/dump.rs b/src/bin/oura/dump.rs index e93f6163..c15f2d16 100644 --- a/src/bin/oura/dump.rs +++ b/src/bin/oura/dump.rs @@ -114,6 +114,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { address: AddressArg(bearer, socket), magic: Some(magic), well_known: None, + min_depth: 0, mapper, since, }), @@ -121,6 +122,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { address: AddressArg(bearer, socket), magic: Some(magic), well_known: None, + min_depth: 0, mapper, since, }), diff --git a/src/bin/oura/watch.rs b/src/bin/oura/watch.rs index 243f5de5..02186f45 100644 --- a/src/bin/oura/watch.rs +++ b/src/bin/oura/watch.rs @@ -92,6 +92,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { address: AddressArg(bearer, socket), magic: Some(magic), well_known: None, + min_depth: 0, mapper, since, }), @@ -99,6 +100,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { address: AddressArg(bearer, socket), magic: Some(magic), well_known: None, + min_depth: 0, mapper, since, }), diff --git a/src/sources/n2c/blocks.rs b/src/sources/n2c/blocks.rs index f1be2c82..7c4087db 100644 --- a/src/sources/n2c/blocks.rs +++ b/src/sources/n2c/blocks.rs @@ -36,9 +36,8 @@ impl TryFrom for MultiEraBlock { } } -#[allow(unused)] impl MultiEraBlock { - fn read_cursor(&self) -> Result { + pub(crate) fn read_cursor(&self) -> Result { match self { MultiEraBlock::Byron(x) => match x.deref() { byron::Block::EbBlock(x) => { diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 58013cdb..8f0b43db 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,6 +1,4 @@ -use std::{fmt::Debug, ops::Deref}; - -use log::{error, info}; +use std::{collections::HashMap, fmt::Debug, ops::Deref}; use pallas::network::{ miniprotocols::{chainsync, run_agent, Point}, @@ -11,7 +9,12 @@ use crate::{mapper::EventWriter, model::EventData, Error}; use super::blocks::MultiEraBlock; -struct ChainObserver(EventWriter); +struct ChainObserver { + chain_buffer: chainsync::RollbackBuffer, + min_depth: usize, + blocks: HashMap, + event_writer: EventWriter, +} // workaround to put a stop on excessive debug requirement coming from Pallas impl Debug for ChainObserver { @@ -20,64 +23,104 @@ impl Debug for ChainObserver { } } -impl Deref for ChainObserver { - type Target = EventWriter; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ChainObserver { - fn new(writer: EventWriter) -> Self { - Self(writer) - } +fn log_buffer_state(buffer: &chainsync::RollbackBuffer) { + log::info!( + "rollback buffer state, size: {}, oldest: {:?}, latest: {:?}", + buffer.size(), + buffer.oldest().map(|x| x.0), + buffer.latest().map(|x| x.0) + ); } impl chainsync::Observer for ChainObserver { fn on_roll_forward( - &self, + &mut self, content: chainsync::BlockContent, _tip: &chainsync::Tip, ) -> Result<(), Box> { + // parse the block and extract the point of the chain let cbor = Vec::from(content.deref()); let block = MultiEraBlock::try_from(content)?; - - match block { - MultiEraBlock::Byron(model) => self.crawl_byron_with_cbor(&model, &cbor)?, - MultiEraBlock::Shelley(model) => self.crawl_shelley_with_cbor(&model, &cbor)?, - }; + let point = block.read_cursor()?; + + // store the block for later retrieval + self.blocks.insert(point.clone(), block); + + // track the new point in our memory buffer + log::info!("rolling forward to point {:?}", point); + self.chain_buffer.roll_forward(point); + + // see if we have points that already reached certain depth + let ready = self.chain_buffer.pop_with_depth(self.min_depth); + log::debug!("found {} points with required min depth", ready.len()); + + // find confirmed block in memory and send down the pipeline + for point in ready { + let block = self + .blocks + .remove(&point) + .expect("required block not found in memory"); + + match block { + MultiEraBlock::Byron(model) => { + self.event_writer.crawl_byron_with_cbor(&model, &cbor)? + } + MultiEraBlock::Shelley(model) => { + self.event_writer.crawl_shelley_with_cbor(&model, &cbor)? + } + }; + } + + log_buffer_state(&self.chain_buffer); Ok(()) } - fn on_rollback(&self, point: &Point) -> Result<(), Error> { - self.0.append(EventData::RollBack { - block_slot: point.0, - block_hash: hex::encode(&point.1), - }) - } + fn on_rollback(&mut self, point: &Point) -> Result<(), Error> { + log::info!("rolling block to point {:?}", point); - fn on_intersect_found(&self, point: &Point, _tip: &chainsync::Tip) -> Result<(), Error> { - info!("intersect found {:?}", point); - Ok(()) - } + match self.chain_buffer.roll_back(point) { + chainsync::RollbackEffect::Handled => { + log::debug!("handled rollback within buffer {:?}", point); + + // drain memory blocks afther the rollback slot + self.blocks.retain(|x, _| x.0 <= point.0); + } + chainsync::RollbackEffect::OutOfScope => { + log::debug!("rollback out of buffer scope, sending event down the pipeline"); + + // clear all the blocks in memory, they are orphan + self.blocks.clear(); + + self.event_writer.append(EventData::RollBack { + block_slot: point.0, + block_hash: hex::encode(&point.1), + })?; + } + } + + log_buffer_state(&self.chain_buffer); - fn on_tip_reached(&self) -> Result<(), Error> { - info!("tip reached"); Ok(()) } } pub(crate) fn observe_forever( mut channel: Channel, - writer: EventWriter, + event_writer: EventWriter, from: Point, + min_depth: usize, ) -> Result<(), Error> { - let observer = ChainObserver::new(writer); + let observer = ChainObserver { + chain_buffer: Default::default(), + blocks: HashMap::new(), + min_depth, + event_writer, + }; + let agent = chainsync::BlockConsumer::initial(vec![from], observer); let agent = run_agent(agent, &mut channel)?; - error!("chainsync agent final state: {:?}", agent.state); + log::warn!("chainsync agent final state: {:?}", agent.state); Ok(()) } diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index 492da9a8..c1049833 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -40,6 +40,16 @@ pub struct Config { #[serde(default)] pub mapper: MapperConfig, + + /// Min block depth (# confirmations) required + /// + /// The min depth a block requires to be considered safe to send down the + /// pipeline. This value is used to configure a rollback buffer used + /// internally by the stage. A high value (eg: ~6) will reduce the + /// probability of seeing rollbacks events. The trade-off is that the stage + /// will need some time to fill up the buffer before sending the 1st event. + #[serde(default)] + pub min_depth: usize, } fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { @@ -94,8 +104,9 @@ impl SourceProvider for WithUtils { info!("starting from chain point: {:?}", &since); + let min_depth = self.inner.min_depth; let handle = std::thread::spawn(move || { - observe_forever(cs_channel, writer, since).expect("chainsync loop failed"); + observe_forever(cs_channel, writer, since, min_depth).expect("chainsync loop failed"); }); Ok((handle, output_rx)) diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index ee39959b..62b43710 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,15 +1,9 @@ use std::fmt::Debug; -use log::{info, warn}; - use pallas::{ ledger::primitives::probing, network::{ - miniprotocols::{ - blockfetch, - chainsync::{self, HeaderContent}, - run_agent, Point, - }, + miniprotocols::{blockfetch, chainsync, run_agent, Point}, multiplexer::Channel, }, }; @@ -53,6 +47,8 @@ impl blockfetch::Observer for Block2EventMapper { } struct ChainObserver { + min_depth: usize, + chain_buffer: chainsync::RollbackBuffer, block_requests: SyncSender, event_writer: EventWriter, } @@ -64,28 +60,64 @@ impl Debug for ChainObserver { } } -impl chainsync::Observer for ChainObserver { +fn log_buffer_state(buffer: &chainsync::RollbackBuffer) { + log::info!( + "rollback buffer state, size: {}, oldest: {:?}, latest: {:?}", + buffer.size(), + buffer.oldest().map(|x| x.0), + buffer.latest().map(|x| x.0) + ); +} + +impl chainsync::Observer for &mut ChainObserver { fn on_roll_forward( - &self, + &mut self, content: chainsync::HeaderContent, _tip: &chainsync::Tip, ) -> Result<(), Error> { + // parse the header and extract the point of the chain let header = MultiEraHeader::try_from(content)?; - let cursor = header.read_cursor()?; + let point = header.read_cursor()?; + + // track the new point in our memory buffer + log::info!("rolling forward to point {:?}", point); + self.chain_buffer.roll_forward(point); + + // see if we have points that already reached certain depth + let ready = self.chain_buffer.pop_with_depth(self.min_depth); + log::debug!("found {} points with required min depth", ready.len()); + + // request download of blocks for confirmed points + for point in ready { + log::debug!("requesting block fetch for point {:?}", point); + self.block_requests.send(point)?; + } - info!("requesting block fetch for point {:?}", cursor); - self.block_requests.send(cursor)?; + log_buffer_state(&self.chain_buffer); Ok(()) } - fn on_rollback(&self, point: &Point) -> Result<(), Error> { - info!("rolling block to point {:?}", point); + fn on_rollback(&mut self, point: &Point) -> Result<(), Error> { + log::info!("rolling block to point {:?}", point); - self.event_writer.append(EventData::RollBack { - block_slot: point.0, - block_hash: hex::encode(&point.1), - }) + match self.chain_buffer.roll_back(point) { + chainsync::RollbackEffect::Handled => { + log::debug!("handled rollback within buffer {:?}", point); + } + chainsync::RollbackEffect::OutOfScope => { + log::debug!("rollback out of buffer scope, sending event down the pipeline"); + + self.event_writer.append(EventData::RollBack { + block_slot: point.0, + block_hash: hex::encode(&point.1), + })?; + } + } + + log_buffer_state(&self.chain_buffer); + + Ok(()) } } @@ -97,7 +129,7 @@ pub(crate) fn fetch_blocks_forever( let observer = Block2EventMapper(event_writer); let agent = blockfetch::OnDemandClient::initial(input, observer); let agent = run_agent(agent, &mut channel)?; - warn!("chainsync agent final state: {:?}", agent.state); + log::warn!("chainsync agent final state: {:?}", agent.state); Ok(()) } @@ -107,15 +139,18 @@ pub(crate) fn observe_headers_forever( event_writer: EventWriter, from: Point, block_requests: SyncSender, + min_depth: usize, ) -> Result<(), Error> { - let observer = ChainObserver { + let observer = &mut ChainObserver { + chain_buffer: Default::default(), + min_depth, event_writer, block_requests, }; let agent = chainsync::HeaderConsumer::initial(vec![from], observer); let agent = run_agent(agent, &mut channel)?; - warn!("chainsync agent final state: {:?}", agent.state); + log::warn!("chainsync agent final state: {:?}", agent.state); Ok(()) } diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index 108b17fb..2ba79d54 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -40,6 +40,16 @@ pub struct Config { #[serde(default)] pub mapper: MapperConfig, + + /// Min block depth (# confirmations) required + /// + /// The min depth a block requires to be considered safe to send down the + /// pipeline. This value is used to configure a rollback buffer used + /// internally by the stage. A high value (eg: ~6) will reduce the + /// probability of seeing rollbacks events. The trade-off is that the stage + /// will need some time to fill up the buffer before sending the 1st event. + #[serde(default)] + pub min_depth: usize, } fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { @@ -96,9 +106,10 @@ impl SourceProvider for WithUtils { let (headers_tx, headers_rx) = std::sync::mpsc::sync_channel(100); + let min_depth = self.inner.min_depth; let cs_writer = writer.clone(); let cs_handle = std::thread::spawn(move || { - observe_headers_forever(cs_channel, cs_writer, since, headers_tx) + observe_headers_forever(cs_channel, cs_writer, since, headers_tx, min_depth) .expect("chainsync loop failed"); });