Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2430: Add parquet joiner v2 #1335

Merged
merged 67 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
f5144b2
add initial ParquetJoiner implementation
Jan 28, 2024
01a08dd
add initial ParquetJoiner implementation
Feb 1, 2024
28c987c
Merge remote-tracking branch 'origin/master' into add-parquet-joiner
Feb 12, 2024
7ae3505
refactor ParquetJoiner implementation
Feb 17, 2024
05eb22a
extend the main test for multiple files on the right
Feb 20, 2024
6bb950d
extend the main test for multiple files on the right
Feb 22, 2024
87b923c
Merge branch 'master' into add-parquet-joiner
Feb 22, 2024
f9536c3
converge join logic, crate a draft of options and rewriter
Feb 23, 2024
d7f11d9
move ParquetJoinTest logic to ParquetRewriterTest
Feb 27, 2024
e8e7ffe
improve Parquet stitching test
Mar 1, 2024
3ee946c
remove custom ParquetRewriter constructor
Mar 6, 2024
fd409c4
remove custom ParquetRewriter constructor
Mar 6, 2024
5a98219
refactor ParquetRewriter
Mar 12, 2024
7b2fd1a
apply spotless and address PR comments
Mar 14, 2024
8da8291
move extra column writing into processBlocksFromReader
Mar 15, 2024
68e41ba
add getInputFiles back
Mar 16, 2024
98b9b23
Merge remote-tracking branch 'fork/master' into add-parquet-joiner
Mar 16, 2024
6d2c222
fix extra ParquetRewriter constructor so tests can pass
Mar 16, 2024
883e935
remove not needed TODOs
Mar 20, 2024
8ef36b5
address PR comments
Mar 24, 2024
79cc2b8
Merge remote-tracking branch 'origin/master' into add-parquet-joiner
Apr 11, 2024
0bbf72f
rename inputFilesR to inputFilesToJoin
Apr 11, 2024
ca53bff
rename inputFilesR to inputFilesToJoinColumns
Apr 11, 2024
1e7998a
add getParquetInputFiles listing to the rewrite start logging
Apr 11, 2024
2ee9b40
redesign file joiner in ParquetRewriter
Apr 28, 2024
fc32dfd
Merge remote-tracking branch 'origin/master' into add-parquet-joiner-v2
Apr 28, 2024
db52c85
redesign file joiner in ParquetRewriter
Apr 28, 2024
9057e91
redesign file joiner in ParquetRewriter
Apr 28, 2024
5b055c0
redesign file joiner in ParquetRewriter
Apr 28, 2024
b70f88f
uncomment some code
Apr 28, 2024
270126b
fix ParquetRewriter joiner test
May 4, 2024
008cb40
Merge remote-tracking branch 'refs/remotes/origin/master' into add-pa…
Jul 25, 2024
0dc1793
add initial ParquetJoiner implementation
Jul 25, 2024
a53d108
add initial ParquetJoiner implementation
Jul 31, 2024
4da0b85
typo
Aug 6, 2024
c5c7b38
typo
Aug 6, 2024
92c95db
typo
Aug 6, 2024
86f7a4c
typo
Aug 6, 2024
73a4af4
docs
Aug 6, 2024
18feef4
typo
Aug 7, 2024
b24bffa
add getExtraMetadata()
Aug 7, 2024
21a5926
extract ensureRowCount()
Aug 7, 2024
c521a95
typo
Aug 7, 2024
1ea6755
typo
Aug 7, 2024
f2e01a2
add logging into getSchema()
Aug 7, 2024
d393125
typo
Aug 7, 2024
64d3bb2
add closing of input files readers
Aug 7, 2024
f50666a
fix RewriteOptions builder for inputFilesToJoin
Aug 7, 2024
d306336
Merge remote-tracking branch 'refs/remotes/origin/master' into add-pa…
Aug 7, 2024
ae9589d
fix ParquetRewriter constructor
Aug 7, 2024
9157960
extend tests for ParquetRewriter
Aug 14, 2024
3b722e4
spotless
Aug 14, 2024
bdba14c
refactor ParquetRewriterTest
Aug 15, 2024
a89eba6
add tests into ParquetRewriterTest
Aug 16, 2024
57432ee
Merge remote-tracking branch 'origin/master' into add-parquet-joiner-v2
Aug 26, 2024
f674bcf
extend tests in ParquetRewriterTest for joiner part
Aug 26, 2024
8514f39
add testMergeFilesToJoinWithDifferentRowCount test into ParquetRewrit…
Aug 27, 2024
0aaf963
Merge remote-tracking branch 'origin/master' into add-parquet-joiner-v2
Aug 29, 2024
4340c42
add testOneInputFileManyInputFilesToJoin with and without JoinColumns…
Aug 29, 2024
e475648
Merge remote-tracking branch 'origin/master' into add-parquet-joiner-v2
Aug 31, 2024
bb42979
add encrypt validation into ParquetRewriterTest's testOneInputFileMan…
Aug 31, 2024
5b97a4c
refactor ParquetRewriter slightly to address PR comments
Sep 8, 2024
27ba73b
add javadoc to ParquetRewriter
Sep 9, 2024
07f1e74
add javadoc to ParquetRewriter
Sep 10, 2024
e96c022
fix javadoc in ParquetRewriter to comply with Maven javadoc plugin
Sep 13, 2024
d1c1d76
fix javadoc in ParquetRewriter to comply with Maven javadoc plugin
Sep 13, 2024
9de20d7
fix javadoc in ParquetRewriter to comply with Maven javadoc plugin
Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class ParquetRewriter implements Closeable {
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
private final Map<String, String> extraMetaData = new HashMap<>();
private final Map<String, String> extraMetaData;
// Writer to rewrite the input files
private final ParquetFileWriter writer;
// Number of blocks written which is used to keep track of the actual row group ordinal
Expand All @@ -125,6 +125,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf));
ensureSameSchema(inputFiles);
ensureSameSchema(inputFilesToJoin);
MaxNevermind marked this conversation as resolved.
Show resolved Hide resolved
ensureRowCount();
LOG.info(
"Start rewriting {} input file(s) {} to {}",
inputFiles.size() + inputFilesToJoin.size(),
Expand All @@ -134,37 +135,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {

this.outSchema = getSchema();
this.outSchema = pruneColumnsInSchema(outSchema, options.getPruneColumns());

List<TransParquetFileReader> allFiles;
if (options.getIgnoreJoinFilesMetadata()) {
allFiles = new ArrayList<>(inputFiles);
} else {
allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
.collect(Collectors.toList());
}
extraMetaData.put(
ORIGINAL_CREATED_BY_KEY,
allFiles.stream()
.map(x -> x.getFooter().getFileMetaData().getCreatedBy())
.collect(Collectors.toSet())
.stream()
.reduce((a, b) -> a + "\n" + b)
.orElse(""));
allFiles.forEach(x -> extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));

if (!inputFilesToJoin.isEmpty()) {
List<Long> blocksRowCountsL = inputFiles.stream()
.flatMap(x -> x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
.collect(Collectors.toList());
List<Long> blocksRowCountsR = inputFilesToJoin.stream()
.flatMap(x -> x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
.collect(Collectors.toList());
if (!blocksRowCountsL.equals(blocksRowCountsR)) {
throw new IllegalArgumentException(
"The number of rows in each block must match! Left blocks row counts: " + blocksRowCountsL
+ ", right blocks row counts" + blocksRowCountsR + ".");
}
}
this.extraMetaData = getExtraMetadata(options);

if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
Expand Down Expand Up @@ -192,6 +163,36 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
writer.start();
}

// Ctor for legacy CompressionConverter and ColumnMasker
public ParquetRewriter(
TransParquetFileReader reader,
ParquetFileWriter writer,
ParquetMetadata meta,
MessageType outSchema,
String originalCreatedBy,
CompressionCodecName codecName,
List<String> maskColumns,
MaskMode maskMode) {
this.writer = writer;
this.outSchema = outSchema;
this.newCodecName = codecName;
extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(
ORIGINAL_CREATED_BY_KEY,
originalCreatedBy != null
? originalCreatedBy
: meta.getFileMetaData().getCreatedBy());
if (maskColumns != null && maskMode != null) {
this.maskColumns = new HashMap<>();
for (String col : maskColumns) {
this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
}
}
this.inputFiles.add(reader);
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
}

private MessageType getSchema() {
MessageType schemaMain = inputFiles.peek().getFooter().getFileMetaData().getSchema();
if (inputFilesToJoin.isEmpty()) {
Expand All @@ -206,39 +207,53 @@ private MessageType getSchema() {
.getSchema()
.getFields()
.forEach(x -> {
if (!fieldNames.containsKey(x.getName()) || overwriteInputWithJoinColumns) {
if (!fieldNames.containsKey(x.getName())) {
LOG.info("Column {} is added to the output from inputFilesToJoin side", x.getName());
MaxNevermind marked this conversation as resolved.
Show resolved Hide resolved
fieldNames.put(x.getName(), x);
} else if (overwriteInputWithJoinColumns) {
LOG.info("Column {} in inputFiles is overwritten by inputFilesToJoin side", x.getName());
fieldNames.put(x.getName(), x);
MaxNevermind marked this conversation as resolved.
Show resolved Hide resolved
}
});
return new MessageType(schemaMain.getName(), new ArrayList<>(fieldNames.values()));
}
}

// Ctor for legacy CompressionConverter and ColumnMasker
public ParquetRewriter(
TransParquetFileReader reader,
ParquetFileWriter writer,
ParquetMetadata meta,
MessageType outSchema,
String originalCreatedBy,
CompressionCodecName codecName,
List<String> maskColumns,
MaskMode maskMode) {
this.writer = writer;
this.outSchema = outSchema;
this.newCodecName = codecName;
originalCreatedBy = originalCreatedBy == null ? meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
if (maskColumns != null && maskMode != null) {
this.maskColumns = new HashMap<>();
for (String col : maskColumns) {
this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
private Map<String, String> getExtraMetadata(RewriteOptions options) {
List<TransParquetFileReader> allFiles;
if (options.getIgnoreJoinFilesMetadata()) {
allFiles = new ArrayList<>(inputFiles);
} else {
allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
.collect(Collectors.toList());
}
Map<String, String> result = new HashMap<>();
result.put(
ORIGINAL_CREATED_BY_KEY,
allFiles.stream()
.map(x -> x.getFooter().getFileMetaData().getCreatedBy())
.collect(Collectors.toSet())
.stream()
.reduce((a, b) -> a + "\n" + b)
.orElse(""));
allFiles.forEach(x -> result.putAll(x.getFileMetaData().getKeyValueMetaData()));
return result;
}

private void ensureRowCount() {
if (!inputFilesToJoin.isEmpty()) {
List<Long> blocksRowCountsL = inputFiles.stream()
.flatMap(x -> x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
.collect(Collectors.toList());
List<Long> blocksRowCountsR = inputFilesToJoin.stream()
.flatMap(x -> x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
.collect(Collectors.toList());
if (!blocksRowCountsL.equals(blocksRowCountsR)) {
throw new IllegalArgumentException(
"The number of rows in each block must match! Left blocks row counts: " + blocksRowCountsL
+ ", right blocks row counts" + blocksRowCountsR + ".");
}
}
this.inputFiles.add(reader);
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
}

private Queue<TransParquetFileReader> getFileReaders(List<InputFile> inputFiles, ParquetConfiguration conf) {
Expand Down Expand Up @@ -282,9 +297,9 @@ public void close() throws IOException {
}

public void processBlocks() throws IOException {
TransParquetFileReader readerJoin = inputFilesToJoin.peek();
IndexCache indexCacheJoin = null;
int blockIdxJoin = -1;
TransParquetFileReader readerToJoin = null;
IndexCache indexCacheToJoin = null;
int blockIdxToJoin = 0;
List<ColumnDescriptor> outColumns = outSchema.getColumns();

while (!inputFiles.isEmpty()) {
Expand All @@ -303,36 +318,42 @@ public void processBlocks() throws IOException {
Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> x.getPath(), x -> x));

if (readerJoin != null
&& (blockIdxJoin == -1
|| ++blockIdxJoin
== readerJoin.getFooter().getBlocks().size())) {
blockIdxJoin = 0;
readerJoin = inputFilesToJoin.poll();
Set<ColumnPath> columnPathsJoin = readerJoin.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
if (indexCacheJoin != null) {
indexCacheJoin.clean();
if (!inputFilesToJoin.isEmpty()) {
if (readerToJoin == null
|| ++blockIdxToJoin
== readerToJoin.getFooter().getBlocks().size()) {
if (readerToJoin != null) readerToJoin.close();
blockIdxToJoin = 0;
readerToJoin = inputFilesToJoin.poll();
Set<ColumnPath> columnPathsToJoin =
readerToJoin.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
if (indexCacheToJoin != null) {
indexCacheToJoin.clean();
}
indexCacheToJoin = IndexCache.create(readerToJoin, columnPathsToJoin, indexCacheStrategy, true);
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
} else {
blockIdxToJoin++;
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
}
indexCacheJoin = IndexCache.create(readerJoin, columnPathsJoin, indexCacheStrategy, true);
indexCacheJoin.setBlockMetadata(
readerJoin.getFooter().getBlocks().get(blockIdxJoin));
} else {
blockIdxJoin++;
}

for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); outColumnIdx++) {
ColumnPath colPath =
ColumnPath.get(outColumns.get(outColumnIdx).getPath());
if (readerJoin != null) {
Optional<ColumnChunkMetaData> chunkJoin =
readerJoin.getFooter().getBlocks().get(blockIdxJoin).getColumns().stream()
if (readerToJoin != null) {
Optional<ColumnChunkMetaData> chunkToJoin =
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin).getColumns().stream()
.filter(x -> x.getPath().equals(colPath))
.findFirst();
if (chunkJoin.isPresent()
if (chunkToJoin.isPresent()
&& (overwriteInputWithJoinColumns || !columnPaths.contains(colPath))) {
processBlock(readerJoin, blockIdxJoin, outColumnIdx, indexCacheJoin, chunkJoin.get());
processBlock(
readerToJoin, blockIdxToJoin, outColumnIdx, indexCacheToJoin, chunkToJoin.get());
} else {
processBlock(reader, blockIdx, outColumnIdx, indexCache, pathToChunk.get(colPath));
}
Expand All @@ -348,7 +369,9 @@ public void processBlocks() throws IOException {

indexCache.clean();
LOG.info("Finish rewriting input file: {}", reader.getFile());
reader.close();
}
if (readerToJoin != null) readerToJoin.close();
}

private void processBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,6 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) {
public RewriteOptions build() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is required");
Preconditions.checkArgument(
inputFilesToJoin == null || !inputFiles.isEmpty(),
"Input files to join must be non-empty list or it can be left unset, it can't be an empty list");

if (pruneColumns != null) {
if (maskColumns != null) {
Expand Down Expand Up @@ -501,7 +498,7 @@ public RewriteOptions build() {
return new RewriteOptions(
conf,
inputFiles,
inputFilesToJoin,
(inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
outputFile,
pruneColumns,
newCodecName,
Expand Down
Loading