-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: api to transform events to conclusion events #508
Conversation
service/src/event/service.rs
Outdated
controller: event.get_controller().unwrap(), | ||
dimensions: vec![], // You may need to populate this if needed | ||
}, | ||
// TODO_Discuss: How can this ever have more than one value? prev return a single cid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prev returns a single cid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected, however in the future we may change the Ceramic event format to allow multiple previous values.
service/src/event/service.rs
Outdated
previous: vec![*time_event.prev()], | ||
//TODO_Discuss: How to determine an ever incrementing index. | ||
//Soln_proposed : We have to store this on disk and increment with every event seen ? | ||
//Define purpose of this counter and how it can be used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
** how it can be useful to the consumer of this api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index needs to come from the db query. I believe its the deliverable
column on the data. We need to select that column and preserve it.
service/src/event/service.rs
Outdated
limit: i64, | ||
include_data: ceramic_api::IncludeEventData, | ||
) -> anyhow::Result<Vec<ConclusionEvent>> { | ||
let raw_events = ceramic_api::EventStore::events_since_highwater_mark( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not want to rely on the API methods here. We should likely move the body of ceramic_api::EventStore::events_since_highwater_mark
to a method on the struct directly. Both this method and the EventStore implementation can call it. This way we do not overload the ceramic_api types.
service/src/event/service.rs
Outdated
&self, | ||
highwater_mark: i64, | ||
limit: i64, | ||
include_data: ceramic_api::IncludeEventData, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always want the data for the conclusion API so we should not let this be a parameter on the function.
service/src/event/service.rs
Outdated
// We map over this to get a vec of conclusion events | ||
fn transform_raw_events_to_conclusion_events( | ||
&self, | ||
raw_event: ceramic_api::EventDataResult, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted, we should not use the ceramic_api types here but instead use the unvalidated::Event
directly.
service/src/event/service.rs
Outdated
match event { | ||
ceramic_event::unvalidated::Event::Time(time_event) => { | ||
let mut current_event: Event<Ipld> = event; | ||
let mut stream_id = current_event.id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit this is stream_cid
not the stream id
service/src/event/service.rs
Outdated
let mut stream_id = current_event.id(); | ||
let mut init_event: Option<Event<Ipld>> = None; | ||
// Traverse the chain of events until we find the init event | ||
while let Some(prev_cid) = current_event.prev() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to traverse the chain, the event.id()
is the CID of the init event already. I imagine this can look something like:
let stream_cid = current_event.id();
let init_event = self.get_event_by_cid(stream_cid)?;
let init = ConclusionInit::try_from(init_event)?;// TODO implement this
Constructing a ConclusionInit from an init event means finding the init payload if it exists and gathering its metadata.
We should add an init_payload()
method to unvalidated::Event
that is similar to is_init
method but that returns an Option<&init::Payload>
. Then try_from
becomes
fn try_from(value: unvalidated::Event) -> Result<ConclusionInit> {
let init_payload = value.init_payload().ok_or_else(|| anyhow!("no init payload found")?;
Ok(ConclusionInit{ ... })
}
service/src/event/service.rs
Outdated
|
||
Ok(ConclusionEvent::Time(ConclusionTime { | ||
event_cid: raw_event.id, | ||
init: ConclusionInit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the above is complete then this just becomes init: init
.
service/src/event/service.rs
Outdated
previous: vec![*time_event.prev()], | ||
//TODO_Discuss: How to determine an ever incrementing index. | ||
//Soln_proposed : We have to store this on disk and increment with every event seen ? | ||
//Define purpose of this counter and how it can be used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index needs to come from the db query. I believe its the deliverable
column on the data. We need to select that column and preserve it.
599eaab
to
c04e093
Compare
service/src/event/service.rs
Outdated
e | ||
)) | ||
})?, | ||
index: delivered as u64, // TODO: Implement proper indexing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : remove todo
event/src/unvalidated/event.rs
Outdated
fn from(value: init::Payload<D>) -> Self { | ||
Self::Unsigned(value) | ||
// TODO : Delete this? then delete all | ||
// Delete event with new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nathanielc All cannot be delete since rest of the two impls are being used in migration code. Makes sense to keep all 3 then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes lets keep them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it, a few questions below. The refactor worked out well.
event/src/unvalidated/event.rs
Outdated
@@ -92,16 +107,17 @@ where | |||
} | |||
} | |||
|
|||
// TODO : Returns the cid no option, update the comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, this todo is complete
event/src/unvalidated/event.rs
Outdated
@@ -92,16 +107,17 @@ where | |||
} | |||
} | |||
|
|||
// TODO : Returns the cid no option, update the comment | |||
/// Returns the 'id' field of the event, which is the Cid of the stream's init event. | |||
/// If this event *is* the init event, then it doesn't know its own Cid and returns None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this comment to indicate we always return the cid because and init event does know its cid
event/src/unvalidated/event.rs
Outdated
fn from(value: init::Payload<D>) -> Self { | ||
Self::Unsigned(value) | ||
// TODO : Delete this? then delete all | ||
// Delete event with new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes lets keep them
flight/src/types.rs
Outdated
@@ -22,11 +25,11 @@ impl ConclusionEvent { | |||
} | |||
} | |||
|
|||
//TODO : Add Stream Type here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have an issue to track this, so we can remove this comment for now
service/src/event/service.rs
Outdated
let stream_cid = raw_event.id(); | ||
let init_event = self.get_event_by_cid(stream_cid).await?; | ||
let init = ConclusionInit::try_from(init_event).unwrap(); | ||
let event_cid = *raw_event.id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect, the event_cid is not the stream_cid (unless its an init event). Does the raw_event know its own cid? That is what we need to use. Maybe we need a new method on unvalidated::Event to get the cid of itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also pass it as a parameter into this method
store/src/sql/access/event.rs
Outdated
limit: i64, | ||
) -> Result<(i64, Vec<(Cid, unvalidated::Event<Ipld>)>)> { | ||
) -> Result<(i64, Vec<(Cid, unvalidated::Event<Ipld>, i64)>)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change this to use a struct instead of a tuple? Its confusing what the different i64s are for. I get it after reading the code, however a struct would make it more clear.
store/src/sql/access/event.rs
Outdated
.bind(limit) | ||
.fetch_all(pool.reader()) | ||
.await?; | ||
|
||
// default to the passed in value if there are no new events to avoid the client going back to 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this comment removed? It still seems valid.
store/src/sql/access/event.rs
Outdated
all_blocks.sort_by(|a, b| a.new_highwater_mark.cmp(&b.new_highwater_mark)); | ||
let blocks = all_blocks.into_iter().map(|b| b.block).collect(); | ||
|
||
let blocks = all_blocks.clone().into_iter().map(|b| b.block).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to clone the blocks?
I'd like to see a test that shows we are correctly converting from events to conclusion events |
store/src/sql/access/event.rs
Outdated
Ok((max_highwater, values)) | ||
let result = values | ||
.into_iter() | ||
.zip(all_blocks.iter()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use them here as well, we add the delivered field to all the blocks here
service/src/event/ordering_task.rs
Outdated
.id() | ||
.expect("prev must exist for non-init events"); | ||
let stream_cid = parsed_event.id(); | ||
let prev = parsed_event.id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be prev()
.. it looks like we introduced a bug here that was released in v0.33 (made latest this week)... fortunately it's the startup task code and has probably never done anything. We don't have long streams currently so things shouldn't be getting picked up at startup, meaning we we're probably okay without doing any clean up, but I'm going to add a ticket to investigate how it got through/improve the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in #516
…vered in creating conclusion event
efb7972
to
97dd107
Compare
API that processes a series of raw events, transforming them into a structured vector of conclusion events. The transformation involves taking a high watermark as input, fetching events from the database based on this marker, and converting these events into the ConclusionEvent type.
Added transform_raw_events_to_conclusion_events function which converts events based on the provided high watermark
Modified event and payload structures
Updated CeramicEventService to include new transformation logic
TODO : Add time event creation logic for test