Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - porting tests to Weaver #926

Draft
wants to merge 33 commits into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
dceb2e2
wip - port tests to Weaver
bplommer Mar 29, 2022
b68ee7c
Use weaver in TransactionalKafkaProducerSpec
bplommer Mar 29, 2022
6017f42
Start porting consumer tests to Weaver
bplommer Mar 29, 2022
2bc68ad
Add Weaver test framesword to sbt
bplommer Mar 29, 2022
b3d5489
Port more consumer tests to Weaver
bplommer Mar 29, 2022
677640f
Port seek tests to Weaver
bplommer Mar 29, 2022
40bf3cb
Port rebalance tests to Weaver
bplommer Mar 29, 2022
29b87c3
Port more tests to weaver
bplommer Mar 31, 2022
4275bb5
Port more tests to Weaver
bplommer Mar 31, 2022
4075089
Port more tests to Weaver
bplommer Mar 31, 2022
ce817c2
Port more tests to Weaver
bplommer Mar 31, 2022
3654600
Fix tests
bplommer Mar 31, 2022
456560e
Fix tests
bplommer Mar 31, 2022
299f66e
Fix tests
bplommer Mar 31, 2022
e17a3ca
Port more tests to Weaver
bplommer Mar 31, 2022
6ef1edb
Fix test
bplommer Mar 31, 2022
c8d228a
Use topic name for group id in test
bplommer Mar 31, 2022
e99803e
Unused import
bplommer Mar 31, 2022
0a104b8
fix tests?
bplommer Mar 31, 2022
4939790
comment out flaky assertion
bplommer Mar 31, 2022
8df6438
trying to hunt down test flakiness
bplommer Mar 31, 2022
8cfbb5c
Merge remote-tracking branch 'origin/series/2.x' into weaver
bplommer Apr 1, 2022
729bd62
Deduplication
bplommer Apr 1, 2022
323f067
scalafmt
bplommer Apr 1, 2022
ef80e8e
Fix nullpointer exception
bplommer Apr 1, 2022
703984a
Run tests in parallel
bplommer Apr 1, 2022
da8a0c9
Shared container between Weaver suites
bplommer Apr 1, 2022
f595e00
revert Test / parallelExecution change
bplommer Apr 1, 2022
03bcc69
Container sharing fix
bplommer Apr 1, 2022
d42aa6d
Fix timeout test
bplommer Apr 1, 2022
9f67760
Fix timeout test
bplommer Apr 1, 2022
5b0f524
Use assign rather than subscribe in offsets test
bplommer Apr 1, 2022
5a98c3c
fix compile
bplommer Apr 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ lazy val core = project
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"com.disneystreaming" %% "weaver-cats" % "0.7.11" % Test
)
),
publishSettings,
Expand Down Expand Up @@ -364,6 +365,7 @@ lazy val scalaSettings = Seq(
lazy val testSettings = Seq(
Test / logBuffered := false,
Test / parallelExecution := false,
testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
Test / testOptions += Tests.Argument("-oDF")
)

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
</encoder>
</appender>

<logger name="org.apache.kafka.common.utils.AppInfoParser" level = "ERROR"/>
<logger name="org.apache.kafka.common.metrics.Metrics" level = "ERROR"/>
<root level="INFO" additivity="false">
<appender-ref ref="STDOUT"/>
</root>
Expand Down
44 changes: 15 additions & 29 deletions modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ This file contains code derived from the Embedded Kafka library
*/
package fs2.kafka

import cats.effect.Sync
import cats.effect.{IO, Sync}
import fs2.kafka.internal.converters.collection._

import java.util.UUID
import scala.util.Failure
import com.dimafeng.testcontainers.{ForAllTestContainer, KafkaContainer}
import com.dimafeng.testcontainers.{ForAllTestContainer}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.{KafkaConsumer => KConsumer}
import org.apache.kafka.clients.producer.{
Expand All @@ -56,41 +56,22 @@ import org.apache.kafka.common.serialization.StringSerializer
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.Args
import weaver.Expectations

abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer {
abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer with BaseKafkaSpecBase {

override def runTest(testName: String, args: Args) = super.runTest(testName, args)
}

trait BaseKafkaSpecBase {
lazy val container = BaseWeaverSpecShared.container

final val adminClientCloseTimeout: FiniteDuration = 2.seconds
final val transactionTimeoutInterval: FiniteDuration = 1.second

final val consumerPollingTimeout: FiniteDuration = 1.second
protected val producerPublishTimeout: FiniteDuration = 10.seconds

override def runTest(testName: String, args: Args) = super.runTest(testName, args)

private val imageVersion = "7.0.1"

private lazy val imageName = Option(System.getProperty("os.arch")) match {
case Some("aarch64") =>
"niciqy/cp-kafka-arm64" // no official docker image for ARM is available yet
case _ => "confluentinc/cp-kafka"
}

override val container: KafkaContainer = new KafkaContainer()
.configure { container =>
container
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv(
"KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS",
transactionTimeoutInterval.toMillis.toString
)
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer")
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true")
.setDockerImageName(s"$imageName:$imageVersion")

()
}

implicit final val stringSerializer: KafkaSerializer[String] = new StringSerializer

implicit final val stringDeserializer: KafkaDeserializer[String] = new StringDeserializer
Expand Down Expand Up @@ -152,6 +133,11 @@ abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer {
final def withTopic[A](f: String => A): A =
f(nextTopicName())

final def withTopic(partitions: Int)(f: String => IO[Expectations]): IO[Expectations] =
IO(nextTopicName()).flatMap { topic =>
IO.blocking(createCustomTopic(topic, partitions = partitions)) >> f(topic)
}

final def withKafkaConsumer(
nativeSettings: Map[String, AnyRef]
): WithKafkaConsumer =
Expand Down
74 changes: 74 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/BaseWeaverSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
This file contains code derived from the Embedded Kafka library
(https://github.com/embeddedkafka/embedded-kafka), the license for which is reproduced below.

The MIT License (MIT)

Copyright (c) 2016 Emanuele Blanco

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package fs2.kafka

import cats.effect.{IO, Resource}
import com.dimafeng.testcontainers.KafkaContainer
import weaver.{GlobalResource, GlobalWrite, IOSuite}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

object BaseWeaverSpecShared extends GlobalResource {
protected val imageVersion = "7.0.1"

protected val imageName = Option(System.getProperty("os.arch")) match {
case Some("aarch64") =>
"niciqy/cp-kafka-arm64" // no official docker image for ARM is available yet
case _ => "confluentinc/cp-kafka"
}
final val transactionTimeoutInterval: FiniteDuration = 1.second

lazy val container: KafkaContainer = makeContainer()

def makeContainer() =
new KafkaContainer()
.configure { container =>
container
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv(
"KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS",
transactionTimeoutInterval.toMillis.toString
)
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer")
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true")
.setDockerImageName(s"$imageName:$imageVersion")

()
}

override def sharedResources(global: GlobalWrite): Resource[IO, Unit] =
ContainerResource(IO(container)).evalMap(global.put(_))
}

abstract class BaseWeaverSpec extends IOSuite with BaseKafkaSpecBase {

override type Res = KafkaContainer

override def sharedResource: Resource[IO, KafkaContainer] = ContainerResource(IO(container))

}
12 changes: 12 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/ContainerResource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package fs2.kafka

import cats.effect.{Resource, Sync}
import cats.syntax.all._
import com.dimafeng.testcontainers.Container

object ContainerResource {
def apply[F[_], C <: Container](container: F[C])(implicit F: Sync[F]): Resource[F, C] =
Resource.make(container.flatTap { container =>
F.blocking(container.start())
})(c => F.blocking(c.stop()))
}
Loading