Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: aes 287 create conclusion event type #499

Merged
merged 20 commits into from
Aug 28, 2024

Conversation

samika98
Copy link
Contributor

Added a new flight crate, which currently holds conclusion event type and conversion logic to convert a feed of conclusion event to an arrow recordBatch. (https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html)

Type information is present here : https://www.notion.so/MVP-OLAP-Aggregator-on-Ceramic-Basin-74f26d19cc8b4f508dbda3a57204228a?d=331986c9e56846f48fd942f8ef7d66c9#89cf865fceec40aba4c6edafe955f5c4

@samika98 samika98 requested a review from a team as a code owner August 21, 2024 19:34
@samika98 samika98 requested review from dav1do and removed request for a team August 21, 2024 19:34
Copy link

linear bot commented Aug 21, 2024

@samika98 samika98 changed the title Feature/aes 287 create conclusion event type feat: aes 287 create conclusion event type Aug 21, 2024
@@ -0,0 +1,44 @@
[package]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this directory, committed by mistake

Copy link
Collaborator

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great! Some questions below to get us started on the details

// TODO : rethink this, add a checkpoint to make it work in datafusion query
pub previous: Vec<Cid>,
pub before: Option<CeramicTime>,
pub after: Option<CeramicTime>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove both before and after from the data events. We have decided that we are not going to support conclusions about the temporality of data events. Consumers of the feed can make their own conclusions when a time event is emitted.

// WIP : Need clearer structure for time events
pub before: Option<CeramicTime>,
pub after: Option<CeramicTime>,
pub data: ByteBuf,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the data for a time event? I don't think we need this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proof of anchor maybe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you just need the timestamp

pub struct TimeEvent {
// WIP : Need clearer structure for time events
pub before: Option<CeramicTime>,
pub after: Option<CeramicTime>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove before and after for the time events, we can add them back in later. We do not need the time information yet. Knowing the before and after fields requires we are validating time events. Once event validation completes we can implement these more directly.

pub before: Option<CeramicTime>,
pub after: Option<CeramicTime>,
pub data: ByteBuf,
pub id: Cid,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a prev for the time event so we know which event it follows.

pub id: Cid,
pub init: ConclusionInit,
// TODO : rethink this, add a checkpoint to make it work in datafusion query
pub previous: Vec<Cid>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I am close to figuring this out....

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConclusionInit {
pub stream_type: String,
pub controllers: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename this to controller since we are stating there is only ever going to be one?

/// ```
pub fn conclusion_events_to_record_batch(events: &[ConclusionEvent]) -> Result<RecordBatch> {
let schema = Arc::new(Schema::new(vec![
Field::new("event_type", DataType::Utf8, false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use an integer type here instead as it is expected to be an enumeration of types?

let schema = Arc::new(Schema::new(vec![
Field::new("event_type", DataType::Utf8, false),
Field::new("stream_id", DataType::Binary, false),
Field::new("stream_type", DataType::Utf8, false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here we should use an integer large enough to handle all future stream type values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uint8 should be good? we wont need more than 256 types i think

true,
),
Field::new("data", DataType::Binary, false),
]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need previous and it should be a list of binary values.

let mut controllers = Vec::new();
let mut befores = Vec::new();
let mut afters = Vec::new();
let mut data = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid allocating the vectors and then reallocating the same data into the Arrow arrays if we use their builders. See for an example https://github.com/ceramicnetwork/rust-ceramic/blob/feat/olap-udaf/olap/src/aggregator/ceramic_car.rs#L86-L135

I know I mislead a bit on this with the initial code example I sent. However the change to the builder pattern should be very similar to what you already have.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I found this example in the Arrow docs. I really like the use of the Extend trait here https://arrow.apache.org/rust/arrow_array/builder/index.html#custom-builders Might be nice to follow this pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing the example, it was super helpful!

Copy link
Collaborator

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes are headed in the right direction. A few more comments below.

let mut controllers = Vec::new();
let mut befores = Vec::new();
let mut afters = Vec::new();
let mut data = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I found this example in the Arrow docs. I really like the use of the Extend trait here https://arrow.apache.org/rust/arrow_array/builder/index.html#custom-builders Might be nice to follow this pattern

// TODO: We should be able to set nullable as false in the field. But doing so breaks the ListBuilder :(
Field::new(
"previous",
DataType::List(Box::new(Field::new("item", DataType::Binary, true))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we should be using with_field on the builder to override the field to match here.

let mut stream_type_builder = UInt8Builder::new(events.len());
let mut controller_builder = StringBuilder::new(events.len());
let mut data_builder = BinaryBuilder::new(events.len());
let mut previous_builder = ListBuilder::new(BinaryBuilder::new(events.len()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using with_field here to match


// TODO_Discussion: Do we still support all these types?
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StreamType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a type like this already in ceramic-core. Can we re_use the same type?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the goal was that the core protocol didn't need to know anything about streamtypes. Can the flight API just pass through the streamtype as an integer without knowing anything about what it represents? The aggregator will of course need to be able to interpret the streamtype and know what it means, but does the conclusion feed need that level of knowledge?

As written now, if C1 started seeing events with a new streamtype, would the conclusion feed break? I don't think we want that, it should be possible to use an unmodified C1 with a custom aggregator that supports a new custom streamtype without needing to update C1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the custom feed would break in this code with a Unknown Stream Type error. We can make the Conclusion Feed unaware of stream types and have it just store whatever stream type it gets in the database? But this means we might be storing arbitrary stream types. Is that what we want?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is a good question. If a user writes a custom aggregator with a new streamtype and starts using it, but neglects to update the streamtype table to reserve the id number for their new streamtype, what should happen? It would be kind of nice if C1 enforced that every event belongs to some registered streamtype, without caring which. I could imagine in the future C1 loading the streamtype list at startup and rejecting events with unregistred streamtypes. So maybe for now we should just have a hardcoded list of acceptable ID numbers and enforce that the streamtype we see in the allowed set?
But still without parsing or interpreting which specific streamtype id is used, just validating that it is one of the valid ones.

WDYT @nathanielc? Open to pushback or other ideas here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code doesn't care which stream type it is and just need to pass that information along. If we want to validate stream types on ingest we can do that as part of the ceramic-service validation logic.

The conclusion feed should treat it as an opaque integer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed in the context of this PR nothing more to do, I was more trying to ask the question in general of how we think this should be handled. If we think it's a good idea to validate that incoming events are part of a known streamtype, I can add a ticket to the Event Validation project to do that.

}

pub type CeramicTime = chrono::DateTime<chrono::Utc>;
pub type Ipld = ByteBuf;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Its confusing to me to rename a byte buffer to Ipld

Field::new("stream_id", DataType::Binary, false),
Field::new("stream_type", DataType::UInt8, false),
Field::new("controller", DataType::Utf8, false),
Field::new("data", DataType::Binary, false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need the dimensions column. This can be a list of fixed size list of two elements (i.e. key value pairs)

/// Err(e) => eprintln!("Error creating RecordBatch: {}", e),
/// }
/// ```
pub fn conclusion_events_to_record_batch(events: &[ConclusionEvent]) -> Result<RecordBatch> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a ConclusionEventBuilder type and have it implement the Extends trait like shown here https://arrow.apache.org/rust/arrow_array/builder/index.html#custom-builders? That will make this API work well with lots of different collections of ConclusionEvents instead of just slices.

@@ -20,6 +20,7 @@ members = [
"beetle/iroh-util",
"service",
"olap",
"flight",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add ceramic-flight as a dep below?

@@ -0,0 +1,24 @@
[package]
name = "flight"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, rename to ceramic-flight

cid.workspace = true
serde.workspace = true
serde_bytes.workspace = true
arrow = "9.0.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, just noticed this. It should be arrow.workspace=true so all crates are using the same arrow version (i.e. 52)


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConclusionData {
pub id: Cid,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both the Cid of the init event and the Cid of the event itself.

I suggest field names, stream_cid and event_cid. We will also need both of these as columns on the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I include an id field in the conclusionInit struct?
How will the strem_cid be different than the event_cid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dimension in the init will include the id of init event? CMIIW

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream_cid is the CID of the init event of the stream. The event_cid is the CID of the event itself.


use cid::Cid;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need serde for these types? Can we use a simple Vec instead of the ByteBuf since we are serializing or deserializing these types?

/// - event_type: UInt8, non-nullable
/// - stream_id: Binary, non-nullable
/// - stream_type: UInt8, non-nullable
/// - controllers: Utf8, non-nullable (comma-separated list for Data events)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably just have a single "controller" instead of "controllers"


// TODO_Discussion: Do we still support all these types?
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StreamType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the goal was that the core protocol didn't need to know anything about streamtypes. Can the flight API just pass through the streamtype as an integer without knowing anything about what it represents? The aggregator will of course need to be able to interpret the streamtype and know what it means, but does the conclusion feed need that level of knowledge?

As written now, if C1 started seeing events with a new streamtype, would the conclusion feed break? I don't think we want that, it should be possible to use an unmodified C1 with a custom aggregator that supports a new custom streamtype without needing to update C1.

@@ -22,25 +20,24 @@ fn test_conclusion_events_to_record_batch() {
ConclusionEvent::Data(ConclusionData {
id: Cid::default(),
init: ConclusionInit {
stream_type: StreamType::Tile,
stream_type: 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think using named StreamTypes in test code is still nice and improves readability (assuming we have an existing type we can re-use, probably not worth adding one just for use in the tests). I'm only concerned about keeping the concrete streamtypes out of the production code, not the test code.

We should probably use MIDs in our tests though not TileDocuments since C1 doesn't actually support TileDocuments.

@samika98 samika98 force-pushed the feature/aes-287-create-conclusion-event-type branch from 4c61204 to 8188d0c Compare August 23, 2024 19:51
Copy link
Collaborator

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the new builder pattern it came together well. Once we implement time events this should be good to go.

serde.workspace = true
chrono = { version = "0.4", features = ["serde"] }
arrow.workspace = true
ceramic-core = { path = "../core" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, can you depend on the workspace version of ceramic-core?

self.previous.append(true);
}
ConclusionEvent::Time(_) => {
todo!("implement time event once we know what its structure looks like");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need this now, can you add an implementation and we can discuss?

let controller_field = Arc::new(Field::new("controller", DataType::Utf8, false));

let data = Arc::new(self.data.finish()) as ArrayRef;
let data_field = Arc::new(Field::new("data", DataType::Binary, false));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data field will need to be nullable because time events will not have data values.

fn default() -> Self {
Self {
event_type: PrimitiveBuilder::new(),
stream_id: BinaryBuilder::new(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, stream_id to stream_cid. A stream ID is an encoding of the stream type + its CID, since we have the stream type explicitly as a column on the data we only want the stream CID. So we should name the column accordingly.

assert_eq!(record_batch.num_rows(), 2);

// Assert the schema
let schema = record_batch.schema();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets validate these tests using the expect_test pattern. This way they are much easier to audit and update.

You can convert a RecordBatch into string like this https://github.com/ceramicnetwork/rust-ceramic/pull/500/files#diff-eb701a6fc846dc0a4b096209cdf1b5b1a75028561fcc2e3c6629862757eec348R287

Then you can use expect! like this to validate its what we expect https://github.com/ceramicnetwork/rust-ceramic/pull/500/files#diff-eb701a6fc846dc0a4b096209cdf1b5b1a75028561fcc2e3c6629862757eec348R310-R315

If you haven't used expect_test before I recommend a quick read on how to use it and update tests https://docs.rs/expect-test/latest/expect_test/#introduction

pub type CeramicTime = chrono::DateTime<chrono::Utc>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeEvent {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this to ConclusionTime to be consistent

// WIP : Need clearer structure for time events
pub id: Cid,
pub previous: Vec<Cid>,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time events need stream_cid, stream_type, event_cid, controller and previous for now. We can add more later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the bulk of those come in via a ConclusionInit field, the same way that is done for data events? As opposed to including at the top-level directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think they should

fn append(&mut self, event: &ConclusionEvent) {
match event {
ConclusionEvent::Data(data_event) => {
self.event_type.append_value(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a method on ConclusionEvent that can report the type as an integer? This way the mapping of variant to integer type is explicit on the type instead of hidden in the conversion implementation.

Copy link
Collaborator

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me with a fix to the test case.

for cid in &time_event.previous {
self.previous.values().append_value(cid.to_bytes());
}
self.previous.append(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be self.previous.append(!time_event.previous.is_empty())

for cid in &data_event.previous {
self.previous.values().append_value(cid.to_bytes());
}
self.previous.append(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be self.previous.append(!data_event.previous.is_empty())

+------------+--------------------------------------------------------------------------+-------------+---------------+--------------------------------------------------------------------------+--------+----------------------------------------------------------------------------+
| 0 | 01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d | 2 | did:key:test1 | 010012200b9691e769ef527bd51427fc745c8bfb8e89271e054548a780ddeed7817cbe0a | 010203 | [] |
| 0 | 01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d | 2 | did:key:test2 | 010012200b9691e769ef527bd51427fc745c8bfb8e89271e054548a780ddeed7817cbe0a | 040506 | [01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d] |
| 1 | 01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d | 2 | did:key:test3 | 010012200b9691e769ef527bd51427fc745c8bfb8e89271e054548a780ddeed7817cbe0a | | |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous column is empty here, however the input data does include a previous cid. This is caused by the bug I noted above.

data: vec![1, 2, 3],
}),
ConclusionEvent::Data(ConclusionData {
event_cid: Cid::from_str("baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use an unique CID for each event ? This will make auditing the expected results easier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use cpk to generate random cids easily

event_cid: Cid::from_str("baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi")
.unwrap(),
previous: vec![Cid::from_str(
"baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous should be the CID of an event not a stream, can you make these chain so its easier to follow?

| 1 | 01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d | 2 | did:key:test3 | 010012200b9691e769ef527bd51427fc745c8bfb8e89271e054548a780ddeed7817cbe0a | | |
| 0 | 01001220ba28cb0268f4f50cca99d952b610bfa2823941bfb12452ccf1e330725d96ae0d | 2 | did:key:test4 | 010012200b9691e769ef527bd51427fc745c8bfb8e89271e054548a780ddeed7817cbe0a | 070809 | [] |
+------------+--------------------------------------------------------------------------+-------------+---------------+--------------------------------------------------------------------------+--------+----------------------------------------------------------------------------+"#]].assert_eq(&formatted);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its hard to audit this when the CID are in hex instead of their common string representation. In my branch I added a small function to convert the cid bytes to a string here. Can you copy that function and use it here on the output? You can see how I use it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it belongs in this test file. It seems like a lot of code for what we want to do, instead I added a method which converts cid bytes to string, for event_cid and stream_cid columns, which does the same thing
Open to push back

@samika98 samika98 force-pushed the feature/aes-287-create-conclusion-event-type branch from 2221157 to 90629d8 Compare August 27, 2024 22:24
@samika98 samika98 added this pull request to the merge queue Aug 28, 2024
Merged via the queue into main with commit c694469 Aug 28, 2024
5 checks passed
@samika98 samika98 deleted the feature/aes-287-create-conclusion-event-type branch August 28, 2024 17:07
@smrz2001 smrz2001 mentioned this pull request Sep 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants