Skip to content

Commit

Permalink
Add support for reading parquet file thanks to arrow-dataset #576
Browse files Browse the repository at this point in the history
  • Loading branch information
fb64 committed Apr 24, 2024
1 parent 7de6022 commit 8b8f706
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 7 deletions.
1 change: 1 addition & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation(libs.arrow.vector)
implementation(libs.arrow.format)
implementation(libs.arrow.memory)
implementation(libs.arrow.dataset)
implementation(libs.commonsCompress)
implementation(libs.kotlin.reflect)
implementation(libs.kotlin.datetimeJvm)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
Expand Down Expand Up @@ -186,3 +187,11 @@ public fun DataFrame.Companion.readArrow(
public fun ArrowReader.toDataFrame(
nullability: NullabilityOptions = NullabilityOptions.Infer
): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability)

/**
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
*/
public fun DataFrame.Companion.readParquet(
url: URL,
nullability: NullabilityOptions = NullabilityOptions.Infer
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.jni.DirectReservationListener
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
Expand Down Expand Up @@ -330,3 +335,27 @@ internal fun DataFrame.Companion.readArrowImpl(
return flattened.concatKeepingSchema()
}
}

internal fun DataFrame.Companion.readArrowDataset(
fileUri: String,
fileFormat: FileFormat,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame {
val scanOptions = ScanOptions(32768)
RootAllocator().use { allocator ->
FileSystemDatasetFactory(
allocator,
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
fileFormat,
fileUri
).use { datasetFactory ->
datasetFactory.finish().use { dataset ->
dataset.newScan(scanOptions).use { scanner ->
scanner.scanBatches().use { reader ->
return readArrow(reader, nullability)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.jetbrains.kotlinx.dataframe.api.columnOf
import org.jetbrains.kotlinx.dataframe.api.convertToBoolean
import org.jetbrains.kotlinx.dataframe.api.copy
import org.jetbrains.kotlinx.dataframe.api.dataFrameOf
import org.jetbrains.kotlinx.dataframe.api.describe
import org.jetbrains.kotlinx.dataframe.api.map
import org.jetbrains.kotlinx.dataframe.api.pathOf
import org.jetbrains.kotlinx.dataframe.api.remove
Expand Down Expand Up @@ -613,4 +612,17 @@ internal class ArrowKtTest {
DataFrame.readArrow(dbArrowReader) shouldBe expected
}
}

@Test
fun testReadParquet(){
val path = testResource("test.arrow.parquet").path
val dataFrame = DataFrame.readParquet(URL("file:$path"))
dataFrame.rowsCount() shouldBe 300
assertEstimations(
exampleFrame = dataFrame,
expectedNullable = false,
hasNulls = false,
fromParquet = true
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import kotlin.reflect.typeOf
* Assert that we have got the same data that was originally saved on example creation.
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
*/
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
internal fun assertEstimations(
exampleFrame: AnyFrame,
expectedNullable: Boolean,
hasNulls: Boolean,
fromParquet: Boolean = false
) {
/**
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
*/
Expand Down Expand Up @@ -129,10 +134,19 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}

val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
if (fromParquet){
//parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}
}else {
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
}
}

val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Expand Down
Binary file not shown.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ junit-platform = "1.10.2"
kotestAsserions = "5.5.4"

jsoup = "1.17.2"
arrow = "15.0.0"
arrow = "16.0.0"
docProcessor = "0.3.5"
simpleGit = "2.0.3"
dependencyVersions = "0.51.0"
Expand Down Expand Up @@ -98,6 +98,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }


Expand Down

0 comments on commit 8b8f706

Please sign in to comment.