Skip to content

Commit

Permalink
Avoid deprecate KafkaContainer
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 9, 2024
1 parent ae96ba1 commit f0d65a4
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

Expand All @@ -50,7 +50,6 @@
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.time.temporal.ChronoUnit.MILLIS;
import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT;
import static org.testcontainers.utility.MountableFile.forClasspathResource;

public final class TestingKafka
Expand All @@ -65,7 +64,7 @@ public final class TestingKafka
private static final DockerImageName SCHEMA_REGISTRY_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-schema-registry");

private final Network network;
private final KafkaContainer kafka;
private final ConfluentKafkaContainer kafka;
private final GenericContainer<?> schemaRegistry;
private final boolean withSchemaRegistry;
private final Closer closer = Closer.create();
Expand Down Expand Up @@ -96,7 +95,7 @@ private TestingKafka(String confluentPlatformVersion, boolean withSchemaRegistry
// Modify the template directly instead.
MountableFile kafkaLogTemplate = forClasspathResource("log4j-kafka.properties.template");
MountableFile schemaRegistryLogTemplate = forClasspathResource("log4j-schema-registry.properties.template");
kafka = new KafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion))
kafka = new ConfluentKafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion))
.withStartupAttempts(3)
.withNetwork(network)
.withNetworkAliases("kafka")
Expand Down Expand Up @@ -240,7 +239,7 @@ private <K, V> Future<RecordMetadata> send(KafkaProducer<K, V> producer, Produce

public String getConnectString()
{
return kafka.getHost() + ":" + kafka.getMappedPort(KAFKA_PORT);
return kafka.getBootstrapServers();
}

private <K, V> KafkaProducer<K, V> createProducer(Map<String, String> extraProperties)
Expand Down

0 comments on commit f0d65a4

Please sign in to comment.