Skip to content

Commit

Permalink
[HUDI-7752] Abstract serializeRecords for log writing (#11210)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored May 14, 2024
1 parent 8d103ad commit a8da2fd
Show file tree
Hide file tree
Showing 32 changed files with 364 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -2137,9 +2135,8 @@ public double getParquetCompressionRatio() {
return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
}

public CompressionCodecName getParquetCompressionCodec() {
String codecName = getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName);
public String getParquetCompressionCodec() {
return getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
}

public boolean parquetDictionaryEnabled() {
Expand Down Expand Up @@ -2183,8 +2180,8 @@ public int getHFileBlockSize() {
return getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE);
}

public Compression.Algorithm getHFileCompressionAlgorithm() {
return Compression.Algorithm.valueOf(getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME));
public String getHFileCompressionAlgorithm() {
return getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME);
}

public long getOrcMaxFileSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -50,11 +50,11 @@ public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, I,
}

private List<Pair<HoodieKey, Long>> fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getStoragePath());
FileFormatUtils fileFormatUtils = FileFormatUtils.getInstance(baseFile.getStoragePath());
if (keyGeneratorOpt.isPresent()) {
return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath(), keyGeneratorOpt);
return fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath(), keyGeneratorOpt);
} else {
return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath());
return fileFormatUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;

import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;

/**
Expand Down Expand Up @@ -73,7 +74,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
return new HoodieRowDataParquetWriter(
convertToStoragePath(path), new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testInsert() throws Exception {

HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

// Get some records belong to the same partition (2021/09/11)
String insertRecordStr1 = "{\"_row_key\":\"1\","
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testInsertWithDataGenerator(boolean mergeAllowDuplicateOnInsertsEnab

HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
metaClient = HoodieTableMetaClient.reload(metaClient);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

String partitionPath = "2021/09/11";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testUpdateRecords() throws Exception {
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

String partitionPath = "2016/01/31";

Expand Down Expand Up @@ -480,7 +480,7 @@ public void testDeleteRecords() throws Exception {
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

String partitionPath = "2022/04/09";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -908,7 +908,7 @@ public long numRowsInCommit(String basePath, HoodieTimeline commitTimeline,
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
return paths.values().stream().map(StoragePath::new).flatMap(path ->
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream())
FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream())
.filter(record -> {
if (filterByCommitTime) {
Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
Expand Down Expand Up @@ -937,7 +937,7 @@ public long countRowsInPaths(String basePath, HoodieStorage storage, String... p
try {
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, paths);
return latestFiles.stream().mapToLong(baseFile ->
BaseFileUtils.getInstance(baseFile.getStoragePath())
FileFormatUtils.getInstance(baseFile.getStoragePath())
.readAvroRecords(context.getStorageConf(), baseFile.getStoragePath()).size())
.sum();
} catch (Exception e) {
Expand Down Expand Up @@ -975,7 +975,7 @@ public long countRecordsOptionallySince(String basePath, HoodieTimeline commitTi
HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream())
return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> FileFormatUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream())
.filter(record -> {
if (lastCommitTimeOpt.isPresent()) {
Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
Expand Down Expand Up @@ -1022,8 +1022,8 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex.
return builder;
}

public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) {
return BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) {
return FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
}

protected HoodieTableMetaClient createMetaClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -61,15 +61,15 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {

private final StoragePath path;
private final StorageConfiguration<?> conf;
private final BaseFileUtils parquetUtils;
private final FileFormatUtils parquetUtils;
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();

public HoodieSparkParquetReader(StorageConfiguration<?> conf, StoragePath path) {
this.path = path;
this.conf = conf.newInstance();
// Avoid adding record in list element when convert parquet schema to avro schema
conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
this.parquetUtils = FileFormatUtils.getInstance(HoodieFileFormat.PARQUET);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;

import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;

/**
* Factory to assist in instantiating a new {@link HoodieInternalRowFileWriter}.
Expand Down Expand Up @@ -76,7 +77,7 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Stora
path,
new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
getCompressionCodecName(writeConfig.getParquetCompressionCodec()),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -134,7 +134,7 @@ private void assertSchemaEvolutionOnUpdateResult(WriteStatus insertResult, Hoodi
Executable executable = () -> {
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), insertResult.getFileId(), supplier, Option.empty());
List<GenericRecord> oldRecords = BaseFileUtils.getInstance(updateTable.getBaseFileFormat())
List<GenericRecord> oldRecords = FileFormatUtils.getInstance(updateTable.getBaseFileFormat())
.readAvroRecords(updateTable.getStorageConf(),
new StoragePath(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()),
mergeHandle.getWriterSchemaWithMetaFields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -1199,7 +1199,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception {

dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

// Inserts => will write file1
String commitTime1 = "001";
Expand Down Expand Up @@ -1312,7 +1312,7 @@ public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);

// Inserts => will write file1
String commitTime1 = "001";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
Expand Down Expand Up @@ -205,14 +205,14 @@ public void testUpdateRecords(HoodieIndex.IndexType indexType) throws Exception

// Read out the bloom filter and make sure filter can answer record exist or not
Path filePath = allFiles[0].getPath();
BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat())
BloomFilter filter = FileFormatUtils.getInstance(table.getBaseFileFormat())
.readBloomFilterFromMetadata(storageConf, new StoragePath(filePath.toUri()));
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}

// Read the base file, check the record content
List<GenericRecord> fileRecords = BaseFileUtils.getInstance(table.getBaseFileFormat())
List<GenericRecord> fileRecords = FileFormatUtils.getInstance(table.getBaseFileFormat())
.readAvroRecords(storageConf, new StoragePath(filePath.toUri()));
GenericRecord newRecord;
int index = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
Expand Down Expand Up @@ -639,7 +639,7 @@ public HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, S
return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
}

public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) {
return BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) {
return FileFormatUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -137,7 +137,7 @@ private void writeMetafileInFormat(StoragePath filePath, HoodieFileFormat format
HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + getMetafileExtension());
try {
// write to temporary file
BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props);
FileFormatUtils.getInstance(format).writeMetaFile(storage, tmpPath, props);
// move to actual path
storage.rename(tmpPath, filePath);
} finally {
Expand Down Expand Up @@ -185,7 +185,7 @@ private boolean readTextFormatMetaFile() {
private boolean readBaseFormatMetaFile() {
for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
try {
BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath);
FileFormatUtils reader = FileFormatUtils.getInstance(metafilePath);
// Data file format
Map<String, String> metadata = reader.readFooter(
storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -306,7 +306,7 @@ public Schema readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionC
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath()));
StoragePath path = new StoragePath(filePath);
return BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), path);
return FileFormatUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), path);
}

private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
Expand Down Expand Up @@ -473,7 +473,7 @@ private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws IOExcepti
// this is a log file
schema = readSchemaFromLogFile(filePath);
} else {
schema = BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), filePath);
schema = FileFormatUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), filePath);
}
}
return schema;
Expand Down
Loading

0 comments on commit a8da2fd

Please sign in to comment.