Skip to content

Commit

Permalink
fix(core): retry exposing host port up to 5 times
Browse files Browse the repository at this point in the history
  • Loading branch information
npepinpe committed Sep 29, 2022
1 parent 33557a5 commit 387a1ba
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 5 deletions.
9 changes: 4 additions & 5 deletions core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.zeebe.containers;

import io.zeebe.containers.util.HostPortForwarder;
import org.apiguardian.api.API;
import org.apiguardian.api.API.Status;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.MountableFile;

Expand Down Expand Up @@ -88,15 +88,14 @@ default T withZeebeData(final ZeebeData data) {
}

/**
* Injects an instance of the debug exporter into the container, which will push records out to
* http://host.testcontainers.internal:{@code port}/records.
* Injects an instance of the debug exporter into the container.
*
* @param port the host port of the {@link io.zeebe.containers.exporter.DebugReceiver}
* @return this container for chaining
*/
@API(status = Status.EXPERIMENTAL)
default T withDebugExporter(final int port) {
Testcontainers.exposeHostPorts(port);
final int containerPort = HostPortForwarder.forwardHostPort(port, 5);

//noinspection resource
withCopyToContainer(
Expand All @@ -106,7 +105,7 @@ default T withDebugExporter(final int port) {
"ZEEBE_BROKER_EXPORTERS_DEBUG_CLASSNAME", "io.zeebe.containers.exporter.DebugExporter")
.withEnv(
"ZEEBE_BROKER_EXPORTERS_DEBUG_ARGS_URL",
"http://host.testcontainers.internal:" + port + "/records");
"http://host.testcontainers.internal:" + containerPort + "/records");

return self();
}
Expand Down
74 changes: 74 additions & 0 deletions core/src/main/java/io/zeebe/containers/util/HostPortForwarder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2022 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.containers.util;

import java.util.HashMap;
import java.util.Map;
import org.agrona.collections.MutableInteger;
import org.apiguardian.api.API;
import org.apiguardian.api.API.Status;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.Testcontainers;

@API(status = Status.INTERNAL)
public final class HostPortForwarder {
private final PortForwarder portForwarder;

public HostPortForwarder() {
this(Testcontainers::exposeHostPorts);
}

public HostPortForwarder(final PortForwarder portForwarder) {
this.portForwarder = portForwarder;
}

public static int forwardHostPort(final int port, final int retryCount) {
return new HostPortForwarder().forward(port, retryCount);
}

/**
* Exposes a given host port to your containers, accessible via host.testcontainers.internal:PORT.
* See <a
* href="https://www.testcontainers.org/features/networking/#exposing-host-ports-to-the-container">the
* docs</a> for more.
*
* <p>This method is mostly here as a QoL improvement to retry on I/O errors.
*
* @param port the port on the host to expose
* @param retryCount the number of times to retry on I/O errors
* @return the container port to use
*/
public int forward(final int port, final int retryCount) {
final MutableInteger attempts = new MutableInteger();
return Unreliables.retryUntilSuccess(
retryCount,
() -> {
// since the port-forwarding requests are cached, use the attempt count to increment the
// container port value, such that the request is always fresh on every retry
final int containerPort = port + attempts.getAndIncrement();
final Map<Integer, Integer> portMapping = new HashMap<>();
portMapping.put(port, containerPort);

portForwarder.forwardPort(portMapping);
return containerPort;
});
}

@FunctionalInterface
public interface PortForwarder {
void forwardPort(final Map<Integer, Integer> portMapping);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright © 2022 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.containers.util;

import static org.assertj.core.api.Assertions.assertThat;

import io.zeebe.containers.util.HostPortForwarder.PortForwarder;
import java.util.HashSet;
import java.util.Set;
import org.agrona.collections.MutableInteger;
import org.junit.jupiter.api.Test;

final class HostPortForwarderTest {
@Test
void shouldRetryUpToRetryCount() {
// given
final int retryCount = 5;
final MutableInteger attempts = new MutableInteger();
final PortForwarder forwarder =
mapping -> {
if (attempts.incrementAndGet() >= retryCount) {
return;
}

throw new RuntimeException("failure");
};
final HostPortForwarder portForwarder = new HostPortForwarder(forwarder);

// when
portForwarder.forward(1024, retryCount);

// then
assertThat(attempts.value).isEqualTo(retryCount);
}

@Test
void shouldChangeContainerPortOnRetry() {
// given
final int retryCount = 5;
final MutableInteger attempts = new MutableInteger();
final Set<Integer> containerPorts = new HashSet<>();
final PortForwarder forwarder =
mapping -> {
containerPorts.add(mapping.get(1024));

if (attempts.incrementAndGet() >= retryCount) {
return;
}

throw new RuntimeException("failure");
};
final HostPortForwarder portForwarder = new HostPortForwarder(forwarder);

// when
portForwarder.forward(1024, retryCount);

// then
assertThat(containerPorts).hasSize(retryCount);
}
}

0 comments on commit 387a1ba

Please sign in to comment.