-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: use v2 to write the sorted IVF partition files #2492
perf: use v2 to write the sorted IVF partition files #2492
Conversation
Testing locally I see a bunch of failures from |
|
We never got a chance to look at this, but Rob was claiming just the act of concatenating batches improved performance, but I was skeptical. Maybe there's something to that? |
548d39f
to
791e096
Compare
WIP Fix for writing centroids WIP WIP
This reverts commit 062ac38.
2f919c6
to
b7da513
Compare
Yeah, I have to concatenate for v2 (v2 can write multiple arrays but I can't make a list array from multiple arrays) so that might explain it. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2492 +/- ##
=======================================
Coverage ? 79.61%
=======================================
Files ? 208
Lines ? 59425
Branches ? 59425
=======================================
Hits ? 47314
Misses ? 9387
Partials ? 2724
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -77,15 +78,12 @@ where | |||
}); | |||
|
|||
let residuals = vectors_slice | |||
.par_chunks(dimension) | |||
.chunks_exact(dimension) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to get this back after fixed #2503 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. I don't think this parallelism is helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least, not on the training path. If we do this transform on the query path then it might be useful. Even then, if we have many queries per second (many concurrent queries) I think we might be better without it.
let mut broken_stream = break_stream(stream, break_limit) | ||
.map_ok(|batch| vec![batch]) | ||
.boxed(); | ||
while let Some(batched_chunk) = broken_stream.next().await { | ||
let batch_chunk = batched_chunk?; | ||
writer.write_batches(batch_chunk.iter()).await?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a little odd here we have to wrap the batch into a Vec. Why not just:
let mut broken_stream = break_stream(stream, break_limit) | |
.map_ok(|batch| vec![batch]) | |
.boxed(); | |
while let Some(batched_chunk) = broken_stream.next().await { | |
let batch_chunk = batched_chunk?; | |
writer.write_batches(batch_chunk.iter()).await?; | |
} | |
let mut broken_stream = break_stream(stream, break_limit); | |
while let Some(batch) = broken_stream.next().await { | |
writer.write_batches([batch_chunk?].iter()).await?; | |
} |
let row_addr = RowAddress::new_from_id(*row_id); | ||
partition_map[row_addr.fragment_id() as usize] | ||
[row_addr.row_id() as usize] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you re-using the row id concept but in index files? I do worry it could be confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index files need to refer to something, either row id or row address. So the problem statement here is that we have a GPU process (possibly distributed on multiple systems in the future) that calculates the partition id for each row id. We then need to pass this mapping forward to a CPU process that relies on that mapping to create the next part of the index.
We could use HashMap<u64, u32>
(we did originally) but this gets very expensive at high scales (in the future, if we distribute this task as well, maybe not the end of the world).
So basically it is very nice if there is some reliable mechanism to create a "row identifier" -> X mapping without using a hashmap. With row addresses we can do this easily using two Vec
and it should be stable as long as the read version used to create the mapping is the same as the read version used to apply the mapping (it is).
If we wanted to use row ids here then we could maybe assume that the number of "gaps" will be rather small and then just use a straight Vec
(one advantage is now it doesn't need to be the same read version but we might have missing entries otherwise) but I don't know if we can always guarantee these small gaps and, if there are large gaps, we end up allocating more RAM than strictly needed. What do you think?
Worst case we can just assume that stage 2 is always run on the same data, in the same order, as stage 1. Then we can just do a list of partition ids. I think this might not work today because we process things in parallel and don't order them (but I don't think the cost of doing that ordering would be significant compared to everything else we do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay this makes sense. I think I was worried that you were re-using the RowAddress
idea for something that isn't a row address. It looked at first like it was the address of the vector within the index partitions. If it's actually RowAddresses as we know now, I agree this makes sense.
let minibatch_size = std::env::var("LANCE_SHUFFLE_BATCH_SIZE") | ||
.unwrap_or("64".to_string()) | ||
.parse::<usize>() | ||
.unwrap_or(64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone does specify this but there is a parsing error, I think we should either log or error, rather than silently fall back to the default.
Closing as this will be tackled piece by piece in other PRs. |
The old v1 approach created a row group for every partition (including empty partitions). This approach converts the input into a
List<Struct<...>>
array where each row is a partition of data. Empty partitions are not included in the array.This yields significant performance benefits. I'm not 100% sure if the benefit is changing to v2 or simply changing the format in which we write the data.