Skip to content

Commit

Permalink
Replaced self-implemented CSV file reader with Apache CSVParser for C…
Browse files Browse the repository at this point in the history
…LI (#474)
  • Loading branch information
lziq committed Nov 11, 2021
1 parent c6e3fc6 commit 36e20d1
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 92 deletions.
12 changes: 8 additions & 4 deletions cli/src/org/partiql/cli/functions/ReadFile.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal class ReadFile(valueFactory: ExprValueFactory) : BaseFunction(valueFact
ConversionMode.values().find { it.name.toLowerCase() == name } ?:
throw IllegalArgumentException( "Unknown conversion: $name")

private fun delimitedReadHandler(delimiter: String): (InputStream, IonStruct) -> ExprValue = { input, options ->
private fun delimitedReadHandler(delimiter: Char): (InputStream, IonStruct) -> ExprValue = { input, options ->
val encoding = options["encoding"]?.stringValue() ?: "UTF-8"
val conversion = options["conversion"]?.stringValue() ?: "none"
val hasHeader = options["header"]?.booleanValue() ?: false
Expand All @@ -38,10 +38,14 @@ internal class ReadFile(valueFactory: ExprValueFactory) : BaseFunction(valueFact
DelimitedValues.exprValue(valueFactory, reader, delimiter, hasHeader, conversionModeFor(conversion))
}

private fun ionReadHandler(): (InputStream, IonStruct) -> ExprValue = { input, _ ->
valueFactory.newBag(valueFactory.ion.iterate(input).asSequence().map { valueFactory.newFromIonValue(it) })
}

private val readHandlers = mapOf(
"ion" to { input, _ -> valueFactory.newBag(valueFactory.ion.iterate(input).asSequence().map { valueFactory.newFromIonValue(it) }) },
"tsv" to delimitedReadHandler("\t"),
"csv" to delimitedReadHandler(","))
"ion" to ionReadHandler(),
"tsv" to delimitedReadHandler('\t'),
"csv" to delimitedReadHandler(','))

override fun call(env: Environment, args: List<ExprValue>): ExprValue {
val options = optionsStruct(1, args)
Expand Down
6 changes: 3 additions & 3 deletions cli/src/org/partiql/cli/functions/WriteFile.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal class WriteFile(valueFactory: ExprValueFactory) : BaseFunction(valueFac
}
}

private fun delimitedWriteHandler(delimiter: String): (ExprValue, OutputStream, IonStruct) -> Unit = { results, out, options ->
private fun delimitedWriteHandler(delimiter: Char): (ExprValue, OutputStream, IonStruct) -> Unit = { results, out, options ->
val encoding = options["encoding"]?.stringValue() ?: "UTF-8"
val writeHeader = options["header"]?.booleanValue() ?: false
val nl = options["nl"]?.stringValue() ?: "\n"
Expand All @@ -43,8 +43,8 @@ internal class WriteFile(valueFactory: ExprValueFactory) : BaseFunction(valueFac
}

private val writeHandlers = mapOf(
"tsv" to delimitedWriteHandler("\t"),
"csv" to delimitedWriteHandler(","),
"tsv" to delimitedWriteHandler('\t'),
"csv" to delimitedWriteHandler(','),
"ion" to PRETTY_ION_WRITER)

override fun call(env: Environment, args: List<ExprValue>): ExprValue {
Expand Down
68 changes: 64 additions & 4 deletions cli/test/org/partiql/cli/functions/ReadFileTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ReadFileTest {
val actual = function.call(env, args).ionValue
val expected = "[1, 2]"

assertEquals(actual, ion.singleValue(expected))
assertEquals(ion.singleValue(expected), actual)
}

@Test
Expand All @@ -65,7 +65,7 @@ class ReadFileTest {
val actual = function.call(env, args).ionValue
val expected = "[1, 2]"

assertEquals(actual, ion.singleValue(expected))
assertEquals(ion.singleValue(expected), actual)
}

@Test
Expand All @@ -77,7 +77,55 @@ class ReadFileTest {
val actual = function.call(env, args).ionValue
val expected = "[{_1:\"1\",_2:\"2\"}]"

assertEquals(actual, ion.singleValue(expected))
assertEquals(ion.singleValue(expected), actual)
}

@Test
fun readCsvWithIonSymbolAsInput() {
writeFile("data_with_ion_symbol_as_input.csv", "1,2")

val args = listOf("\"${dirPath("data_with_ion_symbol_as_input.csv")}\"", "{type:csv}").map { it.exprValue() }

val actual = function.call(env, args).ionValue
val expected = "[{_1:\"1\",_2:\"2\"}]"

assertEquals(ion.singleValue(expected), actual)
}

@Test
fun readCsvWithDoubleQuotesEscape() {
writeFile("data_with_double_quotes_escape.csv", "\"1,2\",2")

val args = listOf("\"${dirPath("data_with_double_quotes_escape.csv")}\"", "{type:\"csv\"}").map { it.exprValue() }

val actual = function.call(env, args).ionValue
val expected = "[{_1:\"1,2\",_2:\"2\"}]"

assertEquals(ion.singleValue(expected), actual)
}

@Test
fun readCsvWithEmptyLines() {
writeFile("data_with_double_quotes_escape.csv", "1,2\n\n3\n\n")

val args = listOf("\"${dirPath("data_with_double_quotes_escape.csv")}\"", "{type:\"csv\"}").map { it.exprValue() }

val actual = function.call(env, args).ionValue
val expected = "[{_1:\"1\",_2:\"2\"},{_1:\"3\"}]"

assertEquals(ion.singleValue(expected), actual)
}

@Test
fun readCsvWithHeaderLine() {
writeFile("data_with_header_line.csv", "col1,col2\n1,2")

val args = listOf("\"${dirPath("data_with_header_line.csv")}\"", "{type:\"csv\", header:true}").map { it.exprValue() }

val actual = function.call(env, args).ionValue
val expected = "[{col1:\"1\",col2:\"2\"}]"

assertEquals(ion.singleValue(expected), actual)
}

@Test
Expand All @@ -89,6 +137,18 @@ class ReadFileTest {
val actual = function.call(env, args).ionValue
val expected = "[{_1:\"1\",_2:\"2\"}]"

assertEquals(actual, ion.singleValue(expected))
assertEquals(ion.singleValue(expected), actual)
}

@Test
fun readTsvWithHeaderLine() {
writeFile("data_with_header_line.tsv", "col1\tcol2\n1\t2")

val args = listOf("\"${dirPath("data_with_header_line.tsv")}\"", "{type:\"tsv\", header:true}").map { it.exprValue() }

val actual = function.call(env, args).ionValue
val expected = "[{col1:\"1\",col2:\"2\"}]"

assertEquals(ion.singleValue(expected), actual)
}
}
2 changes: 2 additions & 0 deletions lang/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dependencies {
api 'com.amazon.ion:ion-element:0.2.0'
api 'org.partiql:partiql-ir-generator-runtime:0.4.0'

implementation 'org.apache.commons:commons-csv:1.8'

// test-time dependencies
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation 'pl.pragmatists:JUnitParams:[1.0.0,1.1.0)'
Expand Down
100 changes: 46 additions & 54 deletions lang/src/org/partiql/lang/eval/io/DelimitedValues.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package org.partiql.lang.eval.io

import com.amazon.ion.*
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVParser
import org.apache.commons.csv.CSVPrinter
import org.partiql.lang.eval.*
import org.partiql.lang.util.*
import java.io.BufferedReader
Expand All @@ -24,9 +27,8 @@ import java.io.Writer
/**
* Provides adapters for delimited input (e.g. TSV/CSV) as lazy sequences of values.
*
* Note that this implementation does not (yet) handle various escaping of TSV/CSV files
* as specified in [RFC-4180](https://tools.ietf.org/html/rfc4180) or what Microsoft Excel
* specifies in its particular dialect.
* This implementation uses Apache CSVParser library and follows [RFC-4180](https://tools.ietf.org/html/rfc4180) format.
* The only difference is that it is allowed for each row to have different numbers of fields.
*/
object DelimitedValues {
/** How to convert each element. */
Expand Down Expand Up @@ -55,7 +57,7 @@ object DelimitedValues {

/**
* Lazily loads a stream of values from a [Reader] into a sequence backed [ExprValue].
* The [ExprValue] is single pass only. This does **not** close the [Reader].
* This does **not** close the [Reader].
*
* @param ion The system to use.
* @param input The input source.
Expand All @@ -66,37 +68,28 @@ object DelimitedValues {
@JvmStatic
fun exprValue(valueFactory: ExprValueFactory,
input: Reader,
delimiter: String,
delimiter: Char,
hasHeader: Boolean,
conversionMode: ConversionMode): ExprValue {
val reader = BufferedReader(input)
val columns: List<String> = when {
hasHeader -> {
val line = reader.readLine()
?: throw IllegalArgumentException("Got EOF for header row")

line.split(delimiter)
}
else -> emptyList()
val csvFormat = when (hasHeader){
true -> CSVFormat.DEFAULT.withDelimiter(delimiter).withFirstRecordAsHeader()
false -> CSVFormat.DEFAULT.withDelimiter(delimiter)
}
val csvParser = CSVParser(reader, csvFormat)
val columns: List<String> = csvParser.headerNames // `columns` is an empty list when `hasHeader` is false

val seq = generateSequence {
val line = reader.readLine()
when (line) {
null -> null
else -> {
valueFactory.newStruct(
line.splitToSequence(delimiter).mapIndexed {i , raw ->
val name = when {
i < columns.size -> columns[i]
else -> syntheticColumnName(i)
}
conversionMode.convert(valueFactory, raw).namedValue(valueFactory.newString(name))
},
StructOrdering.ORDERED
)
}
}
val seq = csvParser.asSequence().map { csvRecord ->
valueFactory.newStruct(
csvRecord.mapIndexed { i, value ->
val name = when {
i < columns.size -> columns[i]
else -> syntheticColumnName(i)
}
conversionMode.convert(valueFactory, value).namedValue(valueFactory.newString(name))
},
StructOrdering.ORDERED
)
}

return valueFactory.newBag(seq)
Expand Down Expand Up @@ -131,34 +124,33 @@ object DelimitedValues {
fun writeTo(ion: IonSystem,
output: Writer,
value: ExprValue,
delimiter: String,
delimiter: Char,
newline: String,
writeHeader: Boolean): Unit {
val nullValue = ion.newNull()
var names: List<String>? = null
for (row in value) {
val colNames = row.orderedNames
?: throw IllegalArgumentException("Delimited data must be ordered tuple: $row")
if (names == null) {
// first row defines column names
names = colNames

if (writeHeader) {
names.joinTo(output, delimiter)
output.write(newline)
writeHeader: Boolean) {
CSVPrinter(output, CSVFormat.DEFAULT.withDelimiter(delimiter).withRecordSeparator(newline)).use { csvPrinter ->
var names: List<String>? = null
for (row in value) {
val colNames = row.orderedNames
?: throw IllegalArgumentException("Delimited data must be ordered tuple: $row")
if (names == null) {
// first row defines column names
names = colNames
if (writeHeader) {
csvPrinter.printRecord(names)
}
} else if (names != colNames) { // We need to check if the column names in other rows are all the same as the first one's.
throw IllegalArgumentException(
"Inconsistent row names: $colNames != $names"
)
}
} else if (names != colNames) {
// mismatch on the tuples
throw IllegalArgumentException(
"Inconsistent row names: $colNames != $names"

csvPrinter.printRecord(
names.map {
val col = row.bindings[BindingName(it, BindingCase.SENSITIVE)]?.ionValue ?: ion.newNull()
col.csvStringValue()
}
)
}

names.map {
val col = row.bindings[BindingName(it, BindingCase.SENSITIVE)]?.ionValue ?: nullValue
col.csvStringValue()
}.joinTo(output, delimiter)
output.write(newline)
}
}
}
35 changes: 8 additions & 27 deletions lang/test/org/partiql/lang/eval/io/DelimitedValuesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,19 @@ class DelimitedValuesTest : TestBase() {

assertSame(ExprValueType.BAG, value.type)
assertEquals(expectedValues, value.ionValue)
try {
value.iterator()
fail("Expected single pass sequence")
} catch (e: IllegalStateException) {}
}

private fun read(text: String,
delimiter: String,
delimiter: Char,
hasHeader: Boolean,
conversionMode: ConversionMode): ExprValue =
DelimitedValues.exprValue(valueFactory, StringReader(text), delimiter, hasHeader, conversionMode)

private fun voidRead(text: String,
delimiter: String,
hasHeader: Boolean,
conversionMode: ConversionMode): Unit {
read(text, delimiter, hasHeader, conversionMode)
}

private fun assertWrite(expectedText: String,
valueText: String,
names: List<String>,
writeHeader: Boolean,
delimiter: String = ",",
delimiter: Char = ',',
newline: String = "\n") {
val actualText = StringWriter().use {

Expand All @@ -79,7 +68,7 @@ class DelimitedValuesTest : TestBase() {

private fun voidWrite(exprValue: ExprValue,
writeHeader: Boolean,
delimiter: String = ",",
delimiter: Char = ',',
newline: String = "\n") {
DelimitedValues.writeTo(ion, StringWriter(), exprValue, delimiter, newline, writeHeader)
}
Expand All @@ -89,7 +78,7 @@ class DelimitedValuesTest : TestBase() {
"""[]""",
read(
"",
delimiter = ",",
delimiter = ',',
hasHeader = false,
conversionMode = NONE
)
Expand All @@ -100,26 +89,18 @@ class DelimitedValuesTest : TestBase() {
"""[]""",
read(
"",
delimiter = ",\t",
delimiter = ',',
hasHeader = false,
conversionMode = AUTO
)
)

@Test(expected = IllegalArgumentException::class)
fun emptyExprValueTabAutoHeader() = voidRead(
"",
delimiter = ",\t",
hasHeader = true,
conversionMode = AUTO
)

@Test
fun singleExprValueCommaNoAutoNoHeader() = assertValues(
"""[{_1: "1", _2: "2", _3: "3"}]""",
read(
"""1,2,3""",
delimiter = ",",
delimiter = ',',
hasHeader = false,
conversionMode = NONE
)
Expand All @@ -138,7 +119,7 @@ class DelimitedValuesTest : TestBase() {
|1.0,2e0,2007-10-10T12:00:00Z
|hello,{,}
""".trimMargin(),
delimiter = ",",
delimiter = ',',
hasHeader = false,
conversionMode = AUTO
)
Expand All @@ -158,7 +139,7 @@ class DelimitedValuesTest : TestBase() {
|1.0,2e0,2007-10-10T12:00:00Z
|hello,{,}
""".trimMargin(),
delimiter = ",",
delimiter = ',',
hasHeader = true,
conversionMode = AUTO
)
Expand Down

0 comments on commit 36e20d1

Please sign in to comment.