Skip to content

Commit

Permalink
Expand test coverage (#31957)
Browse files Browse the repository at this point in the history
Co-authored-by: Lahari Guduru <108150650+lahariguduru@users.noreply.github.com>
  • Loading branch information
francisohara24 and lahariguduru authored Jul 23, 2024
1 parent 35cfad9 commit eadb81f
Show file tree
Hide file tree
Showing 5 changed files with 433 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
* <li>{@code boolean} <a
* href="https://javadoc.io/static/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#withIgnoreHeaderCase--">ignoreHeaderCase</a>
* - must be false.
* <li>{@code boolean} <a
* href="https://javadoc.io/static/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#withSkipHeaderRecord--">skipHeaderRecord</a>
* - must be false. The header is already accounted for during parsing.
* </ul>
*
* <h4>Ignored CSVFormat parameters</h4>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ static void validateCsvFormat(CSVFormat format) {
"Illegal %s: column name is required",
CSVFormat.class);
}
checkArgument(
!format.getSkipHeaderRecord(),
"Illegal %s: cannot skip header record because the header is already accounted for",
CSVFormat.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -47,14 +50,21 @@ final class CsvIOStringToCsvRecord
*/
@Override
public PCollection<List<String>> expand(PCollection<String> input) {
return input.apply(ParDo.of(new ProcessLineToRecordFn()));
return input
.apply(ParDo.of(new ProcessLineToRecordFn()))
.setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of())));
}

/** Processes each line in order to convert it to a {@link CSVRecord}. */
private class ProcessLineToRecordFn extends DoFn<String, List<String>> {
private final String headerLine = headerLine(csvFormat);

@ProcessElement
public void process(@Element String line, OutputReceiver<List<String>> receiver)
throws IOException {
if (headerLine.equals(line)) {
return;
}
for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
receiver.output(csvRecordtoList(record));
}
Expand All @@ -69,4 +79,9 @@ private static List<String> csvRecordtoList(CSVRecord record) {
}
return cells;
}

/** Returns a formatted line of the CSVFormat header. */
static String headerLine(CSVFormat csvFormat) {
return String.join(String.valueOf(csvFormat.getDelimiter()), csvFormat.getHeader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ public void givenCSVFormatThatAllowsDuplicateHeaderNames_throwsException() {
gotMessage);
}

@Test
public void givenCSVFormatThatSkipsHeaderRecord_throwsException() {
CSVFormat format = csvFormatWithHeader().withSkipHeaderRecord(true);
String gotMessage =
assertThrows(
IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format))
.getMessage();
assertEquals(
"Illegal class org.apache.commons.csv.CSVFormat: cannot skip header record because the header is already accounted for",
gotMessage);
}

/** End of tests for {@link CsvIOParseHelpers#validateCsvFormat(CSVFormat)}. */
//////////////////////////////////////////////////////////////////////////////////////////////

Expand Down
Loading

0 comments on commit eadb81f

Please sign in to comment.