Skip to content

Commit

Permalink
Parquet benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Oct 3, 2020
1 parent 160db59 commit e9074f7
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 8 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ monix-s3/it/test-data/
/website/node_modules/
/website/static/api/
/website/variables.js
/website/yarn.lock
/website/yarn.lock

#benchmarks
benchmarks/results/*
61 changes: 61 additions & 0 deletions benchmarks/parquet-reader.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 3 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 3 threads, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.connect.benchmarks.parquet.ParquetReaderBenchmark.fromTask

# Run progress: 0.00% complete, ETA 00:01:20
# Fork: 1 of 1
# Warmup Iteration 1: 5206.126 ops/s
Iteration 1: 7471.975 ops/s
Iteration 2: 7391.813 ops/s
Iteration 3: 7379.877 ops/s


Result "monix.connect.benchmarks.parquet.ParquetReaderBenchmark.fromTask":
7414.555 ±(99.9%) 913.722 ops/s [Average]
(min, avg, max) = (7379.877, 7414.555, 7471.975), stdev = 50.084
CI (99.9%): [6500.833, 8328.277] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 3 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 3 threads, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.connect.benchmarks.parquet.ParquetReaderBenchmark.unsafe

# Run progress: 50.00% complete, ETA 00:00:42
# Fork: 1 of 1
# Warmup Iteration 1: 4819.810 ops/s
Iteration 1: 7115.925 ops/s
Iteration 2: 7301.895 ops/s
Iteration 3: 7407.521 ops/s


Result "monix.connect.benchmarks.parquet.ParquetReaderBenchmark.unsafe":
7275.114 ±(99.9%) 2693.346 ops/s [Average]
(min, avg, max) = (7115.925, 7275.114, 7407.521), stdev = 147.631
CI (99.9%): [4581.767, 9968.460] (assumes normal distribution)


# Run complete. Total time: 00:01:24

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark Mode Cnt Score Error Units
ParquetReaderBenchmark.fromTask thrpt 3 7414.555 ± 913.722 ops/s
ParquetReaderBenchmark.unsafe thrpt 3 7275.114 ± 2693.346 ops/s
90 changes: 90 additions & 0 deletions benchmarks/parquet-writer.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 4 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 5 threads, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.connect.benchmarks.parquet.ParquetWriterBenchmark.fromCoeval

# Run progress: 0.00% complete, ETA 00:02:30
# Fork: 1 of 1
# Warmup Iteration 1: 241.379 ops/s
Iteration 1: 236.589 ops/s
Iteration 2: 299.726 ops/s
Iteration 3: 331.900 ops/s
Iteration 4: 315.734 ops/s


Result "monix.connect.benchmarks.parquet.ParquetWriterBenchmark.fromCoeval":
295.987 ±(99.9%) 269.596 ops/s [Average]
(min, avg, max) = (236.589, 295.987, 331.900), stdev = 41.720
CI (99.9%): [26.391, 565.583] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 4 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 5 threads, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.connect.benchmarks.parquet.ParquetWriterBenchmark.fromTask

# Run progress: 33.33% complete, ETA 00:01:44
# Fork: 1 of 1
# Warmup Iteration 1: 293.745 ops/s
Iteration 1: 310.274 ops/s
Iteration 2: 234.159 ops/s
Iteration 3: 212.028 ops/s
Iteration 4: 187.592 ops/s


Result "monix.connect.benchmarks.parquet.ParquetWriterBenchmark.fromTask":
236.013 ±(99.9%) 342.709 ops/s [Average]
(min, avg, max) = (187.592, 236.013, 310.274), stdev = 53.035
CI (99.9%): [≈ 0, 578.722] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 4 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 5 threads, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.connect.benchmarks.parquet.ParquetWriterBenchmark.unsafe

# Run progress: 66.67% complete, ETA 00:00:51
# Fork: 1 of 1
# Warmup Iteration 1: 316.135 ops/s
Iteration 1: 370.049 ops/s
Iteration 2: 417.218 ops/s
Iteration 3: 412.408 ops/s
Iteration 4: 413.839 ops/s


Result "monix.connect.benchmarks.parquet.ParquetWriterBenchmark.unsafe":
403.379 ±(99.9%) 144.174 ops/s [Average]
(min, avg, max) = (370.049, 403.379, 417.218), stdev = 22.311
CI (99.9%): [259.205, 547.553] (assumes normal distribution)


# Run complete. Total time: 00:02:36

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark Mode Cnt Score Error Units
ParquetWriterBenchmark.fromCoeval thrpt 4 295.987 ± 269.596 ops/s
ParquetWriterBenchmark.fromTask thrpt 4 236.013 ± 342.709 ops/s
ParquetWriterBenchmark.unsafe thrpt 4 403.379 ± 144.174 ops/s
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2020-2020 by The Monix Connect Project Developers.
* See the project homepage at: https://connect.monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.connect.benchmarks.parquet

import java.util.UUID

import monix.eval.Coeval

trait ParquetBenchFixture {

import org.apache.hadoop.conf.Configuration
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroParquetWriter, AvroReadSupport}
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.scalacheck.Gen

case class Person(id: Int, name: String)
val schema: Schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")

val folder: String = "./results/parquet"
val genNonEmptyString = Gen.nonEmptyListOf(Gen.alphaChar).map(_.mkString)
val genFilePath = Coeval(folder + "/" + UUID.randomUUID() + ".parquet")

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

val genPerson: Gen[Person] = for {
id <- Gen.choose(1, 10000)
name <- Gen.alphaLowerStr
} yield { Person(id, name) }

val genPersons: Int => Gen[List[Person]] = n => Gen.listOfN(n, genPerson)

def parquetWriter(file: String, conf: Configuration, schema: Schema): Coeval[ParquetWriter[GenericRecord]] =
Coeval(AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build())

def avroParquetReader[T <: GenericRecord](file: String, conf: Configuration): Coeval[ParquetReader[T]] =
Coeval(AvroParquetReader.builder[T](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build())

def personToRecord(doc: Person): GenericRecord =
new GenericRecordBuilder(schema)
.set("id", doc.id)
.set("name", doc.name)
.build()

def recordToPerson(record: GenericRecord): Person =
Person(
record.get("id").asInstanceOf[Int], // unsafe only used for documentation purposes
record.get("name").toString
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2020-2020 by The Monix Connect Project Developers.
* See the project homepage at: https://connect.monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.connect.benchmarks.parquet

import monix.connect.parquet.{ParquetSink, ParquetSource}
import monix.eval.{Coeval, Task}
import monix.execution.Scheduler
import monix.reactive.Observable
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, Scope, State, Threads, Warmup, _}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@Measurement(iterations = 3)
@Warmup(iterations = 1)
@Fork(1)
@Threads(3)
class ParquetReaderBenchmark extends ParquetBenchFixture {

var size: Int = 10
val s = Scheduler.io("parquet-writer-benchmark")

val file: String = genFilePath.value()
val records: List[GenericRecord] = genPersons(size).sample.get.map(personToRecord)
val writer: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema).value()
val fw = Observable
.fromIterable(records)
.consumeWith(ParquetSink.fromWriterUnsafe(writer))
.runToFuture(s)
Await.result(fw, Duration.Inf)

@Benchmark
def unsafe(): Unit = {
val f = ParquetSource.fromReaderUnsafe(avroParquetReader(file, conf).value()).lastL.runToFuture(s)
Await.result(f, Duration.Inf)
}

@Benchmark
def fromTask(): Unit = {
val f = ParquetSource.fromReader(Task(avroParquetReader(file, conf).value())).lastL.runToFuture(s)
Await.result(f, Duration.Inf)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2020-2020 by The Monix Connect Project Developers.
* See the project homepage at: https://connect.monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.connect.benchmarks.parquet

import monix.connect.parquet.{Parquet, ParquetSink}
import monix.eval.{Coeval, Task}
import monix.execution.Scheduler
import monix.reactive.Observable
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import org.openjdk.jmh.annotations.{
BenchmarkMode,
Fork,
Measurement,
Mode,
OutputTimeUnit,
Scope,
State,
Threads,
Warmup,
_
}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@Measurement(iterations = 4)
@Warmup(iterations = 1)
@Fork(1)
@Threads(5)
class ParquetWriterBenchmark extends ParquetBenchFixture {

var size: Int = 10
val s = Scheduler.io("parquet-writer-benchmark")

@Benchmark
def unsafe(): Unit = {
val file: String = genFilePath.value()
val records: List[GenericRecord] = genPersons(size).sample.get.map(personToRecord)
val writer: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema).value()
val f = Observable
.fromIterable(records)
.consumeWith(ParquetSink.fromWriterUnsafe(writer))
.runToFuture(s)
Await.result(f, Duration.Inf)
}

@Benchmark
def fromCoeval(): Unit = {
val file: String = genFilePath.value()
val records: List[GenericRecord] = genPersons(size).sample.get.map(personToRecord)
val writer: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema).value()
val f = Observable
.fromIterable(records)
.consumeWith(ParquetSink.fromWriter(Coeval(writer)))
.runToFuture(s)
Await.result(f, Duration.Inf)
}

@Benchmark
def fromTask(): Unit = {
val file: String = genFilePath.value()
val records: List[GenericRecord] = genPersons(size).sample.get.map(personToRecord)
val writer: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema).value()
val f = Observable
.fromIterable(records)
.consumeWith(ParquetSink.fromWriter(Task(writer)))
.runToFuture(s)
Await.result(f, Duration.Inf)
}

}
Loading

0 comments on commit e9074f7

Please sign in to comment.