Skip to content

Commit

Permalink
Splitted Parquet into ParquetSource and ParquetSink
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Sep 29, 2020
1 parent ebc6531 commit 730c3cc
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 166 deletions.
42 changes: 1 addition & 41 deletions parquet/src/main/scala/monix/connect/parquet/Parquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,12 @@

package monix.connect.parquet

import monix.eval.Task
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import monix.execution.annotations.UnsafeBecauseImpure
import monix.reactive.{Consumer, Observable}

@deprecated("Splitted into `ParquetSource` and `ParquetSink`.", "0.5.0")
object Parquet {

/**
* Reads the records from a Parquet file.
*
* @param reader The apache hadoop generic implementation of a parquet reader.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents element type of the parquet file to be read.
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
@UnsafeBecauseImpure
def fromReaderUnsafe[T](reader: ParquetReader[T]): Observable[T] =
new ParquetPublisher[T](reader)

/**
* Writes records to a Parquet file.
*
* @param writer The apache hadoop generic implementation of a parquet writer.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents the element type of the parquet file to be written.
* @return A [[Consumer]] that expects records of type [[T]] to be passed and materializes to [[Long]]
* that represents the number of elements written.
*/
@UnsafeBecauseImpure
def fromWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] =
new ParquetSubscriber[T](writer)

/**
* Reads the records from a Parquet file.
*
* @param reader The apache hadoop generic implementation of a parquet reader.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents element type of the parquet file to be read.
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
def fromReader[T](reader: Task[ParquetReader[T]]): Observable[T] =
Observable.resource(reader)(reader => Task(reader.close())).flatMap(fromReaderUnsafe)

@deprecated("Renamed to fromReaderUnsafe", "0.5.0")
def reader[T](reader: ParquetReader[T]): Observable[T] =
new ParquetPublisher[T](reader)
Expand Down
37 changes: 37 additions & 0 deletions parquet/src/main/scala/monix/connect/parquet/ParquetSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package monix.connect.parquet

import monix.eval.Task
import monix.execution.annotations.UnsafeBecauseImpure
import monix.reactive.Consumer
import org.apache.parquet.hadoop.ParquetWriter

object ParquetSink {

/**
* Writes records to a Parquet file.
*
* @param writer The apache hadoop generic implementation of a parquet writer.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents the element type of the parquet file to be written.
* @return A [[Consumer]] that expects records of type [[T]] to be passed and materializes to [[Long]]
* that represents the number of elements written.
*/
@UnsafeBecauseImpure
def fromWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] =
new ParquetSubscriber[T](writer)

/**
* Writes records to a Parquet file.
*
* @param writer The apache hadoop generic implementation of a parquet writer.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents the element type of the parquet file to be written.
* @return A [[Consumer]] that expects records of type [[T]] to be passed and materializes to [[Long]]
* that represents the number of elements written.
*/
def fromWriter[T](writer: Task[ParquetWriter[T]]): Consumer[T, Long] =
new ParquetSubscriberT[T](writer)

}
34 changes: 34 additions & 0 deletions parquet/src/main/scala/monix/connect/parquet/ParquetSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package monix.connect.parquet

import monix.eval.Task
import monix.execution.annotations.UnsafeBecauseImpure
import monix.reactive.Observable
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}

