Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Proposal] PARQUET-2430: Add parquet joiner #1273

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

MaxNevermind
Copy link
Contributor

@MaxNevermind MaxNevermind commented Feb 13, 2024

This PR is a proposal and Work In Progress.

Overview

ParquetJoiner feature is similar to ParquetRewrite class. ParquetRewrite allows to stitch files with the same schema into a single file while ParquetJoiner should enable stitching files with different schemas into a single file. That is possible when: 1) the number of rows in the main and extra files is the same, 2) the ordering of rows in the main and extra files is the same. Main benefit of ParquetJoiner is performance, for the cases when you join/stitch Terabytes/Petabytes of data that seemingly simple low level API can be up to 10x more resource efficient.

Implementation details

ParquetJoiner allows to specify the main input parquet file and extra input parquet files. ParquetJoiner will copy the main input as binary data and write extra input files with row groups adjusted to the main input. If main input is much larger than extra inputs then a lot of resources will be saved by working with the main input as binary.

Use-case examples

A very large Parquet based dataset(dozens or hundreds of fields/Terabytes of data daily/Petabytes of historical partitions). The task is to modify a column or add a new column to it for all the historic data. It is trivial using Spark, but taking into consideration the share scale of a dataset it will take a lot of resources to do that.

Side notes

Note that this class of problems could be in theory solved by storing main input and extra inputs in HMS/Iceberg bucketed tables and use a view that joins those tables on the fly into the final version but in practice there is often a requirement to merge parquet files and have a single parquet sources in the file system.

Use-case implementation details using Apache Spark

You can use Apache Spark to perform the join with ParquetJoiner, read the large main input and prepare the right side of a join in a way that each file on the left have a corresponding file on the right and it preserves records ordering on the right side in the same order as on the left side, that allows the whole input on the left and right to have the same number of files and the same number of records in corresponding files and the same ordering of records in each file pair. Then run ParquetJoiner in parallel for each file pair and perform a join. Example of the code that utilizes this new feature: https://gist.github.com/MaxNevermind/0feaaf380520ca34c2637027ef349a7d.

 

Make sure you have checked all steps below.

Jira

Tests

  • My PR adds the following unit tests:
    • testStitchThreeInputsDifferentRowGroupSize

Commits

  • My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines
    from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Style

  • My contribution adheres to the code style guidelines and Spotless passes.
    • To apply the necessary changes, run mvn spotless:apply -Pvector-plugins

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

@wgtmac
Copy link
Member

wgtmac commented Feb 19, 2024

cc @gszadovszky @ggershinsky @ConeyLiu in case you've missed the ML discussion: https://lists.apache.org/thread/5q7qhkl07of4ypc4t5zx3hyjgv12c7q2

@gszadovszky
Copy link
Contributor

I don't have the time to properly review this proposal but I like the idea.
I agree we shall move this joiner approach as close to the rewriter logic as possible. Maybe introducing new interfaces/implementations and use them where the logic is really different.

@ConeyLiu
Copy link
Contributor

The idea looks good to me too, actually, we have similar requirements to update history data. Feel free to ping me when this is ready for review.

@MaxNevermind
Copy link
Contributor Author

MaxNevermind commented Mar 12, 2024

@wgtmac
Can I ask do a preliminary review of the code? I've moved the logic from a new class to ParquetRewriter as was requested in email communication. So far I've added a single test for a new feature. Wanted to add more tests and polish if the code looks good so far. All tests pass for both old and new code in ParquetRewriterTest atm. Will add more documentation to RewriteOptions if everything else looks good. Just fyi I've added an example of this new feature usage in a Gist here https://gist.github.com/MaxNevermind/0feaaf380520ca34c2637027ef349a7d.

@wgtmac
Copy link
Member

wgtmac commented Mar 13, 2024

@MaxNevermind Sure, I will take a look this week.

LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), options.getParquetInputFiles(), out);

// Init reader of the first input file
initNextReader();
extraMetaData.put(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skeptical of the importance of metadata from the right side files. However I don't have a better suggestion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an action point here or we can resolve this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the possible explosion of the metadata. Should we add an option to ignore those on the right side? Or at least add a comment to raise the potential issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it and maybe we should make it as a general option for both left and right? In theory it might happen even without the right part as I understand. Let me know. If no, I will add an option for just the right part.

Stream.concat(inputFiles.stream(), inputFilesR.stream().flatMap(Collection::stream))
.forEach(x -> extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));

// TODO check that schema on the left and on the right is not identical
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the schema check now is complex so it worths a separate method.

@MaxNevermind
Copy link
Contributor Author

@wgtmac
Let me if you want me to implement your VerticalInputFile / HorizontalInputFile idea and what to do you see has left in this PR.
Let me know what else I should do, I think I should add javadoc I can add more tests. I guess you are aware but just fyi ParquetRewriterTest already takes 3/4 of the total parquet-hadoop testing time, on my machine it is 16 min and 22 min correspondingly.

@wgtmac
Copy link
Member

wgtmac commented Mar 20, 2024

Thanks for the update! I will take a look later this week.

Let me if you want me to implement your VerticalInputFile / HorizontalInputFile idea and what to do you see has left in this PR.

No. The rows on both sides are different, so we don't have to bother with the concepts.

I guess you are aware but just fyi ParquetRewriterTest already takes 3/4 of the total parquet-hadoop testing time, on my machine it is 16 min and 22 min correspondingly.

Yes, we probably should disable some parameter combinations.

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see that you've taken a lot of effort to consolidate the implementation with original rewriter features. Thank you for doing that! I've left some comments and I think we have made good progress.

}

public void writeRows(int rowGroupIdx, long rowsToWrite) throws IOException {
if (rowGroupIdxIn != rowGroupIdx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check to make sure rowGroupIdx will never decrease?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but right now that is not supposed to happen and I believe it would brake the right side tests immediately and a problem become visible through that. I can add something like that if you want me to:

      if (rowGroupIdxL > rowGroupIdx) {
        throw new IOException("A row group index decrease is determined in RightColumnWriter! Current index: " 
            + rowGroupIdxL + ", new index: " + rowGroupIdx);
      }

Let me know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add a proposed check in a message above or we can resolve this as is?

schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
} else { // TODO: describe in documentation that only top level column can be overwritten
this.descriptorsMap = schemaL.getColumns().stream()
.filter(x -> x.getPath().length == 0 || !fieldNamesR.containsKey(x.getPath()[0]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we restrict both left and right sides do not have duplicate fields, would we make things simpler here and after?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to preserve the original capabilities of ParquetRewriter which allowed duplicated columns as I understand, but for the right side that would create a problem and I added that check. Let me know if you want to support for duplicated columns in general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the original rewriter does not support duplicated columns. However, supporting it while joining files is a good idea. Just as my previous comment, it would be better to support explicit resolve strategy for them if that is not too complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the left part columns' order could be changed if overwrite occurred.

.build();
CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(chunk.getCodec());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the same codec in the input file. Should we respect the trans-codec rewriter options?

Copy link
Contributor Author

@MaxNevermind MaxNevermind Mar 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I planned to skip the support for pruning/masking/codec changing/encryption for the right side, you can find the checks for that in RewriteOptions builder. My reasoning was that parquet joiner is for a niche use-case, primarily for large dataset stitching to save resource. Let me know if you want me to make those not-supported extra features. The answer applies for other questions bellow related to missing features on the right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I didn't mean we have to implement everything.

@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, reader.getFile());
LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}

Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right side columns get pruned here, but they are still written in the RightColumnWriter. Do you plan to enable pruning columns on the right side? For whatever reason, we need to state this in the docstring of RewriterOptions.

int dlvl = reader.getCurrentDefinitionLevel();
do {
if (dlvl < dMax) {
cWriter.writeNull(rlvl, dlvl);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a column to mask (only nullifying column is supported) is on the right side, we also need to handle it here.

props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
writer.getEncryptor(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this line makes sure that right side column can be encrypted.

@MaxNevermind MaxNevermind requested a review from wgtmac March 31, 2024 18:28
@MaxNevermind
Copy link
Contributor Author

@wgtmac
Can you check out the comments I left please?

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Please see my new comments.

LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), options.getParquetInputFiles(), out);

// Init reader of the first input file
initNextReader();
extraMetaData.put(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the possible explosion of the metadata. Should we add an option to ignore those on the right side? Or at least add a comment to raise the potential issue.

});
List<Type> fields = Stream.concat(
fieldNamesL.values().stream()
.map(x -> fieldNamesR.getOrDefault(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implicit behavior may introduce unexpected use. Does it sound better to add a resolve strategy for duplicate column names? By default we can pick it up from the right side.

schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
} else { // TODO: describe in documentation that only top level column can be overwritten
this.descriptorsMap = schemaL.getColumns().stream()
.filter(x -> x.getPath().length == 0 || !fieldNamesR.containsKey(x.getPath()[0]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the original rewriter does not support duplicated columns. However, supporting it while joining files is a good idea. Just as my previous comment, it would be better to support explicit resolve strategy for them if that is not too complicated.

cWriters.values().forEach(ColumnWriter::close);
for (ColumnDescriptor descriptor : descriptorsMap.values()) {
if (cPageStores.containsKey(descriptor))
cPageStores.get(descriptor).flushToFileWriter(writer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the specs do not require the column chunk order should be coupled to the schema order. However, I'm not sure if this will break some parquet implementations on the wild.

.build();
CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(chunk.getCodec());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I didn't mean we have to implement everything.

@wgtmac
Copy link
Member

wgtmac commented Apr 2, 2024

@ConeyLiu Would you mind taking a look as well?

private final Map<ColumnDescriptor, ColumnWriter> cWriters = new HashMap<>();
private int rowGroupIdxL = 0; // index of the rowGroup of the current file on the left
private int rowGroupIdxR = 0; // index of the rowGroup of the current file on the right
private int writtenFromBlock = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using long type.

* @param paths input file path to read from
* @return self
*/
public Builder addInputPathsR(List<Path> paths) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should doc the right files' requirements.

? ParquetProperties.WriterVersion.PARQUET_2_0
: ParquetProperties.WriterVersion.PARQUET_1_0;
ParquetProperties props = ParquetProperties.builder()
.withWriterVersion(writerVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output file could be mixed v1/v2 format if the part of the right files has mixed format. That's right?

: ParquetProperties.WriterVersion.PARQUET_1_0;
ParquetProperties props = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withBloomFilterEnabled(bloomFilterLength > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the bloom filter have the same mixed problem?

}

private MessageType createSchemaR2() {
return new MessageType("schema", new PrimitiveType(REPEATED, FLOAT, "FloatFraction"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an overwrite column with a different type?

.map(x ->
x.stream().mapToLong(ParquetFileReader::getRecordCount).sum())
.forEach(rowCountR -> {
if (rowCountL != rowCountR) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only check the row count. How do we check the record order? That should be a key requirement as well.

x.getName())) // takes fields on the right if it was not present on the left
)
.collect(Collectors.toList());
schema = new MessageType(schemaL.getName(), fields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema does not match the actual column orders if there is any overwrite. We need to add ut to verify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac you also mentioned that same problem in this comment.
I've just spend some time on trying to refactor this part. I tried to align schema column order and actual write order of columns and it is mostly doable but RightColumnWriter state becomes too complex, for example file queue need to be present for each individual column on the right. I think the code will become to difficult to support long term if we do that.

@ConeyLiu
Copy link
Contributor

Thanks @MaxNevermind for the great work and thanks @wgtmac for pinging me. I am sorry for the late reply.

@MaxNevermind
Copy link
Contributor Author

@wgtmac @ConeyLiu

fyi
Wanted to share an idea: we can use binary copy on the right side, do not do read-write for right columns, that is possible if the number of rows and ordering in row groups on the left and on the right is the same. I believe a related ideas were discussed in this PR. I originally doubt possibility of user being able to produce files in such a fashion, but this week I’ve found a way to do that in Spark 3.3+. I can create a utility class(similar to the one from above) that takes the input files, run transformations for them, and then write the results in a way that it preserves original input files’ names, row groups row count and ordering. The benefit are of course the speed because of binary copy and simplicity as we don't need RightColumnWriter. I will try to validate my idea the next week and get back with the result. If I’m able to do that I suggest to simplify this PR or maybe split the effort into two: one simpler version for simple binary copy and the second one based on current state of this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants