Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): support up to 16-bit vnode count in row id gen #18529

Open
wants to merge 3 commits into
base: bz/var-vnode-user-facing-local
Choose a base branch
from

Conversation

BugenZhao
Copy link
Member

@BugenZhao BugenZhao commented Sep 13, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This is a progress towards #15900.

This PR supports vnode count up to 16-bit in row-id generator.

Previously we reserved 10 bits for the vnode part in row-id, which limits the vnode count to 1024. This PR extends the format to dynamically allocate bits between the vnode part and the sequence part in row-id, allowing arbitrary vnode count up to 16-bit.

This does not affect the maximum throughput we support in row-id generator, i.e., still 1 << 22 rows per millisecond.

Note that there are some subtle cases that need to pay attention to, majorly in backward compatibility. Can refer to the documentation and comments in the code for more details.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Member Author

BugenZhao commented Sep 13, 2024

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @BugenZhao and the rest of your teammates on Graphite Graphite

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +87 to +90
/// This is okay because we rely on the reversibility only if the serial type (row id) is generated
/// and persisted in the same fragment, where the vnode count is the same. In other cases, the
/// serial type is more like a normal integer type, and the algorithm to hash or compute vnode from
/// it does not matter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the root cause of the problem is because of the improper way to hack the hash() function of Serial.

Currently, we did a hack here: if there is only one column and the column type is Serial, we will use extract_vnode_id_from_row_id instead of the standard hash function.

// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
// be the one that contains RowId, and use a special method to skip the calculation of Hash
// and directly extract the `VirtualNode` from `RowId`.
pub fn compute_chunk(
data_chunk: &DataChunk,
keys: &[usize],
vnode_count: usize,
) -> Vec<VirtualNode> {
if let Ok(idx) = keys.iter().exactly_one()
&& let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
{
return serial_array
.iter()
.enumerate()
.map(|(idx, serial)| {
if let Some(serial) = serial {
extract_vnode_id_from_row_id(serial.as_row_id())
} else {
// NOTE: here it will hash the entire row when the `_row_id` is missing,
// which could result in rows from the same chunk being allocated to different chunks.
// This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
// such as when `distinct on` is used without an `order by`.
let (row, _) = data_chunk.row_at(idx);
row.hash(Crc32FastBuilder).to_vnode(vnode_count)
}
})
.collect();
}
data_chunk
.get_hash_values(keys, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.to_vnode(vnode_count))
.collect()
}

I think the best solution is to use a special distribution, e.g. RowIdDistribution instead of HashDistribution. This essentially remove the hack and make everything clear.

cc. @st1page

Copy link
Contributor

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants