Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use Kafka Producer Consumer interfaces #245

Merged
merged 3 commits into from
Jan 1, 2021
Merged

use Kafka Producer Consumer interfaces #245

merged 3 commits into from
Jan 1, 2021

Conversation

arun0009
Copy link
Contributor

Use Kafka Producer, Consumer interfaces as types where appropriate instead of using implementation types. Resolves #244

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have just one comment!

I will release a snapshot if you'd like to try it out before including it in the official release

}

/** Builds a [[KafkaProducer]] instance with provided Apache Producer. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: ApacheProducer[K, V])(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think producerRef should be taken lazily, or as a Task / Coeval.
The implementation can close ApacheProducer and that should be only done by the producer (creator) of the resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you mean?

  /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */
  def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producer: Coeval[ApacheProducer[K, V]])(
    implicit K: Serializer[K],
    V: Serializer[V]): KafkaProducer[K, V] = {
    lazy val producerRef = producer.value()
    new Implementation[K, V](config, sc, producerRef)
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although Implementation would have to use Coeval as well, otherwise it will initialize right away

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, pushed a commit that uses Coeval producer.

@Avasil Avasil merged commit bf109be into monix:master Jan 1, 2021
@arun0009
Copy link
Contributor Author

arun0009 commented Jan 5, 2021

@Avasil - Just to let you know, we were able to use the snapshot jar which includes this PR and we were able to wrap Tracing as expected.

@Avasil
Copy link
Collaborator

Avasil commented Jan 5, 2021

@arun0009 That's awesome, thanks for the update :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KafkaConsumerObservable and its subclasses use KafkaConsumer (implementation) instead of Consumer (interface)
2 participants