diff --git a/one/src/migrations.rs b/one/src/migrations.rs index b0c29cd7c..252fca3df 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -51,6 +51,10 @@ pub struct FromIpfsOpts { #[arg(long, env = "CERAMIC_ONE_LOCAL_NETWORK_ID")] local_network_id: Option, + /// Log information about tile documents found during the migration. + #[arg(long, env = "CERAMIC_ONE_LOG_TILE_DOCS", default_value_t = false)] + log_tile_docs: bool, + #[command(flatten)] log_opts: LogOpts, } @@ -93,7 +97,9 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { let blocks = FSBlockStore { input_ipfs_path: opts.input_ipfs_path, }; - db.event_store.migrate_from_ipfs(network, blocks).await?; + db.event_store + .migrate_from_ipfs(network, blocks, opts.log_tile_docs) + .await?; Ok(()) } diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs index 33bc8fce4..148ab8b0d 100644 --- a/service/src/event/migration.rs +++ b/service/src/event/migration.rs @@ -1,6 +1,6 @@ use std::collections::BTreeSet; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{anyhow, Context as _}; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated::{self, signed::cacao::Capability}; use cid::Cid; @@ -21,6 +21,7 @@ pub struct Migrator<'a, S> { network: Network, blocks: S, batch: Vec>, + log_tile_docs: bool, // All unsigned init payloads we have found. unsigned_init_payloads: BTreeSet, @@ -32,6 +33,7 @@ pub struct Migrator<'a, S> { referenced_unsigned_init_payloads: BTreeSet, error_count: usize, + tile_doc_count: usize, event_count: usize, } @@ -40,15 +42,18 @@ impl<'a, S: BlockStore> Migrator<'a, S> { service: &'a CeramicEventService, network: Network, blocks: S, - ) -> Result { + log_tile_docs: bool, + ) -> anyhow::Result { Ok(Self { network, service, blocks, + log_tile_docs, batch: Default::default(), unsigned_init_payloads: Default::default(), referenced_unsigned_init_payloads: Default::default(), error_count: 0, + tile_doc_count: 0, event_count: 0, }) } @@ -57,12 +62,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> { if let Some(block) = self.blocks.block_data(cid).await? { Ok(block) } else { - Err(AnalyzeError::MissingBlock(*cid).into()) + Err(Error::MissingBlock(*cid)) } } #[instrument(skip(self), ret(level = Level::DEBUG))] - pub async fn migrate(mut self) -> Result<()> { + pub async fn migrate(mut self) -> anyhow::Result<()> { const PROGRESS_COUNT: usize = 1_000; let mut all_blocks = self.blocks.blocks(); @@ -70,15 +75,33 @@ impl<'a, S: BlockStore> Migrator<'a, S> { while let Some((cid, data)) = all_blocks.try_next().await? { let ret = self.process_block(cid, &data).await; if let Err(err) = ret { - self.error_count += 1; - error!(%cid, err=format!("{err:#}"), "error processing block"); + let log = match err { + Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => { + self.tile_doc_count += 1; + self.log_tile_docs + } + Error::MissingBlock(_) | Error::Fatal(_) => { + self.error_count += 1; + true + } + }; + if log { + error!(%cid, err=format!("{err:#}"), "error processing block"); + } } if self.batch.len() > 1000 { self.write_batch().await? } count += 1; if count % PROGRESS_COUNT == 0 { - info!(last_block=%cid, count, error_count = self.error_count, "migrated blocks"); + info!( + last_block=%cid, + block_count = count, + event_count = self.event_count, + error_count = self.error_count, + tile_doc_count = self.tile_doc_count, + "migrated blocks" + ); } } self.write_batch().await?; @@ -88,6 +111,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { info!( event_count = self.event_count, error_count = self.error_count, + tile_doc_count = self.tile_doc_count, "migration finished" ); Ok(()) @@ -96,7 +120,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { // event. #[instrument(skip(self, data), ret(level = Level::DEBUG))] async fn process_block(&mut self, cid: Cid, data: &[u8]) -> Result<()> { - let event: Result, _> = serde_ipld_dagcbor::from_slice(data); + let event: Result> = + serde_ipld_dagcbor::from_slice(data).map_err(Error::new_fatal); match event { Ok(unvalidated::RawEvent::Unsigned(_)) => { self.unsigned_init_payloads.insert(cid); @@ -128,7 +153,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(&cid) .await .context("finding init event block")?; - let payload: unvalidated::init::Payload = serde_ipld_dagcbor::from_slice(&data)?; + let payload: unvalidated::init::Payload = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; let event_builder = EventBuilder::new( cid, payload.header().model().to_vec(), @@ -148,7 +174,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { async fn write_batch(&mut self) -> Result<()> { self.service .insert_events(&self.batch, DeliverableRequirement::Lazy) - .await?; + .await + .map_err(Error::new_fatal)?; self.event_count += self.batch.len(); self.batch.truncate(0); Ok(()) @@ -172,9 +199,11 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .context("decoding payload") .map_err(|err| { if self.is_tile_doc_data(&payload_data) { - anyhow!("found data Tile Document, skipping") + Error::FoundDataTileDoc(cid) + } else if self.is_tile_doc_init(&payload_data) { + Error::FoundInitTileDoc(cid) } else { - anyhow!("{err}") + Error::new_fatal(err) } })?; let event_builder = match &payload { @@ -205,7 +234,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .await .context("finding capability block")?; // Parse capability to ensure it is valid - let cap: Capability = serde_ipld_dagcbor::from_slice(&data)?; + let cap: Capability = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; debug!(%capability_cid, ?cap, "capability"); capability = Some((capability_cid, cap)); } @@ -241,10 +271,21 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(cid) .await .context("finding init payload block")?; - let init: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&init_data).context("decoding init envelope")?; + let init: unvalidated::RawEvent = serde_ipld_dagcbor::from_slice(&init_data) + .context("decoding init envelope") + .map_err(|err| { + if self.is_tile_doc_init(&init_data) { + Error::FoundInitTileDoc(*cid) + } else { + Error::new_fatal(err) + } + })?; match init { - unvalidated::RawEvent::Time(_) => bail!("init event must not be a time event"), + unvalidated::RawEvent::Time(_) => { + return Err(Error::new_fatal(anyhow!( + "init event must not be a time event" + ))) + } unvalidated::RawEvent::Signed(init) => { let init_payload_data = self .load_block( @@ -259,9 +300,9 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .context("decoding init payload") .map_err(|err| { if self.is_tile_doc_init(&init_payload_data) { - anyhow!("found init Tile Document, skipping") + Error::FoundInitTileDoc(*cid) } else { - anyhow!("{err}") + Error::new_fatal(err) } }) } @@ -288,7 +329,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .load_block(&proof_id) .await .context("finding proof block")?; - let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data)?; + let proof: unvalidated::Proof = + serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; let mut curr = proof.root(); let mut proof_edges = Vec::new(); for index in event.path().split('/') { @@ -324,10 +366,23 @@ impl<'a, S: BlockStore> Migrator<'a, S> { } } +type Result = std::result::Result; + #[derive(Error, Debug)] -enum AnalyzeError { +enum Error { #[error("missing linked block from event {0}")] MissingBlock(Cid), + #[error("block is an init tile document: {0}")] + FoundInitTileDoc(Cid), + #[error("block is a data tile document: {0}")] + FoundDataTileDoc(Cid), + #[error("fatal error: {0:#}")] + Fatal(#[from] anyhow::Error), +} +impl Error { + fn new_fatal(err: impl Into) -> Self { + Self::from(err.into()) + } } struct EventBuilder { diff --git a/service/src/event/service.rs b/service/src/event/service.rs index f8106742d..fdb8cfaca 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -97,8 +97,13 @@ impl CeramicEventService { .await } - pub async fn migrate_from_ipfs(&self, network: Network, blocks: impl BlockStore) -> Result<()> { - let migrator = Migrator::new(self, network, blocks) + pub async fn migrate_from_ipfs( + &self, + network: Network, + blocks: impl BlockStore, + log_tile_docs: bool, + ) -> Result<()> { + let migrator = Migrator::new(self, network, blocks, log_tile_docs) .await .map_err(Error::new_fatal)?; migrator.migrate().await.map_err(Error::new_fatal)?; diff --git a/service/src/tests/migration.rs b/service/src/tests/migration.rs index 4060c034f..4ea82c40c 100644 --- a/service/src/tests/migration.rs +++ b/service/src/tests/migration.rs @@ -60,7 +60,7 @@ async fn test_migration(cars: Vec>) { .await .unwrap(); service - .migrate_from_ipfs(Network::Local(42), blocks) + .migrate_from_ipfs(Network::Local(42), blocks, false) .await .unwrap(); let actual_events: BTreeSet<_> = recon::Store::range_with_values(