Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ethereum RPC provider configuration #547

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions event-svc/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ mod store;
mod validator;

pub use service::{BlockStore, DeliverableRequirement, EventService};
pub use validator::EthRpcProvider;
7 changes: 4 additions & 3 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use recon::ReconItem;
use tracing::{trace, warn};

use crate::event::validator::EthRpcProvider;
use crate::store::{CeramicOneEvent, EventInsertable, EventRowDelivered};
use crate::{Error, Result};

Expand Down Expand Up @@ -87,13 +88,13 @@ impl EventService {
pool: SqlitePool,
process_undelivered_events: bool,
validate_events: bool,
ethereum_rpc_urls: Option<&[String]>,
ethereum_rpc_providers: Vec<EthRpcProvider>,
) -> Result<Self> {
CeramicOneEvent::init_delivered_order(&pool).await?;

let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await;

let event_validator = EventValidator::try_new(pool.clone(), ethereum_rpc_urls).await?;
let event_validator = EventValidator::try_new(pool.clone(), ethereum_rpc_providers).await?;

let svc = Self {
pool,
Expand All @@ -113,7 +114,7 @@ impl EventService {
/// in the next pass.. but it's basically same same but different.
#[allow(dead_code)]
pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result<Self> {
Self::try_new(pool, false, true, None).await
Self::try_new(pool, false, true, vec![]).await
}

/// Currently, we track events when the [`ValidationRequirement`] allows. Right now, this applies to
Expand Down
70 changes: 29 additions & 41 deletions event-svc/src/event/validator/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ipld_core::ipld::Ipld;
use recon::ReconItem;
use tokio::try_join;

use crate::event::validator::EthRpcProvider;
use crate::{
event::{
service::{ValidationError, ValidationRequirement},
Expand All @@ -16,7 +17,7 @@ use crate::{
},
},
store::{EventInsertable, SqlitePool},
Error, Result,
Result,
};

#[derive(Debug)]
Expand Down Expand Up @@ -121,21 +122,16 @@ pub struct EventValidator {
/// It contains the ethereum RPC providers and lives for the live of the [`EventValidator`].
/// The [`SignedEventValidator`] is currently constructed on a per validation request basis
/// as it caches and drops events per batch.
time_event_verifier: Option<TimeEventValidator>,
time_event_verifier: TimeEventValidator,
}

impl EventValidator {
/// Create a new event validator
pub async fn try_new(pool: SqlitePool, ethereum_rpc_urls: Option<&[String]>) -> Result<Self> {
let time_event_verifier = if let Some(eth_urls) = ethereum_rpc_urls {
Some(
TimeEventValidator::try_new(eth_urls)
.await
.map_err(Error::new_fatal)?,
)
} else {
None
};
pub async fn try_new(
pool: SqlitePool,
ethereum_rpc_providers: Vec<EthRpcProvider>,
) -> Result<Self> {
let time_event_verifier = TimeEventValidator::new_with_providers(ethereum_rpc_providers);

Ok(Self {
pool,
Expand Down Expand Up @@ -205,35 +201,27 @@ impl EventValidator {
}

async fn validate_time_events(&self, events: TimeValidationBatch) -> Result<ValidatedEvents> {
if let Some(verifier) = &self.time_event_verifier {
let mut validated_events = ValidatedEvents::new_with_expected_valid(events.0.len());
for time_event in events.0 {
// TODO: better transient error handling from RPC client
match verifier
.validate_chain_inclusion(&self.pool, time_event.as_time())
.await
{
Ok(_t) => {
// TODO(AES-345): Someday, we will use `t.as_unix_ts()` and care about the actual timestamp, but for now we just consider it valid
validated_events.valid.push(time_event.into());
}
Err(err) => {
validated_events.invalid.push(Self::convert_inclusion_error(
err,
&time_event.as_inner().key,
));
}
let mut validated_events = ValidatedEvents::new_with_expected_valid(events.0.len());
for time_event in events.0 {
// TODO: better transient error handling from RPC client
match self
.time_event_verifier
.validate_chain_inclusion(&self.pool, time_event.as_time())
.await
{
Ok(_t) => {
// TODO(AES-345): Someday, we will use `t.as_unix_ts()` and care about the actual timestamp, but for now we just consider it valid
validated_events.valid.push(time_event.into());
}
Err(err) => {
validated_events.invalid.push(Self::convert_inclusion_error(
err,
&time_event.as_inner().key,
));
}
}
Ok(validated_events)
} else {
// we don't verify inclusion proofs if we don't have any RPC providers
Ok(ValidatedEvents {
valid: events.0.into_iter().map(ValidatedEvent::from).collect(),
unvalidated: Vec::new(),
invalid: Vec::new(),
})
}
Ok(validated_events)
}

/// Transforms the [`ChainInclusionError`] into a [`ValidationError`] with an appropriate message
Expand Down Expand Up @@ -302,7 +290,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(Some(&ValidationRequirement::new_recon()), events)
Expand All @@ -326,7 +314,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(Some(&ValidationRequirement::new_local()), events)
Expand All @@ -350,7 +338,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let events = get_validation_events().await;

let validated = EventValidator::try_new(pool, None)
let validated = EventValidator::try_new(pool, vec![])
.await
.unwrap()
.validate_events(None, events)
Expand Down
2 changes: 2 additions & 0 deletions event-svc/src/event/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ mod signed;

mod time;

pub use time::EthRpcProvider;

pub use event::{EventValidator, UnvalidatedEvent, ValidatedEvent, ValidatedEvents};
4 changes: 3 additions & 1 deletion event-svc/src/event/validator/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl Timestamp {
}
}

/// Provider for a remote Ethereum RPC endpoint.
pub type EthRpcProvider = Arc<dyn EthRpc + Send + Sync>;

pub struct TimeEventValidator {
Expand All @@ -80,6 +81,7 @@ impl std::fmt::Debug for TimeEventValidator {

impl TimeEventValidator {
/// Try to construct the validator by looking building the etherum rpc providers from the given URLsƒsw
#[allow(dead_code)]
pub async fn try_new(rpc_urls: &[String]) -> Result<Self> {
let mut chain_providers = HashMap::with_capacity(rpc_urls.len());
for url in rpc_urls {
Expand All @@ -105,7 +107,6 @@ impl TimeEventValidator {

/// Create from known providers (e.g. inject mocks)
/// Currently used in tests, may switch to this from service if we want to share RPC with anchoring.
#[allow(dead_code)]
pub fn new_with_providers(providers: Vec<EthRpcProvider>) -> Self {
Self {
chain_providers: HashMap::from_iter(
Expand Down Expand Up @@ -374,6 +375,7 @@ mod test {
#[async_trait::async_trait]
impl EthRpc for EthRpcProviderTest {
fn chain_id(&self) -> &caip2::ChainId;
fn url(&self) -> String;
async fn get_block_timestamp(&self, tx_hash: &str) -> Result<Option<ceramic_validation::eth_rpc::ChainTransaction>>;
}
}
Expand Down
2 changes: 2 additions & 0 deletions event-svc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod store;
#[cfg(test)]
mod tests;

pub use ceramic_validation::eth_rpc;
pub use error::Error;
pub use event::EthRpcProvider;
pub use event::{BlockStore, EventService};

pub(crate) type Result<T> = std::result::Result<T, Error>;
4 changes: 2 additions & 2 deletions event-svc/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ macro_rules! test_with_sqlite {
async fn [<$test_name _sqlite>]() {

let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap();
let store = $crate::EventService::try_new(conn, true, true, None).await.unwrap();
let store = $crate::EventService::try_new(conn, true, true, vec![]).await.unwrap();
$(
for stmt in $sql_stmts {
store.pool.run_statement(stmt).await.unwrap();
Expand Down Expand Up @@ -607,7 +607,7 @@ where
#[tokio::test]
async fn test_conclusion_events_since() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect_in_memory().await?;
let service = EventService::try_new(pool, false, false, None).await?;
let service = EventService::try_new(pool, false, false, vec![]).await?;
let test_events = generate_chained_events().await;

for event in test_events {
Expand Down
79 changes: 71 additions & 8 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::{path::PathBuf, time::Duration};

use anyhow::{anyhow, Context, Result};
use crate::{
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, Info,
LogOpts, Network,
};
use anyhow::{anyhow, bail, Context, Result};
use ceramic_anchor_remote::RemoteCas;
use ceramic_anchor_service::AnchorService;
use ceramic_core::NodeId;
use ceramic_event_svc::EventService;
use ceramic_event_svc::eth_rpc::HttpEthRpc;
use ceramic_event_svc::{EthRpcProvider, EventService};
use ceramic_interest_svc::InterestService;
use ceramic_kubo_rpc::Multiaddr;
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
Expand All @@ -18,11 +23,6 @@ use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext};
use tokio::sync::broadcast;
use tracing::{debug, info, warn};

use crate::{
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, Info,
LogOpts, Network,
};

#[derive(Args, Debug)]
pub struct DaemonOpts {
#[command(flatten)]
Expand Down Expand Up @@ -231,6 +231,53 @@ pub struct DaemonOpts {
requires = "experimental_features"
)]
anchor_poll_retry_count: u64,

/// Ethereum RPC URLs used for time events validation. Required when connecting to mainnet and uses fallback URLs if not specified for other networks.
#[arg(
long,
use_value_delimiter = true,
value_delimiter = ',',
env = "CERAMIC_ONE_ETHEREUM_RPC_URLS"
)]
ethereum_rpc_urls: Vec<String>,
}

async fn get_rpc_providers(
ethereum_rpc_urls: Vec<String>,
network: &Network,
) -> Result<Vec<EthRpcProvider>> {
let ethereum_rpc_urls = if ethereum_rpc_urls.is_empty() {
network.default_rpc_urls()?
} else {
ethereum_rpc_urls
};

let mut providers = Vec::new();
for url in ethereum_rpc_urls {
match HttpEthRpc::try_new(&url).await {
Ok(provider) => {
let provider: EthRpcProvider = Arc::new(provider);
let provider_chain = provider.chain_id();
if network
.supported_chain_ids()
.map_or(true, |ids| ids.contains(provider_chain))
{
providers.push(provider);
} else {
warn!("Eth RPC provider {} uses chainid {} which isn't supported by Ceramic network {:?}", url, provider_chain,network);
}
}
Err(err) => {
warn!("failed to create RCP client with url: '{url}': {err}");
dav1do marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

if providers.is_empty() {
bail!("No usable ethereum RPC configured");
}

Ok(providers)
}

// Start the daemon process
Expand Down Expand Up @@ -274,10 +321,26 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
// Construct sqlite_pool
let sqlite_pool = opts.db_opts.get_sqlite_pool().await?;

let rpc_providers = get_rpc_providers(opts.ethereum_rpc_urls, &opts.network).await?;

info!(
"Using ethereum rpc providers: {:?}",
rpc_providers
.iter()
.map(|provider| provider.url())
.collect::<Vec<_>>()
);

// Construct services from pool
let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone()));
let event_svc = Arc::new(
EventService::try_new(sqlite_pool.clone(), true, opts.event_validation, None).await?,
EventService::try_new(
sqlite_pool.clone(),
true,
opts.event_validation,
rpc_providers,
)
.await?,
);

let network = opts.network.to_network(&opts.local_network_id)?;
Expand Down
Loading
Loading