From 19995ecec6a72cd19e76c09b2eba95fbbeaebab5 Mon Sep 17 00:00:00 2001 From: Matt Wong Date: Thu, 3 Oct 2024 19:49:39 +0000 Subject: [PATCH] Update length of avro, rc, and line input files if memory input files are created --- .../plugin/hive/avro/AvroPageSourceFactory.java | 2 +- .../plugin/hive/line/LinePageSourceFactory.java | 13 ++++++++----- .../plugin/hive/rcfile/RcFilePageSourceFactory.java | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java index 3d7379ae4ad57..20209c24b4ae0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java @@ -122,13 +122,13 @@ public Optional createPageSource( } try { - length = min(inputFile.length() - start, length); if (estimatedFileSize < BUFFER_SIZE.toBytes()) { try (TrinoInputStream input = inputFile.newStream()) { byte[] data = input.readAllBytes(); inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); } catch (TrinoException e) { throw e; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 4f46899c96d5a..2ecd6770a0d8a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -53,6 +53,7 @@ import static io.trino.plugin.hive.util.HiveUtil.getFooterCount; import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount; import static io.trino.plugin.hive.util.HiveUtil.splitError; +import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public abstract class LinePageSourceFactory @@ -126,11 +127,6 @@ public Optional createPageSource( schema.serdeProperties()); } - // Skip empty inputs - if (length == 0) { - return Optional.of(noProjectionAdaptation(new EmptyPageSource())); - } - TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); TrinoInputFile inputFile = trinoFileSystem.newInputFile(path); try { @@ -141,6 +137,13 @@ public Optional createPageSource( inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); + + // Skip empty inputs + if (length <= 0) { + return Optional.of(noProjectionAdaptation(new EmptyPageSource())); + } + LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount); // Split may be empty after discovering the real file size and skipping headers if (lineReader.isClosed()) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java index d9121f6adddcb..8acd21a7c4257 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java @@ -124,7 +124,6 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) { TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); TrinoInputFile inputFile = trinoFileSystem.newInputFile(path); try { - length = min(inputFile.length() - start, length); if (!inputFile.exists()) { throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "File does not exist"); } @@ -134,6 +133,7 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) { inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); } catch (TrinoException e) { throw e;