Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Setting Minimum Parallelism with RowCount Based Demuxer #7841

Merged
merged 6 commits into from
Oct 21, 2023

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

Addresses performance regression of #7791

Rationale for this change

#7791 introduced a row count targeting execution time partitioning strategy for DataSinks. The initial implementation only writes a single file at a time, which guarantees that only 1 file will ever be written with <soft_max_rows_per_output_file rows and all others will have >= soft_max_rows_per_output_file. This PR introduces a new setting minimum_parallel_output_files which will write N files in parallel, each targeting soft_max_rows_per_output_file. This allows the user to configure a balance between parallelism and achieving the desired file size.

The behavior of this PR is identical to #7791 if minimum_parallel_output_files is set to 1.

What changes are included in this PR?

  • Adds minimum_parallel_output_files config setting
  • Creates new file writers on-demand as batches arrive, so if there is only 1 batch only 1 file will be written regardless of the minimum_parallel_output_files setting.
  • Updates tests to account for this new setting

Are these changes tested?

Yes by existing tests

Are there any user-facing changes?

Default behavior is now to output at least 4 files in parallel even if soft_max_rows_per_output_file is not reached.

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Oct 16, 2023
@devinjdangelo devinjdangelo marked this pull request as ready for review October 18, 2023 12:09
@devinjdangelo
Copy link
Contributor Author

@alamb @metesynnada This PR and #7801 are rebased and ready for review when you have a chance. This one is smaller and addresses the performance regression, so probably best to prioritize this one.

@alamb
Copy link
Contributor

alamb commented Oct 18, 2023

Thank you @devinjdangelo -- I have been accumulating quite a review backlog while working on some other writing projects lol -- I hope to make a dent in this backlog tomorrow

Screenshot 2023-10-18 at 5 21 53 PM

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this is (another) really nice PR.

I reran the test from #7791 (review)

And I do confirm this PR goes much faster. The fact the setting is configurable also means users can trade off buffering and fewer/more compacted files, which is very nice

It is also really nice that this PR still doesn't make empty files if there are no batches to send.

The only thing I think this PR needs prior to merge is some sort of test (perhaps you could set minimum_parallel_output_files to 1 and demonstrate that a single file is created, and then set it minimum_parallel_output_files to 3 and demonstrate that more than 1 is created or something

datafusion/common/src/config.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/file_format/write.rs Outdated Show resolved Hide resolved
/// RecordBatches will be distributed in round robin fashion to each
/// parallel writer. Each writer is closed and a new file opened once
/// soft_max_rows_per_output_file is reached.
pub minimum_parallel_output_files: usize, default = 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about defaulting to the number of cores (maybe if this was 0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returns to additional cores seems to decline very fast beyond 4 tasks in my testing. I believe this is because ~4 parallel serialization tasks no longer bottlenecks the end-to-end execution plan. Going beyond 4 tasks mostly gives higher memory usage and smaller output files for little benefit.

My testing is mostly on a 32core system. I have not tested on enough different configurations to know if core_count/8 is a reasonable default or if a static 4 tasks is a decent default.

It will also depend a lot on the actual execution plan. If you are writing a pre-cached in memory dataset, then you definitely want 1 task/output file per core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to work on a statement level option soon, so you could easily do:

copy my_in_memory_table to my_dir (format parquet, output_files 32);

to boost the parallelism for specific plans that benefit from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me

@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

This PR has a small conflict, but I am pretty sure once that is fixed it will be ready to go

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@devinjdangelo
Copy link
Contributor Author

This PR has a small conflict, but I am pretty sure once that is fixed it will be ready to go

I'll sort this out today, and see if I can improve the tests as you suggested.

@alamb
Copy link
Contributor

alamb commented Oct 21, 2023

LGTM -- thanks again @devinjdangelo

@alamb alamb merged commit 9fde5c4 into apache:main Oct 21, 2023
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants