diff --git a/core/src/stream_id.rs b/core/src/stream_id.rs index 372b1061..06de2f36 100644 --- a/core/src/stream_id.rs +++ b/core/src/stream_id.rs @@ -13,7 +13,7 @@ use unsigned_varint::{decode, encode}; /// Defined here: /// https://cips.ceramic.network/tables/streamtypes.csv #[repr(u64)] -#[derive(Copy, Clone, Debug, Eq, IntEnum, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, IntEnum, PartialEq, Hash)] pub enum StreamIdType { /// A stream type representing a json document /// https://cips.ceramic.network/CIPs/cip-8 @@ -45,7 +45,7 @@ impl Serialize for StreamIdType { } /// A stream id, which is a cid with a type -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct StreamId { /// The type of the stream pub r#type: StreamIdType, diff --git a/event-svc/src/event/migration.rs b/event-svc/src/event/migration.rs index 94c8e068..9bf46e75 100644 --- a/event-svc/src/event/migration.rs +++ b/event-svc/src/event/migration.rs @@ -1,7 +1,10 @@ -use std::collections::BTreeSet; +use std::{ + collections::{BTreeSet, HashMap}, + fmt::Display, +}; use anyhow::{anyhow, Context as _}; -use ceramic_core::{EventId, Network}; +use ceramic_core::{EventId, Network, StreamId}; use ceramic_event::unvalidated::{self, signed::cacao::Capability}; use cid::Cid; use futures::TryStreamExt; @@ -9,6 +12,7 @@ use ipld_core::ipld::Ipld; use recon::ReconItem; use serde::Deserialize; use thiserror::Error; +use tokio::{fs::File, io::AsyncWriteExt as _}; use tracing::{debug, error, info, instrument, Level}; use crate::{ @@ -35,6 +39,7 @@ pub struct Migrator<'a, S> { error_count: usize, tile_doc_count: usize, event_count: usize, + model_error_counts: HashMap, } impl<'a, S: BlockStore> Migrator<'a, S> { @@ -55,6 +60,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { error_count: 0, tile_doc_count: 0, event_count: 0, + model_error_counts: Default::default(), }) } @@ -65,6 +71,29 @@ impl<'a, S: BlockStore> Migrator<'a, S> { Err(Error::MissingBlock(*cid)) } } + fn handle_error(&mut self, cid: Cid, err: &Error) { + let log = match err { + Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => { + self.tile_doc_count += 1; + self.log_tile_docs + } + Error::MissingBlock(_) | Error::Fatal(_) => { + self.error_count += 1; + true + } + Error::Context(model, err) => { + self.model_error_counts + .entry(model.to_owned()) + .and_modify(|count| *count += 1) + .or_insert(1); + self.handle_error(cid, err); + false + } + }; + if log { + error!(%cid, err=format!("{err:#}"), "error processing block"); + } + } #[instrument(skip(self), ret(level = Level::DEBUG))] pub async fn migrate(mut self) -> anyhow::Result<()> { @@ -75,19 +104,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { while let Some((cid, data)) = all_blocks.try_next().await? { let ret = self.process_block(cid, &data).await; if let Err(err) = ret { - let log = match err { - Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => { - self.tile_doc_count += 1; - self.log_tile_docs - } - Error::MissingBlock(_) | Error::Fatal(_) => { - self.error_count += 1; - true - } - }; - if log { - error!(%cid, err=format!("{err:#}"), "error processing block"); - } + self.handle_error(cid, &err) } if self.batch.len() > 1000 { self.write_batch().await? @@ -114,6 +131,16 @@ impl<'a, S: BlockStore> Migrator<'a, S> { tile_doc_count = self.tile_doc_count, "migration finished" ); + // Write out model error counts + const CSV_FILE_PATH: &str = "model_error_counts.csv"; + let mut model_csv = File::create(CSV_FILE_PATH).await?; + model_csv.write_all(b"model,count\n").await?; + for (model, count) in self.model_error_counts { + model_csv + .write_all(format!("{model},{count}\n").as_bytes()) + .await?; + } + info!(path = CSV_FILE_PATH, "wrote error counts by model"); Ok(()) } // Decodes the block and if it is a Ceramic event, it and related blocks are constructed into an @@ -155,6 +182,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { .context("finding init event block")?; let payload: unvalidated::init::Payload = serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; + let model = ModelContext::from(payload.header().model()); let event_builder = EventBuilder::new( cid, payload.header().model().to_vec(), @@ -163,8 +191,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> { ); let event = unvalidated::init::Event::new(payload); let event: unvalidated::Event = unvalidated::Event::from(Box::new(event)); - self.batch - .push(event_builder.build(&self.network, event).await?); + self.batch.push( + event_builder + .build(&self.network, event) + .await + .with_model_context(&model)?, + ); if self.batch.len() > 1000 { self.write_batch().await? } @@ -227,23 +259,32 @@ impl<'a, S: BlockStore> Migrator<'a, S> { ) } }; + + let model = ModelContext::from(event_builder.sep.as_slice()); + let mut capability = None; if let Some(capability_cid) = event.capability() { debug!(%capability_cid, "found cap chain"); let data = self .load_block(&capability_cid) .await - .context("finding capability block")?; + .context("finding capability block") + .with_model_context(&model)?; // Parse capability to ensure it is valid - let cap: Capability = - serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; + let cap: Capability = serde_ipld_dagcbor::from_slice(&data) + .context("decoding capability block") + .with_model_context(&model)?; debug!(%capability_cid, ?cap, "capability"); capability = Some((capability_cid, cap)); } let s = unvalidated::signed::Event::new(cid, event, link, payload, capability); let event = unvalidated::Event::from(s); - self.batch - .push(event_builder.build(&self.network, event).await?); + self.batch.push( + event_builder + .build(&self.network, event) + .await + .with_model_context(&model)?, + ); Ok(()) } fn is_tile_doc_init(&self, data: &[u8]) -> bool { @@ -319,6 +360,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { ) -> Result<()> { let init = event.id(); let init_payload = self.find_init_payload(&event.id()).await?; + let model = ModelContext::from(init_payload.header().model()); let event_builder = EventBuilder::new( cid, init_payload.header().model().to_vec(), @@ -329,9 +371,11 @@ impl<'a, S: BlockStore> Migrator<'a, S> { let data = self .load_block(&proof_id) .await - .context("finding proof block")?; - let proof: unvalidated::Proof = - serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?; + .context("finding proof block") + .with_model_context(&model)?; + let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data) + .context("decoding proof block") + .with_model_context(&model)?; let mut curr = proof.root(); let mut proof_edges = Vec::new(); for index in event.path().split('/') { @@ -340,13 +384,18 @@ impl<'a, S: BlockStore> Migrator<'a, S> { // That block should already be included independently no need to include it here. break; } - let idx: usize = index.parse().context("parsing path segment as index")?; + let idx: usize = index + .parse() + .context("parsing path segment as index") + .with_model_context(&model)?; let data = self .load_block(&curr) .await - .context("finding witness block")?; - let edge: unvalidated::ProofEdge = - serde_ipld_dagcbor::from_slice(&data).context("dag cbor decode")?; + .context("finding witness block") + .with_model_context(&model)?; + let edge: unvalidated::ProofEdge = serde_ipld_dagcbor::from_slice(&data) + .context("decoding proof edge") + .with_model_context(&model)?; // Follow path let maybe_link = edge.get(idx).cloned(); @@ -361,8 +410,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> { } let time = unvalidated::TimeEvent::new(event, proof, proof_edges); let event: unvalidated::Event = unvalidated::Event::from(Box::new(time)); - self.batch - .push(event_builder.build(&self.network, event).await?); + self.batch.push( + event_builder + .build(&self.network, event) + .await + .with_model_context(&model)?, + ); Ok(()) } } @@ -379,13 +432,54 @@ pub enum Error { FoundDataTileDoc(Cid), #[error("fatal error: {0:#}")] Fatal(#[from] anyhow::Error), + #[error("error for model {0}: {1}")] + Context(ModelContext, Box), } + impl Error { fn new_fatal(err: impl Into) -> Self { Self::from(err.into()) } } +trait WithModelContext { + type Target; + fn with_model_context(self, model: &ModelContext) -> Self::Target; +} + +impl WithModelContext for std::result::Result +where + E: Into, +{ + type Target = std::result::Result; + + fn with_model_context(self, model: &ModelContext) -> std::result::Result { + match self { + Ok(ok) => Ok(ok), + Err(err) => Err(Error::Context(model.clone(), Box::new(err.into()))), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct ModelContext(Option); + +impl From<&[u8]> for ModelContext { + fn from(value: &[u8]) -> Self { + Self(StreamId::try_from(value).ok()) + } +} + +impl Display for ModelContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(model) = &self.0 { + write!(f, "{model}") + } else { + write!(f, "UNKNOWN") + } + } +} + struct EventBuilder { event_cid: Cid, sep: Vec,