Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set length of avro and rc input file after memory input file is created #23667

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public Optional<ReaderPageSource> createPageSource(
}

try {
length = min(inputFile.length() - start, length);
if (estimatedFileSize < BUFFER_SIZE.toBytes()) {
try (TrinoInputStream input = inputFile.newStream()) {
byte[] data = input.readAllBytes();
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check if similar change is needed in LinePageSourceFactory and RcFilePageSourceFactory as well ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test for this ?
E.g. we test filesystem with AWS encryption at io.trino.filesystem.s3.TestS3FileSystemAwsS3WithEncryption

Copy link
Contributor Author

@mwong77 mwong77 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with @pettyjamesm, I will need to make a similar change for RcFilePageSourceFactory and LinePageSourceFactory as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka After talking with @pettyjamesm, I don't think we can add tests for this right now since there is no test class that performs client side encryption/decryption of input files. I think the most appropriate place to put these tests would be in the TestHiveFileFormat test class as well. We can probably add a test for this after client side encryption has been supported in native S3 file system. On another note, I am not sure if other file systems support client side encryption.

}
catch (TrinoException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.splitError;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

public abstract class LinePageSourceFactory
Expand Down Expand Up @@ -126,11 +127,6 @@ public Optional<ReaderPageSource> createPageSource(
schema.serdeProperties());
}

// Skip empty inputs
if (length == 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
Expand All @@ -141,6 +137,13 @@ public Optional<ReaderPageSource> createPageSource(
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);

// Skip empty inputs
if (length <= 0) {
return Optional.of(noProjectionAdaptation(new EmptyPageSource()));
}

LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount);
// Split may be empty after discovering the real file size and skipping headers
if (lineReader.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = trinoFileSystem.newInputFile(path);
try {
length = min(inputFile.length() - start, length);
if (!inputFile.exists()) {
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "File does not exist");
}
Expand All @@ -134,6 +133,7 @@ else if (serializationLibraryName.equals(COLUMNAR_SERDE_CLASS)) {
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
length = min(inputFile.length() - start, length);
}
catch (TrinoException e) {
throw e;
Expand Down