Skip to content

Commit

Permalink
Safe parquet writer coeval
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Oct 3, 2020
1 parent fd74685 commit 160db59
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 17 deletions.
17 changes: 15 additions & 2 deletions parquet/src/main/scala/monix/connect/parquet/ParquetSink.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package monix.connect.parquet

import monix.eval.Task
import monix.eval.{Coeval, Task}
import monix.execution.annotations.UnsafeBecauseImpure
import monix.reactive.Consumer
import org.apache.parquet.hadoop.ParquetWriter
Expand Down Expand Up @@ -32,6 +32,19 @@ object ParquetSink {
* that represents the number of elements written.
*/
def fromWriter[T](writer: Task[ParquetWriter[T]]): Consumer[T, Long] =
new ParquetSubscriberT[T](writer)
new ParquetSubscriberEval[T](writer)

/**
* Writes records to a Parquet file.
*
* @param writer The apache hadoop generic implementation of a parquet writer.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents the element type of the parquet file to be written.
* @return A [[Consumer]] that expects records of type [[T]] to be passed and materializes to [[Long]]
* that represents the number of elements written.
*/
def fromWriter[T](writer: Coeval[ParquetWriter[T]]): Consumer[T, Long] =
new ParquetSubscriberCoeval[T](writer)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package monix.connect.parquet

import monix.execution.{Ack, Callback, Scheduler}
import monix.execution.cancelables.AssignableCancelable
import monix.execution.internal.Platform
import monix.execution.internal.{InternalApi, Platform}
import monix.reactive.Consumer
import monix.reactive.observers.Subscriber
import org.apache.parquet.hadoop.ParquetWriter
Expand All @@ -32,6 +32,7 @@ import scala.util.control.NonFatal
* @param parquetWriter The apache hadoop generic implementation of a parquet writer.
* @tparam T Represents the type of the elements that will be written into the parquet file.
*/
@InternalApi
private[parquet] class ParquetSubscriber[T](parquetWriter: ParquetWriter[T]) extends Consumer[T, Long] {

def createSubscriber(callback: Callback[Throwable, Long], s: Scheduler): (Subscriber[T], AssignableCancelable) = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2020-2020 by The Monix Connect Project Developers.
* See the project homepage at: https://connect.monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.connect.parquet

import monix.eval.Coeval
import monix.execution.cancelables.AssignableCancelable
import monix.execution.internal.{InternalApi, Platform}
import monix.execution.{Ack, Callback, Scheduler}
import monix.reactive.Consumer
import monix.reactive.observers.Subscriber
import org.apache.parquet.hadoop.ParquetWriter

import scala.concurrent.Future
import scala.util.control.NonFatal

/**
* A sink that writes each element of [[T]] passed, into a single parquet file.
*
* @param parquetWriter The apache hadoop generic implementation of a parquet writer.
* @tparam T Represents the type of the elements that will be written into the parquet file.
*/
@InternalApi
private[parquet] class ParquetSubscriberCoeval[T](parquetWriter: Coeval[ParquetWriter[T]]) extends Consumer[T, Long] {

def createSubscriber(callback: Callback[Throwable, Long], s: Scheduler): (Subscriber[T], AssignableCancelable) = {
val out = new Subscriber[T] {
override implicit val scheduler: Scheduler = s

//the number of parquet files that has been written that is returned as materialized value
private[this] var nElements: Long = 0
private[this] val memoizedWriter = parquetWriter

// Protects from the situation where last onNext throws, we call onError and then
// upstream calls onError or onComplete again
private[this] var isDone = false

override def onNext(record: T): Future[Ack] = {
memoizedWriter.map { parquetWriter =>
try {
parquetWriter.write(record)
nElements = nElements + 1
Ack.Continue
} catch {
case ex if NonFatal(ex) =>
onError(ex)
Ack.Stop
}
}.value()
}

override def onComplete(): Unit =
if (!isDone) {
isDone = true
memoizedWriter.map { writer =>
try {
writer.close()
callback.onSuccess(nElements)
} catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}.value()
}

override def onError(ex: Throwable): Unit =
if (!isDone) {
isDone = true
memoizedWriter.map { writer =>
try {
writer.close()
callback.onError(ex)
} catch {
case NonFatal(ex2) =>
callback.onError(Platform.composeErrors(ex, ex2))
}
}.value()
}
}

(out, AssignableCancelable.multi())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package monix.connect.parquet

import monix.eval.Task
import monix.execution.cancelables.AssignableCancelable
import monix.execution.internal.Platform
import monix.execution.internal.{InternalApi, Platform}
import monix.execution.{Ack, Callback, Scheduler}
import monix.reactive.Consumer
import monix.reactive.observers.Subscriber
Expand All @@ -34,15 +34,16 @@ import scala.util.control.NonFatal
* @param parquetWriter The apache hadoop generic implementation of a parquet writer.
* @tparam T Represents the type of the elements that will be written into the parquet file.
*/
private[parquet] class ParquetSubscriberT[T](parquetWriter: Task[ParquetWriter[T]]) extends Consumer[T, Long] {
@InternalApi
private[parquet] class ParquetSubscriberEval[T](parquetWriter: Task[ParquetWriter[T]]) extends Consumer[T, Long] {

def createSubscriber(callback: Callback[Throwable, Long], s: Scheduler): (Subscriber[T], AssignableCancelable) = {
val out = new Subscriber[T] {
override implicit val scheduler: Scheduler = s

//the number of parquet files that has been written that is returned as materialized value
private[this] var nElements: Long = 0
private[this] var memoizedWriter = parquetWriter.memoize
private[this] val memoizedWriter = parquetWriter.memoize

// Protects from the situation where last onNext throws, we call onError and then
// upstream calls onError or onComplete again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait AvroParquetFixture extends ParquetFixture {
def avroParquetReader[T <: GenericRecord](file: String, conf: Configuration): ParquetReader[T] =
AvroParquetReader.builder[T](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()

def avroDocToRecord(doc: Person): GenericRecord =
def personToRecord(doc: Person): GenericRecord =
new GenericRecordBuilder(schema)
.set("id", doc.id)
.set("name", doc.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AvroParquetSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
//given
val n: Int = 2
val file: String = genFilePath()
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)
val w: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema)

//when
Expand All @@ -59,7 +59,7 @@ class AvroParquetSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
"read from parquet file" in {
//given
val n: Int = 1
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)
val file = genFilePath()
Observable
.fromIterable(records)
Expand Down
39 changes: 33 additions & 6 deletions parquet/src/test/scala/monix/connect/parquet/ParquetSinkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,39 @@

package monix.connect.parquet

import java.io.{File, FileNotFoundException}
import java.io.File

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.exceptions.DummyException
import monix.execution.schedulers.TestScheduler
import monix.reactive.Observable
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.apache.parquet.hadoop.ParquetWriter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.mockito.IdiomaticMockito
import org.mockito.MockitoSugar.when

import scala.concurrent.duration._
import scala.util.Failure

class ParquetSinkSpec extends AnyWordSpecLike with Matchers with AvroParquetFixture with BeforeAndAfterAll {
class ParquetSinkSpec extends AnyWordSpecLike with IdiomaticMockito with Matchers with AvroParquetFixture with BeforeAndAfterAll {

override def afterAll(): Unit = {
import scala.reflect.io.Directory
val directory = new Directory(new File(folder))
directory.deleteRecursively()
}


s"${ParquetSink}" should {

"unsafe write avro records in parquet" in {
//given
val n: Int = 2
val filePath: String = genFilePath()
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)
val w: ParquetWriter[GenericRecord] = parquetWriter(filePath, conf, schema)

//when
Expand Down Expand Up @@ -90,7 +93,7 @@ class ParquetSinkSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
val n = 1
val testScheduler = TestScheduler()
val filePath: String = genFilePath()
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)

//when
val f =
Expand All @@ -108,6 +111,30 @@ class ParquetSinkSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
file.exists() shouldBe false
}

"signals error when the underlying parquet writer throws an error" in {
//given
val testScheduler = TestScheduler()
val filePath: String = genFilePath()
val record: GenericRecord = personToRecord(genPerson.sample.get)
val ex = DummyException("Boom!")
val parquetWriter = mock[ParquetWriter[GenericRecord]]
when(parquetWriter.write(record)).thenThrow(ex)

//when
val f =
Observable.now(record)
.consumeWith(ParquetSink.fromWriterUnsafe(parquetWriter))
.runToFuture(testScheduler)

//then
testScheduler.tick(1.second)
f.value shouldBe Some(Failure(ex))

//and
val file = new File(filePath)
file.exists() shouldBe false
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ParquetSourceSpec extends AnyWordSpecLike with Matchers with AvroParquetFi
"unsafe read from parquet reader" in {
//given
val n: Int = 10
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)
val file = genFilePath()
Observable
.fromIterable(records)
Expand All @@ -64,7 +64,7 @@ class ParquetSourceSpec extends AnyWordSpecLike with Matchers with AvroParquetFi
"safely read from parquet reader" in {
//given
val n: Int = 10
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(personToRecord)
val file = genFilePath()
Observable
.fromIterable(records)
Expand Down
Loading

0 comments on commit 160db59

Please sign in to comment.