Skip to content

Commit

Permalink
feat: Ethereum RPC provider configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Oct 1, 2024
1 parent 3d95a09 commit 524ae10
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 59 deletions.
1 change: 1 addition & 0 deletions event-svc/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ mod store;
mod validator;

pub use service::{BlockStore, DeliverableRequirement, EventService};
pub use validator::EthRpcProvider;
7 changes: 4 additions & 3 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use recon::ReconItem;
use tracing::{trace, warn};

use crate::event::validator::EthRpcProvider;
use crate::store::{CeramicOneEvent, EventInsertable, EventRowDelivered};
use crate::{Error, Result};

Expand Down Expand Up @@ -87,13 +88,13 @@ impl EventService {
pool: SqlitePool,
process_undelivered_events: bool,
validate_events: bool,
ethereum_rpc_urls: Option<&[String]>,
ethereum_rpc_providers: Vec<EthRpcProvider>,
) -> Result<Self> {
CeramicOneEvent::init_delivered_order(&pool).await?;

let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await;

let event_validator = EventValidator::try_new(pool.clone(), ethereum_rpc_urls).await?;
let event_validator = EventValidator::try_new(pool.clone(), ethereum_rpc_providers).await?;

let svc = Self {
pool,
Expand All @@ -113,7 +114,7 @@ impl EventService {
/// in the next pass.. but it's basically same same but different.
#[allow(dead_code)]
pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result<Self> {
Self::try_new(pool, false, true, None).await
Self::try_new(pool, false, true, vec![]).await
}

/// Currently, we track events when the [`ValidationRequirement`] allows. Right now, this applies to
Expand Down
70 changes: 29 additions & 41 deletions event-svc/src/event/validator/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ipld_core::ipld::Ipld;
use recon::ReconItem;
use tokio::try_join;

use crate::event::validator::EthRpcProvider;
use crate::{
event::{
service::{ValidationError, ValidationRequirement},
Expand All @@ -16,7 +17,7 @@ use crate::{
},
},
store::{EventInsertable, SqlitePool},
Error, Result,
Result,
};

#[derive(Debug)]
Expand Down Expand Up @@ -121,21 +122,16 @@ pub struct EventValidator {
/// It contains the ethereum RPC providers and lives for the live of the [`EventValidator`].
/// The [`SignedEventValidator`] is currently constructed on a per validation request basis
/// as it caches and drops events per batch.
time_event_verifier: Option<TimeEventValidator>,
time_event_verifier: TimeEventValidator,
}

impl EventValidator {
/// Create a new event validator
pub async fn try_new(pool: SqlitePool, ethereum_rpc_urls: Option<&[String]>) -> Result<Self> {
let time_event_verifier = if let Some(eth_urls) = ethereum_rpc_urls {
Some(
TimeEventValidator::try_new(eth_urls)
.await
.map_err(Error::new_fatal)?,
)
} else {
None
};
pub async fn try_new(
pool: SqlitePool,
ethereum_rpc_providers: Vec<EthRpcProvider>,
) -> Result<Self> {
let time_event_verifier = TimeEventValidator::new_with_providers(ethereum_rpc_providers);

Ok(Self {
pool,
Expand Down Expand Up @@ -205,35 +201,27 @@ impl EventValidator {
}

async fn validate_time_events(&self, events: TimeValidationBatch) -> Result<ValidatedEvents> {
if let Some(verifier) = &self.time_event_verifier {
let mut validated_events = ValidatedEvents::new_with_expected_valid(events.0.len());
for time_event in events.0 {
// TODO: better transient error handling from RPC client
match verifier
.validate_chain_inclusion(&self.pool, time_event.as_time())
.await
{
Ok(_t) => {
// TODO(AES-345): Someday, we will use `t.as_unix_ts()` and care about the actual timestamp, but for now we just consider it valid
validated_events.valid.push(time_event.into());
}
Err(err) => {
validated_events.invalid.push(Self::convert_inclusion_error(
err,
&time_event.as_inner().key,
));
}
let mut validated_events = ValidatedEvents::new_with_expected_valid(events.0.len());
for time_event in events.0 {
// TODO: better transient error handling from RPC client
match self
.time_event_verifier
.validate_chain_inclusion(&self.pool, time_event.as_time())
.await
{
Ok(_t) => {
// TODO(AES-345): Someday, we will use `t.as_unix_ts()` and care about the actual timestamp, but for now we just consider it valid
validated_events.valid.push(time_event.into());
}
Err(err) => {
validated_events.invalid.push(Self::convert_inclusion_error(
err,
&time_event.as_inner().key,
));
}
}
Ok(validated_events)
} else {
// we don't verify inclusion proofs if we don't have any RPC providers
Ok(ValidatedEvents {
valid: events.0.into_iter().map(ValidatedEvent::from).collect(),
unvalidated: Vec::new(),
invalid: Vec::new(),
})
}
Ok(validated_events)
}

/// Transforms the [`ChainInclusionError`] into a [`ValidationError`] with an appropriate message
Expand Down Expand Up @@ -302,7 +290,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(Some(&ValidationRequirement::new_recon()), events)
Expand All @@ -326,7 +314,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(Some(&ValidationRequirement::new_local()), events)
Expand All @@ -350,7 +338,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(None, events)
Expand Down
2 changes: 2 additions & 0 deletions event-svc/src/event/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ mod signed;

mod time;

pub use time::EthRpcProvider;

pub use event::{EventValidator, UnvalidatedEvent, ValidatedEvent, ValidatedEvents};
4 changes: 3 additions & 1 deletion event-svc/src/event/validator/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl Timestamp {
}
}

/// Provider for a remote Ethereum RPC endpoint.
pub type EthRpcProvider = Arc<dyn EthRpc + Send + Sync>;

pub struct TimeEventValidator {
Expand All @@ -80,6 +81,7 @@ impl std::fmt::Debug for TimeEventValidator {

impl TimeEventValidator {
/// Try to construct the validator by looking building the etherum rpc providers from the given URLsƒsw
#[allow(dead_code)]
pub async fn try_new(rpc_urls: &[String]) -> Result<Self> {
let mut chain_providers = HashMap::with_capacity(rpc_urls.len());
for url in rpc_urls {
Expand All @@ -105,7 +107,6 @@ impl TimeEventValidator {

/// Create from known providers (e.g. inject mocks)
/// Currently used in tests, may switch to this from service if we want to share RPC with anchoring.
#[allow(dead_code)]
pub fn new_with_providers(providers: Vec<EthRpcProvider>) -> Self {
Self {
chain_providers: HashMap::from_iter(
Expand Down Expand Up @@ -374,6 +375,7 @@ mod test {
#[async_trait::async_trait]
impl EthRpc for EthRpcProviderTest {
fn chain_id(&self) -> &caip2::ChainId;
fn url(&self) -> String;
async fn get_block_timestamp(&self, tx_hash: &str) -> Result<Option<ceramic_validation::eth_rpc::ChainTransaction>>;
}
}
Expand Down
2 changes: 2 additions & 0 deletions event-svc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod store;
#[cfg(test)]
mod tests;

pub use ceramic_validation::eth_rpc;
pub use error::Error;
pub use event::EthRpcProvider;
pub use event::{BlockStore, EventService};

pub(crate) type Result<T> = std::result::Result<T, Error>;
4 changes: 2 additions & 2 deletions event-svc/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ macro_rules! test_with_sqlite {
async fn [<$test_name _sqlite>]() {

let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap();
let store = $crate::EventService::try_new(conn, true, true, None).await.unwrap();
let store = $crate::EventService::try_new(conn, true, true, vec![]).await.unwrap();
$(
for stmt in $sql_stmts {
store.pool.run_statement(stmt).await.unwrap();
Expand Down Expand Up @@ -607,7 +607,7 @@ where
#[tokio::test]
async fn test_conclusion_events_since() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect_in_memory().await?;
let service = EventService::try_new(pool, false, false, None).await?;
let service = EventService::try_new(pool, false, false, vec![]).await?;
let test_events = generate_chained_events().await;

for event in test_events {
Expand Down
81 changes: 73 additions & 8 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::{path::PathBuf, time::Duration};

use anyhow::{anyhow, Context, Result};
use crate::{
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, Info,
LogOpts, Network,
};
use anyhow::{anyhow, bail, Context, Result};
use ceramic_anchor_remote::RemoteCas;
use ceramic_anchor_service::AnchorService;
use ceramic_core::NodeId;
use ceramic_event_svc::EventService;
use ceramic_event_svc::eth_rpc::HttpEthRpc;
use ceramic_event_svc::{EthRpcProvider, EventService};
use ceramic_interest_svc::InterestService;
use ceramic_kubo_rpc::Multiaddr;
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
Expand All @@ -18,11 +23,6 @@ use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext};
use tokio::sync::broadcast;
use tracing::{debug, info, warn};

use crate::{
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, Info,
LogOpts, Network,
};

#[derive(Args, Debug)]
pub struct DaemonOpts {
#[command(flatten)]
Expand Down Expand Up @@ -231,6 +231,55 @@ pub struct DaemonOpts {
requires = "experimental_features"
)]
anchor_poll_retry_count: u64,

/// Ethereum RPC URLs used for time events validation. Required when connecting to mainnet and uses fallback URLs if not specified for other networks.
#[arg(
long,
use_value_delimiter = true,
value_delimiter = ',',
env = "CERAMIC_ONE_ETHEREUM_RPC_URLS"
)]
ethereum_rpc_urls: Vec<String>,
}

async fn get_rpc_providers(
ethereum_rpc_urls: Vec<String>,
network: &Network,
) -> Result<Vec<EthRpcProvider>> {
let ethereum_rpc_urls = if ethereum_rpc_urls.is_empty() {
network.default_rpc_urls()?
} else {
ethereum_rpc_urls
};

let mut providers = Vec::new();
for url in ethereum_rpc_urls {
match HttpEthRpc::try_new(&url).await {
Ok(provider) => {
// use the first valid rpc client we find rather than replace one
// could support an array of clients for a chain if desired
let provider: EthRpcProvider = Arc::new(provider);
let provider_chain = provider.chain_id();
if network
.supported_chain_ids()
.map_or(true, |ids| ids.contains(provider_chain))
{
providers.push(provider);
} else {
warn!("Eth RPC provider {} uses chainid {} which isn't supported by Ceramic network {:?}", url, provider_chain,network);
}
}
Err(err) => {
warn!("failed to create RCP client with url: '{url}': {err}");
}
}
}

if providers.is_empty() {
bail!("No usable ethereum RPC configured");
}

Ok(providers)
}

// Start the daemon process
Expand Down Expand Up @@ -274,10 +323,26 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
// Construct sqlite_pool
let sqlite_pool = opts.db_opts.get_sqlite_pool().await?;

let rpc_providers = get_rpc_providers(opts.ethereum_rpc_urls, &opts.network).await?;

info!(
"Using ethereum rpc providers: {:?}",
rpc_providers
.iter()
.map(|provider| provider.url())
.collect::<Vec<_>>()
);

// Construct services from pool
let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone()));
let event_svc = Arc::new(
EventService::try_new(sqlite_pool.clone(), true, opts.event_validation, None).await?,
EventService::try_new(
sqlite_pool.clone(),
true,
opts.event_validation,
rpc_providers,
)
.await?,
);

let network = opts.network.to_network(&opts.local_network_id)?;
Expand Down
Loading

0 comments on commit 524ae10

Please sign in to comment.