Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka cluster example #1984

Merged

Conversation

seglo
Copy link
Contributor

@seglo seglo commented Oct 17, 2019

This PR introduces the KafkaContainerCluster to run a cluster of KafkaContainers. I discussed this with @bsideup on slack and thought it would be useful for others too.

Cluster-related changes

  • Added convenience class KafkaContainerCluster to facilitate creating a multi-broker cluster.
  • Implement Startable and use deepStart to launch cluster containers.
  • Set appropriate replication factor for internal Kafka topics based on number of available brokers.
  • Set broker id and broker id network aliases.
  • Use Kafka AdminClient to create topics in tests so we can define topics with partitions and replication factors greater than 1.

Other changes

@seglo seglo force-pushed the seglo/multi-broker-kafka branch 2 times, most recently from cfe9f72 to 060cf70 Compare October 20, 2019 21:05
Copy link
Contributor Author

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks a lot for the review @bsideup. I reverted most of the changes to KafkaContainer.

Is KafkaContainerCluster something you're willing to consider adding to the API? It should make it easier for users to setup multi-broker clusters without knowing much about Kafka.

@seglo
Copy link
Contributor Author

seglo commented Nov 11, 2019

Rebased on master. @bsideup LMK if you have any concerns.

@SneakyThrows
private String clusterBrokers(KafkaContainer c) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dockerClient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using zookeeper.execInContainer(cmd)?

Also, what would be the use of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in start() to assert that Kafka cluster has formed, by checking ZooKeeper (its coordinator) to see if the right number of brokers have joined together.

Good idea about using the ZooKeeper container. It looks like the zookeeper-shell script is present on the path there as well. I've pushed an update.

@seglo seglo requested a review from bsideup November 11, 2019 10:41
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b);
Startables.deepStart(startables).get(60, SECONDS);

Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> Stream.of(this.zookeeper)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it can be simplified to () -> clusterBrokersIDs().length == brokersNum (where clusterBrokersIDs returns an array (or list) of broker IDs (or, if execInContainer is possible, can be inlined since it becomes a one liner)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container command returns a string (internally it's stored as a string, I guess, i.e. 1,2,3). I could inline a shell command to split or count the string instead if that's what you prefer.

Copy link
Member

@bsideup bsideup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that there is no error handling here, which means that if the start fails then the containers will be dangling until someone calls #stop().

That made me think of the rules use cases (some people still got used to running things with JUnit's rules) whether we should have it as part of the module at all, or perhaps we should make it an example instead (We have the examples/ folder for things that feel more like a very special use case rather than a thing that most of the users will be doing)

@seglo I remember that you were using it in Alpakka.
Given some cleanups we were able to do, do you still feel that this amount of code is preferable to have as a part of the Kafka module (and being maintained), or maybe you're okay with having it as an example instead?

P.S. the KafkaContainer's rf fix is still very much needed 👍

@seglo
Copy link
Contributor Author

seglo commented Nov 11, 2019

@bsideup By error handling do you mean if the containers fail to start?

I don't mind scrapping this PR (creating a separate one for the tx RF stuff), and making this code an example. As you know I have a copy in Alpakka that works well for us, though I would probably port it to Scala instead. LMK.

@bsideup
Copy link
Member

bsideup commented Nov 11, 2019

@seglo

By error handling do you mean if the containers fail to start?

When some containers fail to start, or when the Unreliables part times out

I don't mind scrapping this PR (creating a separate one for the tx RF stuff), and making this code an example.

No need to, you can just adjust this PR! (Well, unless you want to make 2 separate contributions, totally deserved 👍)
P.S. in the example, I guess you can just define a few containers and add the Unreliables cluster check in @BeforeTest or something.

As you know I have a copy in Alpakka that works well for us, though I would probably port it to Scala instead

Let's do it like that: we will keep it as an example for now (I feel sorry for not suggesting it on a first place, btw! ☺️), and you will be able to evaluate the approach for a little longer.
I will monitor the activity and if we see that the use case is generic enough - we will move it into the module. How does it sound?

P.S. thanks a lot for the fast replies and your dedication! Really appreciate that 👍 💯

@seglo seglo force-pushed the seglo/multi-broker-kafka branch 2 times, most recently from 7a7364e to 6d6b838 Compare November 12, 2019 09:54
@seglo
Copy link
Contributor Author

seglo commented Nov 12, 2019

Thanks @bsideup . I'm having a gradle dep issue perhaps you can help me resolve. I added a build.gradle for ./examples/kafka-cluster, but when I attempt to run the example's tests I get dependency issues on testcontainer types, but I'm not sure why. I'm executing ../gradlew :kafka-cluster:test from within the examples directory:

 seglo@slice  ~/source/testcontainers-java/examples   seglo/multi-broker-kafka  ../gradlew :kafka-cluster:test

> Task :kafka-cluster:compileJava FAILED
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:27: error: cannot find symbol
    private final Network network;
                  ^
  symbol:   class Network
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:28: error: cannot find symbol
    private final GenericContainer zookeeper;
                  ^
  symbol:   class GenericContainer
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:29: error: cannot find symbol
    private final Collection<KafkaContainer> brokers;
                             ^
  symbol:   class KafkaContainer
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:69: error: cannot find symbol
    public Network getNetwork() {
           ^
  symbol:   class Network
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:73: error: cannot find symbol
    public GenericContainer getZooKeeper() {
           ^
  symbol:   class GenericContainer
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:77: error: cannot find symbol
    public Collection<KafkaContainer> getBrokers() {
                      ^
  symbol:   class KafkaContainer
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:87: error: cannot find symbol
    private Stream<GenericContainer> allContainers() {
                   ^
  symbol:   class GenericContainer
  location: class KafkaContainerCluster
/home/seglo/source/testcontainers-java/examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java:105: error: cannot find symbol
    private String clusterBrokers(GenericContainer c) {
                                  ^
  symbol:   class GenericContainer
  location: class KafkaContainerCluster
8 errors

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':kafka-cluster:compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 2s
1 actionable task: 1 executed

@bsideup
Copy link
Member

bsideup commented Nov 22, 2019

@seglo I see that you defined it as implementation, not testImplementation, probably that's the reason?

dependabot bot and others added 10 commits October 13, 2020 08:48
…#3335)

Bumps s3 from 2.14.21 to 2.15.7.

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
)

Bumps [cucumber-java](https://github.com/cucumber/cucumber-jvm) from 6.6.0 to 6.8.1.
- [Release notes](https://github.com/cucumber/cucumber-jvm/releases)
- [Changelog](https://github.com/cucumber/cucumber-jvm/blob/main/CHANGELOG.md)
- [Commits](cucumber/cucumber-jvm@v6.6.0...v6.8.1)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Richard North <rich.north@gmail.com>
…Spanner emulators (testcontainers#2690)

Co-authored-by: Richard North <rich.north@gmail.com>
…rs#3343)

* Use a lighter weight image for MultiplePortsExposedTest

* Update helloworld container version
…m daemon port (testcontainers#2769) (testcontainers#3237)

Co-authored-by: Vitalii Chura <c-vitalii.chura@hulu.com>
@stale
Copy link

stale bot commented Jan 24, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If you believe this is a mistake, please reply to this comment to keep it open. If there isn't one already, a PR to fix or at least reproduce the problem in a test case will always help us get back on track to tackle this.

@stale stale bot added the stale label Jan 24, 2021
@stale stale bot removed the stale label Jan 24, 2021
@bsideup bsideup added this to the next milestone Feb 6, 2021
@bsideup bsideup changed the base branch from master to seglo/multi-broker-kafka February 6, 2021 14:26
@bsideup bsideup merged commit 78aa475 into testcontainers:seglo/multi-broker-kafka Feb 6, 2021
@bsideup
Copy link
Member

bsideup commented Feb 6, 2021

@seglo you won't believe! 😂 I just merged your PR into our own branch (for some polishing) and will merge it into master in a moment! Thanks A LOT for submitting it and staying with us :D And I am so sorry that it took so long 😅

@bsideup bsideup mentioned this pull request Feb 6, 2021
bsideup added a commit that referenced this pull request Feb 6, 2021
Co-authored-by: Sean Glover <sean@seanglover.com>
@seglo
Copy link
Contributor Author

seglo commented Feb 6, 2021

Nice, thanks. Longest PR ever :)

I'll admit that I've been iterating this more over in Alpakka Kafka. I can take a look at the end result and see if there is anything worth carrying over.

@bsideup
Copy link
Member

bsideup commented Feb 7, 2021

@seglo the longest indeed 😅 And with such a nice id, btw ;)

Looking forward to the updates, and I promise it won't take that much this time 😂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KafkaContainer transactions: Timeout expired while initializing transactional state in 60000ms.
7 participants