-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#4743: avoid partition.length()
in the parquet dispatcher
#4960
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4960 +/- ##
==========================================
- Coverage 84.91% 84.56% -0.35%
==========================================
Files 266 256 -10
Lines 19763 19345 -418
==========================================
- Hits 16781 16359 -422
- Misses 2982 2986 +4
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -557,15 +557,10 @@ def build_query_compiler(cls, dataset, columns, index_columns, **kwargs): | |||
) | |||
index, sync_index = cls.build_index(dataset, partition_ids, index_columns) | |||
remote_parts = cls.build_partition(partition_ids, column_widths) | |||
if len(partition_ids) > 0: | |||
row_lengths = [part.length() for part in remote_parts.T[0]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyrito could you tell me why this should be done, if it is automatically calculated if necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be calculated later but there might be slight overhead in doing so. The proposal from @YarShev was to have build_index
return the lengths since we already have to materialize the index. I think we should do something like that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyrito we can change the metadata getters to avoid overhead in this and many other cases. what do you think?
diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py
index 4bd3b538..36801541 100644
--- a/modin/core/dataframe/pandas/dataframe/dataframe.py
+++ b/modin/core/dataframe/pandas/dataframe/dataframe.py
@@ -247,12 +247,15 @@ class PandasDataframe(ClassLogger):
"""
if self._row_lengths_cache is None:
if len(self._partitions) > 0:
- (
- index,
- self._row_lengths_cache,
- ) = self._compute_axis_labels_and_lengths(0)
- if self._index_cache is None:
- self._index_cache = index
+ row_parts = self._partitions.T[0]
+ if self._index_cache is not None:
+ # do not do extra work to get an index that is already known
+ self._row_lengths_cache = [part.length() for part in row_parts]
+ else:
+ (
+ self._index_cache,
+ self._row_lengths_cache,
+ ) = self._compute_axis_labels_and_lengths(0)
else:
self._row_lengths_cache = []
return self._row_lengths_cache
@@ -269,12 +272,15 @@ class PandasDataframe(ClassLogger):
"""
if self._column_widths_cache is None:
if len(self._partitions) > 0:
- (
- columns,
- self._column_widths_cache,
- ) = self._compute_axis_labels_and_lengths(1)
- if self._columns_cache is None:
- self._columns_cache = columns
+ col_parts = self._partitions[0]
+ if self._columns_cache is not None:
+ # do not do extra work to get columns that is already known
+ self._column_widths_cache = [part.width() for part in col_parts]
+ else:
+ (
+ self._columns_cache,
+ self._column_widths_cache,
+ ) = self._compute_axis_labels_and_lengths(1)
else:
self._column_widths_cache = []
return self._column_widths_cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev I'm not sure I fully understand. Is the proposal to try to do all the calculations lazy? In other words, don't calculate the row or column lengths here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, my suggestion is not to return indices from remote functions (on the basis of which lengths are calculated), but immediately the length (just a number). This greatly reduces the time to serialize / deserialize the index in some cases (for example, a multi-index). I do not have specific numbers, but it seems to me that the performance gain here should be obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev Oh I see. To be honest, I'm not completely sure. My suggestion would be to try it yourself and see if you are getting good perf/correctness from not returning the index and building it lazily. I'm not sure if that would run into any issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev I'm confused about how this PR is helping. Here's my understanding. Right now we have two main cases for the parquet index:
- index consists of one or more RangeIndex only: we block on getting index from metadata rather than reading whole file:
modin/modin/core/io/column_stores/parquet_dispatcher.py
Lines 521 to 524 in 3ee4fa0
if range_index or (len(partition_ids) == 0 and len(column_names_to_read) != 0): complete_index = dataset.to_pandas_dataframe( columns=column_names_to_read ).index
I expect that to be cheap anyway. - index has at least one index that's not RangeIndex: we block on materializing the entire index from the partitions:
modin/modin/core/io/column_stores/parquet_dispatcher.py
Lines 528 to 531 in 3ee4fa0
else: index_ids = [part_id[0][1] for part_id in partition_ids if len(part_id) > 0] index_objs = cls.materialize(index_ids) complete_index = index_objs[0].append(index_objs[1:])
Since even after this PR we will continue blocking on getting the whole index in case (2), I don't see how this PR is helping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev could you rebase the PR so we can re-run CI and make sure there are no issues here?
…atcher Signed-off-by: Myachev <anatoly.myachev@intel.com>
72445d0
to
122f4d9
Compare
done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the performance implications here. Is there a way you could check quickly @anmyachev ?
This is part of the change required for asynchronous execution. The performance difference should not be visible now. Without knowing that this is needed for asynchronous execution, these changes can be considered as refactoring, since there is no immediate impact on performance. I can change the first commit category (to REFACTOR) if it's better. |
We should label things |
if len(partition_ids) > 0: | ||
row_lengths = [part.length() for part in remote_parts.T[0]] | ||
else: | ||
row_lengths = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how there could be any performance hit, as we initialize the partitions with their respective sizes here, so .length()
should be just returning immediately:
modin/modin/core/io/column_stores/parquet_dispatcher.py
Lines 466 to 478 in 7871c7b
return np.array( | |
[ | |
[ | |
cls.frame_partition_cls( | |
part_id[0], | |
length=part_id[2], | |
width=col_width, | |
) | |
for part_id, col_width in zip(part_ids, column_widths) | |
] | |
for part_ids in partition_ids | |
] | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the same thing too. There was some discussion before about this, but we tabled the discussion for another time. length
should just be returning the object reference here right? I don't think anything will be getting materialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a special place to materialize length.
modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
Line 316 in 621bc10
self._length_cache = RayWrapper.materialize(self._length_cache) |
Apparently yes, I was wrong about that. Convert to draft. |
not actual |
Signed-off-by: Myachev anatoly.myachev@intel.com
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date