Skip to content

Commit

Permalink
spring-kafka: improve kafka publisher listener selection strategy (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan authored Oct 14, 2024
1 parent a4efa1e commit d805d6a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ projectUrl=https://github.com/Trendyol/stove
licenceUrl=https://github.com/Trendyol/stove/blob/master/LICENCE
licence=Apache-2.0 license
snapshot=1.0.0-SNAPSHOT
version=0.14.0
version=0.14.1


Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,27 @@ class KafkaSystem(
it.setProducerListener(getInterceptor())
it.setCloseTimeout(1.seconds.toJavaDuration())
}
.firstOrNone {
it.producerFactory.configurationProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] == exposedConfiguration.bootstrapServers
}
.firstOrNone { safeContains(it, exposedConfiguration) }
.getOrElse {
logger.warn("No KafkaTemplate found for the configured bootstrap servers, using a fallback KafkaTemplate")
createFallbackTemplate(exposedConfiguration)
}
}

@Suppress("UNCHECKED_CAST")
private fun safeContains(
it: KafkaTemplate<Any, Any>,
exposedConfiguration: KafkaExposedConfiguration
): Boolean = it.producerFactory.configurationProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG]
.toOption()
.map {
when (it) {
is String -> it
is List<*> -> (it as List<String>).joinToString(",")
else -> ""
}
}.isSome { it.contains(exposedConfiguration.bootstrapServers) }

private fun createFallbackTemplate(exposedConfiguration: KafkaExposedConfiguration): KafkaTemplate<Any, Any> {
val producerFactory = DefaultKafkaProducerFactory<Any, Any>(
mapOf(
Expand Down

0 comments on commit d805d6a

Please sign in to comment.