-
Notifications
You must be signed in to change notification settings - Fork 387
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Benefits * JUnit 5 supports test class lifecycle control * useful for simpler admin client use in tests * useful for simpler embedded Kafka support * JUnit 5 makes use of Java 8 features (mainly lambdas) This PR * adds [sbt-jupiter-interface](https://github.com/maichler/sbt-jupiter-interface) plugin to run JUnit 5-annotated and JUnit 4-annotated tests * introduces `KafkaTest` and `EmbeddedKafkaTest` which rely on JUnit 5 per-class lifecyle * changes `ProducerExampleTest` and `ConsumerExampleTest` to use JUnit 5 instead * re-implements the JUnit base-classes for use of *EmbeddedKafka* in Java and makes use of constructor parameters instead of overrides (which look weird in Java)
- Loading branch information
Showing
21 changed files
with
598 additions
and
510 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
76 changes: 76 additions & 0 deletions
76
testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com> | ||
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
|
||
package akka.kafka.testkit.javadsl; | ||
|
||
import akka.Done; | ||
import akka.actor.ActorSystem; | ||
import akka.kafka.Subscriptions; | ||
import akka.kafka.javadsl.Consumer; | ||
import akka.kafka.javadsl.Producer; | ||
import akka.kafka.testkit.internal.KafkaTestKitClass; | ||
import akka.stream.Materializer; | ||
import akka.stream.javadsl.Keep; | ||
import akka.stream.javadsl.Sink; | ||
import akka.stream.javadsl.Source; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.IntStream; | ||
|
||
public abstract class BaseKafkaTest extends KafkaTestKitClass { | ||
|
||
public static final int partition0 = 0; | ||
|
||
public final Logger log = LoggerFactory.getLogger(getClass()); | ||
|
||
protected final Materializer materializer; | ||
|
||
protected BaseKafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) { | ||
super(system, bootstrapServers); | ||
this.materializer = materializer; | ||
} | ||
|
||
@Override | ||
public Logger log() { | ||
return log; | ||
} | ||
|
||
/** Overwrite to set different default timeout for [[#resultOf]]. */ | ||
protected Duration resultOfTimeout() { | ||
return Duration.ofSeconds(5); | ||
} | ||
|
||
protected CompletionStage<Done> produceString(String topic, int messageCount, int partition) { | ||
return Source.fromIterator(() -> IntStream.range(0, messageCount).iterator()) | ||
.map(Object::toString) | ||
.map(n -> new ProducerRecord<String, String>(topic, partition, DefaultKey(), n)) | ||
.runWith(Producer.plainSink(producerDefaults()), materializer); | ||
} | ||
|
||
protected Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumeString( | ||
String topic, long take) { | ||
return Consumer.plainSource( | ||
consumerDefaults().withGroupId(createGroupId(1)), Subscriptions.topics(topic)) | ||
.take(take) | ||
.toMat(Sink.seq(), Keep.both()) | ||
.mapMaterializedValue(Consumer::createDrainingControl) | ||
.run(materializer); | ||
} | ||
|
||
protected <T> T resultOf(CompletionStage<T> stage) throws Exception { | ||
return resultOf(stage, resultOfTimeout()); | ||
} | ||
|
||
protected <T> T resultOf(CompletionStage<T> stage, Duration timeout) throws Exception { | ||
return stage.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
testkit/src/main/java/akka/kafka/testkit/javadsl/EmbeddedKafkaJunit4Test.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com> | ||
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
|
||
package akka.kafka.testkit.javadsl; | ||
|
||
import akka.actor.ActorSystem; | ||
import akka.stream.Materializer; | ||
import net.manub.embeddedkafka.EmbeddedKafka$; | ||
import net.manub.embeddedkafka.EmbeddedKafkaConfig; | ||
import net.manub.embeddedkafka.EmbeddedKafkaConfig$; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import scala.collection.immutable.HashMap$; | ||
|
||
/** | ||
* JUnit 5 aka Jupiter base-class with some convenience for creating an embedded Kafka broker before | ||
* running the tests. Extending classes must be annotated with `@TestInstance(Lifecycle.PER_CLASS)` | ||
* to create a single instance of the test class with `@BeforeAll` and `@AfterAll` annotated methods | ||
* called by the test framework. | ||
*/ | ||
public abstract class EmbeddedKafkaJunit4Test extends KafkaTest { | ||
|
||
private static EmbeddedKafkaConfig embeddedKafkaConfig( | ||
int kafkaPort, int zookeeperPort, int replicationFactor) { | ||
return EmbeddedKafkaConfig$.MODULE$.apply( | ||
kafkaPort, | ||
zookeeperPort, | ||
createReplicationFactorBrokerProps(replicationFactor), | ||
HashMap$.MODULE$.empty(), | ||
HashMap$.MODULE$.empty()); | ||
} | ||
|
||
protected final int kafkaPort; | ||
protected final int replicationFactor; | ||
|
||
public EmbeddedKafkaJunit4Test( | ||
ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) { | ||
super(system, materializer, "localhost:" + kafkaPort); | ||
this.kafkaPort = kafkaPort; | ||
this.replicationFactor = replicationFactor; | ||
} | ||
|
||
protected EmbeddedKafkaJunit4Test(ActorSystem system, Materializer materializer, int kafkaPort) { | ||
this(system, materializer, kafkaPort, 1); | ||
} | ||
|
||
protected static void startEmbeddedKafka(int kafkaPort, int replicationFactor) { | ||
EmbeddedKafka$.MODULE$.start(embeddedKafkaConfig(kafkaPort, kafkaPort + 1, replicationFactor)); | ||
} | ||
|
||
protected static void stopEmbeddedKafka() { | ||
EmbeddedKafka$.MODULE$.stop(); | ||
} | ||
|
||
@Before | ||
public void setupEmbeddedKafka() { | ||
EmbeddedKafkaJunit4Test.startEmbeddedKafka(kafkaPort, replicationFactor); | ||
setUpAdminClient(); | ||
} | ||
|
||
@After | ||
public void cleanUpEmbeddedKafka() { | ||
cleanUpAdminClient(); | ||
EmbeddedKafkaJunit4Test.stopEmbeddedKafka(); | ||
} | ||
} |
Oops, something went wrong.