From 387a1ba8b22236b37b1179e00af408bba51ca7fc Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Thu, 29 Sep 2022 11:53:08 +0200 Subject: [PATCH] fix(core): retry exposing host port up to 5 times --- .../io/zeebe/containers/ZeebeBrokerNode.java | 9 +-- .../containers/util/HostPortForwarder.java | 74 +++++++++++++++++++ .../util/HostPortForwarderTest.java | 73 ++++++++++++++++++ 3 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/io/zeebe/containers/util/HostPortForwarder.java create mode 100644 core/src/test/java/io/zeebe/containers/util/HostPortForwarderTest.java diff --git a/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java b/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java index 7b07651e..101fe4c8 100644 --- a/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java +++ b/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java @@ -15,9 +15,9 @@ */ package io.zeebe.containers; +import io.zeebe.containers.util.HostPortForwarder; import org.apiguardian.api.API; import org.apiguardian.api.API.Status; -import org.testcontainers.Testcontainers; import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.MountableFile; @@ -88,15 +88,14 @@ default T withZeebeData(final ZeebeData data) { } /** - * Injects an instance of the debug exporter into the container, which will push records out to - * http://host.testcontainers.internal:{@code port}/records. + * Injects an instance of the debug exporter into the container. * * @param port the host port of the {@link io.zeebe.containers.exporter.DebugReceiver} * @return this container for chaining */ @API(status = Status.EXPERIMENTAL) default T withDebugExporter(final int port) { - Testcontainers.exposeHostPorts(port); + final int containerPort = HostPortForwarder.forwardHostPort(port, 5); //noinspection resource withCopyToContainer( @@ -106,7 +105,7 @@ default T withDebugExporter(final int port) { "ZEEBE_BROKER_EXPORTERS_DEBUG_CLASSNAME", "io.zeebe.containers.exporter.DebugExporter") .withEnv( "ZEEBE_BROKER_EXPORTERS_DEBUG_ARGS_URL", - "http://host.testcontainers.internal:" + port + "/records"); + "http://host.testcontainers.internal:" + containerPort + "/records"); return self(); } diff --git a/core/src/main/java/io/zeebe/containers/util/HostPortForwarder.java b/core/src/main/java/io/zeebe/containers/util/HostPortForwarder.java new file mode 100644 index 00000000..2723ee6f --- /dev/null +++ b/core/src/main/java/io/zeebe/containers/util/HostPortForwarder.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.util; + +import java.util.HashMap; +import java.util.Map; +import org.agrona.collections.MutableInteger; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.Testcontainers; + +@API(status = Status.INTERNAL) +public final class HostPortForwarder { + private final PortForwarder portForwarder; + + public HostPortForwarder() { + this(Testcontainers::exposeHostPorts); + } + + public HostPortForwarder(final PortForwarder portForwarder) { + this.portForwarder = portForwarder; + } + + public static int forwardHostPort(final int port, final int retryCount) { + return new HostPortForwarder().forward(port, retryCount); + } + + /** + * Exposes a given host port to your containers, accessible via host.testcontainers.internal:PORT. + * See the + * docs for more. + * + *

This method is mostly here as a QoL improvement to retry on I/O errors. + * + * @param port the port on the host to expose + * @param retryCount the number of times to retry on I/O errors + * @return the container port to use + */ + public int forward(final int port, final int retryCount) { + final MutableInteger attempts = new MutableInteger(); + return Unreliables.retryUntilSuccess( + retryCount, + () -> { + // since the port-forwarding requests are cached, use the attempt count to increment the + // container port value, such that the request is always fresh on every retry + final int containerPort = port + attempts.getAndIncrement(); + final Map portMapping = new HashMap<>(); + portMapping.put(port, containerPort); + + portForwarder.forwardPort(portMapping); + return containerPort; + }); + } + + @FunctionalInterface + public interface PortForwarder { + void forwardPort(final Map portMapping); + } +} diff --git a/core/src/test/java/io/zeebe/containers/util/HostPortForwarderTest.java b/core/src/test/java/io/zeebe/containers/util/HostPortForwarderTest.java new file mode 100644 index 00000000..6dbd8173 --- /dev/null +++ b/core/src/test/java/io/zeebe/containers/util/HostPortForwarderTest.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.zeebe.containers.util.HostPortForwarder.PortForwarder; +import java.util.HashSet; +import java.util.Set; +import org.agrona.collections.MutableInteger; +import org.junit.jupiter.api.Test; + +final class HostPortForwarderTest { + @Test + void shouldRetryUpToRetryCount() { + // given + final int retryCount = 5; + final MutableInteger attempts = new MutableInteger(); + final PortForwarder forwarder = + mapping -> { + if (attempts.incrementAndGet() >= retryCount) { + return; + } + + throw new RuntimeException("failure"); + }; + final HostPortForwarder portForwarder = new HostPortForwarder(forwarder); + + // when + portForwarder.forward(1024, retryCount); + + // then + assertThat(attempts.value).isEqualTo(retryCount); + } + + @Test + void shouldChangeContainerPortOnRetry() { + // given + final int retryCount = 5; + final MutableInteger attempts = new MutableInteger(); + final Set containerPorts = new HashSet<>(); + final PortForwarder forwarder = + mapping -> { + containerPorts.add(mapping.get(1024)); + + if (attempts.incrementAndGet() >= retryCount) { + return; + } + + throw new RuntimeException("failure"); + }; + final HostPortForwarder portForwarder = new HostPortForwarder(forwarder); + + // when + portForwarder.forward(1024, retryCount); + + // then + assertThat(containerPorts).hasSize(retryCount); + } +}