Skip to content

Commit

Permalink
Added parquet failure test cases
Browse files Browse the repository at this point in the history
Added failure case test
  • Loading branch information
paualarco committed Sep 28, 2020
1 parent b975613 commit ebc6531
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 62 deletions.
21 changes: 8 additions & 13 deletions parquet/src/main/scala/monix/connect/parquet/Parquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ object Parquet {
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
@UnsafeBecauseImpure
def fromReaderUnsafe[T](reader: ParquetReader[T]): Observable[T] = {
new ParquetPublisher[T](Task(reader))
}
def fromReaderUnsafe[T](reader: ParquetReader[T]): Observable[T] =
new ParquetPublisher[T](reader)

/**
* Writes records to a Parquet file.
Expand All @@ -49,9 +48,8 @@ object Parquet {
* that represents the number of elements written.
*/
@UnsafeBecauseImpure
def fromWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] = {
def fromWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] =
new ParquetSubscriber[T](writer)
}

/**
* Reads the records from a Parquet file.
Expand All @@ -62,18 +60,15 @@ object Parquet {
* @tparam T A hinder kinded type that represents element type of the parquet file to be read.
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
def reader[T](reader: Task[ParquetReader[T]]): Observable[T] = {
new ParquetPublisher[T](reader)
}
def fromReader[T](reader: Task[ParquetReader[T]]): Observable[T] =
Observable.resource(reader)(reader => Task(reader.close())).flatMap(fromReaderUnsafe)

@deprecated("Renamed to fromReaderUnsafe", "0.5.0")
def reader[T](reader: ParquetReader[T]): Observable[T] = {
new ParquetPublisher[T](Task(reader))
}
def reader[T](reader: ParquetReader[T]): Observable[T] =
new ParquetPublisher[T](reader)

@deprecated("Renamed to toWriterUnsafe", "0.5.0")
def writer[T](writer: ParquetWriter[T]): Consumer[T, Long] = {
def writer[T](writer: ParquetWriter[T]): Consumer[T, Long] =
new ParquetSubscriber[T](writer)
}

}
82 changes: 40 additions & 42 deletions parquet/src/main/scala/monix/connect/parquet/ParquetPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package monix.connect.parquet

import cats.effect.{Bracket, Resource}
import monix.eval.Task
import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.BooleanCancelable
import monix.execution.{Ack, Cancelable, ExecutionModel, Scheduler}
Expand All @@ -33,56 +31,56 @@ import scala.util.{Failure, Success}

/**
* The implementation of a reactive parquet publisher.
* @param parquetReader The apache hadoop generic implementation of a parquet reader.
* @param reader The apache hadoop generic implementation of a parquet reader.
*/
private[parquet] final class ParquetPublisher[T](parquetReader: Task[ParquetReader[T]]) extends Observable[T] {
private[parquet] final class ParquetPublisher[T](reader: ParquetReader[T]) extends Observable[T] {

def unsafeSubscribeFn(subscriber: Subscriber[T]): Cancelable = {
val s = subscriber.scheduler
val cancelable = BooleanCancelable()
Resource
.make(parquetReader)(reader => Task(reader.close()))
.use(r => fastLoop(subscriber, cancelable, s.executionModel, 0, r)(s))
.runToFuture(s)
fastLoop(subscriber, cancelable, s.executionModel, 0)(s)
cancelable
}

def fastLoop(o: Subscriber[T], c: BooleanCancelable, em: ExecutionModel, syncIndex: Int, reader: ParquetReader[T])(
@tailrec
def fastLoop(o: Subscriber[T], c: BooleanCancelable, em: ExecutionModel, syncIndex: Int)(
implicit
s: Scheduler): Task[Unit] = {
{
for {
ack <- {
Task.fromFuture {
val r = reader.read()
if (r != null) o.onNext(r)
else {
o.onComplete()
Future(Stop)
}
}
}
nextIndex <- {
Task {
if (ack == Continue) em.nextFrameIndex(syncIndex)
else if (ack == Stop) -1
else 0
}
}
_ <- {
if (nextIndex > 0) fastLoop(o, c, em, nextIndex, reader)
else if (nextIndex == 0 && !c.isCanceled) {
ack match {
case Ack.Continue => fastLoop(o, c, em, 0, reader)
case Ack.Stop => Task.unit
}
} else Task.unit
s: Scheduler): Unit = {

val ack =
try {
val r = reader.read()
if (r != null) o.onNext(r)
else {
o.onComplete()
Stop
}
} yield ()
}.failed.map { ex =>
o.onError(ex)
s.reportFailure(ex)
}
} catch {
case ex if NonFatal(ex) =>
Future.failed(ex)
}

val nextIndex =
if (ack == Continue) em.nextFrameIndex(syncIndex)
else if (ack == Stop) -1
else 0

if (nextIndex > 0)
fastLoop(o, c, em, nextIndex)
else if (nextIndex == 0 && !c.isCanceled)
reschedule(ack, o, c, em)
}

def reschedule(ack: Future[Ack], o: Subscriber[T], c: BooleanCancelable, em: ExecutionModel)(
implicit
s: Scheduler): Unit =
ack.onComplete {
case Success(success) =>
if (success == Continue) fastLoop(o, c, em, 0)
case Failure(ex) =>
o.onError(ex)
s.reportFailure(ex)
case _ =>
() // this was a Stop, do nothing
}
}
116 changes: 109 additions & 7 deletions parquet/src/test/scala/monix/connect/parquet/AvroParquetSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@

package monix.connect.parquet

import java.io.File
import java.io.{File, FileNotFoundException}

import monix.eval.Task
import monix.reactive.Observable
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import monix.execution.Scheduler.Implicits.global
import monix.execution.schedulers.TestScheduler
import org.scalatest.BeforeAndAfterAll

import scala.concurrent.duration._
import scala.util.Failure

class AvroParquetSpec extends AnyWordSpecLike with Matchers with AvroParquetFixture with BeforeAndAfterAll {

s"${Parquet}" should {

"unsafe write avro records in parquet" in {
//given
val n: Int = 2
val file: String = genFilePath()
val filePath: String = genFilePath()
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val w: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema)
val w: ParquetWriter[GenericRecord] = parquetWriter(filePath, conf, schema)

//when
Observable
Expand All @@ -45,14 +50,61 @@ class AvroParquetSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
.runSyncUnsafe()

//then
val parquetContent: List[GenericRecord] = fromParquet[GenericRecord](file, conf, avroParquetReader(file, conf))
val parquetContent: List[GenericRecord] =
fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf))
parquetContent.length shouldEqual n
parquetContent should contain theSameElementsAs records
}

"unsafe read from parquet" in {
"materialises to 0 when an empty observable is passed" in {
//given
val n: Int = 1
val filePath: String = genFilePath()
val writer: ParquetWriter[GenericRecord] = parquetWriter(filePath, conf, schema)

//when
val mat =
Observable
.empty[GenericRecord]
.consumeWith(Parquet.fromWriterUnsafe(writer))
.runSyncUnsafe()

//then
mat shouldBe 0

//and
val file = new File(filePath)
val parquetContent: List[GenericRecord] =
fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf))
file.exists() shouldBe true
parquetContent.length shouldEqual 0
}

"signals error when a malformed parquet writer was passed" in {
//given
val n = 1
val testScheduler = TestScheduler()
val filePath: String = genFilePath()
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)

//when
val f =
Observable
.fromIterable(records)
.consumeWith(Parquet.fromWriterUnsafe(null))
.runToFuture(testScheduler)

//then
testScheduler.tick(1.second)
f.value.get shouldBe a[Failure[NullPointerException]]

//and
val file = new File(filePath)
file.exists() shouldBe false
}

