Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShuffleReaderExec now supports multiple locations per partition #541

Merged
merged 3 commits into from
Jun 12, 2021

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Jun 11, 2021

Which issue does this PR close?

Closes #540 .

Rationale for this change

This is a step towards supporting true shuffle.

What changes are included in this PR?

  • Moves WrappedStream into ballista-core crate and adds constructor
  • Refactors ShuffleReadExec to accept Vec<Vec<PartitionLocation>> instead of Vec<PartitionLocation>

Are there any user-facing changes?

self.stream.size_hint()
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved to ballista-core utils

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a cool abstraction -- i have had need of something similar elsewhere -- perhaps it would be good to move to datafusion itself eventually

@andygrove andygrove changed the title WIP: ShuffleReaderExec now supports multiple locations per partition ShuffleReaderExec now supports multiple locations per partition Jun 11, 2021
@andygrove andygrove requested review from jorgecarleitao, alamb and Dandandan and removed request for jorgecarleitao June 11, 2021 17:02
@andygrove
Copy link
Member Author

@edrevo fyi

Box::pin(futures::stream::iter(result).flatten()),
Arc::new(self.schema.as_ref().clone()),
);
Ok(Box::pin(result))
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change

@andygrove andygrove marked this pull request as ready for review June 11, 2021 17:17
Copy link
Contributor

@edrevo edrevo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines 91 to 92
let x = self.partition[partition].clone();
let result = future::join_all(x.into_iter().map(fetch_partition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; if you change fetch_partition to take a refernce, you can avoid the .clone:

Suggested change
let x = self.partition[partition].clone();
let result = future::join_all(x.into_iter().map(fetch_partition))
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me (though I am not a ballista expert)

self.stream.size_hint()
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a cool abstraction -- i have had need of something similar elsewhere -- perhaps it would be good to move to datafusion itself eventually

.collect::<Result<Vec<_>>>()?;

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a clever way of flattening the streams (though I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)?

Yes, that's exactly what it does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I guess I figured I would point it out (that the different partitions wouldn't be producing in parallel)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the basic shuffle mechanism is implemented, there will be a lot of optimization work to follow

@codecov-commenter
Copy link

Codecov Report

Merging #541 (1551d32) into master (63e3045) will increase coverage by 0.00%.
The diff coverage is 2.00%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master     #541   +/-   ##
=======================================
  Coverage   76.08%   76.09%           
=======================================
  Files         156      156           
  Lines       27035    27048   +13     
=======================================
+ Hits        20570    20581   +11     
- Misses       6465     6467    +2     
Impacted Files Coverage Δ
ballista/rust/client/src/context.rs 0.00% <0.00%> (ø)
...ta/rust/core/src/execution_plans/shuffle_reader.rs 0.00% <0.00%> (ø)
...ista/rust/core/src/serde/physical_plan/to_proto.rs 49.38% <0.00%> (-0.93%) ⬇️
ballista/rust/core/src/utils.rs 25.53% <0.00%> (-2.06%) ⬇️
ballista/rust/scheduler/src/planner.rs 66.91% <ø> (ø)
ballista/rust/scheduler/src/state/mod.rs 70.49% <0.00%> (ø)
...ta/rust/core/src/serde/physical_plan/from_proto.rs 39.16% <100.00%> (+1.45%) ⬆️
datafusion/src/physical_plan/planner.rs 77.77% <0.00%> (-2.42%) ⬇️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 63e3045...1551d32. Read the comment docs.

@alamb alamb merged commit 8f4078d into apache:master Jun 12, 2021
@houqp houqp added api change Changes the API exposed to users of the crate enhancement New feature or request labels Jul 30, 2021
@andygrove andygrove deleted the shuffle-reader-multi-loc branch February 6, 2022 17:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ballista ShuffleReaderExec should be able to read from multiple locations per partition
5 participants