Skip to content

Commit

Permalink
feat(node): Allow reading from arbitrary initial chain point (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Dec 25, 2021
1 parent 2e839f2 commit 76fcc5c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
8 changes: 8 additions & 0 deletions src/bin/oura/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ fn main() {
.possible_values(&["tcp", "unix"]),
)
.arg(Arg::with_name("magic").long("magic").takes_value(true))
.arg(
Arg::with_name("since")
.long("since")
.takes_value(true)
.help(
"point in the chain to start reading from, expects format `slot,hex-hash`",
),
)
.arg(
Arg::with_name("mode")
.long("mode")
Expand Down
9 changes: 8 additions & 1 deletion src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{str::FromStr, sync::mpsc::Sender};
use clap::{value_t, ArgMatches};
use oura::{
framework::*,
sources::common::{AddressArg, BearerKind, MagicArg},
sources::common::{AddressArg, BearerKind, MagicArg, PointArg},
};

use serde_derive::Deserialize;
Expand Down Expand Up @@ -56,6 +56,11 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
false => None,
};

let since = match args.is_present("since") {
true => Some(value_t!(args, "since", PointArg)?),
false => None,
};

let mode = match (args.is_present("mode"), &bearer) {
(true, _) => value_t!(args, "mode", PeerMode).expect("invalid value for 'mode' arg"),
(false, BearerKind::Tcp) => PeerMode::AsNode,
Expand All @@ -67,11 +72,13 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
address: AddressArg(bearer, socket),
magic,
well_known: None,
since,
}),
PeerMode::AsClient => WatchSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
magic,
well_known: None,
since,
}),
};

Expand Down
38 changes: 36 additions & 2 deletions src/sources/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use pallas::ouroboros::network::{
multiplexer::Channel,
};
use serde::{de::Visitor, Deserializer};
use serde_derive::Deserialize;
use serde_derive::{Deserialize, Serialize};

use crate::framework::ChainWellKnownInfo;
use crate::framework::{ChainWellKnownInfo, Error};

#[derive(Debug, Deserialize)]
pub enum BearerKind {
Expand All @@ -34,6 +34,40 @@ impl FromStr for BearerKind {
}
}

/// A serialization-friendly chain Point struct using a hex-encoded hash
#[derive(Debug, Serialize, Deserialize)]
pub struct PointArg(u64, String);

impl TryInto<Point> for &PointArg {
type Error = Error;

fn try_into(self) -> Result<Point, Self::Error> {
let hash = hex::decode(&self.1)?;
Ok(Point(self.0, hash))
}
}

impl From<&Point> for PointArg {
fn from(other: &Point) -> Self {
PointArg(other.0, hex::encode(&other.1))
}
}

impl FromStr for PointArg {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains(",") {
let mut parts: Vec<_> = s.split(",").collect();
let slot = parts.remove(0).parse()?;
let hash = parts.remove(0).to_owned();
Ok(PointArg(slot, hash))
} else {
Err("Can't parse chain point value, expecting `slot,hex-hash` format".into())
}
}
}

#[derive(Debug, Deserialize)]
pub struct MagicArg(u64);

Expand Down
15 changes: 10 additions & 5 deletions src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use log::info;

use pallas::ouroboros::network::{
handshake::{n2c, MAINNET_MAGIC},
machines::run_agent,
machines::{primitives::Point, run_agent},
multiplexer::{Channel, Multiplexer},
};

use serde_derive::Deserialize;

use crate::{
framework::{BootstrapResult, ChainWellKnownInfo, Error, Event, SourceConfig},
sources::common::{find_end_of_chain, AddressArg, BearerKind, MagicArg},
sources::common::{find_end_of_chain, AddressArg, BearerKind, MagicArg, PointArg},
};

use super::observe_forever;
Expand All @@ -26,6 +26,8 @@ pub struct Config {
#[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")]
pub magic: Option<MagicArg>,

pub since: Option<PointArg>,

pub well_known: Option<ChainWellKnownInfo>,
}

Expand Down Expand Up @@ -76,12 +78,15 @@ impl SourceConfig for Config {

let mut cs_channel = muxer.use_channel(5);

let node_tip = find_end_of_chain(&mut cs_channel, &well_known)?;
let since: Point = match &self.since {
Some(arg) => arg.try_into()?,
None => find_end_of_chain(&mut cs_channel, &well_known)?,
};

info!("node tip: {:?}", &node_tip);
info!("starting from chain point: {:?}", &since);

let handle = std::thread::spawn(move || {
observe_forever(cs_channel, well_known, node_tip, output)
observe_forever(cs_channel, well_known, since, output)
.expect("chainsync loop failed");
});

Expand Down
23 changes: 17 additions & 6 deletions src/sources/n2n/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::info;

use pallas::ouroboros::network::{
handshake::{n2n, MAINNET_MAGIC},
machines::run_agent,
machines::{primitives::Point, run_agent},
multiplexer::{Channel, Multiplexer},
};

Expand All @@ -15,7 +15,7 @@ use serde_derive::Deserialize;
use crate::{
framework::{BootstrapResult, ChainWellKnownInfo, Error, Event, SourceConfig},
sources::{
common::{find_end_of_chain, AddressArg, BearerKind, MagicArg},
common::{find_end_of_chain, AddressArg, BearerKind, MagicArg, PointArg},
n2n::{fetch_blocks_forever, observe_headers_forever},
},
};
Expand All @@ -27,6 +27,8 @@ pub struct Config {
#[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")]
pub magic: Option<MagicArg>,

pub since: Option<PointArg>,

pub well_known: Option<ChainWellKnownInfo>,
}

Expand Down Expand Up @@ -77,17 +79,26 @@ impl SourceConfig for Config {

let mut cs_channel = muxer.use_channel(2);

let node_tip = find_end_of_chain(&mut cs_channel, &well_known)?;
let since: Point = match &self.since {
Some(arg) => arg.try_into()?,
None => find_end_of_chain(&mut cs_channel, &well_known)?,
};

info!("node tip: {:?}", &node_tip);
info!("starting from chain point: {:?}", &since);

let (headers_tx, headers_rx) = std::sync::mpsc::channel();

let cs_events = output.clone();
let cs_chain_info = well_known.clone();
let cs_handle = std::thread::spawn(move || {
observe_headers_forever(cs_channel, cs_chain_info, node_tip, cs_events, headers_tx)
.expect("chainsync loop failed");
observe_headers_forever(
cs_channel,
cs_chain_info,
since,
cs_events,
headers_tx,
)
.expect("chainsync loop failed");
});

let bf_channel = muxer.use_channel(3);
Expand Down

0 comments on commit 76fcc5c

Please sign in to comment.