"unsafe read from parquet reader" in {
//given
val n: Int = 10
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val file = genFilePath()
Observable
Expand All @@ -67,6 +119,56 @@ class AvroParquetSpec extends AnyWordSpecLike with Matchers with AvroParquetFixt
result.length shouldEqual n
result should contain theSameElementsAs records
}

"safely read from parquet reader" in {
//given
val n: Int = 10
val records: List[GenericRecord] = genAvroUsers(n).sample.get.map(avroDocToRecord)
val file = genFilePath()
Observable
.fromIterable(records)
.consumeWith(Parquet.fromWriterUnsafe(parquetWriter(file, conf, schema)))
.runSyncUnsafe()

//when
val safeReader = Task(avroParquetReader(file, conf))
val result: List[GenericRecord] = Parquet.fromReader(safeReader).toListL.runSyncUnsafe()

//then
result.length shouldEqual n
result should contain theSameElementsAs records
}

"signals failure when attempting to read from non existing file" in {
//given
val testScheduler = TestScheduler()
val file = genFilePath()
val safeReader = Task(avroParquetReader(file, conf))

//when
val cancelable = Parquet.fromReader(safeReader).toListL.runToFuture(testScheduler)

//then
testScheduler.tick(1.second)
cancelable.value.get shouldBe a[Failure[FileNotFoundException]]
}

"signals failure when reading from a malformed reader" in {
//given
val testScheduler = TestScheduler()
val file = genFilePath()

//when
val malformedReader: Task[ParquetReader[GenericRecord]] = Task(null)
val cancelable1 = Parquet.fromReader(malformedReader).toListL.runToFuture(testScheduler)
val cancelable2 = Parquet.fromReaderUnsafe(null).toListL.runToFuture(testScheduler)

//then
testScheduler.tick(1.second)
cancelable1.value.get shouldBe a[Failure[NullPointerException]]
cancelable2.value.get shouldBe a[Failure[NullPointerException]]
}

}

override def afterAll(): Unit = {
Expand Down

0 comments on commit ebc6531

Please sign in to comment.