object ParquetSource {

/**
* Reads the records from a Parquet file.
*
* @param reader The apache hadoop generic implementation of a parquet reader.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents element type of the parquet file to be read
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
@UnsafeBecauseImpure
def fromReaderUnsafe[T](reader: ParquetReader[T]): Observable[T] =
new ParquetPublisher[T](reader)

/**
* Reads the records from a Parquet file.
*
* @param reader The apache hadoop generic implementation of a parquet reader.
* See the following known implementations of [[ParquetWriter]] for avro and protobuf respectively:
* [[org.apache.parquet.avro.AvroParquetWriter]], [[org.apache.parquet.proto.ProtoParquetWriter]].
* @tparam T A hinder kinded type that represents element type of the parquet file to be read
* @return All the elements of type [[T]] the specified parquet file as [[Observable]]
*/
def fromReader[T](reader: Task[ParquetReader[T]]): Observable[T] =
Observable.resource(reader)(reader => Task(reader.close())).flatMap(fromReaderUnsafe)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import scala.util.control.NonFatal

/**
* A parquet writer implemented as a [[Subscriber.Sync]].
*
* @param parquetWriter The apache hadoop generic implementation of a parquet writer.
* @tparam T Represents the type of the elements that will be written into the parquet file.
*/
class ParquetSubscriber[T](parquetWriter: ParquetWriter[T]) extends Consumer[T, Long] {
private[parquet] class ParquetSubscriber[T](parquetWriter: ParquetWriter[T]) extends Consumer[T, Long] {

def createSubscriber(callback: Callback[Throwable, Long], s: Scheduler): (Subscriber[T], AssignableCancelable) = {
val out = new Subscriber[T] {
Expand All @@ -48,7 +49,7 @@ class ParquetSubscriber[T](parquetWriter: ParquetWriter[T]) extends Consumer[T,
try {
parquetWriter.write(record)
nElements = nElements + 1
monix.execution.Ack.Continue
Ack.Continue
} catch {
case ex if NonFatal(ex) =>
onError(ex)
Expand Down Expand Up @@ -84,4 +85,4 @@ class ParquetSubscriber[T](parquetWriter: ParquetWriter[T]) extends Consumer[T,
(out, AssignableCancelable.multi())
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2020-2020 by The Monix Connect Project Developers.
* See the project homepage at: https://connect.monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.connect.parquet

import monix.eval.Task
import monix.execution.cancelables.AssignableCancelable
import monix.execution.internal.Platform
import monix.execution.{Ack, Callback, Scheduler}
import monix.reactive.Consumer
import monix.reactive.observers.Subscriber
import org.apache.parquet.hadoop.ParquetWriter

import scala.concurrent.Future
import scala.util.control.NonFatal

/**
* A sink that writes each element of [[T]] passed, into a single parquet file.
*
* @param parquetWriter The apache hadoop generic implementation of a parquet writer.
* @tparam T Represents the type of the elements that will be written into the parquet file.
*/
private[parquet] class ParquetSubscriberT[T](parquetWriter: Task[ParquetWriter[T]]) extends Consumer[T, Long] {

def createSubscriber(callback: Callback[Throwable, Long], s: Scheduler): (Subscriber[T], AssignableCancelable) = {
val out = new Subscriber[T] {
override implicit val scheduler: Scheduler = s

//the number of parquet files that has been written that is returned as materialized value
private[this] var nElements: Long = 0
private[this] var memoizedWriter = parquetWriter.memoize

// Protects from the situation where last onNext throws, we call onError and then
// upstream calls onError or onComplete again
private[this] var isDone = false

override def onNext(record: T): Future[Ack] = {
memoizedWriter.map{ parquetWriter =>
try {
parquetWriter.write(record)
nElements = nElements + 1
Ack.Continue
} catch {
case ex if NonFatal(ex) =>
onError(ex)
Ack.Stop
}
}.runToFuture(scheduler)
}

override def onComplete(): Unit =
if (!isDone) {
isDone = true
memoizedWriter.map { writer =>
try {
writer.close()
callback.onSuccess(nElements)
} catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}.runToFuture(scheduler)
}

override def onError(ex: Throwable): Unit =
if (!isDone) {
isDone = true
memoizedWriter.map{ writer =>
try {
writer.close()
callback.onError(ex)
} catch {
case NonFatal(ex2) =>
callback.onError(Platform.composeErrors(ex, ex2))
}
}.runToFuture(scheduler)
}
}

(out, AssignableCancelable.multi())
}

}
Loading

0 comments on commit 730c3cc

Please sign in to comment.