diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java index db5bec8..a4a85a4 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,56 +54,40 @@ public class StrimziKafkaKRaftClusterIT extends AbstractIT { @Test void testKafkaClusterStartup() throws InterruptedException, ExecutionException { - try { - setUpKafkaKRaftCluster(); + setUpKafkaKRaftCluster(); - verifyReadinessOfKRaftCluster(); - } finally { - systemUnderTest.stop(); - } + verifyReadinessOfKRaftCluster(); } @Test void testKafkaClusterStartupWithSharedNetwork() throws InterruptedException, ExecutionException { - try { - systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() - .withNumberOfBrokers(NUMBER_OF_REPLICAS) - .withSharedNetwork() - .withKraft() - .build(); - systemUnderTest.start(); - - verifyReadinessOfKRaftCluster(); - } finally { - systemUnderTest.stop(); - } + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withSharedNetwork() + .withKraft() + .build(); + systemUnderTest.start(); + + verifyReadinessOfKRaftCluster(); } @Test void testKafkaClusterFunctionality() throws ExecutionException, InterruptedException, TimeoutException { setUpKafkaKRaftCluster(); - try { - verifyFunctionalityOfKafkaCluster(); - } finally { - systemUnderTest.stop(); - } + verifyFunctionalityOfKafkaCluster(); } @Test void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException { - try { - systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() - .withNumberOfBrokers(NUMBER_OF_REPLICAS) - .withSharedNetwork() - .withKraft() - .build(); - systemUnderTest.start(); - - verifyFunctionalityOfKafkaCluster(); - } finally { - systemUnderTest.stop(); - } + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withSharedNetwork() + .withKraft() + .build(); + systemUnderTest.start(); + + verifyFunctionalityOfKafkaCluster(); } @Test @@ -111,28 +96,22 @@ void testStartClusterWithProxyContainer() { DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.11.0") .asCompatibleSubstituteFor("shopify/toxiproxy")); - try { - systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() - .withNumberOfBrokers(NUMBER_OF_REPLICAS) - .withProxyContainer(proxyContainer) - .withKraft() - .build(); - - systemUnderTest.start(); - List bootstrapUrls = new ArrayList<>(); - for (KafkaContainer kafkaContainer : systemUnderTest.getBrokers()) { - Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); - assertThat(proxy, notNullValue()); - bootstrapUrls.add(kafkaContainer.getBootstrapServers()); - } - - assertThat(systemUnderTest.getBootstrapServers(), - is(String.join(",", bootstrapUrls))); - } finally { - if (systemUnderTest != null) { - systemUnderTest.stop(); - } + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withProxyContainer(proxyContainer) + .withKraft() + .build(); + + systemUnderTest.start(); + List bootstrapUrls = new ArrayList<>(); + for (KafkaContainer kafkaContainer : systemUnderTest.getBrokers()) { + Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); + assertThat(proxy, notNullValue()); + bootstrapUrls.add(kafkaContainer.getBootstrapServers()); } + + assertThat(systemUnderTest.getBootstrapServers(), + is(String.join(",", bootstrapUrls))); } private void setUpKafkaKRaftCluster() { @@ -215,4 +194,11 @@ private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, Inte }); } } + + @AfterEach + void afterEach() { + if (this.systemUnderTest != null) { + this.systemUnderTest.stop(); + } + } } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java index 5dced94..c3c625c 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java @@ -49,100 +49,83 @@ public class StrimziKafkaKraftContainerIT extends AbstractIT { void testStartContainerWithEmptyConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException { supportsKraftMode(imageName); - try { - systemUnderTest = new StrimziKafkaContainer(imageName) - .withBrokerId(1) - .withKraft() - .waitForRunning(); - - systemUnderTest.start(); - assertThat(systemUnderTest.getClusterId(), notNullValue()); - - String logsFromKafka = systemUnderTest.getLogs(); - if (isLessThanKafka350(kafkaVersion)) { - assertThat(logsFromKafka, containsString("RaftManager nodeId=1")); - } else { - assertThat(logsFromKafka, containsString("ControllerServer id=1")); - assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1")); - } - - verify(); - - assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" + - systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092))); - } finally { - systemUnderTest.stop(); + systemUnderTest = new StrimziKafkaContainer(imageName) + .withBrokerId(1) + .withKraft() + .waitForRunning(); + + systemUnderTest.start(); + assertThat(systemUnderTest.getClusterId(), notNullValue()); + + String logsFromKafka = systemUnderTest.getLogs(); + if (isLessThanKafka350(kafkaVersion)) { + assertThat(logsFromKafka, containsString("RaftManager nodeId=1")); + } else { + assertThat(logsFromKafka, containsString("ControllerServer id=1")); + assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1")); } + + verify(); + + assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" + + systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092))); } @ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}") @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithSomeConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException { supportsKraftMode(imageName); - try { - Map kafkaConfiguration = new HashMap<>(); - - kafkaConfiguration.put("log.cleaner.enable", "false"); - kafkaConfiguration.put("log.cleaner.backoff.ms", "1000"); - kafkaConfiguration.put("ssl.enabled.protocols", "TLSv1"); - kafkaConfiguration.put("log.index.interval.bytes", "2048"); - - systemUnderTest = new StrimziKafkaContainer(imageName) - .withBrokerId(1) - .withKraft() - .withKafkaConfigurationMap(kafkaConfiguration) - .waitForRunning(); - - systemUnderTest.start(); - - String logsFromKafka = systemUnderTest.getLogs(); - - if (isLessThanKafka350(kafkaVersion)) { - assertThat(logsFromKafka, containsString("RaftManager nodeId=1")); - } else { - assertThat(logsFromKafka, containsString("ControllerServer id=1")); - assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1")); - } - assertThat(logsFromKafka, containsString("log.cleaner.enable = false")); - assertThat(logsFromKafka, containsString("log.cleaner.backoff.ms = 1000")); - assertThat(logsFromKafka, containsString("ssl.enabled.protocols = [TLSv1]")); - assertThat(logsFromKafka, containsString("log.index.interval.bytes = 2048")); - - verify(); - } finally { - systemUnderTest.stop(); + + Map kafkaConfiguration = new HashMap<>(); + + kafkaConfiguration.put("log.cleaner.enable", "false"); + kafkaConfiguration.put("log.cleaner.backoff.ms", "1000"); + kafkaConfiguration.put("ssl.enabled.protocols", "TLSv1"); + kafkaConfiguration.put("log.index.interval.bytes", "2048"); + + systemUnderTest = new StrimziKafkaContainer(imageName) + .withBrokerId(1) + .withKraft() + .withKafkaConfigurationMap(kafkaConfiguration) + .waitForRunning(); + + systemUnderTest.start(); + + String logsFromKafka = systemUnderTest.getLogs(); + + if (isLessThanKafka350(kafkaVersion)) { + assertThat(logsFromKafka, containsString("RaftManager nodeId=1")); + } else { + assertThat(logsFromKafka, containsString("ControllerServer id=1")); + assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1")); } + assertThat(logsFromKafka, containsString("log.cleaner.enable = false")); + assertThat(logsFromKafka, containsString("log.cleaner.backoff.ms = 1000")); + assertThat(logsFromKafka, containsString("ssl.enabled.protocols = [TLSv1]")); + assertThat(logsFromKafka, containsString("log.index.interval.bytes = 2048")); + + verify(); } @Test void testUnsupportedKRaftUsingKafkaVersion() { - try { - systemUnderTest = new StrimziKafkaContainer() - .withKafkaVersion("2.8.2") - .withBrokerId(1) - .withKraft() - .waitForRunning(); - - assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); - } finally { - systemUnderTest.stop(); - } + systemUnderTest = new StrimziKafkaContainer() + .withKafkaVersion("2.8.2") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); } @Test void testUnsupportedKRaftUsingImageName() { - try { - systemUnderTest = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:latest-kafka-2.8.2") - .withBrokerId(1) - .withKraft() - .waitForRunning(); - - assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); - } finally { - systemUnderTest.stop(); + systemUnderTest = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:latest-kafka-2.8.2") + .withBrokerId(1) + .withKraft() + .waitForRunning(); - } + assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); } @Test @@ -163,8 +146,6 @@ void testWithKafkaLog() { assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("INFO")); assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("DEBUG")); assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("TRACE")); - - systemUnderTest.stop(); } private void verify() throws InterruptedException, ExecutionException, TimeoutException {