Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement rollback buffer #149

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]


[dependencies]
pallas = "0.5.0-alpha.1"
pallas = "0.5.0-alpha.2"
# pallas = { path = "../pallas/pallas" }
hex = "0.4.3"
net2 = "0.2.37"
Expand Down
2 changes: 2 additions & 0 deletions src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,15 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
address: AddressArg(bearer, socket),
magic: Some(magic),
well_known: None,
min_depth: 0,
mapper,
since,
}),
PeerMode::AsClient => DumpSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
magic: Some(magic),
well_known: None,
min_depth: 0,
mapper,
since,
}),
Expand Down
2 changes: 2 additions & 0 deletions src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
address: AddressArg(bearer, socket),
magic: Some(magic),
well_known: None,
min_depth: 0,
mapper,
since,
}),
PeerMode::AsClient => WatchSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
magic: Some(magic),
well_known: None,
min_depth: 0,
mapper,
since,
}),
Expand Down
3 changes: 1 addition & 2 deletions src/sources/n2c/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ impl TryFrom<BlockContent> for MultiEraBlock {
}
}

#[allow(unused)]
impl MultiEraBlock {
fn read_cursor(&self) -> Result<Point, Error> {
pub(crate) fn read_cursor(&self) -> Result<Point, Error> {
match self {
MultiEraBlock::Byron(x) => match x.deref() {
byron::Block::EbBlock(x) => {
Expand Down
117 changes: 80 additions & 37 deletions src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::{fmt::Debug, ops::Deref};

use log::{error, info};
use std::{collections::HashMap, fmt::Debug, ops::Deref};

use pallas::network::{
miniprotocols::{chainsync, run_agent, Point},
Expand All @@ -11,7 +9,12 @@ use crate::{mapper::EventWriter, model::EventData, Error};

use super::blocks::MultiEraBlock;

struct ChainObserver(EventWriter);
struct ChainObserver {
chain_buffer: chainsync::RollbackBuffer,
min_depth: usize,
blocks: HashMap<Point, MultiEraBlock>,
event_writer: EventWriter,
}

// workaround to put a stop on excessive debug requirement coming from Pallas
impl Debug for ChainObserver {
Expand All @@ -20,64 +23,104 @@ impl Debug for ChainObserver {
}
}

impl Deref for ChainObserver {
type Target = EventWriter;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl ChainObserver {
fn new(writer: EventWriter) -> Self {
Self(writer)
}
fn log_buffer_state(buffer: &chainsync::RollbackBuffer) {
log::info!(
"rollback buffer state, size: {}, oldest: {:?}, latest: {:?}",
buffer.size(),
buffer.oldest().map(|x| x.0),
buffer.latest().map(|x| x.0)
);
}

impl chainsync::Observer<chainsync::BlockContent> for ChainObserver {
fn on_roll_forward(
&self,
&mut self,
content: chainsync::BlockContent,
_tip: &chainsync::Tip,
) -> Result<(), Box<dyn std::error::Error>> {
// parse the block and extract the point of the chain
let cbor = Vec::from(content.deref());
let block = MultiEraBlock::try_from(content)?;

match block {
MultiEraBlock::Byron(model) => self.crawl_byron_with_cbor(&model, &cbor)?,
MultiEraBlock::Shelley(model) => self.crawl_shelley_with_cbor(&model, &cbor)?,
};
let point = block.read_cursor()?;

// store the block for later retrieval
self.blocks.insert(point.clone(), block);

// track the new point in our memory buffer
log::info!("rolling forward to point {:?}", point);
self.chain_buffer.roll_forward(point);

// see if we have points that already reached certain depth
let ready = self.chain_buffer.pop_with_depth(self.min_depth);
log::debug!("found {} points with required min depth", ready.len());

// find confirmed block in memory and send down the pipeline
for point in ready {
let block = self
.blocks
.remove(&point)
.expect("required block not found in memory");

match block {
MultiEraBlock::Byron(model) => {
self.event_writer.crawl_byron_with_cbor(&model, &cbor)?
}
MultiEraBlock::Shelley(model) => {
self.event_writer.crawl_shelley_with_cbor(&model, &cbor)?
}
};
}

log_buffer_state(&self.chain_buffer);

Ok(())
}

fn on_rollback(&self, point: &Point) -> Result<(), Error> {
self.0.append(EventData::RollBack {
block_slot: point.0,
block_hash: hex::encode(&point.1),
})
}
fn on_rollback(&mut self, point: &Point) -> Result<(), Error> {
log::info!("rolling block to point {:?}", point);

fn on_intersect_found(&self, point: &Point, _tip: &chainsync::Tip) -> Result<(), Error> {
info!("intersect found {:?}", point);
Ok(())
}
match self.chain_buffer.roll_back(point) {
chainsync::RollbackEffect::Handled => {
log::debug!("handled rollback within buffer {:?}", point);

// drain memory blocks afther the rollback slot
self.blocks.retain(|x, _| x.0 <= point.0);
}
chainsync::RollbackEffect::OutOfScope => {
log::debug!("rollback out of buffer scope, sending event down the pipeline");

// clear all the blocks in memory, they are orphan
self.blocks.clear();

self.event_writer.append(EventData::RollBack {
block_slot: point.0,
block_hash: hex::encode(&point.1),
})?;
}
}

log_buffer_state(&self.chain_buffer);

fn on_tip_reached(&self) -> Result<(), Error> {
info!("tip reached");
Ok(())
}
}

pub(crate) fn observe_forever(
mut channel: Channel,
writer: EventWriter,
event_writer: EventWriter,
from: Point,
min_depth: usize,
) -> Result<(), Error> {
let observer = ChainObserver::new(writer);
let observer = ChainObserver {
chain_buffer: Default::default(),
blocks: HashMap::new(),
min_depth,
event_writer,
};

let agent = chainsync::BlockConsumer::initial(vec![from], observer);
let agent = run_agent(agent, &mut channel)?;
error!("chainsync agent final state: {:?}", agent.state);
log::warn!("chainsync agent final state: {:?}", agent.state);

Ok(())
}
13 changes: 12 additions & 1 deletion src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ pub struct Config {

#[serde(default)]
pub mapper: MapperConfig,

/// Min block depth (# confirmations) required
///
/// The min depth a block requires to be considered safe to send down the
/// pipeline. This value is used to configure a rollback buffer used
/// internally by the stage. A high value (eg: ~6) will reduce the
/// probability of seeing rollbacks events. The trade-off is that the stage
/// will need some time to fill up the buffer before sending the 1st event.
#[serde(default)]
pub min_depth: usize,
}

fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> {
Expand Down Expand Up @@ -94,8 +104,9 @@ impl SourceProvider for WithUtils<Config> {

info!("starting from chain point: {:?}", &since);

let min_depth = self.inner.min_depth;
let handle = std::thread::spawn(move || {
observe_forever(cs_channel, writer, since).expect("chainsync loop failed");
observe_forever(cs_channel, writer, since, min_depth).expect("chainsync loop failed");
});

Ok((handle, output_rx))
Expand Down
Loading