From 69adc35d75241c20fa4228e754159665a5ed3042 Mon Sep 17 00:00:00 2001 From: Matt Wong Date: Thu, 3 Oct 2024 19:49:39 +0000 Subject: [PATCH] Update length if memory input file is created When a memory input file is created for avro, rc, and line readers, we need to update the length that will be passed in to the reader since the length of the memory input file can possibly be less than the original input file length. --- .../plugin/hive/avro/AvroPageSourceFactory.java | 2 +- .../plugin/hive/line/LinePageSourceFactory.java | 13 ++++++++----- .../plugin/hive/rcfile/RcFilePageSourceFactory.java | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java index 3d7379ae4ad57..20209c24b4ae0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java @@ -122,13 +122,13 @@ public Optional createPageSource( } try { - length = min(inputFile.length() - start, length); if (estimatedFileSize < BUFFER_SIZE.toBytes()) { try (TrinoInputStream input = inputFile.newStream()) { byte[] data = input.readAllBytes(); inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); } catch (TrinoException e) { throw e; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 4f46899c96d5a..2ecd6770a0d8a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -53,6 +53,7 @@ import static io.trino.plugin.hive.util.HiveUtil.getFooterCount; import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount; import static io.trino.plugin.hive.util.HiveUtil.splitError; +import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public abstract class LinePageSourceFactory @@ -126,11 +127,6 @@ public Optional createPageSource( schema.serdeProperties()); } - // Skip empty inputs - if (length == 0) { - return Optional.of(noProjectionAdaptation(new EmptyPageSource())); - } - TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); TrinoInputFile inputFile = trinoFileSystem.newInputFile(path); try { @@ -141,6 +137,13 @@ public Optional createPageSource( inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); + + // Skip empty inputs + if (length <= 0) { + return Optional.of(noProjectionAdaptation(new EmptyPageSource())); + } + LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount); // Split may be empty after discovering the real file size and skipping headers if (lineReader.isClosed()) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java index d9121f6adddcb..8acd21a7c4257 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java @@ -124,7 +124,6 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) { TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); TrinoInputFile inputFile = trinoFileSystem.newInputFile(path); try { - length = min(inputFile.length() - start, length); if (!inputFile.exists()) { throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "File does not exist"); } @@ -134,6 +133,7 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) { inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } + length = min(inputFile.length() - start, length); } catch (TrinoException e) { throw e;