diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java new file mode 100644 index 0000000..475b39e --- /dev/null +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -0,0 +1,286 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.groupcdg.pitest.annotations.DoNotMutate; +import org.apache.logging.log4j.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.images.builder.Transferable; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +public class StrimziConnectContainer extends GenericContainer { + + // class attributes + private static final Logger LOGGER = LoggerFactory.getLogger(StrimziConnectContainer.class); + + /** + * The file containing the startup script. + */ + public static final String STARTER_SCRIPT = "/testcontainers_start.sh"; + + /** + * The file containing the Connect configuration + */ + public static final String CONFIG_FILE = "/opt/kafka/config/connect.properties"; + + /** + * Default Kafka port + */ + public static final int CONNECT_PORT = 8083; + + /** + * The network alias. + */ + protected static final String NETWORK_ALIAS = "connect"; + + /** + * Lazy image name provider + */ + private final CompletableFuture imageNameProvider; + + // instance attributes + private String bootstrapServers; + private Map connectConfigurationMap; + private final String kafkaVersion = KafkaVersionService.getInstance().latestRelease().getVersion(); + private boolean includeFileConnectors = true; + + /** + * Image name is specified lazily automatically in {@link #doStart()} method + */ + public StrimziConnectContainer() { + this(new CompletableFuture<>()); + } + + /** + * Image name is specified by {@code dockerImageName} + * + * @param dockerImageName specific docker image name provided by constructor parameter + */ + public StrimziConnectContainer(String dockerImageName) { + this(CompletableFuture.completedFuture(dockerImageName)); + } + + /** + * Image name is lazily set in {@link #doStart()} method + */ + private StrimziConnectContainer(CompletableFuture imageName) { + super(imageName); + this.imageNameProvider = imageName; + super.setNetwork(Network.SHARED); + // exposing kafka port from the container + super.setExposedPorts(Collections.singletonList(CONNECT_PORT)); + super.addEnv("LOG_DIR", "/tmp"); + } + + @Override + @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) + @DoNotMutate + protected void doStart() { + if (bootstrapServers == null) { + throw new IllegalStateException("Bootstrap servers must be configured using withBootstrapServers()"); + } + if (!imageNameProvider.isDone()) { + imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(kafkaVersion)); + } + + super.withNetworkAliases(NETWORK_ALIAS); + + super.setCommand("sh", "-c", runStarterScript()); + super.doStart(); + } + + @Override + @DoNotMutate + public void stop() { + super.stop(); + } + + /** + * Allows overriding the startup script command. + * The default is:
{@code "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT}
+ * + * @return the command + */ + protected String runStarterScript() { + return "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT; + } + + /** + * Fluent method, which sets a waiting strategy to wait until the broker is ready. + *

+ * This method waits for a log message in the broker log. + * You can customize the strategy using {@link #waitingFor(WaitStrategy)}. + * + * @return StrimziConnectContainer instance + */ + @DoNotMutate + public StrimziConnectContainer waitForRunning() { + super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)); + return this; + } + + /** + * The Connect REST API endpoint + * @return the endpoint + */ + public String restEndpoint() { + return "http://" + getHost() + ":" + getMappedPort(CONNECT_PORT); + } + + @Override + @DoNotMutate + protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) { + super.containerIsStarting(containerInfo, reused); + + LOGGER.info("Mapped port: {}", getMappedPort(CONNECT_PORT)); + + final Properties defaultServerProperties = buildDefaultConnectProperties(bootstrapServers); + final String serverPropertiesWithOverride = overrideProperties(defaultServerProperties, connectConfigurationMap); + + copyFileToContainer( + Transferable.of(serverPropertiesWithOverride.getBytes(StandardCharsets.UTF_8)), + CONFIG_FILE); + + String command = "#!/bin/bash \n"; + command += "bin/connect-distributed.sh " + CONFIG_FILE + " \n"; + + LOGGER.info("Copying command to 'STARTER_SCRIPT' script."); + + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), + STARTER_SCRIPT + ); + } + + /** + * Builds the default Kafka Connect properties. + * + * @param bootstrapServers the bootstrap servers + * @return the default Connect properties + */ + protected Properties buildDefaultConnectProperties(final String bootstrapServers) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", bootstrapServers); + properties.setProperty("group.id", "connect-cluster"); + properties.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + properties.setProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + properties.setProperty("key.converter.schemas.enable", "true"); + properties.setProperty("value.converter.schemas.enable", "true"); + properties.setProperty("offset.storage.topic", "connect-offsets"); + properties.setProperty("offset.storage.replication.factor", "1"); + properties.setProperty("config.storage.topic", "connect-configs"); + properties.setProperty("config.storage.replication.factor", "1"); + properties.setProperty("status.storage.topic", "connect-status"); + properties.setProperty("status.storage.replication.factor", "1"); + if (includeFileConnectors) { + properties.setProperty("plugin.path", "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"); + } + return properties; + } + + /** + * Overrides the default Kafka Connect properties with the provided overrides. + * If the overrides map is null or empty, it simply returns the default properties as a string. + * + * @param defaultProperties The default Kafka Connect properties. + * @param overrides The properties to override. Can be null. + * @return A string representation of the combined Connect properties. + */ + protected String overrideProperties(Properties defaultProperties, Map overrides) { + // Check if overrides are not null and not empty before applying them + if (overrides != null && !overrides.isEmpty()) { + overrides.forEach(defaultProperties::setProperty); + } + + // Write properties to string + StringWriter writer = new StringWriter(); + try { + defaultProperties.store(writer, null); + } catch (IOException e) { + throw new UncheckedIOException("Failed to store Kafka server properties", e); + } + + return writer.toString(); + } + + /** + * Fluent method, which sets {@code connectConfigurationMap}. + * + * @param connectConfigurationMap kafka configuration + * @return StrimziConnectContainer instance + */ + public StrimziConnectContainer withConnectConfigurationMap(final Map connectConfigurationMap) { + this.connectConfigurationMap = connectConfigurationMap; + return this; + } + + /** + * Fluent method to configure the bootstrap servers + * + * @param bootstrapServers the bootstrap servers + * @return StrimziConnectContainer instance + */ + public StrimziConnectContainer withBootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return self(); + } + + /** + * Configures the Connect container to use the specified logging level for Connect logs. + *

+ * This method generates a custom connect-log4j.properties file with the desired logging level + * and copies it into the Connect container. By setting the logging level, you can control the verbosity + * of Kafka's log output, which is useful for debugging or monitoring purposes. + *

+ * + * Example Usage: + *
{@code
+     * StrimziConnectContainer connectContainer = new StrimziConnectContainer()
+     *     .withConnectLog(Level.DEBUG)
+     *     .start();
+     * }
+ * + * @param level the desired {@link Level} of logging (e.g., DEBUG, INFO, WARN, ERROR) + * @return the current instance of {@code StrimziConnectContainer} for method chaining + */ + public StrimziConnectContainer withConnectLog(Level level) { + String log4jConfig = "log4j.rootLogger=" + level.name() + ", stdout\n" + + "log4j.appender.stdout=org.apache.log4j.ConsoleAppender\n" + + "log4j.appender.stdout.layout=org.apache.log4j.PatternLayout\n" + + "log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n\n"; + + // Copy the custom log4j.properties into the container + this.withCopyToContainer( + Transferable.of(log4jConfig.getBytes(StandardCharsets.UTF_8)), + "/opt/kafka/config/connect-log4j.properties" + ); + + return self(); + } + + /** + * Whether to include the FileStream connectors + * @param includeFileConnectors Use false to not include the FileStream connectors + * @return the current instance of {@code StrimziConnectContainer} for method chaining + */ + public StrimziConnectContainer withIncludeFileConnectors(boolean includeFileConnectors) { + this.includeFileConnectors = includeFileConnectors; + return self(); + } +} diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziConnectContainerIT.java new file mode 100644 index 0000000..44d9d24 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziConnectContainerIT.java @@ -0,0 +1,162 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import org.apache.logging.log4j.Level; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StrimziConnectContainerIT extends AbstractIT { + + private static final Set MIRRORMAKER_CONNECTORS = Set.of( + "MirrorSourceConnector", + "MirrorCheckpointConnector", + "MirrorHeartbeatConnector"); + + private static final Set FILE_CONNECTORS = Set.of( + "FileStreamSinkConnector", + "FileStreamSourceConnector"); + + private StrimziConnectContainer systemUnderTest; + private StrimziKafkaContainer kafka; + + @AfterEach + void tearDown() { + if (kafka != null) { + kafka.stop(); + } + if (systemUnderTest != null) { + systemUnderTest.stop(); + } + } + + @Test + void testStartContainerWithoutBootstrapServers() { + systemUnderTest = new StrimziConnectContainer(); + assertThrows(IllegalStateException.class, () -> systemUnderTest.start()); + } + + @Test + void testStartContainer() throws Exception { + kafka = new StrimziKafkaContainer() + .withNetworkAliases("kafka") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + kafka.start(); + + systemUnderTest = new StrimziConnectContainer() + .withNetwork(kafka.getNetwork()) + .withBootstrapServers("kafka:9091") + .waitForRunning(); + systemUnderTest.start(); + + String info = query(systemUnderTest, "/"); + assertThat(info, containsString(kafka.getClusterId())); + + String connectors = query(systemUnderTest, "/connector-plugins"); + for (String connector : MIRRORMAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + } + + @Test + void testStartContainerWithoutFileConnectors() throws Exception { + kafka = new StrimziKafkaContainer() + .withNetworkAliases("kafka") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + kafka.start(); + + systemUnderTest = new StrimziConnectContainer() + .withNetwork(kafka.getNetwork()) + .withBootstrapServers("kafka:9091") + .withIncludeFileConnectors(false) + .waitForRunning(); + systemUnderTest.start(); + + String info = query(systemUnderTest, "/"); + assertThat(info, containsString(kafka.getClusterId())); + + String connectors = query(systemUnderTest, "/connector-plugins"); + for (String connector : MIRRORMAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, not(containsString(connector))); + } + } + + @Test + void testStartContainerWithDebugLogs() { + kafka = new StrimziKafkaContainer() + .withNetworkAliases("kafka") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + kafka.start(); + + systemUnderTest = new StrimziConnectContainer() + .withNetwork(kafka.getNetwork()) + .withBootstrapServers("kafka:9091") + .withConnectLog(Level.DEBUG) + .waitForRunning(); + systemUnderTest.start(); + + assertThat(systemUnderTest.getLogs(), containsString("] DEBUG ")); + } + + @Test + void testStartContainerWithConfigurationMap() throws Exception { + kafka = new StrimziKafkaContainer() + .withNetworkAliases("kafka") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + kafka.start(); + + systemUnderTest = new StrimziConnectContainer() + .withNetwork(kafka.getNetwork()) + .withBootstrapServers("kafka:9091") + .withConnectConfigurationMap(Map.of("plugin.path", "/tmp")) + .waitForRunning(); + systemUnderTest.start(); + + String connectors = query(systemUnderTest, "/connector-plugins"); + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, not(containsString(connector))); + } + } + + private String query(StrimziConnectContainer container, String path) throws Exception { + HttpClient httpClient = HttpClient.newHttpClient(); + URI uri = new URI(container.restEndpoint() + path); + HttpRequest request = HttpRequest.newBuilder() + .GET() + .uri(uri) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(HttpURLConnection.HTTP_OK)); + return response.body(); + } +}