Skip to content

Commit

Permalink
feat: groundwork for event validation and some clean up (#505)
Browse files Browse the repository at this point in the history
* chore: expose validation crate as workspace dependency

* chore: put event in arc to allow cloning

we don't want to mutate events and this makes caching/etc easier

* chore: fix benchmark from recent changes in main

* feat: put event validation behind feature flag

Not sure how to best represent this and a bool is not fun but it's intended to be temporary until we feel confident that event validation is working
  • Loading branch information
dav1do authored Aug 30, 2024
1 parent cde8736 commit 9d1006c
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
ceramic-service = { path = "./service" }
ceramic-store = { path = "./store" }
ceramic-validation = { path = "./validation" }
chrono = "0.4.31"
cid = { version = "0.11", features = ["serde-codec"] }
clap = { version = "4", features = ["derive", "env", "string"] }
Expand Down
3 changes: 3 additions & 0 deletions one/src/feature_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result};
pub(crate) enum ExperimentalFeatureFlags {
None,
Authentication,
EventValidation,
}

impl std::str::FromStr for ExperimentalFeatureFlags {
Expand All @@ -13,6 +14,7 @@ impl std::str::FromStr for ExperimentalFeatureFlags {
match value.to_ascii_lowercase().as_str() {
"none" => Ok(ExperimentalFeatureFlags::None),
"authentication" => Ok(ExperimentalFeatureFlags::Authentication),
"event-validation" => Ok(ExperimentalFeatureFlags::EventValidation),
_ => Err(anyhow!("invalid value")),
}
}
Expand All @@ -23,6 +25,7 @@ impl std::fmt::Display for ExperimentalFeatureFlags {
match self {
ExperimentalFeatureFlags::None => write!(f, "none"),
ExperimentalFeatureFlags::Authentication => write!(f, "authentication"),
ExperimentalFeatureFlags::EventValidation => write!(f, "event-validation"),
}
}
}
Expand Down
25 changes: 20 additions & 5 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ type ModelInterest = ReconInterestProvider<Sha256a>;

