Skip to content

Commit

Permalink
[HUDI-7617] Fix issues for bulk insert user defined partitioner in St…
Browse files Browse the repository at this point in the history
…reamSync (#11014)

Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
vinishjail97 and nsivabalan authored May 14, 2024
1 parent 45ad35b commit bbf0280
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,11 @@ static String[] tryPrependPartitionPathColumns(String[] columnNames, HoodieWrite
return sortCols.toArray(new String[0]);
}

static Object[] prependPartitionPath(String partitionPath, Object[] columnValues) {
Object[] prependColumnValues = new Object[columnValues.length + 1];
System.arraycopy(columnValues, 0, prependColumnValues, 1, columnValues.length);
prependColumnValues[0] = partitionPath;
return prependColumnValues;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,11 @@

package org.apache.hudi.table;

import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;

public class TestBulkInsertPartitioner {

private static Stream<Arguments> argsForTryPrependPartitionColumns() {
Expand All @@ -45,15 +36,4 @@ private static Stream<Arguments> argsForTryPrependPartitionColumns() {
Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(), Arrays.asList("col1", "pt1", "col2").toArray(), false, "pt1,pt2")
);
}

@ParameterizedTest
@MethodSource("argsForTryPrependPartitionColumns")
public void testTryPrependPartitionColumns(String[] expectedSortColumns, String[] sortColumns, boolean populateMetaField, String partitionColumnName) {
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionColumnName);
props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaField));
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/").withProperties(props).build();
assertArrayEquals(expectedSortColumns, BulkInsertPartitioner.tryPrependPartitionPathColumns(sortColumns, writeConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.avro.Schema;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;

/**
* A partitioner that does sorting based on specified column values for Java client.
*
Expand All @@ -46,7 +44,7 @@ public class JavaCustomColumnsSortPartitioner<T>
private final boolean consistentLogicalTimestampEnabled;

public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) {
this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
this.sortColumnNames = columnNames;
this.schema = schema;
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
}
Expand All @@ -56,10 +54,10 @@ public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputPartitions) {
return records.stream().sorted((o1, o2) -> {
FlatLists.ComparableList<Comparable> values1 = FlatLists.ofComparableArray(
HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, sortColumnNames, schema, consistentLogicalTimestampEnabled)
BulkInsertPartitioner.prependPartitionPath(o1.getPartitionPath(), HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, sortColumnNames, schema, consistentLogicalTimestampEnabled))
);
FlatLists.ComparableList<Comparable> values2 = FlatLists.ofComparableArray(
HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, sortColumnNames, schema, consistentLogicalTimestampEnabled)
BulkInsertPartitioner.prependPartitionPath(o2.getPartitionPath(), HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, sortColumnNames, schema, consistentLogicalTimestampEnabled))
);
return values1.compareTo(values2);
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import java.util.Arrays;

import static org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;

/**
* A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on partition path column and custom columns.
*
Expand All @@ -46,12 +44,12 @@ public class RDDCustomColumnsSortPartitioner<T>

public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
this.sortColumnNames = tryPrependPartitionPathColumns(getSortColumnName(config), config);
this.sortColumnNames = getSortColumnName(config);
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
}

public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) {
this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
this.sortColumnNames = columnNames;
this.serializableSchema = new SerializableSchema(schema);
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
}
Expand All @@ -63,11 +61,11 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
final SerializableSchema schema = this.serializableSchema;
final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
return records.sortBy(
record -> {
Object[] columnValues = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
return FlatLists.ofComparableArray(columnValues);
},
true, outputSparkPartitions);
record -> FlatLists.ofComparableArray(
BulkInsertPartitioner.prependPartitionPath(
record.getPartitionPath(),
record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled))
), true, outputSparkPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testCustomColumnSortPartitioner() {
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
String[] sortColumns = sortColumnString.split(",");
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, true, sortColumns);

JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
Expand All @@ -236,11 +236,14 @@ public void testCustomColumnSortPartitioner() {
records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), true);
}

private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) {
private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, boolean prependPartitionPath, String[] sortColumns) {
Comparator<HoodieRecord<? extends HoodieRecordPayload>> comparator = Comparator.comparing(record -> {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
List<Object> keys = new ArrayList<>();
if (prependPartitionPath) {
keys.add(record.getPartitionPath());
}
for (String col : sortColumns) {
keys.add(genericRecord.get(col));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static String getTablePath(HoodieStorage storage,
*
* @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
*/
private static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
public static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
throws HoodieException {
String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@

import scala.Tuple2;

import static org.apache.hudi.DataSourceUtils.createUserDefinedBulkInsertPartitioner;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
Expand Down Expand Up @@ -988,7 +989,7 @@ private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instant
writeClientWriteResult = new WriteClientWriteResult(writeClient.upsert(records, instantTime));
break;
case BULK_INSERT:
writeClientWriteResult = new WriteClientWriteResult(writeClient.bulkInsert(records, instantTime));
writeClientWriteResult = new WriteClientWriteResult(writeClient.bulkInsert(records, instantTime, createUserDefinedBulkInsertPartitioner(writeClient.getConfig())));
break;
case INSERT_OVERWRITE:
writeResult = writeClient.insertOverwrite(records, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand All @@ -52,6 +54,7 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand All @@ -64,11 +67,14 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.sync.common.HoodieSyncConfig;
Expand Down Expand Up @@ -102,6 +108,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -2859,6 +2866,54 @@ public void testConfigurationHotUpdate(HoodieTableType tableType) throws Excepti
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}

@Test
public void testBulkInsertWithUserDefinedPartitioner() throws Exception {
String tableBasePath = basePath + "/test_table_bulk_insert";
String sortColumn = "weight";
TypedProperties bulkInsertProps =
new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
bulkInsertProps.setProperty("hoodie.bulkinsert.shuffle.parallelism", "1");
bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.class", "org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner");
bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.sort.columns", sortColumn);
String bulkInsertPropsFileName = "bulk_insert_override.properties";
UtilitiesTestBase.Helpers.savePropsToDFS(bulkInsertProps, storage, basePath + "/" + bulkInsertPropsFileName);
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()), bulkInsertPropsFileName, false);
syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);

HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), metaClient.getBasePath(), false);
StorageConfiguration hadoopConf = metaClient.getStorageConf();
HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
HoodieMetadataConfig.newBuilder().enable(false).build());
List<String> baseFiles = partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(HoodieBaseFile::getPath)).collect(Collectors.toList());
// Verify each partition has one base file because parallelism is 1.
assertEquals(baseFiles.size(), partitions.size());
// Verify if each parquet file is actually sorted by sortColumn.
for (String filePath : baseFiles) {
try (HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(HoodieTestUtils.getDefaultStorageConf(), new StoragePath(filePath))) {
ClosableIterator<HoodieRecord<IndexedRecord>> iterator = parquetReader.getRecordIterator();
List<Float> sortColumnValues = new ArrayList<>();
while (iterator.hasNext()) {
IndexedRecord indexedRecord = iterator.next().getData();
List<Schema.Field> fields = indexedRecord.getSchema().getFields();
for (int i = 0; i < fields.size(); i++) {
if (fields.get(i).name().equals(sortColumn)) {
sortColumnValues.add((Float) indexedRecord.get(i));
}
}
}
// Assert whether records read are same as the sorted records.
List<Float> actualSortColumnValues = new ArrayList<>(sortColumnValues);
Collections.sort(sortColumnValues);
assertEquals(sortColumnValues, actualSortColumnValues);
}
}
}

private Set<String> getAllFileIDsInTable(String tableBasePath, Option<String> partition) {
HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath);
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Expand Down

0 comments on commit bbf0280

Please sign in to comment.