Skip to content

Commit

Permalink
feat: add logic to migration to count errors by model
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 26, 2024
1 parent d21ab8a commit b4bb8e0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 33 deletions.
4 changes: 2 additions & 2 deletions core/src/stream_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use unsigned_varint::{decode, encode};
/// Defined here:
/// https://cips.ceramic.network/tables/streamtypes.csv
#[repr(u64)]
#[derive(Copy, Clone, Debug, Eq, IntEnum, PartialEq)]
#[derive(Copy, Clone, Debug, Eq, IntEnum, PartialEq, Hash)]
pub enum StreamIdType {
/// A stream type representing a json document
/// https://cips.ceramic.network/CIPs/cip-8
Expand Down Expand Up @@ -45,7 +45,7 @@ impl Serialize for StreamIdType {
}

/// A stream id, which is a cid with a type
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct StreamId {
/// The type of the stream
pub r#type: StreamIdType,
Expand Down
156 changes: 125 additions & 31 deletions event-svc/src/event/migration.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::BTreeSet;
use std::{
collections::{BTreeSet, HashMap},
fmt::Display,
};

use anyhow::{anyhow, Context as _};
use ceramic_core::{EventId, Network};
use ceramic_core::{EventId, Network, StreamId};
use ceramic_event::unvalidated::{self, signed::cacao::Capability};
use cid::Cid;
use futures::TryStreamExt;
use ipld_core::ipld::Ipld;
use recon::ReconItem;
use serde::Deserialize;
use thiserror::Error;
use tokio::{fs::File, io::AsyncWriteExt as _};
use tracing::{debug, error, info, instrument, Level};

use crate::{
Expand All @@ -35,6 +39,7 @@ pub struct Migrator<'a, S> {
error_count: usize,
tile_doc_count: usize,
event_count: usize,
model_error_counts: HashMap<ModelContext, usize>,
}

impl<'a, S: BlockStore> Migrator<'a, S> {
Expand All @@ -55,6 +60,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
error_count: 0,
tile_doc_count: 0,
event_count: 0,
model_error_counts: Default::default(),
})
}

Expand All @@ -65,6 +71,29 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
Err(Error::MissingBlock(*cid))
}
}
fn handle_error(&mut self, cid: Cid, err: &Error) {
let log = match err {
Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => {
self.tile_doc_count += 1;
self.log_tile_docs
}
Error::MissingBlock(_) | Error::Fatal(_) => {
self.error_count += 1;
true
}
Error::Context(model, err) => {
self.model_error_counts
.entry(model.to_owned())
.and_modify(|count| *count += 1)
.or_insert(1);
self.handle_error(cid, err);
false
}
};
if log {
error!(%cid, err=format!("{err:#}"), "error processing block");
}
}

#[instrument(skip(self), ret(level = Level::DEBUG))]
pub async fn migrate(mut self) -> anyhow::Result<()> {
Expand All @@ -75,19 +104,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
while let Some((cid, data)) = all_blocks.try_next().await? {
let ret = self.process_block(cid, &data).await;
if let Err(err) = ret {
let log = match err {
Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => {
self.tile_doc_count += 1;
self.log_tile_docs
}
Error::MissingBlock(_) | Error::Fatal(_) => {
self.error_count += 1;
true
}
};
if log {
error!(%cid, err=format!("{err:#}"), "error processing block");
}
self.handle_error(cid, &err)
}
if self.batch.len() > 1000 {
self.write_batch().await?
Expand All @@ -114,6 +131,16 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
tile_doc_count = self.tile_doc_count,
"migration finished"
);
// Write out model error counts
const CSV_FILE_PATH: &str = "model_error_counts.csv";
let mut model_csv = File::create(CSV_FILE_PATH).await?;
model_csv.write_all(b"model,count\n").await?;
for (model, count) in self.model_error_counts {
model_csv
.write_all(format!("{model},{count}\n").as_bytes())
.await?;
}
info!(path = CSV_FILE_PATH, "wrote error counts by model");
Ok(())
}
// Decodes the block and if it is a Ceramic event, it and related blocks are constructed into an
Expand Down Expand Up @@ -155,6 +182,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
.context("finding init event block")?;
let payload: unvalidated::init::Payload<Ipld> =
serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?;
let model = ModelContext::from(payload.header().model());
let event_builder = EventBuilder::new(
cid,
payload.header().model().to_vec(),
Expand All @@ -163,8 +191,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
);
let event = unvalidated::init::Event::new(payload);
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(event));
self.batch
.push(event_builder.build(&self.network, event).await?);
self.batch.push(
event_builder
.build(&self.network, event)
.await
.with_model_context(&model)?,
);
if self.batch.len() > 1000 {
self.write_batch().await?
}
Expand Down Expand Up @@ -227,23 +259,32 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
)
}
};

let model = ModelContext::from(event_builder.sep.as_slice());

let mut capability = None;
if let Some(capability_cid) = event.capability() {
debug!(%capability_cid, "found cap chain");
let data = self
.load_block(&capability_cid)
.await
.context("finding capability block")?;
.context("finding capability block")
.with_model_context(&model)?;
// Parse capability to ensure it is valid
let cap: Capability =
serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?;
let cap: Capability = serde_ipld_dagcbor::from_slice(&data)
.context("decoding capability block")
.with_model_context(&model)?;
debug!(%capability_cid, ?cap, "capability");
capability = Some((capability_cid, cap));
}
let s = unvalidated::signed::Event::new(cid, event, link, payload, capability);
let event = unvalidated::Event::from(s);
self.batch
.push(event_builder.build(&self.network, event).await?);
self.batch.push(
event_builder
.build(&self.network, event)
.await
.with_model_context(&model)?,
);
Ok(())
}
fn is_tile_doc_init(&self, data: &[u8]) -> bool {
Expand Down Expand Up @@ -319,6 +360,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
) -> Result<()> {
let init = event.id();
let init_payload = self.find_init_payload(&event.id()).await?;
let model = ModelContext::from(init_payload.header().model());
let event_builder = EventBuilder::new(
cid,
init_payload.header().model().to_vec(),
Expand All @@ -329,9 +371,11 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
let data = self
.load_block(&proof_id)
.await
.context("finding proof block")?;
let proof: unvalidated::Proof =
serde_ipld_dagcbor::from_slice(&data).map_err(Error::new_fatal)?;
.context("finding proof block")
.with_model_context(&model)?;
let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data)
.context("decoding proof block")
.with_model_context(&model)?;
let mut curr = proof.root();
let mut proof_edges = Vec::new();
for index in event.path().split('/') {
Expand All @@ -340,13 +384,18 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
// That block should already be included independently no need to include it here.
break;
}
let idx: usize = index.parse().context("parsing path segment as index")?;
let idx: usize = index
.parse()
.context("parsing path segment as index")
.with_model_context(&model)?;
let data = self
.load_block(&curr)
.await
.context("finding witness block")?;
let edge: unvalidated::ProofEdge =
serde_ipld_dagcbor::from_slice(&data).context("dag cbor decode")?;
.context("finding witness block")
.with_model_context(&model)?;
let edge: unvalidated::ProofEdge = serde_ipld_dagcbor::from_slice(&data)
.context("decoding proof edge")
.with_model_context(&model)?;

// Follow path
let maybe_link = edge.get(idx).cloned();
Expand All @@ -361,8 +410,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
}
let time = unvalidated::TimeEvent::new(event, proof, proof_edges);
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(time));
self.batch
.push(event_builder.build(&self.network, event).await?);
self.batch.push(
event_builder
.build(&self.network, event)
.await
.with_model_context(&model)?,
);
Ok(())
}
}
Expand All @@ -379,13 +432,54 @@ pub enum Error {
FoundDataTileDoc(Cid),
#[error("fatal error: {0:#}")]
Fatal(#[from] anyhow::Error),
#[error("error for model {0}: {1}")]
Context(ModelContext, Box<Error>),
}

impl Error {
fn new_fatal(err: impl Into<anyhow::Error>) -> Self {
Self::from(err.into())
}
}

trait WithModelContext {
type Target;
fn with_model_context(self, model: &ModelContext) -> Self::Target;
}

impl<T, E> WithModelContext for std::result::Result<T, E>
where
E: Into<Error>,
{
type Target = std::result::Result<T, Error>;

fn with_model_context(self, model: &ModelContext) -> std::result::Result<T, Error> {
match self {
Ok(ok) => Ok(ok),
Err(err) => Err(Error::Context(model.clone(), Box::new(err.into()))),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ModelContext(Option<StreamId>);

impl From<&[u8]> for ModelContext {
fn from(value: &[u8]) -> Self {
Self(StreamId::try_from(value).ok())
}
}

impl Display for ModelContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(model) = &self.0 {
write!(f, "{model}")
} else {
write!(f, "UNKNOWN")
}
}
}

struct EventBuilder {
event_cid: Cid,
sep: Vec<u8>,
Expand Down

0 comments on commit b4bb8e0

Please sign in to comment.