Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renamed parquet writer signature and updated docs. #353

Merged
merged 8 commits into from
Oct 7, 2020

Conversation

paualarco
Copy link
Member

@paualarco paualarco commented Sep 27, 2020

Addresses #352

@@ -33,20 +47,19 @@ object Parquet {
* @return A [[Consumer]] that expects records of type [[T]] to be passed and materializes to [[Long]]
* that represents the number of elements written.
*/
def writer[T](writer: ParquetWriter[T]): Consumer[T, Long] = {
@UnsafeBecauseImpure
def toWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Avasil any thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming fromWriterUnsafe would make sense here because we create Consumer from ParquetWriter

I would add "safe" versions (fromReader / fromWriter since reader/writer are deprecated?) that take either Task or at least by-name parameter and close resources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed your suggestions, on the other hand I guess that for the ParquetSubscriber will need to be reimplemented in such a way a way that takes as an argument a Task[ParquetWriter[T]] instead of a raw ParquetWriter[T], am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean, it seems like it is the same situation as the reader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a bit different because in this case the subscriber will contain the logic to close resouces onComplete or onError. So we can not enclose it with any resouce data type.
Therefore my idea is to change the current subscriber to expect a task with the parquet writer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a bit different because in this case the subscriber will contain the logic to close resouces onComplete or onError. So we can not enclose it with any resouce data type.
Therefore my idea is to change the current subscriber to expect a task with the parquet writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see...
Do we also want to do it in case of "shared" (strict parameter) ParquetWriter?

Copy link
Member Author

@paualarco paualarco Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah why not? We are already providing a method that expects the strict parameter aka def fromWriterUnsafe[T](parquetWriter: ParquetWriter[T] and we want to make it compatible with def fromWriter[T](parquetWriter: Task[ParquetWriter[T]].
I think would have to implement a normal Subscriber instead of a Syncronous one, although in that case both will close resources on complete.

@paualarco paualarco changed the title Renamed parquet writer signature and updated docs. (#352) Renamed parquet writer signature and updated docs. #352 Sep 27, 2020
@paualarco paualarco changed the title Renamed parquet writer signature and updated docs. #352 Renamed parquet writer signature and updated docs. Sep 27, 2020
@paualarco paualarco force-pushed the renamed-parqued-writer branch 5 times, most recently from 730c3cc to fd74685 Compare September 29, 2020 21:26
@paualarco
Copy link
Member Author

@Avasil it did not convinced me to have both def fromWriterUnsafe[T](writer: ParquetWriter[T]): Consumer[T, Long] and def fromReaderUnsafe[T](reader: ParquetReader[T]): Observable[T] in the same Parquet api, it was bit confusing to me since the return types were completely different.
So I have left as deprecated the 'original' api and splitted the logic into ParquetSink and ParquetSource.

Hopefully you could also review the ParquetSubscriberT which is very similar to ParquetSubscriber but that instead it expects a Task[ParquetWriter[T]], and it will be called from Parquet.fromReader.

Let me know your thoughts whenever you find some time :-)

@Avasil
Copy link
Contributor

Avasil commented Sep 30, 2020

I will try to review tomorrow, I am a little behind on some stuff

onError(ex)
Ack.Stop
}
}.runToFuture(scheduler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running task to Future per each event seems to be fishy :/

I wish Consumer wouldn't require createSubscriber implementation because Observable[A] => Task[B] would be very simple to implement.

I am not sure what to do about it, I'd actually consider taking () => ParquetWriter[T] instead of a Task but idk, maybe runToFuture is not so bad in practice but I don't know without benchmarks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review, will take your suggestions and change it to take a Coeval instead,
furthermore, will try creating a benchmarks sub-module to compare them :)

Copy link
Member Author

@paualarco paualarco Oct 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark results indicates that unsafe method performs better than the safe, however it is not like this (although the error too bit, which would make it less reliable).

Benchmark Mode Cnt Score Error Units
Writer
ParquetWriterBenchmark.fromCoeval thrpt 4 295.987 ± 269.596 ops/s
ParquetWriterBenchmark.fromTask thrpt 4 236.013 ± 342.709 ops/s
ParquetWriterBenchmark.unsafe thrpt 4 403.379 ± 144.174 ops/s
Reader
ParquetReaderBenchmark.fromTask thrpt 3 7414.555 ± 913.722 ops/s
ParquetReaderBenchmark.unsafe thrpt 3 7275.114 ± 2693.346 ops/s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by however it is not like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant on the reader benchmark

@Benchmark
def unsafe(): Unit = {
val file: String = genFilePath.value()
val records: List[GenericRecord] = genPersons(size).sample.get.map(personToRecord)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All benchmarks should compare vs exactly the same data to give us any insight

I would recommend to do a setup like here: https://github.com/monix/monix/blob/series/3.x/benchmarks/shared/src/main/scala/monix/benchmarks/ChunkedEvalFilterMapSumBenchmark.scala#L83

Basically put records to a var outside the methods, fill it in setup and then use in benchmarks

[info] ParquetReaderBenchmark.unsafe thrpt 5 2708.631 ± 508.417 ops/s
[info] ParquetWriterBenchmark.fromCoeval thrpt 5 98.009 ± 17.250 ops/s
[info] ParquetWriterBenchmark.fromTask thrpt 5 97.417 ± 15.160 ops/s
[info] ParquetWriterBenchmark.unsafe thrpt 5 100.652 ± 8.674 ops/s
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the benchmark numbers, both solutions (sefe and unsafe) seems to be approximated. They have run writing / reading files of 10 records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Avasil finally removed ParquetSink.fromWriter(parquetWriter: Task[ParquetWriter]) since providing Coeval should be enough, any additional comments/concerns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for helping! Will do a last review tonight before merge :)

@paualarco paualarco force-pushed the renamed-parqued-writer branch 2 times, most recently from f853834 to 1d8482b Compare October 5, 2020 07:52
Safe parquet writer 

Scalafmtall
Added parquet failure test cases 

Added failure case test
Splitted `Parquet` into `ParquetSource` and `ParquetSink`


Safe parquet writer coeval


Parquet benchmark


Parquet benchmark correct setup



Skip publish benchmarks module
@paualarco paualarco added this to the 0.5.0 milestone Oct 5, 2020
@paualarco paualarco merged commit bc22e58 into monix:master Oct 7, 2020
@paualarco paualarco mentioned this pull request Nov 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants