Skip to content

Commit

Permalink
Update length if memory input file is created
Browse files Browse the repository at this point in the history
When a memory input file is created for avro, rc, and line readers, we
need to update the length that will be passed in to the reader since the
length of the memory input file can possibly be less than the original
input file length.
  • Loading branch information
mwong77 committed Oct 25, 2024
1 parent f8d6289 commit 69adc35
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public Optional<ReaderPageSource> createPageSource(
}

try {
length = min(inputFile.length() - start, length);
if (estimatedFileSize < BUFFER_SIZE.toBytes()) {
try (TrinoInputStream input = inputFile.newStream()) {
byte[] data = input.readAllBytes();
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
}
catch (TrinoException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.splitError;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

public abstract class LinePageSourceFactory
Expand Down Expand Up @@ -126,11 +127,6 @@ public Optional<ReaderPageSource> createPageSource(
schema.serdeProperties());
}

// Skip empty inputs
if (length == 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
Expand All @@ -141,6 +137,13 @@ public Optional<ReaderPageSource> createPageSource(
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);

// Skip empty inputs
if (length <= 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount);
// Split may be empty after discovering the real file size and skipping headers
if (lineReader.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
length = min(inputFile.length() - start, length);
if (!inputFile.exists()) {
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "File does not exist");
}
Expand All @@ -134,6 +133,7 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
}
catch (TrinoException e) {
throw e;
Expand Down

0 comments on commit 69adc35

Please sign in to comment.