Skip to content

Commit

Permalink
chore(containers): improve wait strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Pires committed Feb 7, 2020
1 parent 9600d76 commit e662306
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 35 deletions.
80 changes: 80 additions & 0 deletions src/main/java/io/zeebe/containers/BrokerWaitStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright © 2019 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.containers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;

public class BrokerWaitStrategy implements WaitStrategy {

private final WaitStrategy readyCheck;
private final WaitStrategy commandApiCheck;

public BrokerWaitStrategy() {
readyCheck =
new HttpWaitStrategy()
.forPath("/ready")
.forPort(ZeebePort.MONITORING_API.getPort())
.forStatusCode(204)
.withReadTimeout(Duration.ofSeconds(10));
commandApiCheck = new HostPortWaitStrategy();
}

@Override
public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
readyCheck.waitUntilReady(waitStrategyTarget);
commandApiCheck.waitUntilReady(new BrokerWaitStrategyTarget(waitStrategyTarget));
}

@Override
public WaitStrategy withStartupTimeout(Duration startupTimeout) {
readyCheck.withStartupTimeout(startupTimeout);
commandApiCheck.withStartupTimeout(startupTimeout);
return this;
}

static class BrokerWaitStrategyTarget implements WaitStrategyTarget {

private final WaitStrategyTarget waitStrategyTarget;

public BrokerWaitStrategyTarget(WaitStrategyTarget waitStrategyTarget) {
this.waitStrategyTarget = waitStrategyTarget;
}

@Override
public List<Integer> getExposedPorts() {
return waitStrategyTarget.getExposedPorts();
}

@Override
public InspectContainerResponse getContainerInfo() {
return waitStrategyTarget.getContainerInfo();
}

@Override
public Set<Integer> getLivenessCheckPortNumbers() {
return Collections.singleton(
waitStrategyTarget.getMappedPort(ZeebePort.COMMAND_API.getPort()));
}
}
}
29 changes: 5 additions & 24 deletions src/main/java/io/zeebe/containers/SupportedVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,22 @@
*/
package io.zeebe.containers;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public enum SupportedVersion {
ZEEBE_0_20_1("0.20.1", ".*Broker is ready.*"),
ZEEBE_0_21_1("0.21.1", ".*Broker is ready.*"),
ZEEBE_0_22_1("0.22.1", ".* succeeded. Started.*");
ZEEBE_0_20_1("0.20.1"),
ZEEBE_0_21_1("0.21.1"),
ZEEBE_0_22_1("0.22.1"),
SNAPSHOT("SNAPSHOT");

private static final Map<String, SupportedVersion> LOOKUP_MAP = new HashMap<>();
private final String version;
private final String logStringRegex;

static {
Arrays.stream(SupportedVersion.values())
.forEach(version -> LOOKUP_MAP.put(version.version(), version));
}

SupportedVersion(final String version, final String logStringRegex) {
SupportedVersion(final String version) {
this.version = version;
this.logStringRegex = logStringRegex;
}

public String version() {
return version;
}

public String logStringRegex() {
return logStringRegex;
}

public static SupportedVersion fromVersion(String version) {
return LOOKUP_MAP.get(version);
}

@Override
public String toString() {
return version;
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/zeebe/containers/ZeebeBrokerContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.Set;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.Base58;

@SuppressWarnings({"WeakerAccess", "UnusedReturnValue"})
public class ZeebeBrokerContainer extends GenericContainer<ZeebeBrokerContainer>
implements ZeebeContainer<ZeebeBrokerContainer>,
ZeebeGatewayContainer<ZeebeBrokerContainer>,
ZeebeNetworkable {

protected String host;
protected int portOffset;
protected boolean embedGateway;
Expand Down Expand Up @@ -95,9 +95,8 @@ public int hashCode() {

public void applyDefaultConfiguration() {
final String defaultHost = "zeebe-broker-" + Base58.randomString(6);
setWaitStrategy(
new LogMessageWaitStrategy()
.withRegEx(SupportedVersion.fromVersion(version).logStringRegex()));
setWaitStrategy(new BrokerWaitStrategy());

withHost(defaultHost)
.withPartitionCount(1)
.withReplicationFactor(1)
Expand Down
30 changes: 23 additions & 7 deletions src/test/java/io/zeebe/containers/ZeebeBrokerContainerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,51 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.rnorth.ducttape.timeouts.Timeouts;
import org.testcontainers.containers.Network;

@Timeout(value = 15, unit = TimeUnit.MINUTES)
class ZeebeBrokerContainerTest {

private ZeebeBrokerContainer container;
private Network network;

@AfterEach
void tearDown() {
if (container != null) {
container.stop();
container = null;
}

if (network != null) {
network.close();
network = null;
}
}

@BeforeEach
void setUp() {
network = Network.newNetwork();
}

@ParameterizedTest
@EnumSource(SupportedVersion.class)
void shouldStartWithEmbeddedGateway(final SupportedVersion version) {
// given
final int partitionsCount = 3;
container = new ZeebeBrokerContainer(version.version());
container
.withHost("zeebe-0")
.withNodeId(0)
.withEmbeddedGateway(true)
.withPartitionCount(partitionsCount)
.withReplicationFactor(1);
container =
new ZeebeBrokerContainer(version.version())
.withHost("zeebe-0")
.withNodeId(0)
.withEmbeddedGateway(true)
.withPartitionCount(partitionsCount)
.withReplicationFactor(1)
.withNetwork(network);

Timeouts.doWithTimeout(30, TimeUnit.SECONDS, container::start);

// when
Expand Down

0 comments on commit e662306

Please sign in to comment.