Skip to content

Commit

Permalink
Merge pull request #925 from fd4s/forall-testcontainer
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer authored Mar 29, 2022
2 parents 854dc30 + ff7c934 commit aeb8caa
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 55 deletions.
11 changes: 6 additions & 5 deletions modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ package fs2.kafka

import cats.effect.Sync
import fs2.kafka.internal.converters.collection._
import java.util.UUID

import java.util.UUID
import scala.util.Failure
import com.dimafeng.testcontainers.{KafkaContainer, ForEachTestContainer}

import com.dimafeng.testcontainers.{ForAllTestContainer, KafkaContainer}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.{KafkaConsumer => KConsumer}
import org.apache.kafka.clients.producer.{
Expand All @@ -44,19 +43,21 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer

import scala.concurrent.duration._
import org.apache.kafka.clients.admin.NewTopic

import scala.util.Try
import org.apache.kafka.clients.admin.AdminClient

import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.collection.mutable.ListBuffer
import java.util.concurrent.TimeoutException
import org.apache.kafka.common.serialization.StringSerializer

import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.Args

abstract class BaseKafkaSpec extends BaseAsyncSpec with ForEachTestContainer {
abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer {

final val adminClientCloseTimeout: FiniteDuration = 2.seconds
final val transactionTimeoutInterval: FiniteDuration = 1.second
Expand Down
29 changes: 10 additions & 19 deletions modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,27 +180,18 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
.resource[IO](adminClientSettings)
.use { adminClient =>
for {
topicNames <- adminClient.listTopics.names.map(
_.filterNot(_.startsWith("__confluent"))
)
_ <- IO(assert(topicNames.size == 1))
topicListings <- adminClient.listTopics.listings.map(
_.filterNot(_.name.startsWith("__confluent"))
)
_ <- IO(assert(topicListings.size == 1))
topicNamesToListings <- adminClient.listTopics.namesToListings.map(_.filter {
case (key, _) => !key.startsWith("__confluent")
})
_ <- IO(assert(topicNamesToListings.size == 1))
topicNames <- adminClient.listTopics.names
topicCount = topicNames.size
topicListings <- adminClient.listTopics.listings
_ <- IO(assert(topicListings.size == topicCount))
topicNamesToListings <- adminClient.listTopics.namesToListings
_ <- IO(assert(topicNamesToListings.size == topicCount))
topicNamesInternal <- adminClient.listTopics.includeInternal.names
.map(_.filterNot(_.startsWith("__confluent")))
_ <- IO(assert(topicNamesInternal.size == 2))
_ <- IO(assert(topicNamesInternal.size == topicCount + 1))
topicListingsInternal <- adminClient.listTopics.includeInternal.listings
.map(_.filterNot(_.name.startsWith("__confluent")))
_ <- IO(assert(topicListingsInternal.size == 2))
_ <- IO(assert(topicListingsInternal.size == topicCount + 1))
topicNamesToListingsInternal <- adminClient.listTopics.includeInternal.namesToListings
.map(_.filter { case (key, _) => !key.startsWith("__confluent") })
_ <- IO(assert(topicNamesToListingsInternal.size == 2))
_ <- IO(assert(topicNamesToListingsInternal.size == topicCount + 1))
_ <- IO {
adminClient.listTopics.toString should startWith("ListTopics$")
}
Expand All @@ -209,7 +200,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
startWith("ListTopicsIncludeInternal$")
}
describedTopics <- adminClient.describeTopics(topicNames.toList)
_ <- IO(assert(describedTopics.size == 1))
_ <- IO(assert(describedTopics.size == topicCount))
newTopic = new NewTopic("new-test-topic", 1, 1.toShort)
preCreateNames <- adminClient.listTopics.names
_ <- IO(assert(!preCreateNames.contains(newTopic.name)))
Expand Down
5 changes: 3 additions & 2 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should consume all records with subscribing for several consumers") {
it("should consume all records at least once with subscribing for several consumers") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced = (0 until 5).map(n => s"key-$n" -> s"value->$n")
Expand All @@ -91,7 +91,8 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.toVector
.unsafeRunSync()

res should contain theSameElementsAs produced
// duplication is currently possible.
res.distinct should contain theSameElementsAs produced

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
Expand Down Expand Up @@ -153,7 +153,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
Expand Down Expand Up @@ -217,7 +217,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
Expand Down Expand Up @@ -301,7 +301,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
Expand Down Expand Up @@ -346,6 +346,36 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}

it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)

val info =
TransactionalKafkaProducer[IO]
.stream(
TransactionalProducerSettings(
transactionalId = s"id-$topic",
producerSettings = producerSettings[IO].withRetries(Int.MaxValue)
)
)
.evalMap(_.metrics)

val res =
info
.take(1)
.compile
.lastOrError
.unsafeRunSync()

assert(res.nonEmpty)
}
}
}

// TODO: after switching from ForEachTestContainer to ForAllTestContainer, this fails
// if run with a shared container with the following error:
// org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. was not an instance of org.apache.kafka.common.errors.InvalidProducerEpochException, but an instance of org.apache.kafka.common.errors.ProducerFencedException
class TransactionalKafkaProducerTimeoutSpec extends BaseKafkaSpec with EitherValues {
it("should use user-specified transaction timeouts") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand All @@ -370,7 +400,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
(for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
"id",
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
).withTransactionTimeout(transactionTimeoutInterval - 250.millis)
Expand Down Expand Up @@ -405,28 +435,4 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}

it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)

val info =
TransactionalKafkaProducer[IO]
.stream(
TransactionalProducerSettings(
transactionalId = "id",
producerSettings = producerSettings[IO].withRetries(Int.MaxValue)
)
)
.evalMap(_.metrics)

val res =
info
.take(1)
.compile
.lastOrError
.unsafeRunSync()

assert(res.nonEmpty)
}
}
}

0 comments on commit aeb8caa

Please sign in to comment.