Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FilePartition and PartitionedFile for scanning flexibility #932

Merged
merged 9 commits into from
Aug 30, 2021

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Aug 23, 2021

Which issue does this PR close?

Closes #946 .

Rationale for this change

  1. For potentially finer-grained readers that parallelize even one file reading or balancing workload between scanning threads even in case of great variance in input file sizes. As I quote @andygrove here:

One of the current issues IMO with DataFusion is that we use "file" as the default unit of partitioning. We would be able to scale better if we had finer-grained readers such as reading Parquet row groups instead. This way we can have multiple threads reading from the same file concurrently and avoid the need to repartition first to increase concurrency.

  1. Refactoring Logic in ParquetExec and parquet datasource. It's strange to call ParquetExec:: try_from_path to get planning-related metadata.

What changes are included in this PR?

  1. PartitionedFile -> Single file (for the moment) or part of a file (later, part of the row groups or rows), and we may even extend this to include partition value and partition schema to support partitioned tables:
    /path/to/table/root/p_date=20210813/p_hour=1200/xxxxx.parquet

  2. FilePartition -> The basic unit for parallel processing, each task is responsible for processing one FilePartition which is composed of several PartitionFiles.

  3. Update ballista protocol as well as the serdes to use the new abstraction.

  4. Telling apart the planning related code from ParquetExec

Are there any user-facing changes?

No.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Aug 23, 2021
@yjshen yjshen changed the title FilePartition and partitionedFile for scanning flexibility FilePartition and PartitionedFile for scanning flexibility Aug 23, 2021
@github-actions github-actions bot added the sql SQL Planner label Aug 25, 2021
@yjshen yjshen marked this pull request as ready for review August 25, 2021 14:47
@yjshen
Copy link
Member Author

yjshen commented Aug 25, 2021

cc @houqp @alamb @andygrove for review

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left couple minor comments, the rest looks good to me!

ballista/rust/core/src/serde/logical_plan/from_proto.rs Outdated Show resolved Hide resolved
ballista/rust/core/src/serde/logical_plan/to_proto.rs Outdated Show resolved Hide resolved
.collect(),
schema: Some(parquet_desc.schema().as_ref().into()),
partitions: vec![FilePartitionMetadata {
filename: vec![path],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we discussed this in the original PR. After taking a second look at the code, I am still not fully following the change here. The old behavior has FilePartitionMetadata.filename set to a vector of file paths returned from a directory list, while the new behavior here has the filename always set to a vector of single entry with value set to the root path of the table.

Shouldn't we use parquet_desc.descriptor.descriptor to build the filename vector here instead?

Copy link
Member Author

@yjshen yjshen Aug 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to a vector of all the files.

However, after searching for a while in the project, I find this method may not be actually used, it's hard to understand this RPC's intention as well. Perhaps it's deprecated and we should remove it later?

 rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question when I was going through the code base yesterday, I noticed it's only mentioned in ballista/docs/architecture.md. @andygrove do you know if this RPC method is still needed?

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great refactor @yjshen !

@houqp houqp added api change Changes the API exposed to users of the crate enhancement New feature or request labels Aug 29, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great @yjshen -- thank you for persevering. I think this PR looks great other than the addition of filter to the LogicalPlanBuilder::scan (see comments on that).

I didn't review the ballista changes, but I assume they are mostly mechanical

Again, thank you so much and sorry for the long review cycle

pub file_path: String,
/// Statistics of the file
pub statistics: Statistics,
// Values of partition columns to be appended to each row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in order to take full advantage of partition values (which might span multiple columns, for example), more information about the partitioning scheme will be needed (e.g. what expression is used to generated partitioning values). Adding partitioning support to DataFusion's planning / execution is probably worth its own discussion

(that is to say I agree with postponing adding anything partition specific)

datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Show resolved Hide resolved
@@ -27,14 +27,14 @@ use crate::{
logical_plan::{Column, Expr},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_plan::{
common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how the statistics and schema related code has been moved out of physical_plan and into datasource

}

/// Convert a table provider into a builder with a TableScan
pub fn scan(
table_name: impl Into<String>,
provider: Arc<dyn TableProvider>,
projection: Option<Vec<usize>>,
filters: Option<Vec<Expr>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this argument is likely going to be confusing to users and it should be removed.

For example as a user of LogicalPlanBuilder I would probably assume that the following plan would return only rows where with a<5

  // Build a plan that looks like it would filter out all rows with `a < 5`
  let plan = builder.scan("table", provider, None, vec![col("a").lt(lit(5)));

However, I am pretty sure it could (and often would) return rows with a >= 5). This is because filters added to a TableScan node are optional (in the sense that the provider might not filter rows that do not pass the predicate, but is not required to). Indeed, even for the parquet provider, the filters are only used for row group pruning which may or may not be able to filter rows.

I think we could solve this with:

  1. Leave scan signature alone and rely on the predicate pushdown optimization to push filters appropriately down to the scan (my preference as it is simpler for the users)
  2. Rename this argument to something like 'optional_filters_for_performance' and document what it does more carefully. I think it would be challenging to explain as it might/might not do anything depending on how the data was laid out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, and keep the filters not deserialized for ballista as before.

@yjshen
Copy link
Member Author

yjshen commented Aug 29, 2021

@houqp @alamb I've resolved the comments, PTAL, thanks.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yjshen !

@alamb alamb merged commit 8a085fc into apache:master Aug 30, 2021
@yjshen yjshen deleted the pf_only branch August 30, 2021 13:27
@yjshen
Copy link
Member Author

yjshen commented Aug 30, 2021

Thanks @houqp @alamb for your great help!

@houqp
Copy link
Member

houqp commented Aug 30, 2021

Thank you @yjshen for being patient and driving through this big change step by step :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate enhancement New feature or request sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PartitionedFile abstraction for flexible table scan
3 participants