Skip to content

Commit

Permalink
Update length of avro, rc, and line input files if memory input files…
Browse files Browse the repository at this point in the history
… are created
  • Loading branch information
mwong77 committed Oct 25, 2024
1 parent f8d6289 commit 19995ec
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public Optional<ReaderPageSource> createPageSource(
}

try {
length = min(inputFile.length() - start, length);
if (estimatedFileSize < BUFFER_SIZE.toBytes()) {
try (TrinoInputStream input = inputFile.newStream()) {
byte[] data = input.readAllBytes();
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
}
catch (TrinoException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.splitError;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

public abstract class LinePageSourceFactory
Expand Down Expand Up @@ -126,11 +127,6 @@ public Optional<ReaderPageSource> createPageSource(
schema.serdeProperties());
}

// Skip empty inputs
if (length == 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
Expand All @@ -141,6 +137,13 @@ public Optional<ReaderPageSource> createPageSource(
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);

// Skip empty inputs
if (length <= 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount);
// Split may be empty after discovering the real file size and skipping headers
if (lineReader.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
length = min(inputFile.length() - start, length);
if (!inputFile.exists()) {
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "File does not exist");
}
Expand All @@ -134,6 +133,7 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
}
catch (TrinoException e) {
throw e;
Expand Down

0 comments on commit 19995ec

Please sign in to comment.