Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kwarg chunksize for default data partitioning for write #400

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

svilupp
Copy link
Contributor

@svilupp svilupp commented Mar 12, 2023

This PR proposes to introduce automated partitioning of the provided tables when writing. It follows my findings from benchmarking against PyArrow

Nowadays, most machines are multithreaded and Arrow.write() provides multithreaded writing for partitioned data. However, a user must explicitly partition their data.
Unfortunately, most users do not realize that both their write and subsequent read operations will not be multithreaded without such partitioning (there is an issue to improve the docs).

This PR defaults to partitioning data if it's larger than 64K rows (should be beneficial on most systems) to enable better Arrow.jl performance on both read and write.

Implementation:

  • the new kwarg is called chunksize (maps to PyArrow and should be broadly understood)
  • uses default chunksize of 64000 rows, as per PyArrow.write_feather
  • allows users to opt-out by providing chunksize=nothing
  • partitioning is done via Iterators.partition(Tables.rows(tbl),chunksize) for all Tables.jl-compatible sources (checks Tables.istable) changed to Iterators.partition(tbl,chunksize) to avoid missingness getting lost (eg, for DataFrames)

Some resources:

@svilupp
Copy link
Contributor Author

svilupp commented Mar 13, 2023

I've changed the condition for automatic partitioning to be Tables.rowaccess()=true as well, to prevent accepting some columntables without row iterators.

In addition, I've changed Iterators.partition(Tables.rows(tbl),chunksize) to Iterators.partition(tbl,chunksize) to avoid missingness type getting lost (eg, for DataFrames)

@svilupp
Copy link
Contributor Author

svilupp commented Mar 13, 2023

In addition, I've changed Iterators.partition(Tables.rows(tbl),chunksize) to Iterators.partition(tbl,chunksize) to avoid missingness type getting lost (eg, for DataFrames)

Okay, I was wrong. I misunderstood what rowaccess requirements are -- Iterators.partition() still needs to be defined separately.

I've moved back to Tables.rows to ensure we get rows out.

I'm not sure what the best solution is here.
By far the simplest option would be to pass the schema down - because we have access to it before Tables.columns is called in the arrow construction (that's how we lose the schema, because we "materialize" the chunk as is)

EDIT:

  • The second simplest option would be to add compat entry for DataFrames>1.5.0 and use the Iterators.partition directly (with some safety check that it indeed chunked rows, not columns... if some unknown type defines it over columns)

@svilupp
Copy link
Contributor Author

svilupp commented Mar 13, 2023

Added compat for DataFrames via Extras

@baumgold
Copy link
Member

By far the simplest option would be to pass the schema down - because we have access to it before Tables.columns is called in the arrow construction (that's how we lose the schema, because we "materialize" the chunk as is)

We could allow users to optionally provide the Schema in the Base.open constructor of the Writer object. If a user makes use of this then we should validate the the actual schema of each chunk matches that of the expected schema.

"""
function write end

write(io_or_file; kw...) = x -> write(io_or_file, x; kw...)

function write(file_path, tbl; kwargs...)
function write(file_path, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think chunksize should move to be a new field in Writer with default kwarg value set in the Base.open constructor on L170. This would eliminate the code duplication.

if !isnothing(chunksize) && Tables.istable(tbl) && Tables.rowaccess(tbl)
@assert chunksize >= 0 "chunksize must be >= 0"
if hasmethod(Iterators.partition,(typeof(tbl),))
tbl_source = Iterators.partition(tbl, chunksize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Iterators.partition from Base rather than DataFrames to prevent adding one more dependency?

https://docs.julialang.org/en/v1/base/iterators/#Base.Iterators.partition

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants