Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Arrow file format reading #8503

Closed
alamb opened this issue Dec 11, 2023 · 10 comments · Fixed by #8897
Closed

Parallel Arrow file format reading #8503

alamb opened this issue Dec 11, 2023 · 10 comments · Fixed by #8897
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Dec 11, 2023

Is your feature request related to a problem or challenge?

DataFusion can now automatically read CSV and parquet files in parallel (see #6325 for CSV)

It would be great to do the same for "Arrow" files

Describe the solution you'd like

Basically implement what is described in #6325 for Arrow -- and read a single large arrow file in parallel

Describe alternatives you've considered

Some research may be required -- I am not sure if finding record boundaries is feasible

Additional context

I found this while writing tests for #8451

@alamb alamb added the enhancement New feature or request label Dec 11, 2023
@alamb
Copy link
Contributor Author

alamb commented Dec 11, 2023

See also #8504

@my-vegetable-has-exploded
Copy link
Contributor

I'd like to have a try.

@my-vegetable-has-exploded
Copy link
Contributor

I read related pr about parquet and csv.
Parquet parallel scan is based on rowgroup and csv is based on line. Both of them can be splitted by row and then output RecordBatchs using a certain method.
I don't think arrow can be handled like that, since arrow file is purely column-based.
But I am wondering whether we can split the scan process into several parts and rebuild the whole Batch, since there maybe more than one array in file.
图片

Merry Christmas!

@alamb
Copy link
Contributor Author

alamb commented Dec 24, 2023

But I am wondering whether we can split the scan process into several parts and rebuild the whole Batch, since there maybe more than one array in file.

This sounds like a good idea to me in theory -- I am not sure how easy/hard it would be to do with the existing arrow IPC reader

In general, the strategy for paralleizing Paruqet and CSV is to be to split up the file by ranges, and then have each of the ArrowFileReaders partitions read row groups (or CSV lines) that have their first byte within their assigned rnage

Perhaps we could do the same for arrow files which could use the first byte of the RecordBatches 🤔

This code explains it a bit more: https://github.com/apache/arrow-datafusion/blob/6b433a839948c406a41128186e81572ec1fff689/datafusion/core/src/datasource/physical_plan/file_groups.rs#L35-L79

@my-vegetable-has-exploded
Copy link
Contributor

Perhaps we could do the same for arrow files which could use the first byte of the RecordBatches 🤔

There maybe several RecordBatches(blocks in arrow-rs) in a Arrow file(I didn't notice it before). We can handle it like rowgroups in parquet.

I will check whether DICTIONARY can be handled correctly since there maybe Delta DICTIONARY.

Thanks.

@my-vegetable-has-exploded
Copy link
Contributor

my-vegetable-has-exploded commented Dec 28, 2023

I will check whether DICTIONARY can be handled correctly since there maybe Delta DICTIONARY.

It seems that delta dictionary batches not supported yet.

And I think a pub function to provide offsets is needed in upstream. Like

impl<R: Read + Seek> FileReader<R> {
    pub fn blocks(&self) -> Vec<Block> {
        &self.blocks
    }
   //OR
    pub fn offsets(&self) -> Vec<i64> {
        &self.blocks.iter().map(Block::offset).collect()
    }
}

@tustvold
Copy link
Contributor

tustvold commented Dec 28, 2023

apache/arrow-rs#5249 adds a lower-level reader that should enable this and other use-cases

Delta DICTIONARY.

Delta and replacement dictionaries are only supported by IPC streams, not files

@my-vegetable-has-exploded
Copy link
Contributor

Delta DICTIONARY.

Delta and replacement dictionaries are only supported by IPC streams, not files

get it! Thanks

@my-vegetable-has-exploded
Copy link
Contributor

I will complete this after next release of arrow-rs.

@alamb
Copy link
Contributor Author

alamb commented Dec 31, 2023

The next release is tracked by apache/arrow-rs#5234

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants