From 30a48f05cf2ee0eea0a304fea01eb40f323f9f3c Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Thu, 15 Sep 2022 14:00:50 -0700 Subject: [PATCH] Improve the performance of TextSource by reducing how many byte[]s are copied (fixes #23193) (#23196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improve the performance of TextSource by reducing how many byte[]s are copied (fixes #23193) This makes TextSource take about 2.3x less CPU resources during decoding. Before this change: ``` TextSourceBenchmark.benchmarkTextSource thrpt 5 0.248 ± 0.029 ops/s ``` After this change: ``` TextSourceBenchmark.benchmarkHadoopLineReader thrpt 5 0.465 ± 0.064 ops/s TextSourceBenchmark.benchmarkTextSource thrpt 5 0.575 ± 0.059 ops/s ``` * Write file in pieces instead of pre-allocating entire buffer * Address PR comments --- CHANGES.md | 1 + sdks/java/core/jmh/build.gradle | 2 + .../beam/sdk/jmh/io/TextSourceBenchmark.java | 117 ++++++ .../apache/beam/sdk/jmh/io/package-info.java | 20 + .../org/apache/beam/sdk/io/TextSource.java | 359 ++++++++++++------ .../apache/beam/sdk/io/TextIOReadTest.java | 69 ++-- 6 files changed, 435 insertions(+), 133 deletions(-) create mode 100644 sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java create mode 100644 sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/package-info.java diff --git a/CHANGES.md b/CHANGES.md index f23254f2bf77e..e9bbd5f991240 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)). ## New Features / Improvements diff --git a/sdks/java/core/jmh/build.gradle b/sdks/java/core/jmh/build.gradle index 7079816d27c13..2186e5367fb01 100644 --- a/sdks/java/core/jmh/build.gradle +++ b/sdks/java/core/jmh/build.gradle @@ -28,9 +28,11 @@ ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":sdks:java:core", configuration: "shadowTest") implementation library.java.joda_time implementation library.java.vendored_grpc_1_48_1 implementation library.java.vendored_guava_26_0_jre + implementation library.java.hadoop_common runtimeOnly library.java.slf4j_jdk14 testImplementation library.java.junit testImplementation library.java.hamcrest diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java new file mode 100644 index 0000000000000..171f9c01dd83a --- /dev/null +++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.jmh.io; + +import java.io.BufferedWriter; +import java.io.FileInputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.TextIOReadTest; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.LineReader; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +public class TextSourceBenchmark { + private static final int NUM_LINES = 10_000_000; + private static char[] data = new char[120]; + + static { + Arrays.fill(data, 'a'); + } + + @State(Scope.Benchmark) + public static class Data { + public Path path; + public String pathString; + public int length; + + /** Generates a random file with {@code NUM_LINES} between 60 and 120 characters each. */ + @Setup + public void createFile() throws Exception { + path = Files.createTempFile("benchmark", null).toAbsolutePath(); + pathString = path.toString(); + BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8); + for (int i = 0; i < NUM_LINES; ++i) { + String valueToAppend = + String.valueOf(data, 0, ThreadLocalRandom.current().nextInt(60, 120)); + length += valueToAppend.length(); + writer.write(valueToAppend); + writer.write('\n'); + } + writer.close(); + } + + @TearDown + public void deleteFile() throws Exception { + Files.deleteIfExists(path); + } + } + + @Benchmark + public void benchmarkTextSource(Data data) throws Exception { + Source.Reader reader = + ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null)) + .createReader(PipelineOptionsFactory.create()); + int length = 0; + int linesRead = 0; + if (reader.start()) { + linesRead += 1; + length += reader.getCurrent().length(); + } + while (reader.advance()) { + linesRead += 1; + length += reader.getCurrent().length(); + } + if (linesRead != NUM_LINES) { + throw new IllegalStateException(); + } + if (length != data.length) { + throw new IllegalStateException(); + } + reader.close(); + } + + @Benchmark + public void benchmarkHadoopLineReader(Data data) throws Exception { + LineReader reader = new LineReader(new FileInputStream(data.pathString)); + int length = 0; + int linesRead = 0; + do { + Text text = new Text(); + reader.readLine(text); + // It is important to convert toString() here so that we force the decoding to UTF8 otherwise + // Text keeps the encoded byte[] version in memory. + length += text.toString().length(); + linesRead += 1; + } while (length < data.length); + if (linesRead != NUM_LINES) { + throw new IllegalStateException(); + } + reader.close(); + } +} diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/package-info.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/package-info.java new file mode 100644 index 0000000000000..cf215387a8703 --- /dev/null +++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Benchmarks for IO. */ +package org.apache.beam.sdk.jmh.io; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index ad959d8d5b402..e0aadf79fce98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -19,10 +19,12 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -92,27 +94,33 @@ static class TextBasedReader extends FileBasedReader { private static final int READ_BUFFER_SIZE = 8192; private static final ByteString UTF8_BOM = ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); - private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); - private ByteString buffer; - private int startOfDelimiterInBuffer; - private int endOfDelimiterInBuffer; + private static final byte CR = '\r'; + private static final byte LF = '\n'; + + private final byte @Nullable [] delimiter; + private final ByteArrayOutputStream str; + private final byte[] buffer; + private final ByteBuffer byteBuffer; + + private ReadableByteChannel inChannel; private long startOfRecord; private volatile long startOfNextRecord; private volatile boolean eof; - private volatile boolean elementIsPresent; - private @Nullable String currentValue; - private @Nullable ReadableByteChannel inChannel; - private byte @Nullable [] delimiter; + private volatile @Nullable String currentValue; + private int bufferLength = 0; // the number of bytes of real data in the buffer + private int bufferPosn = 0; // the current position in the buffer private TextBasedReader(TextSource source, byte[] delimiter) { super(source); - buffer = ByteString.EMPTY; + this.buffer = new byte[READ_BUFFER_SIZE]; + this.str = new ByteArrayOutputStream(); + this.byteBuffer = ByteBuffer.wrap(buffer); this.delimiter = delimiter; } @Override protected long getCurrentOffset() throws NoSuchElementException { - if (!elementIsPresent) { + if (currentValue == null) { throw new NoSuchElementException(); } return startOfRecord; @@ -128,7 +136,7 @@ public long getSplitPointsRemaining() { @Override public String getCurrent() throws NoSuchElementException { - if (!elementIsPresent) { + if (currentValue == null) { throw new NoSuchElementException(); } return currentValue; @@ -152,131 +160,264 @@ protected void startReading(ReadableByteChannel channel) throws IOException { // all the bytes of the delimiter in the call to findDelimiterBounds() below requiredPosition = startOffset - delimiter.length; } - ((SeekableByteChannel) channel).position(requiredPosition); - findDelimiterBounds(); - buffer = buffer.substring(endOfDelimiterInBuffer); - startOfNextRecord = requiredPosition + endOfDelimiterInBuffer; - endOfDelimiterInBuffer = 0; - startOfDelimiterInBuffer = 0; + + // Handle the case where the requiredPosition is at the beginning of the file so we can + // skip over UTF8_BOM if present. + if (requiredPosition < UTF8_BOM.size()) { + ((SeekableByteChannel) channel).position(0); + if (fileStartsWithBom()) { + startOfNextRecord = bufferPosn = UTF8_BOM.size(); + } else { + startOfNextRecord = bufferPosn = (int) requiredPosition; + } + } else { + ((SeekableByteChannel) channel).position(requiredPosition); + startOfNextRecord = requiredPosition; + } + + // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point + // to the beginning of the next record. + readNextRecord(); + currentValue = null; + } else { + // Check to see if we start with the UTF_BOM bytes skipping them if present. + if (fileStartsWithBom()) { + startOfNextRecord = bufferPosn = UTF8_BOM.size(); + } + } + } + + private boolean fileStartsWithBom() throws IOException { + for (; ; ) { + int bytesRead = inChannel.read(byteBuffer); + if (bytesRead == -1) { + return false; + } else { + bufferLength += bytesRead; + } + if (bufferLength >= UTF8_BOM.size()) { + int i; + for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) {} + if (i == UTF8_BOM.size()) { + return true; + } + return false; + } + } + } + + @Override + protected boolean readNextRecord() throws IOException { + startOfRecord = startOfNextRecord; + + // If we have reached EOF file last time around then we will mark that we don't have an + // element and return false. + if (eof) { + currentValue = null; + return false; + } + + if (delimiter == null) { + return readDefaultLine(); + } else { + return readCustomLine(); } } /** - * Locates the start position and end position of the next delimiter. Will consume the channel - * till either EOF or the delimiter bounds are found. + * Loosely based upon Hadoop + * LineReader.java + * + *

We're reading data from inChannel, but the head of the stream may be already buffered in + * buffer, so we have several cases: * - *

This fills the buffer and updates the positions as follows: + *

    + *
  1. No newline characters are in the buffer, so we need to copy everything and read another + * buffer from the stream. + *
  2. An unambiguously terminated line is in buffer, so we just create currentValue + *
  3. Ambiguously terminated line is in buffer, i.e. buffer ends in CR. In this case we copy + * everything up to CR to str, but we also need to see what follows CR: if it's LF, then + * we need consume LF as well, so next call to readLine will read from after that. + *
* - *
{@code
-     * ------------------------------------------------------
-     * | element bytes | delimiter bytes | unconsumed bytes |
-     * ------------------------------------------------------
-     * 0            start of          end of              buffer
-     *              delimiter         delimiter           size
-     *              in buffer         in buffer
-     * }
+ *

We use a flag prevCharCR to signal if previous character was CR and, if it happens to be + * at the end of the buffer, delay consuming it until we have a chance to look at the char that + * follows. */ - private void findDelimiterBounds() throws IOException { - int bytePositionInBuffer = 0; - while (true) { - if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) { - startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; - break; - } + private boolean readDefaultLine() throws IOException { + assert !eof; - byte currentByte = buffer.byteAt(bytePositionInBuffer); + int newlineLength = 0; // length of terminating newline + boolean prevCharCR = false; // true of prev char was CR + long bytesConsumed = 0; + for (; ; ) { + int startPosn = bufferPosn; // starting from where we left off the last time - if (delimiter == null) { - // default delimiter - if (currentByte == '\n') { - startOfDelimiterInBuffer = bytePositionInBuffer; - endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1; - break; - } else if (currentByte == '\r') { - startOfDelimiterInBuffer = bytePositionInBuffer; - endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1; - - if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) { - currentByte = buffer.byteAt(bytePositionInBuffer + 1); - if (currentByte == '\n') { - endOfDelimiterInBuffer += 1; - } + // Read the next chunk from the file + if (bufferPosn == bufferLength) { + startPosn = bufferPosn = 0; + if (prevCharCR) { + ++bytesConsumed; // account for CR from previous read + } + byteBuffer.clear(); + bufferLength = inChannel.read(byteBuffer); + + // If we are at EOF then try to create the last value from the buffer. + if (bufferLength < 0) { + eof = true; + + // Don't return an empty record if the file ends with a delimiter + if (str.size() == 0) { + return false; } + + currentValue = str.toString(StandardCharsets.UTF_8.name()); break; } - } else { - // user defined delimiter - int i = 0; - // initialize delimiter not found - startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; - while ((i <= delimiter.length - 1) && (currentByte == delimiter[i])) { - // read next byte - i++; - if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) { - currentByte = buffer.byteAt(bytePositionInBuffer + i); - } else { - // corner case: delimiter truncated at the end of the file - startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; - break; - } + } + + // Search for the newline + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (buffer[bufferPosn] == LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; // at next invocation proceed from following byte + break; } - if (i == delimiter.length) { - // all bytes of delimiter found - endOfDelimiterInBuffer = bytePositionInBuffer + i; + if (prevCharCR) { // CR + notLF, we are at notLF + newlineLength = 1; break; } + prevCharCR = (buffer[bufferPosn] == CR); } - // Move to the next byte in buffer. - bytePositionInBuffer += 1; - } - } - @Override - protected boolean readNextRecord() throws IOException { - startOfRecord = startOfNextRecord; - findDelimiterBounds(); + int readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; // CR at the end of the buffer + } + bytesConsumed += readLength; - // If we have reached EOF file and consumed all of the buffer then we know - // that there are no more records. - if (eof && buffer.isEmpty()) { - elementIsPresent = false; - return false; - } + if (newlineLength == 0) { + // Append the prefix of the value to str until we find a newline + str.write(buffer, startPosn, readLength); + } else { + int appendLength = readLength - newlineLength; - decodeCurrentElement(); - startOfNextRecord = startOfRecord + endOfDelimiterInBuffer; + // Optimize for the common case where the string is wholly contained within the buffer + if (str.size() == 0) { + currentValue = new String(buffer, startPosn, appendLength, StandardCharsets.UTF_8); + } else { + str.write(buffer, startPosn, appendLength); + currentValue = str.toString(StandardCharsets.UTF_8.name()); + } + break; + } + } + startOfNextRecord = startOfRecord + bytesConsumed; + str.reset(); return true; } /** - * Decodes the current element updating the buffer to only contain the unconsumed bytes. + * Loosely based upon Hadoop + * LineReader.java * - *

This invalidates the currently stored {@code startOfDelimiterInBuffer} and {@code - * endOfDelimiterInBuffer}. + *

Note that this implementation fixes an issue where a partial match against the delimiter + * would have been lost if the delimiter crossed at the buffer boundaries during reading. */ - private void decodeCurrentElement() throws IOException { - ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer); - // If present, the UTF8 Byte Order Mark (BOM) will be removed. - if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) { - dataToDecode = dataToDecode.substring(UTF8_BOM.size()); - } - currentValue = dataToDecode.toStringUtf8(); - elementIsPresent = true; - buffer = buffer.substring(endOfDelimiterInBuffer); - } + private boolean readCustomLine() throws IOException { + assert !eof; + + long bytesConsumed = 0; + int delPosn = 0; + for (; ; ) { + int startPosn = bufferPosn; // starting from where we left off the last time + + // Read the next chunk from the file + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + byteBuffer.clear(); + bufferLength = inChannel.read(byteBuffer); + + // If we are at EOF then try to create the last value from the buffer. + if (bufferLength < 0) { + eof = true; + + // Write any partial delimiter now that we are at EOF + if (delPosn != 0) { + str.write(delimiter, 0, delPosn); + } + + // Don't return an empty record if the file ends with a delimiter + if (str.size() == 0) { + return false; + } + + currentValue = str.toString(StandardCharsets.UTF_8.name()); + break; // EOF + } + } + + DELIMITER_MATCH: + { + if (delPosn > 0) { + // slow-path: Handle the case where we only matched part of the delimiter, possibly + // adding that to str fixing up any partially consumed delimiter if we don't match the + // whole delimiter + int prevDelPosn = delPosn; + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (buffer[bufferPosn] == delimiter[delPosn]) { + delPosn++; + if (delPosn == delimiter.length) { + bufferPosn++; + break DELIMITER_MATCH; // Skip matching the delimiter using the fast path + } + } else { + // Add to str any previous partial delimiter since we didn't match the whole + // delimiter + str.write(delimiter, 0, prevDelPosn); + delPosn = 0; + break; // Leave this loop and use the fast-path delimiter matching + } + } + } + + // fast-path: Look for the delimiter within the buffer + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (buffer[bufferPosn] == delimiter[delPosn]) { + delPosn++; + if (delPosn == delimiter.length) { + bufferPosn++; + break; + } + } else { + delPosn = 0; + } + } + } - /** Returns false if we were unable to ensure the minimum capacity by consuming the channel. */ - private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException { - // While we aren't at EOF or haven't fulfilled the minimum buffer capacity, - // attempt to read more bytes. - while (buffer.size() <= minCapacity && !eof) { - eof = inChannel.read(readBuffer) == -1; - readBuffer.flip(); - buffer = buffer.concat(ByteString.copyFrom(readBuffer)); - readBuffer.clear(); + int readLength = bufferPosn - startPosn; + bytesConsumed += readLength; + int appendLength = readLength - delPosn; + if (delPosn < delimiter.length) { + // Append the prefix of the value to str skipping the partial delimiter + str.write(buffer, startPosn, appendLength); + } else { + if (str.size() == 0) { + // Optimize for the common case where the string is wholly contained within the buffer + currentValue = new String(buffer, startPosn, appendLength, StandardCharsets.UTF_8); + } else { + str.write(buffer, startPosn, appendLength); + currentValue = str.toString(StandardCharsets.UTF_8.name()); + } + break; + } } - // Return true if we were able to honor the minimum buffer capacity request - return buffer.size() >= minCapacity; + + startOfNextRecord = startOfRecord + bytesConsumed; + str.reset(); + return true; } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 428c8623b6257..2e447255b7d84 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -56,6 +56,7 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; @@ -84,7 +85,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.apache.commons.lang3.SystemUtils; @@ -235,13 +235,15 @@ private static File createZipFile( } private static TextSource prepareSource( - TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter) throws IOException { + TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter) throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); + return getTextSource(path.toString(), delimiter); + } + + public static TextSource getTextSource(String path, @Nullable byte[] delimiter) { return new TextSource( - ValueProvider.StaticValueProvider.of(path.toString()), - EmptyMatchTreatment.DISALLOW, - delimiter); + ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter); } private static String getFileSuffix(Compression compression) { @@ -383,6 +385,44 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro } } + /** Tests for reading files with various delimiters. */ + @RunWith(Parameterized.class) + public static class ReadWithCustomDelimiterTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[] {"first|*second|*|*third"}) + .add(new Object[] {"first|*second|*|*third|"}) + .add(new Object[] {"first|*second|*|*third*"}) + .add(new Object[] {"first|*second|*|*third|*"}) + .add(new Object[] {"|first|*second|*|*third"}) + .add(new Object[] {"|first|*second|*|*third|"}) + .add(new Object[] {"|first|*second|*|*third*"}) + .add(new Object[] {"|first|*second|*|*third|*"}) + .add(new Object[] {"*first|*second|*|*third"}) + .add(new Object[] {"*first|*second|*|*third|"}) + .add(new Object[] {"*first|*second|*|*third*"}) + .add(new Object[] {"*first|*second|*|*third|*"}) + .add(new Object[] {"|*first|*second|*|*third"}) + .add(new Object[] {"|*first|*second|*|*third|"}) + .add(new Object[] {"|*first|*second|*|*third*"}) + .add(new Object[] {"|*first|*second|*|*third|*"}) + .build(); + } + + @Parameterized.Parameter(0) + public String testCase; + + @Test + public void testReadLinesWithCustomDelimiter() throws Exception { + SourceTestUtils.assertSplitAtFractionExhaustive( + TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), + PipelineOptionsFactory.create()); + } + } + /** Tests for some basic operations in {@link TextIO.Read}. */ @RunWith(JUnit4.class) public static class BasicIOTest { @@ -447,24 +487,6 @@ public void testReadStringsWithCustomDelimiter() throws Exception { p.run(); } - @Test - public void testSplittingSourceWithCustomDelimiter() throws Exception { - List testCases = Lists.newArrayList(); - String infix = "first|*second|*|*third"; - String[] affixes = new String[] {"", "|", "*", "|*"}; - for (String prefix : affixes) { - for (String suffix : affixes) { - testCases.add(prefix + infix + suffix); - } - } - for (String testCase : testCases) { - SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource( - tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), - PipelineOptionsFactory.create()); - } - } - @Test @Category(NeedsRunner.class) public void testReadStrings() throws Exception { @@ -693,7 +715,6 @@ public void testProgressAfterSplitting() throws IOException { // Split. 0.1 is in line1, so should now be able to detect last record. remainder = readerOrig.splitAtFraction(0.1); - System.err.println(readerOrig.getCurrentSource()); assertNotNull(remainder); // First record, after splitting.