Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: IndexOutOfBoundsException in TextIO.Read with non-default delimiter #32249

Closed
1 of 17 tasks
baeminbo opened this issue Aug 20, 2024 · 0 comments · Fixed by #32398
Closed
1 of 17 tasks

[Bug]: IndexOutOfBoundsException in TextIO.Read with non-default delimiter #32249

baeminbo opened this issue Aug 20, 2024 · 0 comments · Fixed by #32398

Comments

@baeminbo
Copy link
Contributor

baeminbo commented Aug 20, 2024

What happened?

The pipeline reading a text file with a non-default delimit [1] fails by IndexOutOfBoundsException at TextBasedReader.readCustomLine [2].

The delimiter is "ABCDE" (5 bytes).

The input file is sample.csv. It is 16400 bytes and has 'A' at index 8190, 'B' at index 8191 (index is 0-based), and 'C' at index 8192. So, the pipeline doesn't split the file content and the whole content should be a single element.

I have a theory about the root cause as below.

The code TextBasedReader.readCustomLine writes buffer (8192 bytes) into a ByteArrayOutputStream, but the range is [0, 8194) when the exception is thrown. This is because the appendLength is 8194, where readLength is 8192 (= the length of buffer), delPosn is 0, prevDelPosn is 2.

For the first buffer read of [0, 8192), the delPosn is 2 as the buffer finishes with "AB". For the second buffer read of [8192, 16384), the delPosn is reset to 0 (no delimit character matched) while prevDelPosn is 2 (= delPosn in prev buffer read). I guess this is a bug not to reset prevDelPosn to 0 when delimiter match fails.

[1]

public class TextReadJob {
  private static final String INPUT_PATH = "sample.csv";
  private static final byte[] DELIMITER = "ABCDE".getBytes(StandardCharsets.UTF_8);

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();

    Pipeline pipeline = Pipeline.create(options);

    pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER));

    pipeline.run();
  }
}

[2]

Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern sample.csv matched 1 files with total size 16400
Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern sample.csv into bundles of size 1025 took 1 ms and produced 1 files and 16 bundles
Exception in thread "main" java.lang.IndexOutOfBoundsException: Range [0, 0 + 8194) out of bounds for length 8192
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckFromIndexSize(Preconditions.java:82)
	at java.base/jdk.internal.util.Preconditions.checkFromIndexSize(Preconditions.java:343)
	at java.base/java.util.Objects.checkFromIndexSize(Objects.java:426)
	at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:155)
	at org.apache.beam.sdk.io.TextSource$TextBasedReader.readCustomLine(TextSource.java:466)
	at org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:268)
	at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:507)
	at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:502)
	at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
	at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:150)
	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant