From c82a685ff5d350b3d7181f04ff5bfa20b62f82ec Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 6 Sep 2024 08:06:45 -0600 Subject: [PATCH] fix: updates detection of tile docs and doesn't log them by default (#514) * fix: updates detection of tile docs and doesn't log them by default With this change we catch two more cases where we can detect tile docs. Additionally by default we only count the number of tile docs we find and do not log any errors about them. This way any tile docs do not create a feeling of failure when migrating. * make it clear default is to not log tile docs * fix clippy and test --- one/src/migrations.rs | 8 ++- service/src/event/migration.rs | 95 +++++++++++++++++++++++++++------- service/src/event/service.rs | 9 +++- service/src/tests/migration.rs | 2 +- 4 files changed, 90 insertions(+), 24 deletions(-) diff --git a/one/src/migrations.rs b/one/src/migrations.rs index b0c29cd7c..252fca3df 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -51,6 +51,10 @@ pub struct FromIpfsOpts { #[arg(long, env = "CERAMIC_ONE_LOCAL_NETWORK_ID")] local_network_id: Option, + /// Log information about tile documents found during the migration. + #[arg(long, env = "CERAMIC_ONE_LOG_TILE_DOCS", default_value_t = false)] + log_tile_docs: bool, + #[command(flatten)] log_opts: LogOpts, } @@ -93,7 +97,9 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { let blocks = FSBlockStore { input_ipfs_path: opts.input_ipfs_path, }; - db.event_store.migrate_from_ipfs(network, blocks).await?; + db.event_store + .migrate_from_ipfs(network, blocks, opts.log_tile_docs) + .await?; Ok(()) } diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs index 33bc8fce4..148ab8b0d 100644 --- a/service/src/event/migration.rs +++ b/service/src/event/migration.rs @@ -1,6 +1,6 @@ use std::collections::BTreeSet; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{anyhow, Context as _}; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated::{self, signed::cacao::Capability}; use cid::Cid; @@ -21,6 +21,7 @@ pub struct Migrator<'a, S> { network: Network, blocks: S, batch: Vec>, + log_tile_docs: bool, // All unsigned init payloads we have found. unsigned_init_payloads: BTreeSet, @@ -32,6 +33,7 @@ pub struct Migrator<'a, S> { referenced_unsigned_init_payloads: BTreeSet, error_count: usize, + tile_doc_count: usize, event_count: usize, } @@ -40,15 +42,18 @@ impl<'a, S: BlockStore> Migrator<'a, S> { service: &'a CeramicEventService, network: Network, blocks: S, - ) -> Result { + log_tile_docs: bool, + ) -> anyhow::Result { Ok(Self { network, service, blocks, + log_tile_docs, batch: Default::default(), unsigned_init_payloads: Default::default(), referenced_unsigned_init_payloads: Default::default(), error_count: 0, + tile_doc_count: 0, event_count: 0, }) } @@ -57,12 +62,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> { if let Some(block) = self.blocks.block_data(cid).await? { Ok(block) } else { - Err(AnalyzeError::MissingBlock(*cid).into()) + Err(Error::MissingBlock(*cid)) } } #[instrument(skip(self), ret(level = Level::DEBUG))] - pub async fn migrate(mut self) -> Result<()> { + pub async fn migrate(mut self) -> anyhow::Result<()> { const PROGRESS_COUNT: usize = 1_000; let mut all_blocks = self.blocks.blocks(); @@ -70,15 +75,33 @@ impl<'a, S: BlockStore> Migrator<'a, S> { while let Some((cid, data)) = all_blocks.try_next().await? { let ret = self.process_block(cid, &data).await; if let Err(err) = ret { - self.error_count += 1; - error!(%cid, err=format!("{err:#}"), "error processing block"); + let log = match err { + Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => { + self.tile_doc_count += 1; + self.log_tile_docs + } + Error::MissingBlock(_) | Error::Fatal(_) => { + self.error_count += 1; + true + } + }; + if log { + error!(%cid, err=format!("{err:#}"), "error processing block"); + } } if self.batch.len() > 1000 { self.write_batch().await? } count += 1; if count % PROGRESS_COUNT == 0 { - info!(last_block=%cid, count, error_count = self.error_count, "migrated blocks"); + info!( + last_block=%cid, + block_count = count, + event_count = self.event_count, + error_count = self.error_count, + tile_doc_count = self.tile_doc_count, + "migrated blocks" + ); } } self.write_batch().await?; @@ -88,6 +111,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { info!( event_count = self.event_count, error_count = self.error_count, + tile_doc_count = self.tile_doc_count, "migration finished" ); Ok(()) @@ -96,7 +120,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { // event. #[instrument(skip(self, data), ret(level = Level::DEBUG))] async fn process_block(&mut self, cid: Cid, data: &[u8]) -> Result<()> { - let event: Result, _> = serde_ipld_dagcbor::from_slice(data); + let event: Result> = + serde_ipld_dagcbor::from_slice(data).map_err(Error::new_fatal); match event { Ok(unvalidated::RawEvent::Unsigned(_)) => { self.unsigned_init_payloads.insert(cid); @@ -128,7 +153,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(&cid) .await .context("finding init event block")?; - let payload: unvalidated::init::Payload = serde_ipld_dagcbor::from_slice(&data)?; + let payload: unvalidated::init::Payload = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; let event_builder = EventBuilder::new( cid, payload.header().model().to_vec(), @@ -148,7 +174,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { async fn write_batch(&mut self) -> Result<()> { self.service .insert_events(&self.batch, DeliverableRequirement::Lazy) - .await?; + .await + .map_err(Error::new_fatal)?; self.event_count += self.batch.len(); self.batch.truncate(0); Ok(()) @@ -172,9 +199,11 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .context("decoding payload") .map_err(|err| { if self.is_tile_doc_data(&payload_data) { - anyhow!("found data Tile Document, skipping") + Error::FoundDataTileDoc(cid) + } else if self.is_tile_doc_init(&payload_data) { + Error::FoundInitTileDoc(cid) } else { - anyhow!("{err}") + Error::new_fatal(err) } })?; let event_builder = match &payload { @@ -205,7 +234,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .await .context("finding capability block")?; // Parse capability to ensure it is valid - let cap: Capability = serde_ipld_dagcbor::from_slice(&data)?; + let cap: Capability = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; debug!(%capability_cid, ?cap, "capability"); capability = Some((capability_cid, cap)); } @@ -241,10 +271,21 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(cid) .await .context("finding init payload block")?; - let init: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&init_data).context("decoding init envelope")?; + let init: unvalidated::RawEvent = serde_ipld_dagcbor::from_slice(&init_data) + .context("decoding init envelope") + .map_err(|err| { + if self.is_tile_doc_init(&init_data) { + Error::FoundInitTileDoc(*cid) + } else { + Error::new_fatal(err) + } + })?; match init { - unvalidated::RawEvent::Time(_) => bail!("init event must not be a time event"), + unvalidated::RawEvent::Time(_) => { + return Err(Error::new_fatal(anyhow!( + "init event must not be a time event" + ))) + } unvalidated::RawEvent::Signed(init) => { let init_payload_data = self .load_block( @@ -259,9 +300,9 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .context("decoding init payload") .map_err(|err| { if self.is_tile_doc_init(&init_payload_data) { - anyhow!("found init Tile Document, skipping") + Error::FoundInitTileDoc(*cid) } else { - anyhow!("{err}") + Error::new_fatal(err) } }) } @@ -288,7 +329,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(&proof_id) .await .context("finding proof block")?; - let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data)?; + let proof: unvalidated::Proof = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; let mut curr = proof.root(); let mut proof_edges = Vec::new(); for index in event.path().split('/') { @@ -324,10 +366,23 @@ impl<'a, S: BlockStore> Migrator<'a, S> { } } +type Result = std::result::Result; + #[derive(Error, Debug)] -enum AnalyzeError { +enum Error { #[error("missing linked block from event {0}")] MissingBlock(Cid), + #[error("block is an init tile document: {0}")] + FoundInitTileDoc(Cid), + #[error("block is a data tile document: {0}")] + FoundDataTileDoc(Cid), + #[error("fatal error: {0:#}")] + Fatal(#[from] anyhow::Error), +} +impl Error { + fn new_fatal(err: impl Into) -> Self { + Self::from(err.into()) + } } struct EventBuilder { diff --git a/service/src/event/service.rs b/service/src/event/service.rs index f8106742d..fdb8cfaca 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -97,8 +97,13 @@ impl CeramicEventService { .await } - pub async fn migrate_from_ipfs(&self, network: Network, blocks: impl BlockStore) -> Result<()> { - let migrator = Migrator::new(self, network, blocks) + pub async fn migrate_from_ipfs( + &self, + network: Network, + blocks: impl BlockStore, + log_tile_docs: bool, + ) -> Result<()> { + let migrator = Migrator::new(self, network, blocks, log_tile_docs) .await .map_err(Error::new_fatal)?; migrator.migrate().await.map_err(Error::new_fatal)?; diff --git a/service/src/tests/migration.rs b/service/src/tests/migration.rs index 4060c034f..4ea82c40c 100644 --- a/service/src/tests/migration.rs +++ b/service/src/tests/migration.rs @@ -60,7 +60,7 @@ async fn test_migration(cars: Vec>) { .await .unwrap(); service - .migrate_from_ipfs(Network::Local(42), blocks) + .migrate_from_ipfs(Network::Local(42), blocks, false) .await .unwrap(); let actual_events: BTreeSet<_> = recon::Store::range_with_values(