Skip to content

Commit

Permalink
Merge branch 'master' into create_new_merger_configs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Sep 30, 2024
2 parents 597677f + 4e98278 commit 7aac16b
Show file tree
Hide file tree
Showing 49 changed files with 1,437 additions and 326 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -472,7 +473,7 @@ private static Map<String, String> getPropsForRewrite(HoodieTableMetaClient meta
metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString()));
propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true");
propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp());
propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp());
propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName());
return propsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,9 +1016,9 @@ public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
/**
* Drops the index and removes the metadata partitions.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @param metadataPartitions - list of metadata partitions which need to be dropped
*/
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
public void dropIndex(List<String> metadataPartitions) {
HoodieTable table = createTable(config);
String dropInstant = createNewInstantTime();
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
Expand All @@ -1028,7 +1028,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.dropMetadataPartitions(partitionTypes);
metadataWriter.dropMetadataPartitions(metadataPartitions);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.keygen;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
Expand Down Expand Up @@ -76,8 +77,11 @@ private static List<BaseKeyGenerator> getPartitionKeyGenerators(List<String> par
return Collections.emptyList(); // Corresponds to no partition case
} else {
return partitionPathFields.stream().map(field -> {
Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType = getPartitionFieldAndKeyType(field);
CustomAvroKeyGenerator.PartitionKeyType keyType = partitionAndType.getRight();
Pair<String, Option<CustomAvroKeyGenerator.PartitionKeyType>> partitionAndType = getPartitionFieldAndKeyType(field);
if (partitionAndType.getRight().isEmpty()) {
throw getPartitionPathFormatException();
}
CustomAvroKeyGenerator.PartitionKeyType keyType = partitionAndType.getRight().get();
String partitionPathField = partitionAndType.getLeft();
switch (keyType) {
case SIMPLE:
Expand All @@ -95,35 +99,53 @@ private static List<BaseKeyGenerator> getPartitionKeyGenerators(List<String> par
}
}

public static List<PartitionKeyType> getPartitionTypes(List<String> partitionPathFields) {
/**
* Returns list of partition types configured in the partition fields for custom key generator.
*
* @param tableConfig Table config where partition fields are configured
*/
public static List<PartitionKeyType> getPartitionTypes(HoodieTableConfig tableConfig) {
List<String> partitionPathFields = HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
if (partitionPathFields.size() == 1 && partitionPathFields.get(0).isEmpty()) {
return Collections.emptyList(); // Corresponds to no partition case
} else {
return partitionPathFields.stream().map(field -> {
Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType = getPartitionFieldAndKeyType(field);
return partitionAndType.getRight();
}).collect(Collectors.toList());
return partitionPathFields.stream().map(field -> getPartitionFieldAndKeyType(field).getRight())
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
}
}

public static List<String> getTimestampFields(List<String> partitionPathFields) {
if (partitionPathFields.size() == 1 && partitionPathFields.get(0).isEmpty()) {
return Collections.emptyList(); // Corresponds to no partition case
/**
* Returns the partition fields with timestamp partition type.
*
* @param tableConfig Table config where partition fields are configured
* @return Optional list of partition fields with timestamp partition type
*/
public static Option<List<String>> getTimestampFields(HoodieTableConfig tableConfig) {
List<String> partitionPathFields = HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
if (partitionPathFields.isEmpty() || (partitionPathFields.size() == 1 && partitionPathFields.get(0).isEmpty())) {
return Option.of(Collections.emptyList()); // Corresponds to no partition case
} else if (getPartitionFieldAndKeyType(partitionPathFields.get(0)).getRight().isEmpty()) {
// Partition type is not configured for the partition fields therefore timestamp partition fields
// can not be determined
return Option.empty();
} else {
return partitionPathFields.stream()
return Option.of(partitionPathFields.stream()
.map(CustomAvroKeyGenerator::getPartitionFieldAndKeyType)
.filter(fieldAndKeyType -> fieldAndKeyType.getRight().equals(PartitionKeyType.TIMESTAMP))
.filter(fieldAndKeyType -> fieldAndKeyType.getRight().isPresent() && fieldAndKeyType.getRight().get().equals(PartitionKeyType.TIMESTAMP))
.map(Pair::getLeft)
.collect(Collectors.toList());
.collect(Collectors.toList()));
}
}

public static Pair<String, PartitionKeyType> getPartitionFieldAndKeyType(String field) {
public static Pair<String, Option<PartitionKeyType>> getPartitionFieldAndKeyType(String field) {
String[] fieldWithType = field.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyException("Unable to find field names for partition path in proper format");
if (fieldWithType.length == 2) {
return Pair.of(fieldWithType[0], Option.of(PartitionKeyType.valueOf(fieldWithType[1].toUpperCase())));
} else {
return Pair.of(fieldWithType[0], Option.empty());
}
return Pair.of(fieldWithType[0], PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()));
}

@Override
Expand Down Expand Up @@ -158,6 +180,11 @@ private void validateRecordKeyFields() {
}
}

static HoodieKeyGeneratorException getPartitionPathFormatException() {
return new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format. "
+ "Please specify the partition field names in format `field1:type1,field2:type2`. Example: `city:simple,ts:timestamp`");
}

public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand Down Expand Up @@ -153,7 +154,12 @@ public static String getRecordKey(GenericRecord record, List<String> recordKeyFi
StringBuilder recordKey = new StringBuilder();
for (int i = 0; i < recordKeyFields.size(); i++) {
String recordKeyField = recordKeyFields.get(i);
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
String recordKeyValue;
try {
recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, false, consistentLogicalTimestampEnabled);
} catch (HoodieException e) {
throw new HoodieKeyException("Record key field '" + recordKeyField + "' does not exist in the input record");
}
if (recordKeyValue == null) {
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
} else if (recordKeyValue.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeFunctionalIndexPartiti
}

private Set<String> getFunctionalIndexPartitionsToInit() {
if (dataMetaClient.getIndexMetadata().isEmpty()) {
return Collections.emptySet();
}
Set<String> functionalIndexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().keySet();
Set<String> completedMetadataPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
functionalIndexPartitions.removeAll(completedMetadataPartitions);
Expand Down Expand Up @@ -895,9 +898,8 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
}, fileGroupFileIds.size());
}

public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
public void dropMetadataPartitions(List<String> metadataPartitions) throws IOException {
for (String partitionPath : metadataPartitions) {
// first update table config
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
LOG.warn("Deleting Metadata Table partition: {}", partitionPath);
Expand Down Expand Up @@ -1051,6 +1053,9 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord>
* Update functional index from {@link HoodieCommitMetadata}.
*/
private void updateFunctionalIndexIfPresent(HoodieCommitMetadata commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap) {
if (!dataWriteConfig.getMetadataConfig().isFunctionalIndexEnabled()) {
return;
}
dataMetaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* @param metadataPartitions List of MDT partitions to drop
* @throws IOException on failures
*/
void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException;
void dropMetadataPartitions(List<String> metadataPartitions) throws IOException;

/**
* Update the metadata table due to a COMMIT operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,18 @@ public class CleanPlanner<T, I, K, O> implements Serializable {
public static final Integer CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;

private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
private transient HoodieTimeline commitTimeline;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingLogCompactionOperations;
private final HoodieTable<T, I, K, O> hoodieTable;
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;
private List<String> savepointedTimestamps;
private final List<String> savepointedTimestamps;
private Option<HoodieInstant> earliestCommitToRetain = Option.empty();

public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
this.context = context;
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getHoodieView();
this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
this.config = config;
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) hoodieTable.getSliceView();
this.fgIdToPendingCompactionOperations = fileSystemView
Expand All @@ -111,6 +108,13 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
: Collections.emptyList());
}

private HoodieTimeline getCommitTimeline() {
if (commitTimeline == null) {
commitTimeline = hoodieTable.getCompletedCommitsTimeline();
}
return commitTimeline;
}

/**
* @return list of savepointed timestamps in active timeline as of this clean planning.
*/
Expand Down Expand Up @@ -305,7 +309,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
boolean toDeletePartition = false;
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand Down Expand Up @@ -378,12 +382,12 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S

// determine if we have enough commits, to start cleaning.
boolean toDeletePartition = false;
if (commitTimeline.countInstants() > commitsRetained) {
if (getCommitTimeline().countInstants() > commitsRetained) {
HoodieInstant earliestInstant = earliestCommitToRetain.get();
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());

Expand Down Expand Up @@ -483,9 +487,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(Str
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
if (earliestCommitToRetain.isPresent()) {
replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
replacedGroups = hoodieTable.getHoodieView().getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
} else {
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
replacedGroups = hoodieTable.getHoodieView().getAllReplacedFileGroups(partitionPath);
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted)
Expand Down Expand Up @@ -570,11 +574,7 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
* Returns the last completed commit timestamp before clean.
*/
public String getLastCompletedCommitTimestamp() {
if (commitTimeline.lastInstant().isPresent()) {
return commitTimeline.lastInstant().get().getTimestamp();
} else {
return "";
}
return getCommitTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse("");
}

/*
Expand Down Expand Up @@ -624,6 +624,6 @@ private boolean isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
}

private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, String partitionPath) {
return !fileSystemView.getReplacedFileGroupsAfterOrOn(earliestCommitToRetain, partitionPath).findAny().isPresent();
return !hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain, partitionPath).findAny().isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

import java.util.Map;

public abstract class BaseHoodieFunctionalIndexClient {
public abstract class BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieFunctionalIndexClient.class);
private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieIndexClient.class);

public BaseHoodieFunctionalIndexClient() {
public BaseHoodieIndexClient() {
}

/**
Expand Down Expand Up @@ -61,4 +61,13 @@ public void register(HoodieTableMetaClient metaClient, String indexName, String
* Create a functional index.
*/
public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options);

/**
* Drop an index. By default, ignore drop if index does not exist.
*
* @param metaClient {@link HoodieTableMetaClient} instance
* @param indexName index name for the index to be dropped
* @param ignoreIfNotExists ignore drop if index does not exist
*/
public abstract void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists);
}
Loading

0 comments on commit 7aac16b

Please sign in to comment.