Skip to content

Commit

Permalink
Add new hudi session property: ignore_absent_partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
willzgw committed Mar 12, 2024
1 parent 4c8c005 commit aeb9d4a
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class HudiConfig
private int splitGeneratorParallelism = 4;
private long perTransactionMetastoreCacheMaximumSize = 2000;
private boolean queryPartitionFilterRequired;
private boolean ignoreAbsentPartitions;

public List<String> getColumnsToHide()
{
Expand Down Expand Up @@ -207,4 +208,16 @@ public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}

@Config("hudi.ignore-absent-partitions")
public HudiConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions)
{
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
return this;
}

public boolean isIgnoreAbsentPartitions()
{
return ignoreAbsentPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class HudiSessionProperties
private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits";
private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -119,6 +120,11 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
QUERY_PARTITION_FILTER_REQUIRED,
"Require a filter on at least one partition column",
hudiConfig.isQueryPartitionFilterRequired(),
false),
booleanProperty(
IGNORE_ABSENT_PARTITIONS,
"Ignore absent partitions",
hudiConfig.isIgnoreAbsentPartitions(),
false));
}

Expand Down Expand Up @@ -178,4 +184,9 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isIgnoreAbsentPartitions(ConnectorSession session)
{
return session.getProperty(IGNORE_ABSENT_PARTITIONS, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize;
import static io.trino.plugin.hudi.HudiSessionProperties.isIgnoreAbsentPartitions;
import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled;
import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -82,7 +83,8 @@ public HudiSplitSource(
metastore,
table,
partitionColumnHandles,
partitions);
partitions,
!tableHandle.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session));

this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor);
HudiBackgroundSplitLoader splitLoader = new HudiBackgroundSplitLoader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public HudiReadOptimizedDirectoryLister(
HiveMetastore hiveMetastore,
Table hiveTable,
List<HiveColumnHandle> partitionColumnHandles,
List<String> hivePartitionNames)
List<String> hivePartitionNames,
boolean ignoreAbsentPartitions)
{
this.fileSystemView = new HudiTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
this.fileSystemView = new HudiTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), ignoreAbsentPartitions);
this.partitionColumns = hiveTable.getPartitionColumns();
this.allPartitionInfoMap = hivePartitionNames.stream()
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hudi.compaction.CompactionOperation;
import io.trino.plugin.hudi.compaction.HudiCompactionOperation;
import io.trino.plugin.hudi.compaction.HudiCompactionPlan;
Expand Down Expand Up @@ -56,6 +57,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_PARTITION_NOT_FOUND;
import static io.trino.plugin.hudi.files.FSUtils.LOG_FILE_PATTERN;
import static io.trino.plugin.hudi.files.FSUtils.getPartitionLocation;
import static java.util.function.Function.identity;
Expand All @@ -80,8 +83,9 @@ public class HudiTableFileSystemView
private Map<String, List<HudiFileGroup>> partitionToFileGroupsMap;
private Map<HudiFileGroupId, Entry<String, CompactionOperation>> fgIdToPendingCompaction;
private Map<HudiFileGroupId, HudiInstant> fgIdToReplaceInstants;
private boolean ignoreAbsentPartitions;

public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visibleActiveTimeline)
public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visibleActiveTimeline, boolean ignoreAbsentPartitions)
{
partitionToFileGroupsMap = new ConcurrentHashMap<>();
this.metaClient = metaClient;
Expand All @@ -90,6 +94,7 @@ public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visi
resetPendingCompactionOperations(getAllPendingCompactionOperations(metaClient)
.values().stream()
.map(pair -> Map.entry(pair.getKey(), CompactionOperation.convertFromAvroRecordInstance(pair.getValue()))));
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
}

private static Map<HudiFileGroupId, Entry<String, HudiCompactionOperation>> getAllPendingCompactionOperations(
Expand Down Expand Up @@ -292,6 +297,9 @@ private FileIterator listPartition(Location partitionLocation)
if (fileIterator.hasNext()) {
return fileIterator;
}
if (!ignoreAbsentPartitions) {
checkPartitionLocationExists(metaClient.getFileSystem(), partitionLocation);
}
return FileIterator.empty();
}

Expand All @@ -310,6 +318,18 @@ public List<HudiFileGroup> addFilesToView(FileIterator partitionFiles)
return fileGroups;
}

private static void checkPartitionLocationExists(TrinoFileSystem fileSystem, Location location)
{
try {
if (!fileSystem.directoryExists(location).orElse(true)) {
throw new TrinoException(HUDI_PARTITION_NOT_FOUND, "Partition location does not exist: " + location);
}
}
catch (IOException e) {
throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed checking directory path:" + location, e);
}
}

private List<HudiFileGroup> buildFileGroups(
FileIterator partitionFiles,
HudiTimeline timeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testDefaults()
.setSplitLoaderParallelism(4)
.setSplitGeneratorParallelism(4)
.setPerTransactionMetastoreCacheMaximumSize(2000)
.setQueryPartitionFilterRequired(false));
.setQueryPartitionFilterRequired(false)
.setIgnoreAbsentPartitions(false));
}

@Test
Expand All @@ -58,6 +59,7 @@ public void testExplicitPropertyMappings()
.put("hudi.split-generator-parallelism", "32")
.put("hudi.per-transaction-metastore-cache-maximum-size", "1000")
.put("hudi.query-partition-filter-required", "true")
.put("hudi.ignore-absent-partitions", "true")
.buildOrThrow();

HudiConfig expected = new HudiConfig()
Expand All @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings()
.setSplitLoaderParallelism(16)
.setSplitGeneratorParallelism(32)
.setPerTransactionMetastoreCacheMaximumSize(1000)
.setQueryPartitionFilterRequired(true);
.setQueryPartitionFilterRequired(true)
.setIgnoreAbsentPartitions(true);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit aeb9d4a

Please sign in to comment.