impl DBOpts {
/// This function will create the database directory if it does not exist.
async fn get_database(&self, process_undelivered: bool) -> Result<Databases> {
async fn get_database(
&self,
process_undelivered: bool,
validate_events: bool,
) -> Result<Databases> {
match tokio::fs::create_dir_all(&self.store_dir).await {
Ok(_) => {}
Err(err) => match err.kind() {
Expand All @@ -318,13 +322,17 @@ impl DBOpts {
},
}
let sql_db_path = self.store_dir.join("db.sqlite3").display().to_string();
Self::build_sqlite_dbs(&sql_db_path, process_undelivered).await
Self::build_sqlite_dbs(&sql_db_path, process_undelivered, validate_events).await
}

async fn build_sqlite_dbs(path: &str, process_undelivered: bool) -> Result<Databases> {
async fn build_sqlite_dbs(
path: &str,
process_undelivered: bool,
validate_events: bool,
) -> Result<Databases> {
let sql_pool =
ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?;
let ceramic_service = CeramicService::try_new(sql_pool).await?;
let ceramic_service = CeramicService::try_new(sql_pool, validate_events).await?;
let interest_store = ceramic_service.interest_service().to_owned();
let event_store = ceramic_service.event_service().to_owned();
if process_undelivered {
Expand Down Expand Up @@ -382,7 +390,14 @@ impl Daemon {
exe_hash = info.exe_hash,
);
debug!(?opts, "using daemon options");
let db = opts.db_opts.get_database(true).await?;
let db = opts
.db_opts
.get_database(
true,
opts.experimental_feature_flags
.contains(&ExperimentalFeatureFlags::EventValidation),
)
.await?;

// we should be able to consolidate the Store traits now that they all rely on &self, but for now we use
// static dispatch and require compile-time type information, so we pass all the types we need in, even
Expand Down
3 changes: 2 additions & 1 deletion one/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub async fn migrate(cmd: EventsCommand) -> Result<()> {
async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> {
let network = opts.network.to_network(&opts.local_network_id)?;
let db_opts: DBOpts = (&opts).into();
let crate::Databases::Sqlite(db) = db_opts.get_database(false).await?;
// TODO: feature flags here? or just remove this entirely when enabling
let crate::Databases::Sqlite(db) = db_opts.get_database(false, false).await?;
let blocks = FSBlockStore {
input_ipfs_path: opts.input_ipfs_path,
};
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ mod tests {
let sql_pool = SqlitePool::connect_in_memory().await.unwrap();

let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default());
let store = Arc::new(CeramicEventService::new(sql_pool).await?);
let store = Arc::new(CeramicEventService::new(sql_pool, true).await?);
store.process_all_undelivered_events().await?;
let mut p2p = Node::new(
network_config,
Expand Down
14 changes: 12 additions & 2 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const PENDING_EVENTS_CHANNEL_DEPTH: usize = 1_000_000;
/// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::EventId`].
pub struct CeramicEventService {
pub(crate) pool: SqlitePool,
_validate_events: bool,
delivery_task: DeliverableTask,
}
/// An object that represents a set of blocks that can produce a stream of all blocks and lookup a
Expand Down Expand Up @@ -66,17 +67,26 @@ pub enum DeliverableRequirement {

impl CeramicEventService {
/// Create a new CeramicEventStore
pub async fn new(pool: SqlitePool) -> Result<Self> {
pub async fn new(pool: SqlitePool, _validate_events: bool) -> Result<Self> {
CeramicOneEvent::init_delivered_order(&pool).await?;

let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await;

Ok(Self {
pool,
_validate_events,
delivery_task,
})
}

/// Create a new CeramicEventStore with event validation enabled
/// This is likely temporary and only used in tests to avoid adding the bool now and then deleting it
/// in the next pass.. but it's basically same same but different.
#[allow(dead_code)]
pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result<Self> {
Self::new(pool, true).await
}

/// Returns the number of undelivered events that were updated
pub async fn process_all_undelivered_events(&self) -> Result<usize> {
OrderingTask::process_all_undelivered_events(
Expand Down Expand Up @@ -164,7 +174,7 @@ impl CeramicEventService {
let mut signed_events = Vec::with_capacity(parsed_events.len());
let mut time_events = Vec::with_capacity(parsed_events.len());
for event in parsed_events {
match event.event() {
match event.event().as_ref() {
Event::Time(_) => {
time_events.push(event);
}
Expand Down
4 changes: 2 additions & 2 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub struct CeramicService {

impl CeramicService {
/// Create a new CeramicService and process undelivered events if requested
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
pub async fn try_new(pool: SqlitePool, enable_event_validation: bool) -> Result<Self> {
// In the future, we may need to check the previous version to make sure we're not downgrading and risking data loss
CeramicOneVersion::insert_current(&pool).await?;
let interest = Arc::new(CeramicInterestService::new(pool.clone()));
let event = Arc::new(CeramicEventService::new(pool).await?);
let event = Arc::new(CeramicEventService::new(pool, enable_event_validation).await?);
Ok(Self { interest, event })
}

Expand Down
2 changes: 1 addition & 1 deletion service/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ macro_rules! test_with_sqlite {
async fn [<$test_name _sqlite>]() {

let conn = ceramic_store::SqlitePool::connect_in_memory().await.unwrap();
let store = $crate::CeramicEventService::new(conn).await.unwrap();
let store = $crate::CeramicEventService::new_with_event_validation(conn).await.unwrap();
store.process_all_undelivered_events().await.unwrap();
$(
for stmt in $sql_stmts {
Expand Down
4 changes: 3 additions & 1 deletion service/src/tests/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ async fn test_migration(cars: Vec<Vec<u8>>) {
let conn = ceramic_store::SqlitePool::connect_in_memory()
.await
.unwrap();
let service = CeramicEventService::new(conn).await.unwrap();
let service = CeramicEventService::new_with_event_validation(conn)
.await
.unwrap();
service
.migrate_from_ipfs(Network::Local(42), blocks)
.await
Expand Down
4 changes: 3 additions & 1 deletion service/src/tests/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ async fn setup_service() -> CeramicEventService {
.await
.unwrap();

CeramicEventService::new(conn).await.unwrap()
CeramicEventService::new_with_event_validation(conn)
.await
.unwrap()
}

async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<EventId>) {
Expand Down
21 changes: 9 additions & 12 deletions store/benches/sqlite_store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::str::FromStr;

use ceramic_core::{DidDocument, EventId, Network, StreamId};
use ceramic_core::{Cid, DidDocument, EventId, Network, StreamId};
use ceramic_event::unvalidated::{
self,
signed::{self, Signer},
Builder,
};
use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use criterion2::{criterion_group, criterion_main, BatchSize, Criterion};
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use rand::RngCore;

Expand All @@ -27,7 +28,7 @@ async fn generate_init_event(
model: &StreamId,
data: &[u8],
signer: impl Signer,
) -> (EventId, Vec<u8>) {
) -> (EventId, Cid, unvalidated::Event<Ipld>) {
let data = ipld_core::ipld!({
"raw": data,
});
Expand All @@ -38,16 +39,16 @@ async fn generate_init_event(
.build();
let signed = signed::Event::from_payload(unvalidated::Payload::Init(init), signer).unwrap();
let cid = signed.envelope_cid();
let data = signed.encode_car().await.unwrap();
let id = EventId::new(
&Network::DevUnstable,
"model",
&model.to_vec(),
"did:key:z6MkgSV3tAuw7gUWqKCUY7ae6uWNxqYgdwPhUJbJhF9EFXm9",
&cid,
&cid,
cid,
cid,
);
(id, data)

(id, *cid, unvalidated::Event::from(signed))
}

const INSERTION_COUNT: usize = 10_000;
Expand All @@ -74,12 +75,8 @@ async fn model_setup(tpe: ModelType, cnt: usize) -> ModelSetup {
};
rand::thread_rng().fill_bytes(&mut data);

let init = generate_init_event(&model, &data, signer.clone()).await;
events.push(
EventInsertable::try_from_carfile(init.0, &init.1)
.await
.unwrap(),
);
let (order_key, cid, event) = generate_init_event(&model, &data, signer.clone()).await;
events.push(EventInsertable::new(order_key, cid, event, true).unwrap());
}

let pool = SqlitePool::connect_in_memory().await.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions store/src/sql/entities/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeSet;
use std::{collections::BTreeSet, sync::Arc};

use anyhow::anyhow;
use ceramic_car::{CarHeader, CarReader, CarWriter};
Expand Down Expand Up @@ -51,7 +51,7 @@ pub struct EventInsertable {
/// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event
deliverable: bool,
/// The parsed structure containing the actual Event data.
event: unvalidated::Event<Ipld>,
event: Arc<unvalidated::Event<Ipld>>,
}

impl EventInsertable {
Expand Down Expand Up @@ -81,7 +81,7 @@ impl EventInsertable {
order_key,
cid,
deliverable,
event,
event: Arc::new(event),
})
}

Expand All @@ -96,7 +96,7 @@ impl EventInsertable {
}

/// Get the parsed Event structure.
pub fn event(&self) -> &unvalidated::Event<Ipld> {
pub fn event(&self) -> &Arc<unvalidated::Event<Ipld>> {
&self.event
}

Expand Down

0 comments on commit 9d1006c

Please sign in to comment.