Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka module #546

Merged
merged 8 commits into from
Jan 27, 2018
Merged

Add Kafka module #546

merged 8 commits into from
Jan 27, 2018

Conversation

bsideup
Copy link
Member

@bsideup bsideup commented Jan 16, 2018

No description provided.

@bsideup bsideup added this to the 1.6.0 milestone Jan 16, 2018
@bsideup bsideup requested review from rnorth and kiview January 16, 2018 19:38
@@ -52,7 +52,7 @@ static Network newNetwork() {
private final AtomicBoolean initialized = new AtomicBoolean();

@Override
public String getId() {
public synchronized String getId() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nasty bug discovered during my experiments with Kafka module, concurrent access was giving null for the second invocation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making id volatile. This should work and is cheaper under contention that synchronized method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a performance critical part (being called just a few times), doesn't make sense to over-optimize it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Train your good habits! :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it simple 😎

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't volatile as simple? Doesn't really matter for me in this case 😁


withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY);
withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided to implement all-in-one container simply to speed it up. Starting from Kafka 0.11, Zookeeper is no longer have to be exposed to the clients and becomes a detail of implementation of Kafka, so IMO it's fine

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally prefer to have those as separate containers. or at least provide this as an option to configure. this will allow testing scenarios of network partitions between zookeeper and kafka nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ;)

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line makes the whole thing work :D Otherwise, Kafka would try to communicate with itself with host's port (yes, even in 1-node scenario (sic!) )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not directly familiar with Kafka or the 7 environment variables that are being set in this method. Is it possible that someone might want to use alternative values for some of these? If so, they could happily subclass and re-set the env vars they want, so that's OK.

However, would it be worth having a comment in the code (similar to your Github comment!) to say which settings are just critical to getting Kafka to work in the Dockerised environment and shouldn't be changed?

@@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() {
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest");
}

public String getSocatContainerImage() {
return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to use :latest :(
https://hub.docker.com/r/alpine/socat/tags/

Even tho there is a tag, it seems to be old and didn't work for me. But I assume it's fine to use latest here because socat itself is supposed to be backward compatible and shouldn't break over the time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or would it be more responsible to maintain our own image? 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we get our name on Docker Hub - sure, for now I would stick with Alpine's

@gAmUssA
Copy link

gAmUssA commented Jan 16, 2018

Yay111
We need dis!

@@ -56,6 +59,10 @@
private boolean pull = true;
private boolean tailChildContainers;

private final AtomicInteger nextAmbassadorPort = new AtomicInteger(2000);
private final Map<String, Map<Integer, Integer>> ambassadorPortMappings = new ConcurrentHashMap<>();
private final SocatContainer ambassadorContainer = new SocatContainer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this changes happen in a different PR? It's a quite big one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is little to no activity around DockerComposeContainer, so I would keep it as part of this PR until you have objections :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try this change on Windows tonight 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good on Docker for Windows 💪

protected void configure() {
withCommand("-c",
targets.entrySet().stream()
.map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that strange hostname values won't break the shell if we concat the String like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK any valid hostname should be valid shell argument

@@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() {
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest");
}

public String getSocatContainerImage() {
return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or would it be more responsible to maintain our own image? 😕

this("4.0.0");
}

public KafkaContainer(String confluencePlatformVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK it's confluent, not confluence.

WDYT about allowing non-confluent images? We are i.e. using https://hub.docker.com/r/ches/kafka/ for our acceptance-tests. This image is missing KAFKA_INTER_BROKER_LISTENER_NAME, but it should be trivial to update the image accordingly, the other ENV params should be equal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK it's confluent, not confluence.

👍 😄

WDYT about allowing non-confluent images
We add too much customization, if one wants to use his custom image, I would rather recommend him to build it on top of GenericContainer

withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
}

public KafkaContainer withEmbeddedZookeeper() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because confluent images contain zookeeper?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@iNikem
Copy link
Contributor

iNikem commented Jan 21, 2018

Yo, @bsideup any estimation when this will be released? :)

@bsideup
Copy link
Member Author

bsideup commented Jan 21, 2018

Hey @iNikem, I still wait for @rnorth's review, we're also running the tests in my company with KafkaContainer via JitPack, so far so good :)


public SocatContainer() {
super(TestcontainersConfiguration.getInstance().getSocatContainerImage());
withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought - please could we set the container's name to something that people will be able to identify? (i.e. stop people wondering "what's this random socat container?"). A name like "testcontainers-socat-" + base58chars would probably work.

I realise that the image richnorth/ambassador or the random container names it was spawned with were never great in the first place, but we have a chance to do better 😄

Copy link
Member

@rnorth rnorth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a few minor comments from me. I'm not a Kafka user so I put more weight on the fact that you and @kiview are using this already to know that it's good 😄

Please could you also add a suitable CHANGELOG entry and probably a docs update? Otherwise looks great to me!

@bsideup
Copy link
Member Author

bsideup commented Jan 22, 2018

@rnorth done 👍

this("4.0.0");
}

public KafkaContainer(String confluentPlatformVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something, but don't we need a custom wait strategy? Or is port based wait strategy already enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Kafka is ready to be used when the port is open

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I at least had some instances, where I got the following error when creating a topic:
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 1 larger than available brokers: 0 (kafka.admin.TopicCommand$).

This did not happen for me, when I used
.waitingFor(new LogMessageWaitStrategy().withRegEx(/(?s).*started \(kafka\.server\.KafkaServer\).*/))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Started Kafka" != "Initialized Kafka", your clients should be able to sync the cluster state before doing operations (and most of them do).

Waiting for cluster initialization instead of just startup will just make the tests slower while it should be handled by your consumers / producers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in this case we were using the bundled helper scripts, like kafka-topics.sh (but this is mostly a rudiment from having to use kafka with GenericContainer).

@iNikem
Copy link
Contributor

iNikem commented Jan 24, 2018

Gentle push for @bsideup and @rnorth :)

@bsideup bsideup merged commit 8d5f7a9 into master Jan 27, 2018
@bsideup bsideup deleted the kafka branch January 27, 2018 10:48
bsideup added a commit that referenced this pull request Jan 27, 2018
* Add Kafka module

* Replace AmbassadorContainer with SocatContainer in DockerComposeContainer

* make it possible to use KafkaContainer with external Zookeeper

* fix typo

* fix Kafka tests

* Add to CHANGELOG.md, name SocatContainer, add listeners explanation comment to KafkaContainer

* listen on alias

* rename myNetworkAlias -> networkAlias
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants