diff --git a/build.sbt b/build.sbt index ec7173543..e818c6a54 100644 --- a/build.sbt +++ b/build.sbt @@ -7,68 +7,17 @@ val akkaVersion = "2.5.19" val kafkaVersion = "2.1.0" val kafkaVersionForDocs = "21" val scalatestVersion = "3.0.5" -val junit4Version = "4.12" val slf4jVersion = "1.7.25" -val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion - -val coreDependencies = Seq( - "com.typesafe.akka" %% "akka-stream" % akkaVersion, - kafkaClients, -) - -val testkitDependencies = Seq( - "com.typesafe.akka" %% "akka-testkit" % akkaVersion, - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion, - "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" exclude ("log4j", "log4j"), - "org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1 - "org.scalatest" %% "scalatest" % scalatestVersion % Provided, - "junit" % "junit" % junit4Version % Provided, - "org.apache.kafka" %% "kafka" % kafkaVersion exclude ("org.slf4j", "slf4j-log4j12") -) - val confluentAvroSerializerVersion = "5.0.1" -val testDependencies = Seq( - "io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion % Test, - // See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808 - "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar"), - "net.manub" %% "scalatest-embedded-schema-registry" % "2.0.0" % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), - "org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro, which pulls in commons-compress 1.8.1, see testing.md - "org.scalatest" %% "scalatest" % scalatestVersion % Test, - "io.spray" %% "spray-json" % "1.3.5" % Test, - "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.7" % Test, // ApacheV2 - "com.novocode" % "junit-interface" % "0.11" % Test, - "junit" % "junit" % junit4Version % Test, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test, - "ch.qos.logback" % "logback-classic" % "1.2.3" % Test, - "org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test, - // Schema registry uses Glassfish which uses java.util.logging - "org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test, - "org.mockito" % "mockito-core" % "2.23.4" % Test -) - -val integrationTestDependencies = Seq( - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % IntegrationTest, - "org.scalatest" %% "scalatest" % scalatestVersion % IntegrationTest, - "com.spotify" % "docker-client" % "8.11.7" % IntegrationTest, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % IntegrationTest, - "ch.qos.logback" % "logback-classic" % "1.2.3" % IntegrationTest, - "org.slf4j" % "log4j-over-slf4j" % "1.7.25" % IntegrationTest -) - -val benchmarkDependencies = Seq( - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", - "io.dropwizard.metrics" % "metrics-core" % "3.2.6", - "ch.qos.logback" % "logback-classic" % "1.2.3", - "org.slf4j" % "log4j-over-slf4j" % "1.7.25", - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % "it", - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "it", - "org.scalatest" %% "scalatest" % scalatestVersion % "it" -) - val kafkaScale = settingKey[Int]("Number of kafka docker containers") -resolvers in ThisBuild ++= Seq(Resolver.bintrayRepo("manub", "maven")) +resolvers in ThisBuild ++= Seq( + // for Embedded Kafka + Resolver.bintrayRepo("manub", "maven"), + // for Jupiter interface (JUnit 5) + Resolver.jcenterRepo +) val commonSettings = Seq( organization := "com.typesafe.akka", @@ -114,11 +63,13 @@ val commonSettings = Seq( "akka.pattern" // for some reason Scaladoc creates this ), // show full stack traces and test case durations - testOptions += Tests.Argument("-oDF"), + testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oDF"), + // https://github.com/maichler/sbt-jupiter-interface#framework-options // -a Show stack traces and exception class name for AssertionErrors. // -v Log "test run started" / "test started" / "test run finished" events on log level "info" instead of "debug". // -q Suppress stdout for successful tests. - testOptions += Tests.Argument(TestFrameworks.JUnit, "-a", "-v", "-q"), + // -s Try to decode Scala names in stack traces and test names. + testOptions += Tests.Argument(jupiterTestFramework, "-a", "-v", "-q", "-s"), scalafmtOnCompile := true, headerLicense := Some( HeaderLicense.Custom( @@ -176,7 +127,10 @@ lazy val core = project .settings( name := "akka-stream-kafka", AutomaticModuleName.settings("akka.stream.alpakka.kafka"), - libraryDependencies ++= coreDependencies, + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-stream" % akkaVersion, + "org.apache.kafka" % "kafka-clients" % kafkaVersion, + ), mimaPreviousArtifacts := Set( organization.value %% name.value % previousStableVersion.value .getOrElse(throw new Error("Unable to determine previous version")) @@ -191,7 +145,15 @@ lazy val testkit = project .settings( name := "akka-stream-kafka-testkit", AutomaticModuleName.settings("akka.stream.alpakka.kafka.testkit"), - libraryDependencies ++= testkitDependencies, + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion, + "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" exclude ("log4j", "log4j"), + "org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1 + "org.scalatest" %% "scalatest" % scalatestVersion % Provided, + "junit" % "junit" % "4.12" % Provided, + "org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided, + "org.apache.kafka" %% "kafka" % kafkaVersion exclude ("org.slf4j", "slf4j-log4j12") + ), mimaPreviousArtifacts := Set( organization.value %% name.value % previousStableVersion.value .getOrElse(throw new Error("Unable to determine previous version")) @@ -208,7 +170,35 @@ lazy val tests = project .settings(automateHeaderSettings(IntegrationTest)) .settings( name := "akka-stream-kafka-tests", - libraryDependencies ++= testDependencies ++ integrationTestDependencies, + libraryDependencies ++= Seq( + "io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion % Test, + // See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808 + "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar"), + "net.manub" %% "scalatest-embedded-schema-registry" % "2.0.0" % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), + "org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro, which pulls in commons-compress 1.8.1, see testing.md + "org.scalatest" %% "scalatest" % scalatestVersion % Test, + "io.spray" %% "spray-json" % "1.3.5" % Test, + "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.7" % Test, // ApacheV2 + "org.junit.vintage" % "junit-vintage-engine" % JupiterKeys.junitVintageVersion.value % Test, + // See http://hamcrest.org/JavaHamcrest/distributables#upgrading-from-hamcrest-1x + "org.hamcrest" % "hamcrest-library" % "2.1" % Test, + "org.hamcrest" % "hamcrest" % "2.1" % Test, + "net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test, + "ch.qos.logback" % "logback-classic" % "1.2.3" % Test, + "org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test, + // Schema registry uses Glassfish which uses java.util.logging + "org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test, + "org.mockito" % "mockito-core" % "2.23.4" % Test + ) ++ + Seq( // integration test dependencies + "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % IntegrationTest, + "org.scalatest" %% "scalatest" % scalatestVersion % IntegrationTest, + "com.spotify" % "docker-client" % "8.11.7" % IntegrationTest, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % IntegrationTest, + "ch.qos.logback" % "logback-classic" % "1.2.3" % IntegrationTest, + "org.slf4j" % "log4j-over-slf4j" % "1.7.25" % IntegrationTest + ), resolvers += "Confluent Maven Repo" at "https://packages.confluent.io/maven/", publish / skip := true, whitesourceIgnore := true, @@ -275,7 +265,15 @@ lazy val benchmarks = project skip in publish := true, whitesourceIgnore := true, IntegrationTest / parallelExecution := false, - libraryDependencies ++= benchmarkDependencies, + libraryDependencies ++= Seq( + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", + "io.dropwizard.metrics" % "metrics-core" % "3.2.6", + "ch.qos.logback" % "logback-classic" % "1.2.3", + "org.slf4j" % "log4j-over-slf4j" % "1.7.25", + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % "it", + "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "it", + "org.scalatest" %% "scalatest" % scalatestVersion % "it" + ), kafkaScale := 1, buildInfoPackage := "akka.kafka.benchmarks", buildInfoKeys := Seq[BuildInfoKey](kafkaScale), diff --git a/docs/src/main/paradox/testing.md b/docs/src/main/paradox/testing.md index 6668c3848..268fbe0ed 100644 --- a/docs/src/main/paradox/testing.md +++ b/docs/src/main/paradox/testing.md @@ -39,7 +39,7 @@ The testkit contains helper classes used by the tests in the Alpakka Kafka conne ### Testing from Java code -Test classes may extend `akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test` to automatically start and stop an embedded Kafka broker. +Test classes may extend `akka.kafka.testkit.javadsl.EmbeddedKafkaTest` (JUnit 5) or `akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test` (JUnit 4) to automatically start and stop an embedded Kafka broker. Furthermore it provides @@ -48,11 +48,14 @@ Furthermore it provides * unique topic creation (`createTopic(int number, int partitions, int replication)`), and * `CompletionStage` value extraction helper (` T resultOf(CompletionStage stage, java.time.Duration timeout)`). -The example below shows a skeleton test class for use with JUnit 4. +The example below shows skeleton test classes for JUnit 4 and JUnit 5. -Java +Java JUnit 4 : @@snip [snip](/tests/src/test/java/docs/javadsl/AssignmentTest.java) { #testkit } +Java JUnit 5 +: @@snip [snip](/tests/src/test/java/docs/javadsl/ProducerExampleTest.java) { #testkit } + ### Testing from Scala code diff --git a/project/plugins.sbt b/project/plugins.sbt index 2e231f6f4..5abf9ff12 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -20,3 +20,6 @@ addSbtPlugin("com.github.ehsanyou" % "sbt-docker-compose" % "67284e73-envvars-2m // depend directly on the patched version see https://github.com/akka/alpakka/issues/1388 addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2+10-148ba0ff") resolvers += Resolver.bintrayIvyRepo("2m", "sbt-plugins") + +addSbtPlugin("net.aichler" % "sbt-jupiter-interface" % "0.8.0") +resolvers += Resolver.jcenterRepo diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java new file mode 100644 index 000000000..3261a293d --- /dev/null +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.testkit.javadsl; + +import akka.Done; +import akka.actor.ActorSystem; +import akka.kafka.Subscriptions; +import akka.kafka.javadsl.Consumer; +import akka.kafka.javadsl.Producer; +import akka.kafka.testkit.internal.KafkaTestKitClass; +import akka.stream.Materializer; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public abstract class BaseKafkaTest extends KafkaTestKitClass { + + public static final int partition0 = 0; + + public final Logger log = LoggerFactory.getLogger(getClass()); + + protected final Materializer materializer; + + protected BaseKafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) { + super(system, bootstrapServers); + this.materializer = materializer; + } + + @Override + public Logger log() { + return log; + } + + /** Overwrite to set different default timeout for [[#resultOf]]. */ + protected Duration resultOfTimeout() { + return Duration.ofSeconds(5); + } + + protected CompletionStage produceString(String topic, int messageCount, int partition) { + return Source.fromIterator(() -> IntStream.range(0, messageCount).iterator()) + .map(Object::toString) + .map(n -> new ProducerRecord(topic, partition, DefaultKey(), n)) + .runWith(Producer.plainSink(producerDefaults()), materializer); + } + + protected Consumer.DrainingControl>> consumeString( + String topic, long take) { + return Consumer.plainSource( + consumerDefaults().withGroupId(createGroupId(1)), Subscriptions.topics(topic)) + .take(take) + .toMat(Sink.seq(), Keep.both()) + .mapMaterializedValue(Consumer::createDrainingControl) + .run(materializer); + } + + protected T resultOf(CompletionStage stage) throws Exception { + return resultOf(stage, resultOfTimeout()); + } + + protected T resultOf(CompletionStage stage, Duration timeout) throws Exception { + return stage.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } +} diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.java new file mode 100644 index 000000000..26293261a --- /dev/null +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.testkit.javadsl; + +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import net.manub.embeddedkafka.EmbeddedKafka$; +import net.manub.embeddedkafka.EmbeddedKafkaConfig; +import net.manub.embeddedkafka.EmbeddedKafkaConfig$; +import org.junit.After; +import org.junit.Before; +import scala.collection.immutable.HashMap$; + +/** + * JUnit 5 aka Jupiter base-class with some convenience for creating an embedded Kafka broker before + * running the tests. Extending classes must be annotated with `@TestInstance(Lifecycle.PER_CLASS)` + * to create a single instance of the test class with `@BeforeAll` and `@AfterAll` annotated methods + * called by the test framework. + */ +public abstract class EmbeddedKafkaJunit4Test extends KafkaTest { + + private static EmbeddedKafkaConfig embeddedKafkaConfig( + int kafkaPort, int zookeeperPort, int replicationFactor) { + return EmbeddedKafkaConfig$.MODULE$.apply( + kafkaPort, + zookeeperPort, + createReplicationFactorBrokerProps(replicationFactor), + HashMap$.MODULE$.empty(), + HashMap$.MODULE$.empty()); + } + + protected final int kafkaPort; + protected final int replicationFactor; + + public EmbeddedKafkaJunit4Test( + ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) { + super(system, materializer, "localhost:" + kafkaPort); + this.kafkaPort = kafkaPort; + this.replicationFactor = replicationFactor; + } + + protected EmbeddedKafkaJunit4Test(ActorSystem system, Materializer materializer, int kafkaPort) { + this(system, materializer, kafkaPort, 1); + } + + protected static void startEmbeddedKafka(int kafkaPort, int replicationFactor) { + EmbeddedKafka$.MODULE$.start(embeddedKafkaConfig(kafkaPort, kafkaPort + 1, replicationFactor)); + } + + protected static void stopEmbeddedKafka() { + EmbeddedKafka$.MODULE$.stop(); + } + + @Before + public void setupEmbeddedKafka() { + EmbeddedKafkaJunit4Test.startEmbeddedKafka(kafkaPort, replicationFactor); + setUpAdminClient(); + } + + @After + public void cleanUpEmbeddedKafka() { + cleanUpAdminClient(); + EmbeddedKafkaJunit4Test.stopEmbeddedKafka(); + } +} diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaTest.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaTest.java new file mode 100644 index 000000000..06e57a080 --- /dev/null +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.testkit.javadsl; + +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import net.manub.embeddedkafka.EmbeddedKafka$; +import net.manub.embeddedkafka.EmbeddedKafkaConfig; +import net.manub.embeddedkafka.EmbeddedKafkaConfig$; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import scala.collection.immutable.HashMap$; + +/** + * JUnit 5 aka Jupiter base-class with some convenience for creating an embedded Kafka broker before + * running the tests. Extending classes must be annotated with `@TestInstance(Lifecycle.PER_CLASS)` + * to create a single instance of the test class with `@BeforeAll` and `@AfterAll` annotated methods + * called by the test framework. + */ +public abstract class EmbeddedKafkaTest extends KafkaTest { + + private static EmbeddedKafkaConfig embeddedKafkaConfig( + int kafkaPort, int zookeeperPort, int replicationFactor) { + return EmbeddedKafkaConfig$.MODULE$.apply( + kafkaPort, + zookeeperPort, + createReplicationFactorBrokerProps(replicationFactor), + HashMap$.MODULE$.empty(), + HashMap$.MODULE$.empty()); + } + + protected final int kafkaPort; + protected final int replicationFactor; + + protected EmbeddedKafkaTest( + ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) { + super(system, materializer, "localhost:" + kafkaPort); + this.kafkaPort = kafkaPort; + this.replicationFactor = replicationFactor; + } + + protected EmbeddedKafkaTest(ActorSystem system, Materializer materializer, int kafkaPort) { + this(system, materializer, kafkaPort, 1); + } + + protected void startEmbeddedKafka(int kafkaPort, int replicationFactor) { + EmbeddedKafka$.MODULE$.start(embeddedKafkaConfig(kafkaPort, kafkaPort + 1, replicationFactor)); + } + + protected void stopEmbeddedKafka() { + EmbeddedKafka$.MODULE$.stop(); + } + + @BeforeAll + void setupEmbeddedKafka() { + startEmbeddedKafka(kafkaPort, replicationFactor); + } + + @AfterAll + void stopEmbeddedKafkaNow() { + stopEmbeddedKafka(); + } +} diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaJunit4Test.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaJunit4Test.java new file mode 100644 index 000000000..bce401ac9 --- /dev/null +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaJunit4Test.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.testkit.javadsl; + +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import akka.stream.testkit.javadsl.StreamTestKit; +import org.junit.After; +import org.junit.Before; + +/** + * JUnit 5 aka Jupiter base-class with some convenience for accessing a Kafka broker. Extending + * classes must be annotated with `@TestInstance(Lifecycle.PER_CLASS)` to create a single instance + * of the test class with `@BeforeAll` and `@AfterAll` annotated methods called by the test + * framework. + */ +public abstract class KafkaJunit4Test extends BaseKafkaTest { + + protected KafkaJunit4Test( + ActorSystem system, Materializer materializer, String bootstrapServers) { + super(system, materializer, bootstrapServers); + } + + @Before + public void setUpAdmin() { + setUpAdminClient(); + } + + @After + public void cleanUpAdmin() { + cleanUpAdminClient(); + } + + @After + public void checkForStageLeaks() { + StreamTestKit.assertAllStagesStopped(materializer); + } +} diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaTest.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaTest.java new file mode 100644 index 000000000..c5b741452 --- /dev/null +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/KafkaTest.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.testkit.javadsl; + +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import akka.stream.testkit.javadsl.StreamTestKit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; + +/** + * JUnit 5 aka Jupiter base-class with some convenience for accessing a Kafka broker. Extending + * classes must be annotated with `@TestInstance(Lifecycle.PER_CLASS)` to create a single instance + * of the test class with `@BeforeAll` and `@AfterAll` annotated methods called by the test + * framework. + */ +public abstract class KafkaTest extends BaseKafkaTest { + + protected KafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) { + super(system, materializer, bootstrapServers); + } + + @BeforeAll + public void setupAdmin() { + setUpAdminClient(); + } + + @AfterAll + public void cleanUpAdmin() { + cleanUpAdminClient(); + } + + @AfterEach + public void checkForStageLeaks() { + StreamTestKit.assertAllStagesStopped(materializer); + } +} diff --git a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala index c114661ed..7bfb10494 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala @@ -17,32 +17,30 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.slf4j.Logger -import scala.concurrent.duration._ - trait KafkaTestKit { def log: Logger val DefaultKey = "key" - private val producerDefaultsInstance: ProducerSettings[String, String] = + private lazy val producerDefaultsInstance: ProducerSettings[String, String] = ProducerSettings(system, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) def producerDefaults: ProducerSettings[String, String] = producerDefaultsInstance - private val consumerDefaultsInstance: ConsumerSettings[String, String] = + private lazy val consumerDefaultsInstance: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") def consumerDefaults: ConsumerSettings[String, String] = consumerDefaultsInstance - private val committerDefaultsInstance = CommitterSettings(system) + private lazy val committerDefaultsInstance = CommitterSettings(system) def committerDefaults: CommitterSettings = committerDefaultsInstance - def nextNumber(): Int = KafkaTestKit.topicCounter.incrementAndGet() + def nextNumber(): Int = KafkaTestKitClass.topicCounter.incrementAndGet() def createTopicName(number: Int) = s"topic-$number-${nextNumber}" @@ -53,7 +51,7 @@ trait KafkaTestKit { def system: ActorSystem def bootstrapServers: String - private val adminDefaults = { + private lazy val adminDefaults = { val config = new Properties() config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) config @@ -73,15 +71,18 @@ trait KafkaTestKit { * be sure to call `cleanUpAdminClient` after the tests are done. */ def setUpAdminClient(): Unit = - adminClientVar = AdminClient.create(adminDefaults) + if (adminClientVar == null) { + adminClientVar = AdminClient.create(adminDefaults) + } /** * Close internal admin client instances. */ - def cleanUpAdminClient(): Unit = { - adminClient.close(60, TimeUnit.SECONDS) - adminClientVar = null - } + def cleanUpAdminClient(): Unit = + if (adminClientVar != null) { + adminClientVar.close(60, TimeUnit.SECONDS) + adminClientVar = null + } /** * Create a topic with given partition number and replication factor. @@ -109,6 +110,12 @@ trait KafkaTestKit { } } -object KafkaTestKit { +abstract class KafkaTestKitClass(override val system: ActorSystem, override val bootstrapServers: String) + extends KafkaTestKit + +object KafkaTestKitClass { val topicCounter = new AtomicInteger() + def createReplicationFactorBrokerProps(replicationFactor: Int): Map[String, String] = Map( + "offsets.topic.replication.factor" -> s"$replicationFactor" + ) } diff --git a/testkit/src/main/scala/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.scala b/testkit/src/main/scala/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.scala deleted file mode 100644 index c70de0461..000000000 --- a/testkit/src/main/scala/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2014 - 2016 Softwaremill - * Copyright (C) 2016 - 2019 Lightbend Inc. - */ - -package akka.kafka.testkit.javadsl - -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} -import org.junit.{After, Before} - -abstract class EmbeddedKafkaJunit4Test extends KafkaJunit4Test { - import EmbeddedKafkaJunit4Test._ - - def kafkaPort: Int - def replicationFactor = 1 - - @Before def setupEmbeddedKafka() = startEmbeddedKafka(kafkaPort, replicationFactor) - - @After def cleanUpEmbeddedKafka() = - stopEmbeddedKafka() -} - -object EmbeddedKafkaJunit4Test { - private def embeddedKafkaConfig(kafkaPort: Int, zooKeeperPort: Int, replicationFactor: Int) = - EmbeddedKafkaConfig(kafkaPort, - zooKeeperPort, - Map( - "offsets.topic.replication.factor" -> s"$replicationFactor" - )) - - def startEmbeddedKafka(kafkaPort: Int, replicationFactor: Int): Unit = - EmbeddedKafka.start()(embeddedKafkaConfig(kafkaPort, kafkaPort + 1, replicationFactor)) - - def stopEmbeddedKafka(): Unit = - EmbeddedKafka.stop() -} diff --git a/testkit/src/main/scala/akka/kafka/testkit/javadsl/KafkaJunit4Test.scala b/testkit/src/main/scala/akka/kafka/testkit/javadsl/KafkaJunit4Test.scala deleted file mode 100644 index 36b494744..000000000 --- a/testkit/src/main/scala/akka/kafka/testkit/javadsl/KafkaJunit4Test.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) 2014 - 2016 Softwaremill - * Copyright (C) 2016 - 2019 Lightbend Inc. - */ - -package akka.kafka.testkit.javadsl - -import java.util.concurrent.{CompletionStage, TimeUnit} - -import akka.Done -import akka.japi.Pair -import akka.kafka.Subscriptions -import akka.kafka.javadsl.Consumer -import akka.kafka.scaladsl.Producer -import akka.kafka.testkit.internal.KafkaTestKit -import akka.stream.Materializer -import akka.stream.javadsl.{Keep, Sink} -import akka.stream.scaladsl.Source -import akka.stream.testkit.javadsl.StreamTestKit -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.producer.ProducerRecord -import org.junit.{After, Before} -import org.slf4j.{Logger, LoggerFactory} - -import scala.compat.java8.FutureConverters._ - -abstract class KafkaJunit4Test extends KafkaTestKit { - - val log: Logger = LoggerFactory.getLogger(getClass) - - final val partition0 = 0 - - def materializer: Materializer - - /** - * Sets up the Admin client. Override if you want custom initialization of the admin client. - */ - @Before def setUpAdmin() = setUpAdminClient() - - /** - * Cleans up the Admin client. Override if you want custom cleaning up of the admin client. - */ - @After def cleanUpAdmin() = cleanUpAdminClient() - - @After def checkForStageLeaks() = StreamTestKit.assertAllStagesStopped(materializer) - - /** - * Overwrite to set different default timeout for [[KafkaJunit4Test.resultOf]]. - */ - def resultOfTimeout: java.time.Duration = java.time.Duration.ofSeconds(5) - - def produceString(topic: String, messageCount: Int, partition: Int): CompletionStage[Done] = - Source(1 to messageCount) - .map(_.toString) - // NOTE: If no partition is specified but a key is present a partition will be chosen - // using a hash of the key. If neither key nor partition is present a partition - // will be assigned in a round-robin fashion. - .map(n => new ProducerRecord(topic, partition, DefaultKey, n)) - .runWith(Producer.plainSink(producerDefaults))(materializer) - .toJava - - private final type Records = java.util.List[ConsumerRecord[String, String]] - - protected def consumeString(topic: String, take: Long): Consumer.DrainingControl[Records] = - Consumer - .plainSource(consumerDefaults.withGroupId(createGroupId(1)), Subscriptions.topics(topic)) - .take(take) - .toMat(Sink.seq, Keep.both[Consumer.Control, CompletionStage[Records]]) - .mapMaterializedValue( - new akka.japi.function.Function[Pair[Consumer.Control, CompletionStage[Records]], Consumer.DrainingControl[ - Records - ]] { - override def apply( - p: Pair[Consumer.Control, CompletionStage[Records]] - ): Consumer.DrainingControl[Records] = Consumer.createDrainingControl(p) - } - ) - .run(materializer) - - @throws[Exception] - protected def resultOf[T](stage: CompletionStage[T]): T = resultOf(stage, resultOfTimeout) - - @throws[Exception] - protected def resultOf[T](stage: CompletionStage[T], timeout: java.time.Duration): T = - stage.toCompletableFuture.get(timeout.toMillis, TimeUnit.MILLISECONDS) - -} diff --git a/tests/src/test/java/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.java b/tests/src/test/java/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.java new file mode 100644 index 000000000..113a076d9 --- /dev/null +++ b/tests/src/test/java/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.javadsl; + +import akka.actor.ActorSystem; +import akka.kafka.testkit.javadsl.KafkaJunit4Test; +import akka.stream.Materializer; +import net.manub.embeddedkafka.schemaregistry.EmbeddedKWithSR; +import net.manub.embeddedkafka.schemaregistry.EmbeddedKafkaConfigWithSchemaRegistry; +import net.manub.embeddedkafka.schemaregistry.EmbeddedKafkaConfigWithSchemaRegistry$; +import net.manub.embeddedkafka.schemaregistry.EmbeddedKafkaWithSchemaRegistry$; +import org.junit.After; +import org.junit.Before; +import scala.collection.immutable.HashMap$; + +public abstract class EmbeddedKafkaWithSchemaRegistryTest extends KafkaJunit4Test { + + /** + * Workaround for https://github.com/manub/scalatest-embedded-kafka/issues/166 Keeping track of + * all embedded servers, so we can shut the down later + */ + private static EmbeddedKWithSR embeddedServer; + + protected final int kafkaPort; + protected final int replicationFactor; + protected final int schemaRegistryPort; + protected final String schemaRegistryUrl; + + public EmbeddedKafkaWithSchemaRegistryTest( + ActorSystem system, + Materializer materializer, + int kafkaPort, + int replicationFactor, + int schemaRegistryPort) { + super(system, materializer, "localhost:" + kafkaPort); + this.kafkaPort = kafkaPort; + this.replicationFactor = replicationFactor; + this.schemaRegistryPort = schemaRegistryPort; + this.schemaRegistryUrl = "http://localhost:" + schemaRegistryPort; + } + + private static EmbeddedKafkaConfigWithSchemaRegistry embeddedKafkaConfig( + int kafkaPort, int zookeeperPort, int schemaRegistryPort, int replicationFactor) { + return EmbeddedKafkaConfigWithSchemaRegistry$.MODULE$.apply( + kafkaPort, + zookeeperPort, + schemaRegistryPort, + createReplicationFactorBrokerProps(replicationFactor) + .updated("zookeeper.connection.timeout.ms", "20000"), + HashMap$.MODULE$.empty(), + HashMap$.MODULE$.empty()); + } + + protected static void startEmbeddedKafka( + int kafkaPort, int replicationFactor, int schemaRegistryPort) { + embeddedServer = + EmbeddedKafkaWithSchemaRegistry$.MODULE$.start( + embeddedKafkaConfig(kafkaPort, kafkaPort + 1, schemaRegistryPort, replicationFactor)); + } + + @Before + public void setUpEmbeddedKafka() { + EmbeddedKafkaWithSchemaRegistryTest.startEmbeddedKafka( + kafkaPort, replicationFactor, schemaRegistryPort); + setUpAdminClient(); + } + + @After + public void cleanUpEmbeddedKafka() { + cleanUpAdminClient(); + EmbeddedKafkaWithSchemaRegistryTest.embeddedServer.stop(true); + } +} diff --git a/tests/src/test/java/docs/javadsl/AssignmentTest.java b/tests/src/test/java/docs/javadsl/AssignmentTest.java index f19cbebf5..344c61003 100644 --- a/tests/src/test/java/docs/javadsl/AssignmentTest.java +++ b/tests/src/test/java/docs/javadsl/AssignmentTest.java @@ -10,6 +10,7 @@ import akka.kafka.AutoSubscription; import akka.kafka.KafkaPorts; import akka.kafka.ManualSubscription; +import akka.kafka.ProducerMessage; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Consumer; // #testkit @@ -21,18 +22,19 @@ import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; // #testkit -// #testkit import akka.testkit.javadsl.TestKit; +// #testkit import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +// #testkit import org.junit.AfterClass; import org.junit.Test; +// #testkit import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -45,31 +47,14 @@ public class AssignmentTest extends EmbeddedKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); private static final Materializer mat = ActorMaterializer.create(sys); - @Override - public ActorSystem system() { - return sys; - } - - @Override - public Materializer materializer() { - return mat; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } - - @Override - public int kafkaPort() { - return KafkaPorts.AssignmentTest(); + public AssignmentTest() { + super(sys, mat, KafkaPorts.AssignmentTest()); } // #testkit @Test - public void mustConsumeFromTheSpecifiedSingleTopic() - throws ExecutionException, InterruptedException { + public void mustConsumeFromTheSpecifiedSingleTopic() throws Exception { final String topic = createTopic(0, 1, 1); final String group = createGroupId(0); final Integer totalMessages = 100; @@ -78,7 +63,7 @@ public void mustConsumeFromTheSpecifiedSingleTopic() .map(msg -> new ProducerRecord<>(topic, 0, DefaultKey(), msg.toString())) .runWith(Producer.plainSink(producerDefaults()), mat); - producerCompletion.toCompletableFuture().get(); + resultOf(producerCompletion); // #single-topic final AutoSubscription subscription = Subscriptions.topics(topic); @@ -87,59 +72,50 @@ public void mustConsumeFromTheSpecifiedSingleTopic() // #single-topic final Integer receivedMessages = - consumer - .takeWhile(m -> Integer.valueOf(m.value()) < totalMessages, true) - .runWith(Sink.seq(), mat) - .toCompletableFuture() - .get() + resultOf( + consumer + .takeWhile(m -> Integer.valueOf(m.value()) < totalMessages, true) + .runWith(Sink.seq(), mat)) .size(); assertEquals(totalMessages, receivedMessages); } @Test - public void mustConsumeFromTheSpecifiedTopicPattern() - throws ExecutionException, InterruptedException { - final List topics = Arrays.asList(createTopic(1, 1, 1), createTopic(1, 1, 1)); + public void mustConsumeFromTheSpecifiedTopicPattern() throws Exception { + final List topics = Arrays.asList(createTopic(9001, 1, 1), createTopic(9002, 1, 1)); final String group = createGroupId(0); final Integer totalMessages = 100; final CompletionStage producerCompletion = Source.range(1, totalMessages) - .mapConcat( + .map( msg -> - topics - .stream() - .map(t -> new ProducerRecord<>(t, 0, DefaultKey(), msg.toString())) - .collect(Collectors.toList())) - .concat( - Source.single( - new ProducerRecord<>( - topics.get(0), 0, DefaultKey(), String.valueOf(totalMessages + 1)))) - .runWith(Producer.plainSink(producerDefaults()), mat); + ProducerMessage.multi( + topics + .stream() + .map(t -> new ProducerRecord<>(t, 0, DefaultKey(), msg.toString())) + .collect(Collectors.toList()))) + .via(Producer.flexiFlow(producerDefaults())) + .runWith(Sink.ignore(), mat); - producerCompletion.toCompletableFuture().get(); + resultOf(producerCompletion); // #topic-pattern - final String pattern = "topic-1-[0-9]+"; + final String pattern = "topic-900[1|2]-[0-9]+"; final AutoSubscription subscription = Subscriptions.topicPattern(pattern); final Source, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults().withGroupId(group), subscription); // #topic-pattern + int expectedTotal = totalMessages * topics.size(); final Integer receivedMessages = - consumer - .takeWhile(m -> Integer.valueOf(m.value()) <= totalMessages) - .runWith(Sink.seq(), mat) - .toCompletableFuture() - .get() - .size(); + resultOf(consumer.take(expectedTotal).runWith(Sink.seq(), mat)).size(); - assertEquals(totalMessages * topics.size(), (int) receivedMessages); + assertEquals(expectedTotal, (int) receivedMessages); } @Test - public void mustConsumeFromTheSpecifiedPartition() - throws ExecutionException, InterruptedException { + public void mustConsumeFromTheSpecifiedPartition() throws Exception { final String topic = createTopic(2, 2, 1); final Integer totalMessages = 100; final CompletionStage producerCompletion = @@ -151,7 +127,7 @@ public void mustConsumeFromTheSpecifiedPartition() }) .runWith(Producer.plainSink(producerDefaults()), mat); - producerCompletion.toCompletableFuture().get(); + resultOf(producerCompletion); // #assingment-single-partition final Integer partition = 0; @@ -166,13 +142,12 @@ public void mustConsumeFromTheSpecifiedPartition() .take(totalMessages / 2) .map(msg -> Integer.valueOf(msg.value())) .runWith(Sink.seq(), mat); - final List messages = consumerCompletion.toCompletableFuture().get(); + final List messages = resultOf(consumerCompletion); messages.forEach(m -> assertEquals(0, m % 2)); } @Test - public void mustConsumeFromTheSpecifiedPartitionAndOffset() - throws ExecutionException, InterruptedException { + public void mustConsumeFromTheSpecifiedPartitionAndOffset() throws Exception { final String topic = createTopic(3, 1, 1); final Integer totalMessages = 100; final CompletionStage producerCompletion = @@ -180,7 +155,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndOffset() .map(msg -> new ProducerRecord<>(topic, 0, DefaultKey(), msg.toString())) .runWith(Producer.plainSink(producerDefaults()), mat); - producerCompletion.toCompletableFuture().get(); + resultOf(producerCompletion); // #assingment-single-partition-offset final Integer partition = 0; @@ -193,13 +168,12 @@ public void mustConsumeFromTheSpecifiedPartitionAndOffset() final CompletionStage> consumerCompletion = consumer.take(totalMessages / 2).map(ConsumerRecord::offset).runWith(Sink.seq(), mat); - final List messages = consumerCompletion.toCompletableFuture().get(); + final List messages = resultOf(consumerCompletion); IntStream.range(0, (int) offset).forEach(idx -> assertEquals(idx, messages.get(idx) - offset)); } @Test - public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() - throws ExecutionException, InterruptedException { + public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() throws Exception { final String topic = createTopic(4, 1, 1); final Integer totalMessages = 100; final CompletionStage producerCompletion = @@ -210,7 +184,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() topic, 0, System.currentTimeMillis(), DefaultKey(), msg.toString())) .runWith(Producer.plainSink(producerDefaults()), mat); - producerCompletion.toCompletableFuture().get(); + resultOf(producerCompletion); // #assingment-single-partition-timestamp final Integer partition = 0; @@ -229,13 +203,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() .map(ConsumerRecord::timestamp) .runWith(Sink.seq(), mat); final long oldMessages = - consumerCompletion - .toCompletableFuture() - .get() - .stream() - .map(t -> t - now) - .filter(t -> t > 5000) - .count(); + resultOf(consumerCompletion).stream().map(t -> t - now).filter(t -> t > 5000).count(); assertEquals(0, oldMessages); } diff --git a/tests/src/test/java/docs/javadsl/AtLeastOnceTest.java b/tests/src/test/java/docs/javadsl/AtLeastOnceTest.java index f3ea379ae..3bfa8a881 100644 --- a/tests/src/test/java/docs/javadsl/AtLeastOnceTest.java +++ b/tests/src/test/java/docs/javadsl/AtLeastOnceTest.java @@ -43,24 +43,8 @@ public class AtLeastOnceTest extends EmbeddedKafkaJunit4Test { private static final Materializer materializer = ActorMaterializer.create(system); private static final Executor ec = Executors.newSingleThreadExecutor(); - @Override - public ActorSystem system() { - return system; - } - - @Override - public Materializer materializer() { - return materializer; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } - - @Override - public int kafkaPort() { - return KafkaPorts.AtLeastOnceToManyTest(); + public AtLeastOnceTest() { + super(system, materializer, KafkaPorts.AtLeastOnceToManyTest()); } @AfterClass @@ -101,7 +85,7 @@ public void consumeOneProduceMany() throws Exception { .toMat(Sink.seq(), Keep.both()) .run(materializer); - produceString(topic1, 10, partition0()).toCompletableFuture().get(1, TimeUnit.SECONDS); + produceString(topic1, 10, partition0).toCompletableFuture().get(1, TimeUnit.SECONDS); sleepSeconds(10, "to make produce happen"); assertThat( control.drainAndShutdown(ec).toCompletableFuture().get(5, TimeUnit.SECONDS), @@ -164,7 +148,7 @@ public void consumerOneProduceConditional() throws Exception { .toMat(Sink.seq(), Keep.both()) .run(materializer); - produceString(topic1, 10, partition0()).toCompletableFuture().get(1, TimeUnit.SECONDS); + produceString(topic1, 10, partition0).toCompletableFuture().get(1, TimeUnit.SECONDS); sleepSeconds(10, "to make produce happen"); assertThat( control.drainAndShutdown(ec).toCompletableFuture().get(5, TimeUnit.SECONDS), diff --git a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java index f5c04d06a..3ba8a457c 100644 --- a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java +++ b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java @@ -13,10 +13,10 @@ import akka.actor.Props; import akka.japi.Pair; import akka.kafka.*; +import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.kafka.javadsl.Producer; -import akka.kafka.javadsl.Committer; -import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; +import akka.kafka.testkit.javadsl.EmbeddedKafkaTest; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.*; @@ -30,8 +30,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.junit.AfterClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import java.time.Duration; import java.util.List; @@ -45,42 +46,27 @@ import static org.junit.Assert.assertEquals; -public class ConsumerExampleTest extends EmbeddedKafkaJunit4Test { +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class ConsumerExampleTest extends EmbeddedKafkaTest { private static final ActorSystem system = ActorSystem.create("ConsumerExampleTest"); private static final Materializer materializer = ActorMaterializer.create(system); - private static final Executor ec = Executors.newSingleThreadExecutor(); - - @Override - public ActorSystem system() { - return system; - } - - @Override - public Materializer materializer() { - return materializer; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } + private static final Executor executor = Executors.newSingleThreadExecutor(); - @Override - public int kafkaPort() { - return KafkaPorts.ConsumerExamplesTest(); + ConsumerExampleTest() { + super(system, materializer, KafkaPorts.ConsumerExamplesTest()); } - @AfterClass - public static void afterClass() { + @AfterAll + void afterClass() { TestKit.shutdownActorSystem(system); } - protected void assertDone(CompletionStage stage) throws Exception { + void assertDone(CompletionStage stage) throws Exception { assertEquals(Done.done(), resultOf(stage)); } - protected Flow business() { + private Flow business() { return Flow.create(); } @@ -101,7 +87,7 @@ protected Flow business() { // #settings-autocommit @Test - public void plainSourceWithExternalOffsetStorage() throws Exception { + void plainSourceWithExternalOffsetStorage() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -115,12 +101,12 @@ public void plainSourceWithExternalOffsetStorage() throws Exception { Consumer.plainSource( consumerSettings, Subscriptions.assignmentWithOffset( - new TopicPartition(topic, /* partition: */ 0), fromOffset)) + new TopicPartition(topic, partition0), fromOffset)) .mapAsync(1, db::businessLogicAndStoreOffset) .to(Sink.ignore()) .run(materializer)); // #plainSource - assertDone(produceString(topic, 10, partition0())); + assertDone(produceString(topic, 10, partition0)); while (db.offsetStore.get() < 9L) { sleepMillis(100, "until offsets have increased"); } @@ -158,7 +144,7 @@ public CompletionStage storeProcessedOffset(long offset) { // ... } // #plainSource @Test - public void atMostOnce() throws Exception { + void atMostOnce() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -170,7 +156,7 @@ public void atMostOnce() throws Exception { .run(materializer); // #atMostOnce - assertDone(produceString(topic, 10, partition0())); + assertDone(produceString(topic, 10, partition0)); assertDone(control.shutdown()); } @@ -181,7 +167,7 @@ CompletionStage business(String key, String value) { // .... } } @Test - public void atLeastOnce() throws Exception { + void atLeastOnce() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); CommitterSettings committerSettings = committerDefaults(); @@ -199,12 +185,12 @@ public void atLeastOnce() throws Exception { .run(materializer); // #atLeastOnce - assertDone(produceString(topic, 10, partition0())); - assertDone(control.drainAndShutdown(ec)); + assertDone(produceString(topic, 10, partition0)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void atLeastOnceWithCommitterSink() throws Exception { + void atLeastOnceWithCommitterSink() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -223,12 +209,12 @@ public void atLeastOnceWithCommitterSink() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #committerSink - assertDone(produceString(topic, 10, partition0())); - assertDone(control.drainAndShutdown(ec)); + assertDone(produceString(topic, 10, partition0)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void commitWithMetadata() throws Exception { + void commitWithMetadata() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); CommitterSettings committerSettings = committerDefaults(); @@ -248,12 +234,12 @@ public void commitWithMetadata() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #commitWithMetadata - assertDone(produceString(topic, 10, partition0())); - assertDone(control.drainAndShutdown(ec)); + assertDone(produceString(topic, 10, partition0)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void consumerToProducer() throws Exception { + void consumerToProducer() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); ProducerSettings producerSettings = producerDefaults(); @@ -273,17 +259,17 @@ public void consumerToProducer() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #consumerToProducerSink - assertDone(produceString(topic1, 10, partition0())); - assertDone(produceString(topic2, 10, partition0())); + assertDone(produceString(topic1, 10, partition0)); + assertDone(produceString(topic2, 10, partition0)); Consumer.DrainingControl>> consumer = consumeString(targetTopic, 20); assertDone(consumer.isShutdown()); - assertEquals(20, resultOf(consumer.drainAndShutdown(ec)).size()); - assertDone(control.drainAndShutdown(ec)); + assertEquals(20, resultOf(consumer.drainAndShutdown(executor)).size()); + assertDone(control.drainAndShutdown(executor)); } @Test - public void consumerToProducerFlow() throws Exception { + void consumerToProducerFlow() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); ProducerSettings producerSettings = producerDefaults(); @@ -305,16 +291,16 @@ public void consumerToProducerFlow() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #consumerToProducerFlow - assertDone(produceString(topic, 10, partition0())); + assertDone(produceString(topic, 10, partition0)); Consumer.DrainingControl>> consumer = consumeString(targetTopic, 10); assertDone(consumer.isShutdown()); - assertEquals(10, resultOf(consumer.drainAndShutdown(ec)).size()); - assertDone(control.drainAndShutdown(ec)); + assertEquals(10, resultOf(consumer.drainAndShutdown(executor)).size()); + assertDone(control.drainAndShutdown(executor)); } @Test - public void committableParitionedSource() throws Exception { + void committableParitionedSource() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)).withStopTimeout(Duration.ofMillis(10)); CommitterSettings committerSettings = committerDefaults(); @@ -330,12 +316,12 @@ public void committableParitionedSource() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #committablePartitionedSource - assertDone(produceString(topic, 10, partition0())); - assertDone(control.drainAndShutdown(ec)); + assertDone(produceString(topic, 10, partition0)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void streamPerPartition() throws Exception { + void streamPerPartition() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)).withStopTimeout(Duration.ofMillis(10)); CommitterSettings committerSettings = committerDefaults(); @@ -358,12 +344,12 @@ public void streamPerPartition() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #committablePartitionedSource-stream-per-partition - assertDone(produceString(topic, 10, partition0())); - assertDone(control.drainAndShutdown(ec)); + assertDone(produceString(topic, 10, partition0)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void consumerActor() throws Exception { + void consumerActor() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); ActorRef self = system.deadLetters(); @@ -404,7 +390,7 @@ public void consumerActor() throws Exception { } @Test - public void restartSource() throws Exception { + void restartSource() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 2, 1); @@ -426,7 +412,7 @@ public void restartSource() throws Exception { .runWith(Sink.ignore(), materializer); // #restartSource - assertDone(produceString(topic, 10, partition0())); + assertDone(produceString(topic, 10, partition0)); // #restartSource control.get().shutdown(); // #restartSource @@ -455,7 +441,7 @@ public Receive createReceive() { // #withRebalanceListenerActor @Test - public void withRebalanceListener() throws Exception { + void withRebalanceListener() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -478,13 +464,13 @@ public void withRebalanceListener() throws Exception { .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer); // #withRebalanceListenerActor - assertDone(produceString(topic, messageCount, partition0())); + assertDone(produceString(topic, messageCount, partition0)); assertDone(control.isShutdown()); - assertEquals(messageCount, resultOf(control.drainAndShutdown(ec)).size()); + assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size()); } @Test - public void consumerMetrics() throws Exception { + void consumerMetrics() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -506,11 +492,11 @@ public void consumerMetrics() throws Exception { CompletionStage> metrics = control.getMetrics(); metrics.thenAccept(map -> System.out.println("Metrics: " + map)); // #consumerMetrics - assertDone(control.drainAndShutdown(ec)); + assertDone(control.drainAndShutdown(executor)); } @Test - public void shutdownPlainSource() { + void shutdownPlainSource() { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -535,12 +521,12 @@ record -> .run(materializer)); // Shutdown the consumer when desired - control.thenAccept(c -> c.drainAndShutdown(ec)); + control.thenAccept(c -> c.drainAndShutdown(executor)); // #shutdownPlainSource } @Test - public void shutdownCommittable() throws Exception { + void shutdownCommittable() throws Exception { ConsumerSettings consumerSettings = consumerDefaults().withGroupId(createGroupId(1)); String topic = createTopic(1, 1, 1); @@ -566,7 +552,7 @@ public void shutdownCommittable() throws Exception { .run(materializer); // #shutdownCommittableSource - assertDone(produceString(topic, messageCount, partition0())); + assertDone(produceString(topic, messageCount, partition0)); assertDone(control.isShutdown()); assertEquals(Done.done(), resultOf(control.drainAndShutdown(ec), Duration.ofSeconds(20))); // #shutdownCommittableSource diff --git a/tests/src/test/java/docs/javadsl/FetchMetadataTest.java b/tests/src/test/java/docs/javadsl/FetchMetadataTest.java index 1c0662c89..75f18cf45 100644 --- a/tests/src/test/java/docs/javadsl/FetchMetadataTest.java +++ b/tests/src/test/java/docs/javadsl/FetchMetadataTest.java @@ -36,24 +36,8 @@ public class FetchMetadataTest extends EmbeddedKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("FetchMetadataTest"); private static final Materializer mat = ActorMaterializer.create(sys); - @Override - public ActorSystem system() { - return sys; - } - - @Override - public Materializer materializer() { - return mat; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } - - @Override - public int kafkaPort() { - return KafkaPorts.FetchMetadataTest(); + public FetchMetadataTest() { + super(sys, mat, KafkaPorts.FetchMetadataTest()); } @AfterClass diff --git a/tests/src/test/java/docs/javadsl/ProducerExampleTest.java b/tests/src/test/java/docs/javadsl/ProducerExampleTest.java index 5074bbd01..3aaa0a960 100644 --- a/tests/src/test/java/docs/javadsl/ProducerExampleTest.java +++ b/tests/src/test/java/docs/javadsl/ProducerExampleTest.java @@ -10,19 +10,25 @@ import akka.kafka.*; import akka.kafka.javadsl.Consumer; import akka.kafka.javadsl.Producer; -import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; +// #testkit +import akka.kafka.testkit.javadsl.EmbeddedKafkaTest; +// #testkit import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +// #testkit import akka.testkit.javadsl.TestKit; +// #testkit import com.typesafe.config.Config; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.AfterClass; -import org.junit.Test; +// #testkit +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.TestInstance.Lifecycle; +// #testkit import java.util.Arrays; import java.util.List; @@ -32,38 +38,31 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -public class ProducerExampleTest extends EmbeddedKafkaJunit4Test { +// #testkit + +@TestInstance(Lifecycle.PER_CLASS) +class ProducerExampleTest extends EmbeddedKafkaTest { private static final ActorSystem system = ActorSystem.create("ProducerExampleTest"); private static final Materializer materializer = ActorMaterializer.create(system); - private final ExecutorService ec = Executors.newSingleThreadExecutor(); - private final ProducerSettings producerSettings = producerDefaults(); + // #testkit - @Override - public ActorSystem system() { - return system; - } - - @Override - public Materializer materializer() { - return materializer; - } + private final Executor executor = Executors.newSingleThreadExecutor(); + private final ProducerSettings producerSettings = producerDefaults(); - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } + // #testkit - @Override - public int kafkaPort() { - return KafkaPorts.JavaProducerExamples(); + ProducerExampleTest() { + super(system, materializer, KafkaPorts.JavaProducerExamples()); } - @AfterClass - public static void afterClass() { + @AfterAll + void shutdownActorSystem() { TestKit.shutdownActorSystem(system); } + // #testkit + @Test void createProducer() { // #producer // #settings @@ -75,10 +74,11 @@ void createProducer() { final org.apache.kafka.clients.producer.Producer kafkaProducer = producerSettings.createKafkaProducer(); // #producer + kafkaProducer.close(); } @Test - public void plainSink() throws Exception { + void plainSink() throws Exception { String topic = createTopic(1, 1, 1); // #plainSink CompletionStage done = @@ -92,12 +92,13 @@ public void plainSink() throws Exception { consumeString(topic, 100); assertEquals(Done.done(), resultOf(done)); assertEquals(Done.done(), resultOf(control.isShutdown())); - CompletionStage>> result = control.drainAndShutdown(ec); + CompletionStage>> result = + control.drainAndShutdown(executor); assertEquals(100, resultOf(result).size()); } @Test - public void plainSinkWithSharedProducer() throws Exception { + void plainSinkWithSharedProducer() throws Exception { String topic = createTopic(1, 1, 1); final org.apache.kafka.clients.producer.Producer kafkaProducer = producerSettings.createKafkaProducer(); @@ -113,14 +114,15 @@ public void plainSinkWithSharedProducer() throws Exception { consumeString(topic, 100); assertEquals(Done.done(), resultOf(done)); assertEquals(Done.done(), resultOf(control.isShutdown())); - CompletionStage>> result = control.drainAndShutdown(ec); + CompletionStage>> result = + control.drainAndShutdown(executor); assertEquals(100, resultOf(result).size()); kafkaProducer.close(); } @Test - public void observeMetrics() throws Exception { + void observeMetrics() throws Exception { final org.apache.kafka.clients.producer.Producer kafkaProducer = producerSettings.createKafkaProducer(); // #producerMetrics @@ -166,7 +168,7 @@ ProducerMessage.Envelope createPassThroughM } @Test - public void producerFlowExample() throws Exception { + void producerFlowExample() throws Exception { String topic = createTopic(1, 1, 1); // #flow CompletionStage done = @@ -223,7 +225,10 @@ public void producerFlowExample() throws Exception { consumeString(topic, 100L); assertEquals(Done.done(), resultOf(done)); assertEquals(Done.done(), resultOf(control.isShutdown())); - CompletionStage>> result = control.drainAndShutdown(ec); + CompletionStage>> result = + control.drainAndShutdown(executor); assertEquals(100, resultOf(result).size()); } + // #testkit } +// #testkit diff --git a/tests/src/test/java/docs/javadsl/SerializationTest.java b/tests/src/test/java/docs/javadsl/SerializationTest.java index 3443a334d..cd85bd965 100644 --- a/tests/src/test/java/docs/javadsl/SerializationTest.java +++ b/tests/src/test/java/docs/javadsl/SerializationTest.java @@ -18,7 +18,6 @@ import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import akka.stream.testkit.javadsl.StreamTestKit; import akka.testkit.javadsl.TestKit; // #jackson-imports import com.fasterxml.jackson.databind.ObjectMapper; @@ -41,10 +40,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; // #imports -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import org.slf4j.bridge.SLF4JBridgeHandler; import java.util.Arrays; @@ -62,28 +58,10 @@ public class SerializationTest extends EmbeddedKafkaWithSchemaRegistryTest { private static final Materializer mat = ActorMaterializer.create(sys); private static final Executor ec = Executors.newSingleThreadExecutor(); - @Override - public ActorSystem system() { - return sys; + public SerializationTest() { + super(sys, mat, KafkaPorts.SerializationTest(), 1, KafkaPorts.SerializationTest() + 2); } - @Override - public Materializer materializer() { - return mat; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } - - @Override - public int kafkaPort() { - return KafkaPorts.SerializationTest(); - } - - private final String schemaRegistryUrl = "http://localhost:" + schemaRegistryPort(kafkaPort()); - @BeforeClass public static void beforeClass() { // Schema registry uses Glassfish which uses java.util.logging diff --git a/tests/src/test/java/docs/javadsl/TestkitSamplesTest.java b/tests/src/test/java/docs/javadsl/TestkitSamplesTest.java index 6a133bb63..057604823 100644 --- a/tests/src/test/java/docs/javadsl/TestkitSamplesTest.java +++ b/tests/src/test/java/docs/javadsl/TestkitSamplesTest.java @@ -37,7 +37,6 @@ import akka.kafka.testkit.javadsl.ConsumerControlFactory; // #factories -import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; diff --git a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java index 424f73523..b48eab419 100644 --- a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java +++ b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java @@ -36,24 +36,8 @@ public class TransactionsExampleTest extends EmbeddedKafkaJunit4Test { private final ExecutorService ec = Executors.newSingleThreadExecutor(); private final ProducerSettings producerSettings = producerDefaults(); - @Override - public ActorSystem system() { - return system; - } - - @Override - public Materializer materializer() { - return materializer; - } - - @Override - public String bootstrapServers() { - return "localhost:" + kafkaPort(); - } - - @Override - public int kafkaPort() { - return KafkaPorts.JavaTransactionsExamples(); + public TransactionsExampleTest() { + super(system, materializer, KafkaPorts.JavaTransactionsExamples()); } @AfterClass @@ -94,7 +78,7 @@ public void sourceSink() throws Exception { // #transactionalSink Consumer.DrainingControl>> consumer = consumeString(targetTopic, 10); - produceString(sourceTopic, 10, partition0()); + produceString(sourceTopic, 10, partition0); assertDone(consumer.isShutdown()); // #transactionalSink control.drainAndShutdown(ec); @@ -146,7 +130,7 @@ public void usingRestartSource() throws Exception { int messages = 10; Consumer.DrainingControl>> consumer = consumeString(targetTopic, messages); - assertDone(produceString(sourceTopic, messages, partition0())); + assertDone(produceString(sourceTopic, messages, partition0)); assertDone(consumer.isShutdown()); assertDone(innerControl.get().shutdown()); assertEquals(messages, resultOf(consumer.drainAndShutdown(ec)).size()); diff --git a/tests/src/test/scala/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.scala b/tests/src/test/scala/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.scala deleted file mode 100644 index 807336e20..000000000 --- a/tests/src/test/scala/akka/kafka/javadsl/EmbeddedKafkaWithSchemaRegistryTest.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2014 - 2016 Softwaremill - * Copyright (C) 2016 - 2019 Lightbend Inc. - */ - -package akka.kafka.javadsl -import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test -import net.manub.embeddedkafka.schemaregistry.{ - EmbeddedKWithSR, - EmbeddedKafkaConfigWithSchemaRegistryImpl, - EmbeddedKafkaWithSchemaRegistry -} -import org.junit.{After, Before} - -abstract class EmbeddedKafkaWithSchemaRegistryTest extends EmbeddedKafkaJunit4Test { - import EmbeddedKafkaWithSchemaRegistryTest._ - - @Before override def setupEmbeddedKafka() = - startEmbeddedKafka(kafkaPort, replicationFactor) - - @After override def cleanUpEmbeddedKafka(): Unit = - stopEmbeddedKafka() -} - -object EmbeddedKafkaWithSchemaRegistryTest { - - /** - * Workaround for https://github.com/manub/scalatest-embedded-kafka/issues/166 - * Keeping track of all embedded servers, so we can shut the down later - */ - private var embeddedServer: EmbeddedKWithSR = _ - - def schemaRegistryPort(kafkaPort: Int) = - kafkaPort + 2 - - private def embeddedKafkaConfig(kafkaPort: Int, zooKeeperPort: Int, schemaRegistryPort: Int, replicationFactor: Int) = - EmbeddedKafkaConfigWithSchemaRegistryImpl( - kafkaPort, - zooKeeperPort, - schemaRegistryPort, - Map( - "offsets.topic.replication.factor" -> s"$replicationFactor", - "zookeeper.connection.timeout.ms" -> "20000" - ), - Map.empty, - Map.empty - ) - - def startEmbeddedKafka(kafkaPort: Int, replicationFactor: Int): Unit = - embeddedServer = EmbeddedKafkaWithSchemaRegistry.start()( - embeddedKafkaConfig(kafkaPort, kafkaPort + 1, schemaRegistryPort(kafkaPort), replicationFactor) - ) - - def stopEmbeddedKafka(): Unit = - embeddedServer.stop(clearLogs = true) -}