Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hudi connector gets stuck #19506 #20027

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Additionally, following configuration properties can be set depending on the use
or `CAST(part_key AS INTEGER) % 2 = 0` are not recognized as partition filters,
and queries using such expressions fail if the property is set to `true`.
- `false`
* - `hudi.ignore-absent-partitions`
- Ignore partitions when the file system location does not exist rather than
failing the query. This skips data that may be expected to be part of the
table.
- `false`

:::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class HudiConfig
private int splitGeneratorParallelism = 4;
private long perTransactionMetastoreCacheMaximumSize = 2000;
private boolean queryPartitionFilterRequired;
private boolean ignoreAbsentPartitions;

public List<String> getColumnsToHide()
{
Expand Down Expand Up @@ -203,4 +204,16 @@ public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}

@Config("hudi.ignore-absent-partitions")
public HudiConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions)
{
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
return this;
}

public boolean isIgnoreAbsentPartitions()
{
return ignoreAbsentPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class HudiSessionProperties
private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits";
private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -125,6 +126,11 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
QUERY_PARTITION_FILTER_REQUIRED,
"Require a filter on at least one partition column",
hudiConfig.isQueryPartitionFilterRequired(),
false),
booleanProperty(
IGNORE_ABSENT_PARTITIONS,
"Ignore absent partitions",
hudiConfig.isIgnoreAbsentPartitions(),
false));
}

Expand Down Expand Up @@ -189,4 +195,9 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isIgnoreAbsentPartitions(ConnectorSession session)
{
return session.getProperty(IGNORE_ABSENT_PARTITIONS, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize;
import static io.trino.plugin.hudi.HudiSessionProperties.isIgnoreAbsentPartitions;
import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled;
import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -81,7 +83,8 @@ public HudiSplitSource(
metastore,
table,
partitionColumnHandles,
partitions);
partitions,
!tableHandle.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session));

this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor);
HudiBackgroundSplitLoader splitLoader = new HudiBackgroundSplitLoader(
Expand All @@ -91,7 +94,12 @@ public HudiSplitSource(
queue,
new BoundedExecutor(executor, getSplitGeneratorParallelism(session)),
createSplitWeightProvider(session),
partitions);
partitions,
throwable -> {
trinoException.compareAndSet(null, new TrinoException(HUDI_CANNOT_OPEN_SPLIT,
"Failed to generate splits for " + table.getSchemaTableName(), throwable));
queue.finish();
});
this.splitLoaderFuture = splitLoaderExecutorService.schedule(splitLoader, 0, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public HudiReadOptimizedDirectoryLister(
HiveMetastore hiveMetastore,
Table hiveTable,
List<HiveColumnHandle> partitionColumnHandles,
List<String> hivePartitionNames)
List<String> hivePartitionNames,
boolean ignoreAbsentPartitions)
{
this.fileSystemView = new HudiTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
this.fileSystemView = new HudiTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), ignoreAbsentPartitions);
this.partitionColumns = hiveTable.getPartitionColumns();
this.allPartitionInfoMap = hivePartitionNames.stream()
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hudi.split;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hudi.HudiTableHandle;
import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader;
Expand All @@ -28,8 +29,10 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
import static java.util.Objects.requireNonNull;
Expand All @@ -43,6 +46,7 @@ public class HudiBackgroundSplitLoader
private final int splitGeneratorNumThreads;
private final HudiSplitFactory hudiSplitFactory;
private final List<String> partitions;
private final Consumer<Throwable> errorListener;

public HudiBackgroundSplitLoader(
ConnectorSession session,
Expand All @@ -51,44 +55,50 @@ public HudiBackgroundSplitLoader(
AsyncQueue<ConnectorSplit> asyncQueue,
Executor splitGeneratorExecutor,
HudiSplitWeightProvider hudiSplitWeightProvider,
List<String> partitions)
List<String> partitions,
Consumer<Throwable> errorListener)
{
this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null");
this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null");
this.splitGeneratorExecutor = requireNonNull(splitGeneratorExecutor, "splitGeneratorExecutorService is null");
this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session);
this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider);
this.partitions = requireNonNull(partitions, "partitions is null");
this.errorListener = requireNonNull(errorListener, "errorListener is null");
}

@Override
public void run()
{
Deque<String> partitionQueue = new ConcurrentLinkedDeque<>(partitions);
List<HudiPartitionInfoLoader> splitGeneratorList = new ArrayList<>();
List<Future> splitGeneratorFutures = new ArrayList<>();
List<ListenableFuture<Void>> splitGeneratorFutures = new ArrayList<>();

// Start a number of partition split generators to generate the splits in parallel
for (int i = 0; i < splitGeneratorNumThreads; i++) {
HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue);
splitGeneratorList.add(generator);
splitGeneratorFutures.add(Futures.submit(generator, splitGeneratorExecutor));
ListenableFuture<Void> future = Futures.submit(generator, splitGeneratorExecutor);
addExceptionCallback(future, errorListener);
splitGeneratorFutures.add(future);
}

for (HudiPartitionInfoLoader generator : splitGeneratorList) {
// Let the split generator stop once the partition queue is empty
generator.stopRunning();
}

// Wait for all split generators to finish
for (Future future : splitGeneratorFutures) {
try {
future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
try {
// Wait for all split generators to finish
Futures.whenAllComplete(splitGeneratorFutures)
.run(asyncQueue::finish, directExecutor())
.get();
}
catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
}
asyncQueue.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hudi.compaction.CompactionOperation;
import io.trino.plugin.hudi.compaction.HudiCompactionOperation;
import io.trino.plugin.hudi.compaction.HudiCompactionPlan;
Expand Down Expand Up @@ -56,6 +57,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_PARTITION_NOT_FOUND;
import static io.trino.plugin.hudi.files.FSUtils.LOG_FILE_PATTERN;
import static io.trino.plugin.hudi.files.FSUtils.getPartitionLocation;
import static java.util.function.Function.identity;
Expand All @@ -80,8 +83,9 @@ public class HudiTableFileSystemView
private Map<String, List<HudiFileGroup>> partitionToFileGroupsMap;
private Map<HudiFileGroupId, Entry<String, CompactionOperation>> fgIdToPendingCompaction;
private Map<HudiFileGroupId, HudiInstant> fgIdToReplaceInstants;
private boolean ignoreAbsentPartitions;

public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visibleActiveTimeline)
public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visibleActiveTimeline, boolean ignoreAbsentPartitions)
{
partitionToFileGroupsMap = new ConcurrentHashMap<>();
this.metaClient = metaClient;
Expand All @@ -90,6 +94,7 @@ public HudiTableFileSystemView(HudiTableMetaClient metaClient, HudiTimeline visi
resetPendingCompactionOperations(getAllPendingCompactionOperations(metaClient)
.values().stream()
.map(pair -> Map.entry(pair.getKey(), CompactionOperation.convertFromAvroRecordInstance(pair.getValue()))));
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
}

private static Map<HudiFileGroupId, Entry<String, HudiCompactionOperation>> getAllPendingCompactionOperations(
Expand Down Expand Up @@ -292,6 +297,9 @@ private FileIterator listPartition(Location partitionLocation)
if (fileIterator.hasNext()) {
return fileIterator;
}
if (!ignoreAbsentPartitions) {
checkPartitionLocationExists(metaClient.getFileSystem(), partitionLocation);
}
return FileIterator.empty();
}

Expand All @@ -310,6 +318,18 @@ public List<HudiFileGroup> addFilesToView(FileIterator partitionFiles)
return fileGroups;
}

private static void checkPartitionLocationExists(TrinoFileSystem fileSystem, Location location)
{
try {
if (!fileSystem.directoryExists(location).orElse(true)) {
throw new TrinoException(HUDI_PARTITION_NOT_FOUND, "Partition location does not exist: " + location);
}
}
catch (IOException e) {
throw new TrinoException(HUDI_FILESYSTEM_ERROR, "Failed checking directory path:" + location, e);
}
}

private List<HudiFileGroup> buildFileGroups(
FileIterator partitionFiles,
HudiTimeline timeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void testDefaults()
.setSplitLoaderParallelism(4)
.setSplitGeneratorParallelism(4)
.setPerTransactionMetastoreCacheMaximumSize(2000)
.setQueryPartitionFilterRequired(false));
.setQueryPartitionFilterRequired(false)
.setIgnoreAbsentPartitions(false));
}

@Test
Expand All @@ -59,6 +60,7 @@ public void testExplicitPropertyMappings()
.put("hudi.split-generator-parallelism", "32")
.put("hudi.per-transaction-metastore-cache-maximum-size", "1000")
.put("hudi.query-partition-filter-required", "true")
.put("hudi.ignore-absent-partitions", "true")
.buildOrThrow();

HudiConfig expected = new HudiConfig()
Expand All @@ -72,7 +74,8 @@ public void testExplicitPropertyMappings()
.setSplitLoaderParallelism(16)
.setSplitGeneratorParallelism(32)
.setPerTransactionMetastoreCacheMaximumSize(1000)
.setQueryPartitionFilterRequired(true);
.setQueryPartitionFilterRequired(true)
.setIgnoreAbsentPartitions(true);

assertFullMapping(properties, expected);
}
Expand Down