Skip to content

Commit

Permalink
Merge pull request #730 from keirlawson/admin-delete-cgs
Browse files Browse the repository at this point in the history
Add deleteConsumerGroups method
  • Loading branch information
bplommer authored Jan 2, 2022
2 parents 5381bda + 366e867 commit 48315ff
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ lazy val mimaSettings = Seq(
// format: off
Seq(
ProblemFilters.exclude[Problem]("fs2.kafka.internal.*"),
ProblemFilters.exclude[IncompatibleSignatureProblem]("*")
ProblemFilters.exclude[IncompatibleSignatureProblem]("*"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.deleteConsumerGroups")
)
// format: on
}
Expand Down
18 changes: 18 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ sealed abstract class KafkaAdminClient[F[_]] {
*/
def listConsumerGroups: ListConsumerGroups[F]

/**
* Delete consumer groups from the cluster.
*/
def deleteConsumerGroups[G[_]](groupIds: G[String])(
implicit G: Foldable[G]
): F[Unit]

/**
* Lists topics. Returns topic names using:
*
Expand Down Expand Up @@ -478,6 +485,12 @@ object KafkaAdminClient {
): F[Unit] =
withAdminClient(_.deleteConsumerGroupOffsets(groupId, partitions.asJava).all().void)

private[this] def deleteConsumerGroupsWith[F[_], G[_]](
withAdminClient: WithAdminClient[F],
groupIds: G[String]
)(implicit G: Foldable[G]): F[Unit] =
withAdminClient(_.deleteConsumerGroups(groupIds.asJava).all().void)

/**
* Creates a new [[KafkaAdminClient]] in the `Resource` context,
* using the specified [[AdminClientSettings]]. If working in a
Expand Down Expand Up @@ -580,6 +593,11 @@ object KafkaAdminClient {
): F[Unit] =
deleteConsumerGroupOffsetsWith(client, groupId, partitions)

override def deleteConsumerGroups[G[_]](
groupIds: G[String]
)(implicit G: Foldable[G]): F[Unit] =
deleteConsumerGroupsWith(client, groupIds)

override def toString: String =
"KafkaAdminClient$" + System.identityHashCode(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
val expected = consumerGroupOffsetsMap - partition0
assert(res == expected)
}
_ <- adminClient
.deleteConsumerGroups(consumerGroupIds)
_ <- adminClient.listConsumerGroups.groupIds.map { res =>
val expected = List.empty
assert(res == expected)
}
} yield ()
}
.unsafeRunSync()
Expand Down

0 comments on commit 48315ff

Please sign in to comment.