-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(log-store): support kv-based log store #9060
Conversation
state_store | ||
.iter( | ||
(Included(range_start), Excluded(range_end)), | ||
u64::MAX, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be first_write_epoch
(although the key_range can guarantee correctness)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a safe_epoch
in hummock version. Reading below the safe epoch may return an error even though it is correct. Since we don't pin the epoch in log reader, the safe epoch will bump up to above the first write epoch, and then read from the first write epoch may return an error.
// Use u64::MAX here because the epoch to consume may be below the safe | ||
// epoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By safe epoch, do you mean the compaction watermark? I don't think this is an issue because all keys written to log store are unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reason as above.
.await?; | ||
return Ok((epoch, LogStoreReadItem::StreamChunk(stream_chunk))); | ||
} | ||
LogStoreBufferItem::Barrier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that barrier will keep appending to the in-mem buffer even when sink is lagging behind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
In future PR, we can try merging the data of multiple epochs if the sink is lagging too behind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! LGTM
also please attach the design doc in pr description |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this PR, we implement a kv log store based on the serde implemented from #9451 and #10090.
First, we implement a
KvLogStoreBuffer
that acts as a channel between the log writer and reader. The writer writes data to the buffer, and the reader reads from the buffer and await on the buffer when it is empty. The buffer has a capacity. When the capacity has not been reached, the writer will write the stream chunk into the buffer. When the capacity has been reached, the stream chunk will be written to state store, and only the start and end seq id is added to the buffer. When the reader gets initialized with a latest epoch, the reader will read all data flushed before such epoch, and then read the data from the buffer.Implement risingwavelabs/rfcs#55
🤖 Generated by Copilot at 539e472
This pull request refactors the log store logic in the stream module and implements a new kv log store based on a key-value state store. It also changes the visibility of some serde types and functions to be crate-private. It moves the in-memory log store logic to a separate module
in_mem.rs
and adds new modules for the kv log store components:buffer.rs
,reader.rs
,writer.rs
, andmod.rs
. It updates the imports of theBoundedInMemLogStoreFactory
in thesink.rs
andfrom_proto/sink.rs
modules to reflect the new location.Checklist For Contributors
./risedev check
(or alias,./risedev c
)Checklist For Reviewers
Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note