From bf6fcdb05f57f819816f68c1bd370fdc002f0fb3 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 12 Apr 2024 18:32:28 -0500 Subject: [PATCH] Added support to read partitioned parquet files from S3 (#5206) Breaking Change: Renamed KeyValuePartitionLayout to FileKeyValuePartitionLayout. --- .../java/io/deephaven/base/FileUtils.java | 15 +- .../util/channel/CachedChannelProvider.java | 11 + .../util/channel/LocalFSChannelProvider.java | 17 ++ .../channel/SeekableChannelsProvider.java | 25 +++ .../channel/CachedChannelProviderTest.java | 14 ++ .../local/FileKeyValuePartitionLayout.java | 141 +++++++++++++ .../local/KeyValuePartitionLayout.java | 195 +++++++----------- .../URIStreamKeyValuePartitionLayout.java | 119 +++++++++++ ...a => TestFileKeyValuePartitionLayout.java} | 21 +- .../deephaven/parquet/base/ParquetUtils.java | 30 ++- .../deephaven/parquet/table/ParquetTools.java | 151 +++++++++++--- .../table/layout/LocationTableBuilderCsv.java | 8 +- .../layout/ParquetFlatPartitionedLayout.java | 75 ++++--- .../ParquetKeyValuePartitionedLayout.java | 78 ++++++- .../table/ParquetTableReadWriteTest.java | 141 ++++++++++++- extensions/s3/build.gradle | 2 +- .../extensions/s3/S3ChannelContext.java | 85 +++++--- .../extensions/s3/S3Instructions.java | 9 +- .../extensions/s3/S3SeekableByteChannel.java | 9 +- .../s3/S3SeekableChannelProvider.java | 131 ++++++++++++ .../s3/S3SeekableChannelProviderPlugin.java | 11 +- .../TrackedSeekableChannelsProvider.java | 17 ++ ...TrackedSeekableChannelsProviderPlugin.java | 6 +- py/server/deephaven/parquet.py | 8 +- 24 files changed, 1065 insertions(+), 254 deletions(-) create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileKeyValuePartitionLayout.java create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java rename engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/{TestKeyValuePartitionLayout.java => TestFileKeyValuePartitionLayout.java} (94%) diff --git a/Base/src/main/java/io/deephaven/base/FileUtils.java b/Base/src/main/java/io/deephaven/base/FileUtils.java index 367c6896a05..8f13f724859 100644 --- a/Base/src/main/java/io/deephaven/base/FileUtils.java +++ b/Base/src/main/java/io/deephaven/base/FileUtils.java @@ -12,6 +12,7 @@ import java.net.URISyntaxException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.regex.Pattern; public class FileUtils { private final static FileFilter DIRECTORY_FILE_FILTER = new FileFilter() { @@ -29,6 +30,8 @@ public boolean accept(File dir, String name) { }; private final static String[] EMPTY_STRING_ARRAY = new String[0]; + public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+"); + /** * Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an * empty directory). @@ -254,7 +257,8 @@ public boolean accept(File pathname) { } /** - * Take the file source path or URI string and convert it to a URI object. + * Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be + * removed. * * @param source The file source path or URI * @param isDirectory Whether the source is a directory @@ -264,9 +268,16 @@ public static URI convertToURI(final String source, final boolean isDirectory) { if (source.isEmpty()) { throw new IllegalArgumentException("Cannot convert empty source to URI"); } - final URI uri; + URI uri; try { uri = new URI(source); + // Replace two or more consecutive slashes in the path with a single slash + final String path = uri.getPath(); + if (path.contains("//")) { + final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/"); + uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath, + uri.getQuery(), uri.getFragment()); + } } catch (final URISyntaxException e) { // If the URI is invalid, assume it's a file path return convertToURI(new File(source), isDirectory); diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index 408c69d47cc..2927ce5b706 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -19,6 +19,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.*; +import java.util.stream.Stream; /** * {@link SeekableChannelsProvider Channel provider} that will cache a bounded number of unused channels. @@ -109,6 +110,16 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole // end no matter what. } + @Override + public Stream list(@NotNull final URI directory) throws IOException { + return wrappedProvider.list(directory); + } + + @Override + public Stream walk(@NotNull final URI directory) throws IOException { + return wrappedProvider.walk(directory); + } + @Nullable private synchronized CachedChannel tryGetPooledChannel(@NotNull final String pathKey, @NotNull final KeyedObjectHashMap channelPool) { diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java index fe24270ad73..5d902447180 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java @@ -3,6 +3,7 @@ // package io.deephaven.util.channel; +import io.deephaven.base.FileUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -12,8 +13,10 @@ import java.net.URI; import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.stream.Stream; public class LocalFSChannelProvider implements SeekableChannelsProvider { @Override @@ -56,6 +59,20 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b return result; } + @Override + public final Stream list(@NotNull final URI directory) throws IOException { + // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in + // the processor. + return Files.list(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); + } + + @Override + public final Stream walk(@NotNull final URI directory) throws IOException { + // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in + // the processor. + return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); + } + @Override public void close() {} } diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index ba7bcf49c74..91831b60400 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -12,6 +12,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.stream.Stream; import static io.deephaven.base.FileUtils.convertToURI; @@ -85,4 +86,28 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo } SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException; + + /** + * Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive. + * The URIs supplied by the stream will not have any unnecessary slashes or path separators. Also, the URIs will be + * file URIs (not ending with "/") irrespective of whether the URI corresponds to a file or a directory. The caller + * should manage file vs. directory handling in the processor. The caller is also responsible for closing the + * stream, preferably using a try-with-resources block. + * + * @param directory the URI of the directory to list + * @return The {@link Stream} of {@link URI}s + */ + Stream list(@NotNull URI directory) throws IOException; + + /** + * Returns a stream of URIs, the elements of which are all the files in the file tree rooted at the given starting + * directory. The URIs supplied by the stream will not have any unnecessary slashes or path separators. Also, the + * URIs will be file URIs (not ending with "/") irrespective of whether the URI corresponds to a file or a + * directory. The caller should manage file vs. directory handling in the processor. The caller is also responsible + * for closing the stream, preferably using a try-with-resources block. + * + * @param directory the URI of the directory to walk + * @return The {@link Stream} of {@link URI}s + */ + Stream walk(@NotNull URI directory) throws IOException; } diff --git a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java index 59c42dc39be..1fad31eb918 100644 --- a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java +++ b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java @@ -3,6 +3,7 @@ // package io.deephaven.util.channel; +import io.deephaven.base.FileUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; @@ -12,10 +13,13 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -215,6 +219,16 @@ public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) { return new TestMockChannel(count.getAndIncrement(), path.toString()); } + @Override + public final Stream list(@NotNull final URI directory) { + throw new UnsupportedOperationException("list"); + } + + @Override + public final Stream walk(@NotNull final URI directory) { + throw new UnsupportedOperationException("walk"); + } + @Override public void close() {} } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileKeyValuePartitionLayout.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileKeyValuePartitionLayout.java new file mode 100644 index 00000000000..e3fd32aada3 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileKeyValuePartitionLayout.java @@ -0,0 +1,141 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl.locations.local; + +import gnu.trove.map.TIntObjectMap; +import gnu.trove.map.hash.TIntObjectHashMap; +import io.deephaven.base.verify.Require; +import io.deephaven.engine.table.Table; +import io.deephaven.api.util.NameValidator; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.TableLocationKey; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.*; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; + + +/** + * {@link TableLocationKeyFinder Location finder} that will take a directory file, traverse the directory hierarchy and + * infer partitions from key-value pairs in the directory names, for example: + * + *
+ * tableRootDirectory/Country=France/City=Paris/parisData.parquet
+ * 
+ * + * Traversal is depth-first, and assumes that target files will only be found at a single depth. This class is + * specialized for handling of files. For handling of URIs, see {@link URIStreamKeyValuePartitionLayout}. + * + * @implNote Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set)}. + */ +public class FileKeyValuePartitionLayout + extends KeyValuePartitionLayout + implements TableLocationKeyFinder { + + private final File tableRootDirectory; + private final Predicate pathFilter; + private final Supplier locationTableBuilderFactory; + private final int maxPartitioningLevels; + + /** + * @param tableRootDirectory The directory to traverse from + * @param pathFilter Filter to determine whether a regular file should be used to create a key + * @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition + * information; as builders are typically stateful, a new builder is created each time this + * {@link KeyValuePartitionLayout} is used to {@link #findKeys(Consumer) find keys} + * @param keyFactory Factory function used to generate table location keys from target files and partition values + * @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only + * look at files in {@code tableRootDirectory} and find no partitions. + */ + public FileKeyValuePartitionLayout( + @NotNull final File tableRootDirectory, + @NotNull final Predicate pathFilter, + @NotNull final Supplier locationTableBuilderFactory, + @NotNull final BiFunction>, TLK> keyFactory, + final int maxPartitioningLevels) { + super(keyFactory); + this.tableRootDirectory = tableRootDirectory; + this.pathFilter = pathFilter; + this.locationTableBuilderFactory = locationTableBuilderFactory; + this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels"); + } + + @Override + public String toString() { + return FileKeyValuePartitionLayout.class.getSimpleName() + '[' + tableRootDirectory + ']'; + } + + @Override + public void findKeys(@NotNull final Consumer locationKeyObserver) { + final Queue targetFiles = new ArrayDeque<>(); + final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get(); + try { + Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS), + maxPartitioningLevels + 1, new SimpleFileVisitor<>() { + final Set partitionKeys = new LinkedHashSet<>(); // Preserve order of insertion + final List partitionValues = new ArrayList<>(); + final TIntObjectMap partitionColInfo = new TIntObjectHashMap<>(); + boolean registered; + int columnCount = -1; + + @Override + public FileVisitResult preVisitDirectory( + @NotNull final Path dir, + @NotNull final BasicFileAttributes attrs) { + final String dirName = dir.getFileName().toString(); + // Skip dot directories + if (!dirName.isEmpty() && dirName.charAt(0) == '.') { + return FileVisitResult.SKIP_SUBTREE; + } + if (++columnCount > 0) { + // We're descending and past the root + final int columnIndex = columnCount - 1; + processSubdirectoryInternal(dirName, dir.toString(), columnIndex, partitionKeys, + partitionValues, partitionColInfo); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile( + @NotNull final Path file, + @NotNull final BasicFileAttributes attrs) { + if (attrs.isRegularFile() && pathFilter.test(file)) { + if (!registered) { + locationTableBuilder.registerPartitionKeys(partitionKeys); + registered = true; + } + locationTableBuilder.acceptLocation(partitionValues); + targetFiles.add(file); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory( + @NotNull final Path dir, + @Nullable final IOException exc) throws IOException { + if (--columnCount >= 0) { + partitionValues.remove(columnCount); + } + return super.postVisitDirectory(dir, exc); + } + }); + } catch (IOException e) { + throw new TableDataException("Error finding locations for under " + tableRootDirectory, e); + } + + final Table locationTable = locationTableBuilder.build(); + buildLocationKeys(locationTable, targetFiles, locationKeyObserver); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/KeyValuePartitionLayout.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/KeyValuePartitionLayout.java index cdd0ba042e0..0c98a4fa4c6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/KeyValuePartitionLayout.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/KeyValuePartitionLayout.java @@ -3,39 +3,29 @@ // package io.deephaven.engine.table.impl.locations.local; -import io.deephaven.base.verify.Require; -import io.deephaven.engine.table.Table; +import gnu.trove.map.TIntObjectMap; import io.deephaven.api.util.NameValidator; +import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.engine.table.ColumnSource; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import java.io.File; -import java.io.IOException; -import java.nio.file.*; -import java.nio.file.attribute.BasicFileAttributes; import java.util.*; +import java.net.URI; +import java.nio.file.Path; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.function.Supplier; /** - * {@link TableLocationKeyFinder Location finder} that will traverse a directory hierarchy and infer partitions from - * key-value pairs in the directory names, for example: - * - *
- * tableRootDirectory/Country=France/City=Paris/parisData.parquet
- * 
- * - * Traversal is depth-first, and assumes that target files will only be found at a single depth. + * Base class for {@link TableLocationKeyFinder location finders} that traverse file hierarchy to infer partitions. * - * @implNote Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set)}. + * @param The type of {@link TableLocationKey} to be generated + * @param The type of files used to generate location keys, like a {@link URI} or a {@link Path} */ -public class KeyValuePartitionLayout implements TableLocationKeyFinder { +public abstract class KeyValuePartitionLayout + implements TableLocationKeyFinder { /** * Interface for implementations to perform type coercion and specify a table of partition values for observed table @@ -53,11 +43,11 @@ public interface LocationTableBuilder { /** * Accept an ordered collection of {@link String strings} representing partition values for a particular table - * location, parallel to a previously-registered collection of partition keys. Should be called after a single + * location, parallel to a previously registered collection of partition keys. Should be called after a single * call to {@link #registerPartitionKeys(Collection) registerPartitionKeys}. - * - * @param partitionValueStrings The partition values to accept. Must have the same length as the - * previously-registered partition keys. + * + * @param partitionValueStrings The partition values to accept. Must have the same length as the previously + * registered partition keys. */ void acceptLocation(@NotNull Collection partitionValueStrings); @@ -73,114 +63,83 @@ public interface LocationTableBuilder { Table build(); } - private final File tableRootDirectory; - private final Predicate pathFilter; - private final Supplier locationTableBuilderFactory; - private final BiFunction>, TLK> keyFactory; - private final int maxPartitioningLevels; + private final BiFunction>, TLK> keyFactory; /** - * @param tableRootDirectory The directory to traverse from - * @param pathFilter Filter to determine whether a regular file should be used to create a key - * @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition - * information; as builders are typically stateful, a new builder is created each time this - * KeyValuePartitionLayout is used to {@link #findKeys(Consumer) find keys} - * @param keyFactory Key factory function - * @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only - * look at files in {@code tableRootDirectory} and find no partitions. + * @param keyFactory Factory function used to generate table location keys from target files and partition values */ - public KeyValuePartitionLayout( - @NotNull final File tableRootDirectory, - @NotNull final Predicate pathFilter, - @NotNull final Supplier locationTableBuilderFactory, - @NotNull final BiFunction>, TLK> keyFactory, - final int maxPartitioningLevels) { - this.tableRootDirectory = tableRootDirectory; - this.pathFilter = pathFilter; - this.locationTableBuilderFactory = locationTableBuilderFactory; + KeyValuePartitionLayout(@NotNull final BiFunction>, TLK> keyFactory) { this.keyFactory = keyFactory; - this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels"); } + @Override public String toString() { - return KeyValuePartitionLayout.class.getSimpleName() + '[' + tableRootDirectory + ']'; + return getClass().getSimpleName(); } - @Override - public void findKeys(@NotNull final Consumer locationKeyObserver) { - final Deque targetFiles = new ArrayDeque<>(); - final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get(); - try { - Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS), - maxPartitioningLevels + 1, new SimpleFileVisitor<>() { - final Set takenNames = new HashSet<>(); - final List partitionKeys = new ArrayList<>(); - final List partitionValues = new ArrayList<>(); - boolean registered; - int columnCount = -1; - - @Override - public FileVisitResult preVisitDirectory( - @NotNull final Path dir, - @NotNull final BasicFileAttributes attrs) { - final String dirName = dir.getFileName().toString(); - // Skip dot directories - if (!dirName.isEmpty() && dirName.charAt(0) == '.') { - return FileVisitResult.SKIP_SUBTREE; - } - if (++columnCount > 0) { - // We're descending and past the root - final String[] components = dirName.split("=", 2); - if (components.length != 2) { - throw new TableDataException( - "Unexpected directory name format (not key=value) at " + dir); - } - final String columnKey = NameValidator.legalizeColumnName(components[0], takenNames); - final int columnIndex = columnCount - 1; - if (columnCount > partitionKeys.size()) { - partitionKeys.add(columnKey); - } else if (!partitionKeys.get(columnIndex).equals(columnKey)) { - throw new TableDataException(String.format( - "Column name mismatch at column index %d: expected %s found %s at %s", - columnIndex, partitionKeys.get(columnIndex), columnKey, dir)); - } - final String columnValue = components[1]; - partitionValues.add(columnValue); - } - return FileVisitResult.CONTINUE; - } + static class ColumnNameInfo { + final String columnName; // Name extracted from directory + final String legalizedColumnName; - @Override - public FileVisitResult visitFile( - @NotNull final Path file, - @NotNull final BasicFileAttributes attrs) { - if (attrs.isRegularFile() && pathFilter.test(file)) { - if (!registered) { - locationTableBuilder.registerPartitionKeys(partitionKeys); - registered = true; - } - locationTableBuilder.acceptLocation(partitionValues); - targetFiles.add(file); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory( - @NotNull final Path dir, - @Nullable final IOException exc) throws IOException { - if (--columnCount >= 0) { - partitionValues.remove(columnCount); - } - return super.postVisitDirectory(dir, exc); - } - }); - } catch (IOException e) { - throw new TableDataException("Error finding locations for under " + tableRootDirectory, e); + ColumnNameInfo(@NotNull final String columnName, @NotNull final String legalizedColumnName) { + this.columnName = columnName; + this.legalizedColumnName = legalizedColumnName; } + } - final Table locationTable = locationTableBuilder.build(); + /** + * Process a subdirectory, extracting a column name and value and updating the partition keys and values. + * + * @param dirName The name of the directory + * @param path The path of the directory + * @param colIndex The index of the column + * @param partitionKeys The partition keys, to be updated with the + * {@link NameValidator#legalizeColumnName(String, Set) legalized} column names as new columns are + * encountered + * @param partitionValues The partition values, to be updated with the value extracted from the directory name + * @param partitionColInfo The map of column index to column name info, to be updated with the column name info as + * new columns are encountered + */ + static void processSubdirectoryInternal( + @NotNull final String dirName, + @NotNull final String path, + final int colIndex, + @NotNull final Set partitionKeys, + @NotNull final Collection partitionValues, + @NotNull final TIntObjectMap partitionColInfo) { + final String[] components = dirName.split("=", 2); + if (components.length != 2) { + throw new TableDataException("Unexpected directory name format (not key=value) at " + path); + } + final String columnName = components[0]; + final String legalizedColumnName; + if (partitionColInfo.containsKey(colIndex)) { + final ColumnNameInfo existing = partitionColInfo.get(colIndex); + if (!existing.columnName.equals(columnName)) { + throw new TableDataException(String.format( + "Column name mismatch at column index %d: expected %s found %s at %s", + colIndex, existing.columnName, columnName, path)); + } + } else { + legalizedColumnName = NameValidator.legalizeColumnName(columnName, partitionKeys); + partitionKeys.add(legalizedColumnName); + partitionColInfo.put(colIndex, new ColumnNameInfo(columnName, legalizedColumnName)); + } + final String columnValue = components[1]; + partitionValues.add(columnValue); + } + /** + * Build location keys from a location table and a collection of target files. + * + * @param locationTable The location table + * @param targetFiles The target files + * @param locationKeyObserver A consumer which will receive the location keys + */ + final void buildLocationKeys( + @NotNull final Table locationTable, + @NotNull final Queue targetFiles, + @NotNull final Consumer locationKeyObserver) { final Map> partitions = new LinkedHashMap<>(); // Note that we allow the location table to define partition priority order. final String[] partitionKeys = locationTable.getDefinition().getColumnNamesArray(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java new file mode 100644 index 00000000000..d70055dcd04 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java @@ -0,0 +1,119 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl.locations.local; + +import gnu.trove.map.TIntObjectMap; +import gnu.trove.map.hash.TIntObjectHashMap; +import io.deephaven.base.verify.Require; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.TableLocationKey; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * Extracts a key-value partitioned table layout from a stream of URIs. + */ +public abstract class URIStreamKeyValuePartitionLayout + extends KeyValuePartitionLayout { + + private static final String URI_SEPARATOR = "/"; + + protected final URI tableRootDirectory; + private final Supplier locationTableBuilderFactory; + private final int maxPartitioningLevels; + + /** + * @param tableRootDirectory The directory to traverse from + * @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition + * information; as builders are typically stateful, a new builder is created each time this + * {@link KeyValuePartitionLayout} is used to {@link #findKeys(Consumer) find keys} + * @param keyFactory Factory function used to generate table location keys from target files and partition values + * @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only + * look at files in {@code tableRootDirectory} and find no partitions. + */ + protected URIStreamKeyValuePartitionLayout( + @NotNull final URI tableRootDirectory, + @NotNull final Supplier locationTableBuilderFactory, + @NotNull final BiFunction>, TLK> keyFactory, + final int maxPartitioningLevels) { + super(keyFactory); + this.tableRootDirectory = tableRootDirectory; + this.locationTableBuilderFactory = locationTableBuilderFactory; + this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels"); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '[' + tableRootDirectory + ']'; + } + + /** + * Find the keys in the given URI stream and notify the observer. Note that the URIs are not expected to have any + * extra slashes or other path separators. For example, "/a//b/c" in the path is not expected. + */ + protected final void findKeys(@NotNull final Stream uriStream, + @NotNull final Consumer locationKeyObserver) { + final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get(); + final Queue targetURIs = new ArrayDeque<>(); + final Set partitionKeys = new LinkedHashSet<>(); // Preserve order of insertion + final TIntObjectMap partitionColInfo = new TIntObjectHashMap<>(); + final MutableBoolean registered = new MutableBoolean(false); + uriStream.forEachOrdered(uri -> { + final Collection partitionValues = new ArrayList<>(); + final URI relativePath = tableRootDirectory.relativize(uri); + getPartitions(relativePath, partitionKeys, partitionValues, partitionColInfo, registered.booleanValue()); + if (registered.isFalse()) { + // Use the first path to find the partition keys and use the same for the rest + locationTableBuilder.registerPartitionKeys(partitionKeys); + registered.setTrue(); + } + // Use the partition values from each path to build the location table + locationTableBuilder.acceptLocation(partitionValues); + targetURIs.add(uri); + }); + final Table locationTable = locationTableBuilder.build(); + buildLocationKeys(locationTable, targetURIs, locationKeyObserver); + } + + private void getPartitions(@NotNull final URI relativePath, + @NotNull final Set partitionKeys, + @NotNull final Collection partitionValues, + @NotNull final TIntObjectMap partitionColInfo, + final boolean registered) { + final String relativePathString = relativePath.getPath(); + // The following assumes that there is exactly one URI_SEPARATOR between each subdirectory in the path + final String[] subDirs = relativePathString.split(URI_SEPARATOR); + final int numPartitioningCol = subDirs.length - 1; + if (registered) { + if (numPartitioningCol > partitionKeys.size()) { + throw new TableDataException("Too many partitioning levels at " + relativePathString + " (expected " + + partitionKeys.size() + ") based on earlier leaf nodes in the tree."); + } + } else { + if (numPartitioningCol > maxPartitioningLevels) { + throw new TableDataException("Too many partitioning levels at " + relativePathString + ", count = " + + numPartitioningCol + ", maximum expected are " + maxPartitioningLevels); + } + } + for (int partitioningColIndex = 0; partitioningColIndex < numPartitioningCol; partitioningColIndex++) { + processSubdirectoryInternal(subDirs[partitioningColIndex], relativePathString, partitioningColIndex, + partitionKeys, partitionValues, partitionColInfo); + } + } +} + diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestKeyValuePartitionLayout.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestFileKeyValuePartitionLayout.java similarity index 94% rename from engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestKeyValuePartitionLayout.java rename to engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestFileKeyValuePartitionLayout.java index d830c735ead..c602f024172 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestKeyValuePartitionLayout.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/TestFileKeyValuePartitionLayout.java @@ -11,7 +11,7 @@ import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout.LocationTableBuilder; import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition; import io.deephaven.engine.testutil.junit4.EngineCleanup; -import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout; +import io.deephaven.engine.table.impl.locations.local.FileKeyValuePartitionLayout; import io.deephaven.parquet.table.layout.LocationTableBuilderCsv; import junit.framework.TestCase; import org.junit.After; @@ -28,10 +28,10 @@ import java.util.stream.Collectors; /** - * Unit tests for {@link TestKeyValuePartitionLayout}. + * Unit tests for {@link FileKeyValuePartitionLayout}. */ @SuppressWarnings("ResultOfMethodCallIgnored") -public class TestKeyValuePartitionLayout { +public class TestFileKeyValuePartitionLayout { @Rule final public EngineCleanup framework = new EngineCleanup(); @@ -57,7 +57,7 @@ public void testFlat() throws IOException { Files.write(file2.toPath(), "Goodbye cruel world!".getBytes()); final RecordingLocationKeyFinder recorder = new RecordingLocationKeyFinder<>(); - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 0).findKeys(recorder); final List results = recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList()); @@ -81,7 +81,7 @@ public void testOneLevel() throws IOException { Files.write(file2.toPath(), "Goodbye cruel world!".getBytes()); final RecordingLocationKeyFinder recorder = new RecordingLocationKeyFinder<>(); - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 1).findKeys(recorder); final List results = recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList()); @@ -114,7 +114,7 @@ public void testThreeLevels() throws IOException { Files.write(file3.toPath(), "Oui!".getBytes()); final RecordingLocationKeyFinder recorder = new RecordingLocationKeyFinder<>(); - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder); final List results = recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList()); @@ -164,7 +164,7 @@ public void testTypesAndNameLegalization() throws IOException { ColumnDefinition.ofBoolean("C").withPartitioning(), ColumnDefinition.ofDouble("B1").withPartitioning()))); for (final Supplier locationTableBuilderSupplier : locationTableBuilderSuppliers) { - final TableLocationKeyFinder finder = new KeyValuePartitionLayout<>( + final TableLocationKeyFinder finder = new FileKeyValuePartitionLayout<>( dataDirectory, path -> true, locationTableBuilderSupplier, (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3); @@ -218,7 +218,7 @@ public void testMaxDepthEmpty() throws IOException { Files.write(file3.toPath(), "Oui!".getBytes()); final RecordingLocationKeyFinder recorder = new RecordingLocationKeyFinder<>(); - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder); final List results = recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList()); @@ -246,7 +246,7 @@ public void testMaxDepth() throws IOException { Files.write(file4.toPath(), "Non!".getBytes()); final RecordingLocationKeyFinder recorder = new RecordingLocationKeyFinder<>(); - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder); final List results = recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList()); @@ -274,7 +274,8 @@ public void testMismatch() throws IOException { Files.write(file3.toPath(), "Oui!".getBytes()); try { - new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory), + new FileKeyValuePartitionLayout<>(dataDirectory, path -> true, + () -> new LocationTableBuilderCsv(dataDirectory), (path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(ftlk -> { }); TestCase.fail("Expected exception"); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java index 604444ae8be..12f7f8e9ce1 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java @@ -3,6 +3,8 @@ // package io.deephaven.parquet.base; +import org.jetbrains.annotations.NotNull; + import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -25,14 +27,6 @@ public final class ParquetUtils { */ public static final String METADATA_KEY = "deephaven"; - /** - * Used as a filter to select relevant parquet files while reading all files in a directory. - */ - public static boolean fileNameMatches(final Path path) { - final String fileName = path.getFileName().toString(); - return fileName.endsWith(PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.'; - } - /** * @return the key value derived from the file path, used for storing each file's metadata in the combined * {@value #METADATA_FILE_NAME} and {@value #COMMON_METADATA_FILE_NAME} files. @@ -40,4 +34,24 @@ public static boolean fileNameMatches(final Path path) { public static String getPerFileMetadataKey(final String filePath) { return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_'); } + + /** + * Check if the provided path points to a non-hidden parquet file, and that none of its parents (till rootDir) are + * hidden. + */ + public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull final Path filePath) { + final String fileName = filePath.getFileName().toString(); + if (!fileName.endsWith(PARQUET_FILE_EXTENSION) || fileName.charAt(0) == '.') { + return false; + } + Path parent = filePath.getParent(); + while (parent != null && !parent.equals(rootDir)) { + final String parentName = parent.getFileName().toString(); + if (!parentName.isEmpty() && parentName.charAt(0) == '.') { + return false; + } + parent = parent.getParent(); + } + return true; + } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index f76af02d108..cc9fbd61745 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -122,7 +122,7 @@ private ParquetTools() {} * @see ParquetFlatPartitionedLayout */ public static Table readTable(@NotNull final String source) { - return readTableInternal(convertParquetSourceToURI(source), ParquetInstructions.EMPTY); + return readTableInternal(source, ParquetInstructions.EMPTY); } /** @@ -153,7 +153,7 @@ public static Table readTable(@NotNull final String source) { public static Table readTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions) { - return readTableInternal(convertParquetSourceToURI(source), readInstructions); + return readTableInternal(source, readInstructions); } /** @@ -211,19 +211,6 @@ public static Table readTable( return readTableInternal(sourceFile, readInstructions); } - /** - * Convert a parquet source to a URI. - * - * @param source The path or URI of parquet file or directory to examine - * @return The URI - */ - private static URI convertParquetSourceToURI(@NotNull final String source) { - if (source.endsWith(".parquet")) { - return convertToURI(source, false); - } - return convertToURI(source, true); - } - /** * Write a table to a file. Data indexes to write are determined by those present on {@code sourceTable}. * @@ -1238,19 +1225,30 @@ private static Table readTableInternal( } /** - * Same as {@link #readTableInternal(File, ParquetInstructions)} but with a URI. + * Similar to {@link #readTableInternal(File, ParquetInstructions)} but with a string source. * - * @param source The source URI + * @param source The source path or URI * @param instructions Instructions for reading * @return A {@link Table} */ private static Table readTableInternal( - @NotNull final URI source, + @NotNull final String source, @NotNull final ParquetInstructions instructions) { - if (!FILE_URI_SCHEME.equals(source.getScheme())) { - return readSingleFileTable(source, instructions); + final boolean isDirectory = !source.endsWith(PARQUET_FILE_EXTENSION); + final URI sourceURI = convertToURI(source, isDirectory); + if (FILE_URI_SCHEME.equals(sourceURI.getScheme())) { + return readTableInternal(new File(sourceURI), instructions); + } + if (!isDirectory) { + return readSingleFileTable(sourceURI, instructions); } - return readTableInternal(new File(source), instructions); + if (source.endsWith(METADATA_FILE_NAME) || source.endsWith(COMMON_METADATA_FILE_NAME)) { + throw new UncheckedDeephavenException("We currently do not support reading parquet metadata files " + + "from non local storage"); + } + // Both flat partitioned and key-value partitioned data can be read under key-value partitioned layout + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(sourceURI, MAX_PARTITIONING_LEVELS_INFERENCE, + instructions), instructions); } private static boolean ignoreDotFiles(Path path) { @@ -1384,9 +1382,10 @@ private static Pair infer( lastKey.getFileReader().getSchema(), lastKey.getMetadata().getFileMetaData().getKeyValueMetaData(), readInstructions); + final Set partitionKeys = lastKey.getPartitionKeys(); final List> allColumns = - new ArrayList<>(lastKey.getPartitionKeys().size() + schemaInfo.getFirst().size()); - for (final String partitionKey : lastKey.getPartitionKeys()) { + new ArrayList<>(partitionKeys.size() + schemaInfo.getFirst().size()); + for (final String partitionKey : partitionKeys) { final Comparable partitionValue = lastKey.getPartitionValue(partitionKey); if (partitionValue == null) { throw new IllegalArgumentException(String.format( @@ -1400,7 +1399,11 @@ private static Pair infer( allColumns.add(ColumnDefinition.fromGenericType(partitionKey, dataType, null, ColumnDefinition.ColumnType.Partitioning)); } - allColumns.addAll(schemaInfo.getFirst()); + // Only read non-partitioning columns from the parquet files + final List> columnDefinitionsFromParquetFile = schemaInfo.getFirst(); + columnDefinitionsFromParquetFile.stream() + .filter(columnDefinition -> !partitionKeys.contains(columnDefinition.getName())) + .forEach(allColumns::add); return new Pair<>(TableDefinition.of(allColumns), schemaInfo.getSecond()); } @@ -1433,7 +1436,7 @@ public static Table readPartitionedTableWithMetadata( * Callers wishing to be more explicit and skip the inference step may prefer to call * {@link #readKeyValuePartitionedTable(File, ParquetInstructions, TableDefinition)}. * - * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param directory the root directory to search for .parquet files * @param readInstructions the instructions for customizations while reading * @return the table * @see ParquetKeyValuePartitionedLayout#ParquetKeyValuePartitionedLayout(File, int, ParquetInstructions) @@ -1442,16 +1445,37 @@ public static Table readPartitionedTableWithMetadata( public static Table readKeyValuePartitionedTable( @NotNull final File directory, @NotNull final ParquetInstructions readInstructions) { - return readPartitionedTable( - new ParquetKeyValuePartitionedLayout(directory, MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions), - readInstructions); + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(directory, MAX_PARTITIONING_LEVELS_INFERENCE, + readInstructions), readInstructions); + } + + /** + * Creates a partitioned table via the key-value partitioned parquet files from the root {@code directory}, + * inferring the table definition from those files. + * + *

+ * Callers wishing to be more explicit and skip the inference step may prefer to call + * {@link #readKeyValuePartitionedTable(String, ParquetInstructions, TableDefinition)}. + * + * @param directory the path or URI for the root directory to search for .parquet files + * @param readInstructions the instructions for customizations while reading + * @return the table + * @see ParquetKeyValuePartitionedLayout#ParquetKeyValuePartitionedLayout(URI, int, ParquetInstructions) + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions) + */ + @Deprecated + public static Table readKeyValuePartitionedTable( + @NotNull final String directory, + @NotNull final ParquetInstructions readInstructions) { + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory, true), + MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions), readInstructions); } /** * Creates a partitioned table via the key-value partitioned parquet files from the root {@code directory} using the * provided {@code tableDefinition}. * - * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param directory the root directory to search for .parquet files * @param readInstructions the instructions for customizations while reading * @param tableDefinition the table definition * @return the table @@ -1470,6 +1494,29 @@ public static Table readKeyValuePartitionedTable( readInstructions, tableDefinition); } + /** + * Creates a partitioned table via the key-value partitioned parquet files from the root {@code directory} using the + * provided {@code tableDefinition}. + * + * @param directory the path or URI for the root directory to search for .parquet files + * @param readInstructions the instructions for customizations while reading + * @param tableDefinition the table definition + * @return the table + * @see ParquetKeyValuePartitionedLayout#ParquetKeyValuePartitionedLayout(URI, TableDefinition, ParquetInstructions) + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions, TableDefinition) + */ + @Deprecated + public static Table readKeyValuePartitionedTable( + @NotNull final String directory, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) { + throw new IllegalArgumentException("No partitioning columns"); + } + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory, true), tableDefinition, + readInstructions), readInstructions, tableDefinition); + } + /** * Creates a partitioned table via the flat parquet files from the root {@code directory}, inferring the table * definition from those files. @@ -1478,7 +1525,7 @@ public static Table readKeyValuePartitionedTable( * Callers wishing to be more explicit and skip the inference step may prefer to call * {@link #readFlatPartitionedTable(File, ParquetInstructions, TableDefinition)}. * - * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param directory the directory to search for .parquet files * @param readInstructions the instructions for customizations while reading * @return the table * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions) @@ -1490,11 +1537,33 @@ public static Table readFlatPartitionedTable( return readPartitionedTable(new ParquetFlatPartitionedLayout(directory, readInstructions), readInstructions); } + /** + * Creates a partitioned table via the flat parquet files from the root {@code directory}, inferring the table + * definition from those files. + * + *

+ * Callers wishing to be more explicit and skip the inference step may prefer to call + * {@link #readFlatPartitionedTable(String, ParquetInstructions, TableDefinition)}. + * + * @param directory the path or URI for the directory to search for .parquet files + * @param readInstructions the instructions for customizations while reading + * @return the table + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions) + * @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(URI, ParquetInstructions) + */ + @Deprecated + public static Table readFlatPartitionedTable( + @NotNull final String directory, + @NotNull final ParquetInstructions readInstructions) { + return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory, true), readInstructions), + readInstructions); + } + /** * Creates a partitioned table via the flat parquet files from the root {@code directory} using the provided * {@code tableDefinition}. * - * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param directory the directory to search for .parquet files * @param readInstructions the instructions for customizations while reading * @param tableDefinition the table definition * @return the table @@ -1509,6 +1578,26 @@ public static Table readFlatPartitionedTable( tableDefinition); } + /** + * Creates a partitioned table via the flat parquet files from the root {@code directory} using the provided + * {@code tableDefinition}. + * + * @param directory the path or URI for the directory to search for .parquet files + * @param readInstructions the instructions for customizations while reading + * @param tableDefinition the table definition + * @return the table + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions, TableDefinition) + * @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(URI, ParquetInstructions) + */ + @Deprecated + public static Table readFlatPartitionedTable( + @NotNull final String directory, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory, true), readInstructions), + readInstructions, tableDefinition); + } + /** * Creates a single table via the parquet {@code file} using the table definition derived from that {@code file}. * diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/LocationTableBuilderCsv.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/LocationTableBuilderCsv.java index 051a3db9033..81d29a5a51f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/LocationTableBuilderCsv.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/LocationTableBuilderCsv.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table.layout; +import io.deephaven.base.FileUtils; import io.deephaven.csv.CsvTools; import io.deephaven.csv.util.CsvReaderException; import io.deephaven.engine.table.Table; @@ -14,6 +15,7 @@ import java.io.ByteArrayInputStream; import java.io.File; +import java.net.URI; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -26,13 +28,17 @@ public final class LocationTableBuilderCsv implements KeyValuePartitionLayout.Lo private static final String LS = System.lineSeparator(); - private final File tableRootDirectory; + private final URI tableRootDirectory; private List partitionKeys; private StringBuilder csvBuilder; private int locationCount; public LocationTableBuilderCsv(@NotNull final File tableRootDirectory) { + this(FileUtils.convertToURI(tableRootDirectory, true)); + } + + LocationTableBuilderCsv(@NotNull final URI tableRootDirectory) { this.tableRootDirectory = tableRootDirectory; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java index 2ac7970e9ea..31914804dc1 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java @@ -3,33 +3,40 @@ // package io.deephaven.parquet.table.layout; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; -import io.deephaven.parquet.base.ParquetUtils; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.jetbrains.annotations.NotNull; import java.io.File; import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; +import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; /** * Parquet {@link TableLocationKeyFinder location finder} that will discover multiple files in a single directory. */ public final class ParquetFlatPartitionedLayout implements TableLocationKeyFinder { - private static ParquetTableLocationKey locationKey(Path path, @NotNull final ParquetInstructions readInstructions) { - return new ParquetTableLocationKey(path.toFile(), 0, null, readInstructions); + private static ParquetTableLocationKey locationKey(@NotNull final URI uri, + @NotNull final ParquetInstructions readInstructions) { + return new ParquetTableLocationKey(uri, 0, null, readInstructions); } - private final File tableRootDirectory; - private final Map cache; + private final URI tableRootDirectory; + private final Map cache; private final ParquetInstructions readInstructions; /** @@ -38,8 +45,17 @@ private static ParquetTableLocationKey locationKey(Path path, @NotNull final Par */ public ParquetFlatPartitionedLayout(@NotNull final File tableRootDirectory, @NotNull final ParquetInstructions readInstructions) { - this.tableRootDirectory = tableRootDirectory; - this.cache = new HashMap<>(); + this(FileUtils.convertToURI(tableRootDirectory, true), readInstructions); + } + + /** + * @param tableRootDirectoryURI The directory URI to search for .parquet files. + * @param readInstructions the instructions for customizations while reading + */ + public ParquetFlatPartitionedLayout(@NotNull final URI tableRootDirectoryURI, + @NotNull final ParquetInstructions readInstructions) { + this.tableRootDirectory = tableRootDirectoryURI; + this.cache = Collections.synchronizedMap(new HashMap<>()); this.readInstructions = readInstructions; } @@ -48,20 +64,33 @@ public String toString() { } @Override - public synchronized void findKeys(@NotNull final Consumer locationKeyObserver) { - try (final DirectoryStream parquetFileStream = - Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetUtils::fileNameMatches)) { - for (final Path parquetFilePath : parquetFileStream) { - ParquetTableLocationKey locationKey = cache.get(parquetFilePath); - if (locationKey == null) { - locationKey = locationKey(parquetFilePath, readInstructions); - if (!locationKey.verifyFileReader()) { - continue; + public void findKeys(@NotNull final Consumer locationKeyObserver) { + final Predicate uriFilter; + if (FILE_URI_SCHEME.equals(tableRootDirectory.getScheme())) { + uriFilter = uri -> { + final String filename = new File(uri).getName(); + return filename.endsWith(ParquetUtils.PARQUET_FILE_EXTENSION) && filename.charAt(0) != '.'; + }; + } else { + uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION); + } + try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( + tableRootDirectory, readInstructions.getSpecialInstructions()); + final Stream stream = provider.list(tableRootDirectory)) { + stream.filter(uriFilter).forEach(uri -> { + cache.compute(uri, (key, existingLocationKey) -> { + if (existingLocationKey != null) { + locationKeyObserver.accept(existingLocationKey); + return existingLocationKey; + } + final ParquetTableLocationKey newLocationKey = locationKey(uri, readInstructions); + if (!newLocationKey.verifyFileReader()) { + return null; } - cache.put(parquetFilePath, locationKey); - } - locationKeyObserver.accept(locationKey); - } + locationKeyObserver.accept(newLocationKey); + return newLocationKey; + }); + }); } catch (final IOException e) { throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index edebbbcc195..4836a176066 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -3,45 +3,105 @@ // package io.deephaven.parquet.table.layout; +import io.deephaven.api.util.NameValidator; import io.deephaven.csv.CsvTools; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition; +import io.deephaven.engine.table.impl.locations.local.URIStreamKeyValuePartitionLayout; +import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout; +import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; -import io.deephaven.parquet.base.ParquetUtils; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.jetbrains.annotations.NotNull; import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import static io.deephaven.base.FileUtils.convertToURI; +import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; +import static io.deephaven.parquet.base.ParquetUtils.isVisibleParquetFile; /** * {@link KeyValuePartitionLayout} for Parquet data. * - * @implNote Type inference uses {@link CsvTools#readCsv(java.io.InputStream)} as a conversion tool, and hence follows - * the same rules. + * @implNote + *

    + *
  • Unless a {@link TableDefinition} is provided, type inference for partitioning column uses + * {@link CsvTools#readCsv(java.io.InputStream) CsvTools.readCsv} as a conversion tool, and hence follows the + * same rules.
  • + *
  • Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set) + * NameValidator.legalizeColumnName}.
  • + *
*/ -public class ParquetKeyValuePartitionedLayout extends KeyValuePartitionLayout { +public class ParquetKeyValuePartitionedLayout + extends URIStreamKeyValuePartitionLayout + implements TableLocationKeyFinder { + + private final ParquetInstructions readInstructions; public ParquetKeyValuePartitionedLayout( @NotNull final File tableRootDirectory, @NotNull final TableDefinition tableDefinition, @NotNull final ParquetInstructions readInstructions) { + this(convertToURI(tableRootDirectory, true), tableDefinition, readInstructions); + } + + public ParquetKeyValuePartitionedLayout( + @NotNull final URI tableRootDirectory, + @NotNull final TableDefinition tableDefinition, + @NotNull final ParquetInstructions readInstructions) { super(tableRootDirectory, - ParquetUtils::fileNameMatches, () -> new LocationTableBuilderDefinition(tableDefinition), - (path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions), Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count())); + this.readInstructions = readInstructions; } public ParquetKeyValuePartitionedLayout( @NotNull final File tableRootDirectory, final int maxPartitioningLevels, @NotNull final ParquetInstructions readInstructions) { + this(convertToURI(tableRootDirectory, true), maxPartitioningLevels, readInstructions); + } + + public ParquetKeyValuePartitionedLayout( + @NotNull final URI tableRootDirectory, + final int maxPartitioningLevels, + @NotNull final ParquetInstructions readInstructions) { super(tableRootDirectory, - ParquetUtils::fileNameMatches, () -> new LocationTableBuilderCsv(tableRootDirectory), - (path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions), maxPartitioningLevels); + this.readInstructions = readInstructions; + } + + @Override + public final void findKeys(@NotNull final Consumer locationKeyObserver) { + final Predicate uriFilter; + if (FILE_URI_SCHEME.equals(tableRootDirectory.getScheme())) { + final Path rootDir = Path.of(tableRootDirectory); + uriFilter = uri -> isVisibleParquetFile(rootDir, Path.of(uri)); + } else { + uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION); + } + try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( + tableRootDirectory, readInstructions.getSpecialInstructions()); + final Stream uriStream = provider.walk(tableRootDirectory)) { + final Stream filteredStream = uriStream.filter(uriFilter); + findKeys(filteredStream, locationKeyObserver); + } catch (final IOException e) { + throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e); + } } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 3f246620544..8483818dc87 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -125,6 +125,7 @@ public final class ParquetTableReadWriteTest { private static final ParquetInstructions REFRESHING = ParquetInstructions.builder().setIsRefreshing(true).build(); // TODO(deephaven-core#5064): Add support for local S3 testing + // The following tests are disabled by default, as they are verifying against a remote system private static final boolean ENABLE_S3_TESTING = Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false); @@ -1295,7 +1296,7 @@ public void testArrayColumns() { } @Test - public void readSampleParquetFilesFromS3Test1() { + public void readSampleParquetFilesFromDeephavenS3Bucket() { Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); final S3Instructions s3Instructions = S3Instructions.builder() .regionName("us-east-1") @@ -1333,7 +1334,7 @@ public void readSampleParquetFilesFromS3Test1() { } @Test - public void readSampleParquetFilesFromS3Test2() { + public void readSampleParquetFilesFromPublicS3() { Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); final S3Instructions s3Instructions = S3Instructions.builder() .regionName("us-east-2") @@ -1343,6 +1344,7 @@ public void readSampleParquetFilesFromS3Test2() { .maxCacheSize(32) .connectionTimeout(Duration.ofSeconds(1)) .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) .build(); final ParquetInstructions readInstructions = new ParquetInstructions.Builder() .setSpecialInstructions(s3Instructions) @@ -1365,11 +1367,107 @@ public void readSampleParquetFilesFromS3Test2() { ParquetTools.readSingleFileTable( "s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet", - readInstructions, tableDefinition).select(); + readInstructions, tableDefinition).head(10).select(); ParquetTools.readSingleFileTable( "s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet", - readInstructions, tableDefinition).select(); + readInstructions, tableDefinition).head(10).select(); + } + + @Test + public void readFlatPartitionedParquetFromS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(32) + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.defaultCredentials()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + final Table table = ParquetTools.readFlatPartitionedTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/", + readInstructions); + final Table expected = emptyTable(30).update("A = (int)i % 10"); + assertTableEquals(expected, table); + } + + @Test + public void readFlatPartitionedDataAsKeyValuePartitionedParquetFromS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(32) + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.defaultCredentials()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + final Table table = + ParquetTools.readKeyValuePartitionedTable("s3://dh-s3-parquet-test1/flatPartitionedParquet3/", + readInstructions); + final Table expected = emptyTable(30).update("A = (int)i % 10"); + assertTableEquals(expected, table); + } + + @Test + public void readKeyValuePartitionedParquetFromS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(32) + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.defaultCredentials()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + final Table table = + ParquetTools.readKeyValuePartitionedTable("s3://dh-s3-parquet-test1/KeyValuePartitionedData/", + readInstructions); + final List> partitioningColumns = table.getDefinition().getPartitioningColumns(); + assertEquals(3, partitioningColumns.size()); + assertEquals("PC1", partitioningColumns.get(0).getName()); + assertEquals("PC2", partitioningColumns.get(1).getName()); + assertEquals("PC3", partitioningColumns.get(2).getName()); + assertEquals(100, table.size()); + assertEquals(3, table.selectDistinct("PC1").size()); + assertEquals(2, table.selectDistinct("PC2").size()); + assertEquals(2, table.selectDistinct("PC3").size()); + assertEquals(100, table.selectDistinct("I").size()); + assertEquals(1, table.selectDistinct("J").size()); + } + + @Test + public void readKeyValuePartitionedParquetFromPublicS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(32) + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + final TableDefinition ookla_table_definition = TableDefinition.of( + ColumnDefinition.ofInt("quarter").withPartitioning(), + ColumnDefinition.ofString("quadkey")); + ParquetTools.readKeyValuePartitionedTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023", + readInstructions, ookla_table_definition).head(10).select(); } @Test @@ -1705,6 +1803,16 @@ public void basicWriteAndReadFromFileURITests() { } catch (final RuntimeException e) { assertTrue(e instanceof UnsupportedOperationException); } + + // Read from absolute path with additional "/" in the path + final String additionalSlashPath = rootFile.getAbsolutePath() + "/////" + filename; + final Table fromDisk5 = ParquetTools.readTable(additionalSlashPath); + assertTableEquals(tableToSave, fromDisk5); + + // Read from URI with additional "/" in the path + final String additionalSlashURI = "file:////" + additionalSlashPath; + final Table fromDisk6 = ParquetTools.readTable(additionalSlashURI); + assertTableEquals(tableToSave, fromDisk6); } /** @@ -1986,6 +2094,31 @@ public void partitionedParquetWithDotFilesTest() throws IOException { assertTableEquals(fromDisk, partitionedTable); } + @Test + public void partitionedParquetWithDuplicateDataTest() throws IOException { + // Create an empty parent directory + final File parentDir = new File(rootFile, "tempDir"); + parentDir.mkdir(); + assertTrue(parentDir.exists() && parentDir.isDirectory() && parentDir.list().length == 0); + + // Writing the partitioning column "X" in the file itself + final Table firstTable = TableTools.emptyTable(5).update("X='A'", "Y=(int)i"); + final File firstPartition = new File(parentDir, "X=A"); + final File firstDataFile = new File(firstPartition, "data.parquet"); + + final File secondPartition = new File(parentDir, "X=B"); + final File secondDataFile = new File(secondPartition, "data.parquet"); + final Table secondTable = TableTools.emptyTable(5).update("X='B'", "Y=(int)i"); + + writeTable(firstTable, firstDataFile); + writeTable(secondTable, secondDataFile); + + final Table partitionedTable = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + + final Table combinedTable = merge(firstTable, secondTable); + assertTableEquals(partitionedTable, combinedTable); + } + /** * These are tests for writing multiple parquet tables with indexes. */ diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle index 0c162fc1f70..d96e2cf4713 100644 --- a/extensions/s3/build.gradle +++ b/extensions/s3/build.gradle @@ -11,11 +11,11 @@ dependencies { implementation project(':Base') implementation project(':Util') implementation project(':Configuration') + implementation project(':log-factory') implementation platform('software.amazon.awssdk:bom:2.23.19') implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:aws-crt-client' - //implementation 'software.amazon.awssdk:netty-nio-client' compileOnly depAnnotations diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index ed41b56d35f..a216d480eaa 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -4,11 +4,12 @@ package io.deephaven.extensions.s3; import io.deephaven.base.reference.PooledObjectReference; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; import io.deephaven.util.channel.SeekableChannelContext; +import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -23,7 +24,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -32,20 +32,26 @@ import java.util.function.BiConsumer; /** - * Context object used to store read-ahead buffers for efficiently reading from S3. + * Context object used to store read-ahead buffers for efficiently reading from S3. A single context object can only be + * associated with a single URI at a time. */ final class S3ChannelContext implements SeekableChannelContext { private static final Logger log = LoggerFactory.getLogger(S3ChannelContext.class); - private static final long UNINITIALIZED_SIZE = -1; + static final long UNINITIALIZED_SIZE = -1; + private static final long UNINITIALIZED_NUM_FRAGMENTS = -1; private final S3AsyncClient client; - final S3Instructions instructions; + private final S3Instructions instructions; private final BufferPool bufferPool; + /** + * The URI associated with this context. A single context object can only be associated with a single URI at a time. + * But it can be re-associated with a different URI after {@link #reset() resetting}. + */ private S3Uri uri; /** - * Used to cache recently fetched fragments for faster lookup + * Used to cache recently fetched fragments from the {@link #uri} for faster lookup. */ private final Request[] requests; @@ -64,19 +70,19 @@ final class S3ChannelContext implements SeekableChannelContext { this.instructions = Objects.requireNonNull(instructions); this.bufferPool = Objects.requireNonNull(bufferPool); requests = new Request[instructions.maxCacheSize()]; + uri = null; size = UNINITIALIZED_SIZE; + numFragments = UNINITIALIZED_NUM_FRAGMENTS; if (log.isDebugEnabled()) { - log.debug("creating context: {}", ctxStr()); + log.debug().append("Creating context: ").append(ctxStr()).endl(); } } - void verifyOrSetUri(S3Uri uri) { - if (this.uri == null) { - this.uri = Objects.requireNonNull(uri); - } else if (!this.uri.equals(uri)) { - throw new IllegalStateException( - String.format("Inconsistent URIs. expected=%s, actual=%s, ctx=%s", this.uri, uri, ctxStr())); + void setURI(@NotNull final S3Uri uri) { + if (!uri.equals(this.uri)) { + reset(); } + this.uri = uri; } void verifyOrSetSize(long size) { @@ -88,12 +94,12 @@ void verifyOrSetSize(long size) { } } - public long size() throws IOException { + long size() throws IOException { ensureSize(); return size; } - public int fill(final long position, ByteBuffer dest) throws IOException { + int fill(final long position, ByteBuffer dest) throws IOException { final int destRemaining = dest.remaining(); if (destRemaining == 0) { return 0; @@ -129,12 +135,23 @@ public int fill(final long position, ByteBuffer dest) throws IOException { return filled; } + private void reset() { + // Cancel all outstanding requests + close(); + // Reset the internal state + uri = null; + size = UNINITIALIZED_SIZE; + numFragments = UNINITIALIZED_NUM_FRAGMENTS; + } + + /** + * Close the context, cancelling all outstanding requests and releasing all resources associated with it. + */ @Override public void close() { if (log.isDebugEnabled()) { - log.debug("closing context: {}", ctxStr()); + log.debug().append("Closing context: ").append(ctxStr()).endl(); } - // Cancel all outstanding requests for (int i = 0; i < requests.length; i++) { if (requests[i] != null) { requests[i].release(); @@ -209,17 +226,17 @@ private Request(long fragmentIndex) { void init() { if (log.isDebugEnabled()) { - log.debug("send: {}", requestStr()); + log.debug().append("Sending: ").append(requestStr()).endl(); } consumerFuture = client.getObject(getObjectRequest(), this); consumerFuture.whenComplete(this); } - public boolean isDone() { + boolean isDone() { return consumerFuture.isDone(); } - public int fill(long localPosition, ByteBuffer dest) throws IOException { + int fill(long localPosition, ByteBuffer dest) throws IOException { final int resultOffset = (int) (localPosition - from); final int resultLength = Math.min((int) (to - localPosition + 1), dest.remaining()); if (!bufferReference.acquireIfAvailable()) { @@ -230,7 +247,7 @@ public int fill(long localPosition, ByteBuffer dest) throws IOException { try { fullFragment = getFullFragment(); } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { - throw handleS3Exception(e, String.format("fetching fragment %s", requestStr())); + throw handleS3Exception(e, String.format("fetching fragment %s", requestStr()), instructions); } // fullFragment has limit == capacity. This lets us have safety around math and ability to simply // clear to reset. @@ -249,12 +266,17 @@ public int fill(long localPosition, ByteBuffer dest) throws IOException { return resultLength; } - public void release() { + private void release() { final boolean didCancel = consumerFuture.cancel(true); bufferReference.clear(); if (log.isDebugEnabled()) { final String cancelType = didCancel ? "fast" : (fillCount == 0 ? "unused" : "normal"); - log.debug("cancel {}: {} fillCount={}, fillBytes={}", cancelType, requestStr(), fillCount, fillBytes); + log.debug() + .append("cancel ").append(cancelType) + .append(": ") + .append(requestStr()) + .append(" fillCount=").append(fillCount) + .append(" fillBytes=").append(fillBytes).endl(); } } @@ -265,9 +287,11 @@ public void accept(ByteBuffer byteBuffer, Throwable throwable) { if (log.isDebugEnabled()) { final Instant completedAt = Instant.now(); if (byteBuffer != null) { - log.debug("send complete: {} {}", requestStr(), Duration.between(createdAt, completedAt)); + log.debug().append("Send complete: ").append(requestStr()).append(' ') + .append(Duration.between(createdAt, completedAt).toString()).endl(); } else { - log.debug("send error: {} {}", requestStr(), Duration.between(createdAt, completedAt)); + log.debug().append("Send error: ").append(requestStr()).append(' ') + .append(Duration.between(createdAt, completedAt).toString()).endl(); } } } @@ -334,7 +358,7 @@ private String requestStr() { // -------------------------------------------------------------------------------------------------- - final class Sub implements Subscriber { + private final class Sub implements Subscriber { private final CompletableFuture localProducer; // Access to this view must be guarded by bufferReference.acquire private ByteBuffer bufferView; @@ -422,7 +446,8 @@ public void onComplete() { } } - private IOException handleS3Exception(final Exception e, final String operationDescription) { + static IOException handleS3Exception(final Exception e, final String operationDescription, + final S3Instructions instructions) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); return new IOException(String.format("Thread interrupted while %s", operationDescription), e); @@ -446,7 +471,7 @@ private void ensureSize() throws IOException { return; } if (log.isDebugEnabled()) { - log.debug("head: {}", ctxStr()); + log.debug().append("Head: ").append(ctxStr()).endl(); } // Fetch the size of the file on the first read using a blocking HEAD request, and store it in the context // for future use @@ -459,7 +484,7 @@ private void ensureSize() throws IOException { .build()) .get(instructions.readTimeout().toNanos(), TimeUnit.NANOSECONDS); } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { - throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr())); + throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr()), instructions); } setSize(headObjectResponse.contentLength()); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index 9906cac288b..36b1d7cd0b7 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -3,6 +3,8 @@ // package io.deephaven.extensions.s3; +import io.deephaven.base.log.LogOutput; +import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.configuration.Configuration; import org.immutables.value.Value; import org.immutables.value.Value.Check; @@ -26,7 +28,7 @@ strictBuilder = true, weakInterning = true, jdkOnly = true) -public abstract class S3Instructions { +public abstract class S3Instructions implements LogOutputAppendable { private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50; private final static int DEFAULT_READ_AHEAD_COUNT = 1; @@ -117,6 +119,11 @@ public Credentials credentials() { return Credentials.defaultCredentials(); } + @Override + public LogOutput append(final LogOutput logOutput) { + return logOutput.append(toString()); + } + /** * The endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when * connecting to non-AWS, S3-compatible APIs. diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java index 279117d8dec..3579961b3a3 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java @@ -17,6 +17,8 @@ import java.nio.channels.SeekableByteChannel; import java.util.Objects; +import static io.deephaven.extensions.s3.S3ChannelContext.UNINITIALIZED_SIZE; + /** * {@link SeekableByteChannel} class used to fetch objects from S3 buckets using an async client with the ability to @@ -24,8 +26,8 @@ */ final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelProvider.ContextHolder { - private static final long UNINITIALIZED_SIZE = -1; private static final long CLOSED_SENTINEL = -1; + private static final int INIT_POSITION = 0; private final S3Uri uri; @@ -38,9 +40,10 @@ final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelP private long position; private long size; - S3SeekableByteChannel(S3Uri uri) { + S3SeekableByteChannel(final S3Uri uri) { this.uri = Objects.requireNonNull(uri); this.size = UNINITIALIZED_SIZE; + this.position = INIT_POSITION; } /** @@ -56,7 +59,7 @@ public void setContext(@Nullable final SeekableChannelContext channelContext) { } this.context = (S3ChannelContext) channelContext; if (this.context != null) { - this.context.verifyOrSetUri(uri); + this.context.setURI(uri); if (size != UNINITIALIZED_SIZE) { context.verifyOrSetSize(size); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 18fe36761fe..bd62aefc062 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -3,6 +3,10 @@ // package io.deephaven.extensions.s3; +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.base.verify.Assert; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; import io.deephaven.util.channel.Channels; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; @@ -14,11 +18,29 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Uri; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URISyntaxException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static io.deephaven.base.FileUtils.DUPLICATE_SLASH_PATTERN; +import static io.deephaven.extensions.s3.S3ChannelContext.handleS3Exception; +import static io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin.S3_URI_SCHEME; /** * {@link SeekableChannelsProvider} implementation that is used to fetch objects from an S3-compatible API. @@ -31,6 +53,10 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { */ private static final BufferPool BUFFER_POOL = new BufferPool(S3Instructions.MAX_FRAGMENT_SIZE); + private static final int MAX_KEYS_PER_BATCH = 1000; + + private static final Logger log = LoggerFactory.getLogger(S3SeekableChannelProvider.class); + private final S3AsyncClient s3AsyncClient; private final S3Instructions s3Instructions; @@ -60,6 +86,9 @@ private static S3AsyncClient buildClient(@NotNull S3Instructions s3Instructions) .region(Region.of(s3Instructions.regionName())) .credentialsProvider(s3Instructions.awsV2CredentialsProvider()); s3Instructions.endpointOverride().ifPresent(builder::endpointOverride); + if (log.isDebugEnabled()) { + log.debug().append("Building client with instructions: ").append(s3Instructions).endl(); + } return builder.build(); } @@ -97,6 +126,108 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole throw new UnsupportedOperationException("Writing to S3 is currently unsupported"); } + @Override + public Stream list(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); + } + return createStream(directory, false); + } + + @Override + public Stream walk(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); + } + return createStream(directory, true); + } + + private Stream createStream(@NotNull final URI directory, final boolean isRecursive) { + // The following iterator fetches URIs from S3 in batches and creates a stream + final Iterator iterator = new Iterator<>() { + private final String bucketName; + private final String directoryKey; + + private Iterator currentBatchIt; + private String continuationToken; + + { + final S3Uri s3DirectoryURI = s3AsyncClient.utilities().parseUri(directory); + bucketName = s3DirectoryURI.bucket().orElseThrow(); + directoryKey = s3DirectoryURI.key().orElseThrow(); + } + + @Override + public boolean hasNext() { + if (currentBatchIt != null) { + if (currentBatchIt.hasNext()) { + return true; + } + // End of current batch + if (continuationToken == null) { + // End of the directory + return false; + } + } + try { + fetchNextBatch(); + } catch (final IOException e) { + throw new UncheckedDeephavenException("Failed to fetch next batch of URIs from S3", e); + } + Assert.neqNull(currentBatchIt, "currentBatch"); + return currentBatchIt.hasNext(); + } + + @Override + public URI next() { + if (!hasNext()) { + throw new NoSuchElementException("No more URIs available in the directory"); + } + return currentBatchIt.next(); + } + + private void fetchNextBatch() throws IOException { + final ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(directoryKey) + .maxKeys(MAX_KEYS_PER_BATCH); + if (!isRecursive) { + // Add a delimiter to the request if we don't want to fetch all files recursively + requestBuilder.delimiter("/"); + } + final long readTimeoutNanos = s3Instructions.readTimeout().toNanos(); + final ListObjectsV2Request request = requestBuilder.continuationToken(continuationToken).build(); + final ListObjectsV2Response response; + try { + response = s3AsyncClient.listObjectsV2(request).get(readTimeoutNanos, TimeUnit.NANOSECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { + throw handleS3Exception(e, String.format("fetching list of files in directory %s", directory), + s3Instructions); + } + currentBatchIt = response.contents().stream() + .filter(s3Object -> !s3Object.key().equals(directoryKey)) + .map(s3Object -> { + String path = "/" + s3Object.key(); + if (path.contains("//")) { + path = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/"); + } + try { + return new URI(S3_URI_SCHEME, directory.getUserInfo(), directory.getHost(), + directory.getPort(), path, null, null); + } catch (final URISyntaxException e) { + throw new UncheckedDeephavenException("Failed to create URI for S3 object with key: " + + s3Object.key() + " and bucket " + bucketName + " inside directory " + + directory, e); + } + }).iterator(); + // The following token is null when the last batch is fetched. + continuationToken = response.nextContinuationToken(); + } + }; + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, + Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.NONNULL), false); + } + @Override public void close() { s3AsyncClient.close(); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java index 28eb3c1cc5e..5728d79fe74 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java @@ -17,7 +17,7 @@ @AutoService(SeekableChannelsProviderPlugin.class) public final class S3SeekableChannelProviderPlugin implements SeekableChannelsProviderPlugin { - private static final String S3_URI_SCHEME = "s3"; + static final String S3_URI_SCHEME = "s3"; @Override public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { @@ -27,12 +27,11 @@ public boolean isCompatible(@NotNull final URI uri, @Nullable final Object confi @Override public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { if (!isCompatible(uri, config)) { - if (!(config instanceof S3Instructions)) { - throw new IllegalArgumentException("Must provide S3Instructions to read files from S3"); - } throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); } - final S3Instructions s3Instructions = (S3Instructions) config; - return new S3SeekableChannelProvider(s3Instructions); + if (!(config instanceof S3Instructions)) { + throw new IllegalArgumentException("Must provide S3Instructions to read files from S3"); + } + return new S3SeekableChannelProvider((S3Instructions) config); } } diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java index 77a68e8cbc9..dc032f4ba05 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java @@ -3,6 +3,7 @@ // package io.deephaven.extensions.trackedfile; +import io.deephaven.base.FileUtils; import io.deephaven.base.verify.Assert; import io.deephaven.engine.util.file.FileHandle; import io.deephaven.engine.util.file.FileHandleFactory; @@ -20,8 +21,10 @@ import java.io.InputStream; import java.net.URI; import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.stream.Stream; import static io.deephaven.extensions.trackedfile.TrackedSeekableChannelsProviderPlugin.FILE_URI_SCHEME; @@ -71,6 +74,20 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b : new TruncateOnceFileCreator(fileHandleFactory), filePath.toFile()); } + @Override + public Stream list(@NotNull final URI directory) throws IOException { + // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in + // the processor. + return Files.list(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); + } + + @Override + public Stream walk(@NotNull final URI directory) throws IOException { + // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in + // the processor. + return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); + } + private static final class TruncateOnceFileCreator implements FileHandleFactory.FileToHandleFunction { private static final AtomicIntegerFieldUpdater FIRST_TIME_UPDATER = diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java index 74a751c9947..f8098f4c717 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java @@ -28,11 +28,11 @@ public boolean isCompatible(@NotNull final URI uri, @Nullable final Object objec @Override public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object object) { if (!isCompatible(uri, object)) { - if (object != null) { - throw new IllegalArgumentException("Arguments not compatible, provided non null object"); - } throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); } + if (object != null) { + throw new IllegalArgumentException("Arguments not compatible, provided non null object"); + } return new TrackedSeekableChannelsProvider(TrackedFileHandleFactory.getInstance()); } } diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index ea6b36504a4..dcf89f10d1d 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -191,9 +191,9 @@ def read( if file_layout == ParquetFileLayout.SINGLE_FILE: j_table = _JParquetTools.readSingleFileTable(path, read_instructions, j_table_definition) elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: - j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions, j_table_definition) + j_table = _JParquetTools.readFlatPartitionedTable(path, read_instructions, j_table_definition) elif file_layout == ParquetFileLayout.KV_PARTITIONED: - j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions, j_table_definition) + j_table = _JParquetTools.readKeyValuePartitionedTable(path, read_instructions, j_table_definition) elif file_layout == ParquetFileLayout.METADATA_PARTITIONED: raise DHError(f"file_layout={ParquetFileLayout.METADATA_PARTITIONED} with table_definition not currently supported") else: @@ -204,9 +204,9 @@ def read( elif file_layout == ParquetFileLayout.SINGLE_FILE: j_table = _JParquetTools.readSingleFileTable(path, read_instructions) elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: - j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions) + j_table = _JParquetTools.readFlatPartitionedTable(path, read_instructions) elif file_layout == ParquetFileLayout.KV_PARTITIONED: - j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions) + j_table = _JParquetTools.readKeyValuePartitionedTable(path, read_instructions) elif file_layout == ParquetFileLayout.METADATA_PARTITIONED: j_table = _JParquetTools.readPartitionedTableWithMetadata(_JFile(path), read_instructions) else: