From aa6fd34878687400a7c3e8f74c35eafff7fa43b8 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 17:07:52 +0200 Subject: [PATCH 01/15] . --- 3rdparty/jvm/io/micrometer/BUILD | 26 ++++++++++++++++++++++ 3rdparty/jvm/io/projectreactor/kafka/BUILD | 16 +++++++++++++ 3rdparty/target_file.bzl | 5 ++++- 3rdparty/workspace.bzl | 14 ++++++++---- dependencies.yaml | 9 ++++++-- src/main/java/kafka/Consumer.java | 4 ++-- 6 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 3rdparty/jvm/io/micrometer/BUILD create mode 100644 3rdparty/jvm/io/projectreactor/kafka/BUILD diff --git a/3rdparty/jvm/io/micrometer/BUILD b/3rdparty/jvm/io/micrometer/BUILD new file mode 100644 index 0000000..0798829 --- /dev/null +++ b/3rdparty/jvm/io/micrometer/BUILD @@ -0,0 +1,26 @@ +java_library( + name = "micrometer_commons", + exports = [ + "//external:jar/io/micrometer/micrometer_commons" + ], + visibility = [ + "//3rdparty/jvm:__subpackages__" + ] +) + + + +java_library( + name = "micrometer_observation", + exports = [ + "//external:jar/io/micrometer/micrometer_observation" + ], + runtime_deps = [ + ":micrometer_commons" + ], + visibility = [ + "//3rdparty/jvm:__subpackages__" + ] +) + + diff --git a/3rdparty/jvm/io/projectreactor/kafka/BUILD b/3rdparty/jvm/io/projectreactor/kafka/BUILD new file mode 100644 index 0000000..d60b279 --- /dev/null +++ b/3rdparty/jvm/io/projectreactor/kafka/BUILD @@ -0,0 +1,16 @@ +java_library( + name = "reactor_kafka", + exports = [ + "//external:jar/io/projectreactor/kafka/reactor_kafka" + ], + runtime_deps = [ + "//3rdparty/jvm/io/micrometer:micrometer_observation", + "//3rdparty/jvm/io/projectreactor:reactor_core", + "//3rdparty/jvm/org/apache/kafka:kafka_clients" + ], + visibility = [ + "//visibility:public" + ] +) + + diff --git a/3rdparty/target_file.bzl b/3rdparty/target_file.bzl index 2b6cfa3..7a010ca 100644 --- a/3rdparty/target_file.bzl +++ b/3rdparty/target_file.bzl @@ -146,14 +146,17 @@ def list_target_data(): "3rdparty/jvm/dev/failsafe:failsafe": ["lang||||||java","name||||||//3rdparty/jvm/dev/failsafe:failsafe","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/dev/failsafe/failsafe","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/dev/failsafe:failsafe_okhttp": ["lang||||||java","name||||||//3rdparty/jvm/dev/failsafe:failsafe_okhttp","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/dev/failsafe/failsafe_okhttp","runtimeDeps|||L|||//3rdparty/jvm/dev/failsafe:failsafe|||//3rdparty/jvm/com/squareup/okhttp3:okhttp","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/github/cdimascio:java_dotenv": ["lang||||||java","name||||||//3rdparty/jvm/io/github/cdimascio:java_dotenv","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/github/cdimascio/java_dotenv","runtimeDeps|||L|||//3rdparty/jvm/org/jetbrains/kotlin:kotlin_stdlib","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], +"3rdparty/jvm/io/micrometer:micrometer_commons": ["lang||||||java","name||||||//3rdparty/jvm/io/micrometer:micrometer_commons","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/micrometer/micrometer_commons","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], +"3rdparty/jvm/io/micrometer:micrometer_observation": ["lang||||||java","name||||||//3rdparty/jvm/io/micrometer:micrometer_observation","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/micrometer/micrometer_observation","runtimeDeps|||L|||//3rdparty/jvm/io/micrometer:micrometer_commons","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/projectreactor:reactor_core": ["lang||||||java","name||||||//3rdparty/jvm/io/projectreactor:reactor_core","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/projectreactor/reactor_core","runtimeDeps|||L|||//3rdparty/jvm/org/reactivestreams:reactive_streams","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/projectreactor/addons:reactor_extra": ["lang||||||java","name||||||//3rdparty/jvm/io/projectreactor/addons:reactor_extra","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/projectreactor/addons/reactor_extra","runtimeDeps|||L|||//3rdparty/jvm/io/projectreactor:reactor_core","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], +"3rdparty/jvm/io/projectreactor/kafka:reactor_kafka": ["lang||||||java","name||||||//3rdparty/jvm/io/projectreactor/kafka:reactor_kafka","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/projectreactor/kafka/reactor_kafka","runtimeDeps|||L|||//3rdparty/jvm/io/micrometer:micrometer_observation|||//3rdparty/jvm/io/projectreactor:reactor_core|||//3rdparty/jvm/org/apache/kafka:kafka_clients","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/prometheus:simpleclient": ["lang||||||java","name||||||//3rdparty/jvm/io/prometheus:simpleclient","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/prometheus/simpleclient","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/prometheus:simpleclient_common": ["lang||||||java","name||||||//3rdparty/jvm/io/prometheus:simpleclient_common","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/prometheus/simpleclient_common","runtimeDeps|||L|||//3rdparty/jvm/io/prometheus:simpleclient","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/prometheus:simpleclient_hotspot": ["lang||||||java","name||||||//3rdparty/jvm/io/prometheus:simpleclient_hotspot","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/prometheus/simpleclient_hotspot","runtimeDeps|||L|||//3rdparty/jvm/io/prometheus:simpleclient","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/io/prometheus:simpleclient_httpserver": ["lang||||||java","name||||||//3rdparty/jvm/io/prometheus:simpleclient_httpserver","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/io/prometheus/simpleclient_httpserver","runtimeDeps|||L|||//3rdparty/jvm/io/prometheus:simpleclient|||//3rdparty/jvm/io/prometheus:simpleclient_common","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/javax/annotation:javax_annotation_api": ["lang||||||java","name||||||//3rdparty/jvm/javax/annotation:javax_annotation_api","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/javax/annotation/javax_annotation_api","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], -"3rdparty/jvm/org/apache/kafka:kafka_clients": ["lang||||||java","name||||||//3rdparty/jvm/org/apache/kafka:kafka_clients","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/org/apache/kafka/kafka_clients","runtimeDeps|||L|||//3rdparty/jvm/com/github/luben:zstd_jni|||//3rdparty/jvm/org/lz4:lz4_java|||//3rdparty/jvm/org/xerial/snappy:snappy_java|||//3rdparty/jvm/org/slf4j:slf4j_api","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], +"3rdparty/jvm/org/apache/kafka:kafka_clients": ["lang||||||java","name||||||//3rdparty/jvm/org/apache/kafka:kafka_clients","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/org/apache/kafka/kafka_clients","runtimeDeps|||L|||//3rdparty/jvm/org/lz4:lz4_java|||//3rdparty/jvm/com/github/luben:zstd_jni|||//3rdparty/jvm/org/slf4j:slf4j_api|||//3rdparty/jvm/org/xerial/snappy:snappy_java","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/org/checkerframework:checker_qual": ["lang||||||java","name||||||//3rdparty/jvm/org/checkerframework:checker_qual","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/org/checkerframework/checker_qual","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/org/codehaus/mojo:animal_sniffer_annotations": ["lang||||||java","name||||||//3rdparty/jvm/org/codehaus/mojo:animal_sniffer_annotations","visibility||||||//3rdparty/jvm:__subpackages__","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/org/codehaus/mojo/animal_sniffer_annotations","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], "3rdparty/jvm/org/jetbrains:annotations": ["lang||||||java","name||||||//3rdparty/jvm/org/jetbrains:annotations","visibility||||||//visibility:public","kind||||||library","deps|||L|||","jars|||L|||","sources|||L|||","exports|||L|||//external:jar/org/jetbrains/annotations","runtimeDeps|||L|||","processorClasses|||L|||","generatesApi|||B|||false","licenses|||L|||","generateNeverlink|||B|||false"], diff --git a/3rdparty/workspace.bzl b/3rdparty/workspace.bzl index 68d41a4..a5839af 100644 --- a/3rdparty/workspace.bzl +++ b/3rdparty/workspace.bzl @@ -79,7 +79,7 @@ def list_dependencies(): {"artifact": "com.fasterxml.jackson.core:jackson-annotations:2.9.0", "lang": "java", "sha1": "07c10d545325e3a6e72e06381afe469fd40eb701", "sha256": "45d32ac61ef8a744b464c54c2b3414be571016dd46bfc2bec226761cf7ae457a", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.0/jackson-annotations-2.9.0.jar", "source": {"sha1": "a0ad4e203304ccab7e01266fa814115850edb8a9", "sha256": "eb1e62bc83f4d8e1f0660c9cf2f06d6d196eefb20de265cfff96521015d87020", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.0/jackson-annotations-2.9.0-sources.jar"} , "name": "com_fasterxml_jackson_core_jackson_annotations", "actual": "@com_fasterxml_jackson_core_jackson_annotations//jar", "bind": "jar/com/fasterxml/jackson/core/jackson_annotations"}, {"artifact": "com.fasterxml.jackson.core:jackson-core:2.9.3", "lang": "java", "sha1": "ea9b6fc7bc3ccba9777b0827091f9aa1f8580371", "sha256": "ed4b9c7e782b9d6b1d33fbe00cb19e30ef1b49d88d21eca1f16457480c9f0e02", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.3/jackson-core-2.9.3.jar", "source": {"sha1": "99006a307afedb30000fc93b462e98f0d822fa11", "sha256": "8983dcd4a5aa5a2d5549409f4c92edc90864085a3fd5f7efd591273802ce57b2", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.3/jackson-core-2.9.3-sources.jar"} , "name": "com_fasterxml_jackson_core_jackson_core", "actual": "@com_fasterxml_jackson_core_jackson_core//jar", "bind": "jar/com/fasterxml/jackson/core/jackson_core"}, {"artifact": "com.fasterxml.jackson.core:jackson-databind:2.9.3", "lang": "java", "sha1": "193b96ef555b2f2573b576887ba9a93e4bf48e8c", "sha256": "432862a32c291036a8e15c94b8a738460437a185be0df6df83166121dea0e3e3", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.3/jackson-databind-2.9.3.jar", "source": {"sha1": "eab1ff15884e40c0adafe219a98c46513eb1d0e6", "sha256": "19e7da5391416344c7dad02edf7b31f6687d9aec4acdbf6ab3e5d9df533b2a09", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.3/jackson-databind-2.9.3-sources.jar"} , "name": "com_fasterxml_jackson_core_jackson_databind", "actual": "@com_fasterxml_jackson_core_jackson_databind//jar", "bind": "jar/com/fasterxml/jackson/core/jackson_databind"}, - {"artifact": "com.github.luben:zstd-jni:1.5.2-1", "lang": "java", "sha1": "fad786abc1d1b81570e8d9a2fc8a1ef479bc27b6", "sha256": "93f7e4cbc907c2650f89f9f0bec94873735a58f1e4b66a54973294e4ec1878e8", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1.jar", "source": {"sha1": "fc4dca644fbb8527bc77622209b933a19572a983", "sha256": "3bc7cb26705151f9d239e0ef50c7e5797fb0e6edb93644f141fce3c3d0f82978", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1-sources.jar"} , "name": "com_github_luben_zstd_jni", "actual": "@com_github_luben_zstd_jni//jar", "bind": "jar/com/github/luben/zstd_jni"}, + {"artifact": "com.github.luben:zstd-jni:1.5.5-1", "lang": "java", "sha1": "fda1d6278299af27484e1cc3c79a060e41b7ef7e", "sha256": "f779fcd068ad91ac77aa0239104bd42793b0dce807fb1d73b51c635e0ea1e293", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/github/luben/zstd-jni/1.5.5-1/zstd-jni-1.5.5-1.jar", "source": {"sha1": "89ed42c223d964d3d111e84c18b00d842e5c850e", "sha256": "4800c5f8917fb9589caed77e0440cbce74fac465c8dd81fe91e5058cfdad36ed", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/github/luben/zstd-jni/1.5.5-1/zstd-jni-1.5.5-1-sources.jar"} , "name": "com_github_luben_zstd_jni", "actual": "@com_github_luben_zstd_jni//jar", "bind": "jar/com/github/luben/zstd_jni"}, {"artifact": "com.google.api:api-common:1.8.1", "lang": "java", "sha1": "e89befb19b08ad84b262b2f226ab79aefcaa9d7f", "sha256": "9840ed24fce0a96492e671853077be62edab802b6760e3b327362d6949943674", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/api/api-common/1.8.1/api-common-1.8.1.jar", "source": {"sha1": "1823b92949fff86bcc852e80f3608b6735810135", "sha256": "d61ca9de9fd31f341b83890f878f5cb45238531fecb41516650574f417c19c60", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/api/api-common/1.8.1/api-common-1.8.1-sources.jar"} , "name": "com_google_api_api_common", "actual": "@com_google_api_api_common//jar", "bind": "jar/com/google/api/api_common"}, {"artifact": "com.google.code.findbugs:jsr305:3.0.2", "lang": "java", "sha1": "25ea2e8b0c338a877313bd4672d3fe056ea78f0d", "sha256": "766ad2a0783f2687962c8ad74ceecc38a28b9f72a2d085ee438b7813e928d0c7", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2.jar", "source": {"sha1": "b19b5927c2c25b6c70f093767041e641ae0b1b35", "sha256": "1c9e85e272d0708c6a591dc74828c71603053b48cc75ae83cce56912a2aa063b", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2-sources.jar"} , "name": "com_google_code_findbugs_jsr305", "actual": "@com_google_code_findbugs_jsr305//jar", "bind": "jar/com/google/code/findbugs/jsr305"}, {"artifact": "com.google.code.gson:gson:2.8.9", "lang": "java", "sha1": "8a432c1d6825781e21a02db2e2c33c5fde2833b9", "sha256": "d3999291855de495c94c743761b8ab5176cfeabe281a5ab0d8e8d45326fd703e", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/code/gson/gson/2.8.9/gson-2.8.9.jar", "source": {"sha1": "e1167849f172a0b75d6ffe4e4b2f34b9259795bb", "sha256": "ba5bddb1a89eb721fcca39f3b34294532060f851e2407a82d82134a41eec4719", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/com/google/code/gson/gson/2.8.9/gson-2.8.9-sources.jar"} , "name": "com_google_code_gson_gson", "actual": "@com_google_code_gson_gson//jar", "bind": "jar/com/google/code/gson/gson"}, @@ -107,8 +107,14 @@ def list_dependencies(): {"artifact": "dev.failsafe:failsafe-okhttp:3.3.1", "lang": "java", "sha1": "a0b5ea9100e27806ebc9b789a6b260020c9f602a", "sha256": "e8455c31a16648d5e17070da21cdb5ded3219688a6d2ab2b6a57454aaec2e39c", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/dev/failsafe/failsafe-okhttp/3.3.1/failsafe-okhttp-3.3.1.jar", "source": {"sha1": "619b0fa58641cb0ff4df55df3397756d7abec4ec", "sha256": "6dbc2f1ef23751702f7e4be6c2dc62edec4183aca47eeb35d5c79b2b7b5ffd63", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/dev/failsafe/failsafe-okhttp/3.3.1/failsafe-okhttp-3.3.1-sources.jar"} , "name": "dev_failsafe_failsafe_okhttp", "actual": "@dev_failsafe_failsafe_okhttp//jar", "bind": "jar/dev/failsafe/failsafe_okhttp"}, {"artifact": "dev.failsafe:failsafe:3.3.1", "lang": "java", "sha1": "cca46c10dd22fb0beb7e0c1e468604430355a4ab", "sha256": "af90a069c56918d9ddbef7bbd5ef5a25c1eb8bef459ae1ee89c1db21cd5f6bf5", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/dev/failsafe/failsafe/3.3.1/failsafe-3.3.1.jar", "source": {"sha1": "81cb52cbde62be0a021b01c816377692191c37d0", "sha256": "a851840d44fda67d007d8bf93086b58dfb0086847ed3be37b542abc54794be45", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/dev/failsafe/failsafe/3.3.1/failsafe-3.3.1-sources.jar"} , "name": "dev_failsafe_failsafe", "actual": "@dev_failsafe_failsafe//jar", "bind": "jar/dev/failsafe/failsafe"}, {"artifact": "io.github.cdimascio:java-dotenv:3.1.7", "lang": "java", "sha1": "dc7a06f28c0e270a2216cca3429e9a2a6e22e752", "sha256": "2d41499b6d848f93a5ad08a91f0c07f5e62e19546fd9643f8565d960ac2e2111", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/github/cdimascio/java-dotenv/3.1.7/java-dotenv-3.1.7.jar", "source": {"sha1": "a05528d86bba44fbeba699ba5536d18cb5ad295f", "sha256": "a168009b2299ed42eb8da14460800c674702cb72ff057126abb5933df0d4894a", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/github/cdimascio/java-dotenv/3.1.7/java-dotenv-3.1.7-sources.jar"} , "name": "io_github_cdimascio_java_dotenv", "actual": "@io_github_cdimascio_java_dotenv//jar", "bind": "jar/io/github/cdimascio/java_dotenv"}, - {"artifact": "io.projectreactor.addons:reactor-extra:3.3.3.RELEASE", "lang": "java", "sha1": "e8d9adea315fc94199c42fc7936d28adf4255b40", "sha256": "b0d015a36991db31123591edd95ba3d614f2a53639a39ccc899be90d13b86a28", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/addons/reactor-extra/3.3.3.RELEASE/reactor-extra-3.3.3.RELEASE.jar", "source": {"sha1": "3c02a41a34a4c801badd9b08cd2cb18a3b7a4b8f", "sha256": "12871c8b2a93ce5eb21f0242f2611a5f87ed68a3c0a93d609a0fade28d91a005", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/addons/reactor-extra/3.3.3.RELEASE/reactor-extra-3.3.3.RELEASE-sources.jar"} , "name": "io_projectreactor_addons_reactor_extra", "actual": "@io_projectreactor_addons_reactor_extra//jar", "bind": "jar/io/projectreactor/addons/reactor_extra"}, - {"artifact": "io.projectreactor:reactor-core:3.3.3.RELEASE", "lang": "java", "sha1": "8f971129f2b467bfa11c25c02ff3be7f466a82e0", "sha256": "e8a8d18530bf083ce10454895aedb8c62d8b9df6194b1f62948afbfd55ee4f5c", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/reactor-core/3.3.3.RELEASE/reactor-core-3.3.3.RELEASE.jar", "source": {"sha1": "1c98e879931557d0b9c8ce435c3800930e86c05d", "sha256": "6ce79656c7b3b3a3ccfa5da772f614ccc7610c888559188409cc9bfa0bc70d83", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/reactor-core/3.3.3.RELEASE/reactor-core-3.3.3.RELEASE-sources.jar"} , "name": "io_projectreactor_reactor_core", "actual": "@io_projectreactor_reactor_core//jar", "bind": "jar/io/projectreactor/reactor_core"}, + {"artifact": "io.micrometer:micrometer-commons:1.10.10", "lang": "java", "sha1": "dfd7637bc7b300b98abb0f85eb9c0862366910b5", "sha256": "745d7497c51d170ffa0d9cb4c6cdf6e9a1b30803dc47f72b9f9ae158102832b8", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/micrometer/micrometer-commons/1.10.10/micrometer-commons-1.10.10.jar", "source": {"sha1": "1399c92225d14ea5214c162b0a88e504c38cd328", "sha256": "0d439c4e234388b3663a6ccab21bdbef1cea16b824946182d447b5bdc4e2b951", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/micrometer/micrometer-commons/1.10.10/micrometer-commons-1.10.10-sources.jar"} , "name": "io_micrometer_micrometer_commons", "actual": "@io_micrometer_micrometer_commons//jar", "bind": "jar/io/micrometer/micrometer_commons"}, + {"artifact": "io.micrometer:micrometer-observation:1.10.10", "lang": "java", "sha1": "0a13355287f9b933d652157c85b313afbbfee1e3", "sha256": "e94f09bb015339e092aa6aadb8d1816867c7ac466ef257115cc08f28509c9346", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/micrometer/micrometer-observation/1.10.10/micrometer-observation-1.10.10.jar", "source": {"sha1": "c26825d7819fafcf6f2e89dd4223993652c03162", "sha256": "dc795dea97fab05fa7ef80b3fa8bc5b58aea4d8689e0aab15da43383bfd37f34", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/micrometer/micrometer-observation/1.10.10/micrometer-observation-1.10.10-sources.jar"} , "name": "io_micrometer_micrometer_observation", "actual": "@io_micrometer_micrometer_observation//jar", "bind": "jar/io/micrometer/micrometer_observation"}, + {"artifact": "io.projectreactor.addons:reactor-extra:3.5.1", "lang": "java", "sha1": "227cd47421ca0571c662f67428903c5c54777ef1", "sha256": "523e4ff3e80bb9c15702bbb4e3912462dca4a9888c88fc3ddfe560f3783756e2", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/addons/reactor-extra/3.5.1/reactor-extra-3.5.1.jar", "source": {"sha1": "4f73ecb82578690056f1fa70174cf852a9933075", "sha256": "2584ce947268e3a7a91afc03d26a19e27e1f22b402fe5b57f42e3f88fb872cb8", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/addons/reactor-extra/3.5.1/reactor-extra-3.5.1-sources.jar"} , "name": "io_projectreactor_addons_reactor_extra", "actual": "@io_projectreactor_addons_reactor_extra//jar", "bind": "jar/io/projectreactor/addons/reactor_extra"}, + {"artifact": "io.projectreactor.kafka:reactor-kafka:1.3.22", "lang": "java", "sha1": "d30865d2b26466390b5b8d38388bcdef1eca7af2", "sha256": "08101dbfe23db31bb1ced03fff16584e1a25502ade978f2ef0628d5059378fb6", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/kafka/reactor-kafka/1.3.22/reactor-kafka-1.3.22.jar", "source": {"sha1": "03526ebc816e12a6d8003c6213d4c9f674db637f", "sha256": "0704f0d3e4bedaefeae4893e9b22ca3451dca3872dfdced339cc9c2fa8c97a1c", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/kafka/reactor-kafka/1.3.22/reactor-kafka-1.3.22-sources.jar"} , "name": "io_projectreactor_kafka_reactor_kafka", "actual": "@io_projectreactor_kafka_reactor_kafka//jar", "bind": "jar/io/projectreactor/kafka/reactor_kafka"}, +# duplicates in io.projectreactor:reactor-core fixed to 3.6.0 +# - io.projectreactor.addons:reactor-extra:3.5.1 wanted version 3.5.4 +# - io.projectreactor.kafka:reactor-kafka:1.3.22 wanted version 3.4.34 + {"artifact": "io.projectreactor:reactor-core:3.6.0", "lang": "java", "sha1": "ac8c6923f46c0dff079133e010f0aa3132ccf1c2", "sha256": "1b37779edf618eb526d27986329740fa5c25c61568ca812812d0cca127ceeac1", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/reactor-core/3.6.0/reactor-core-3.6.0.jar", "source": {"sha1": "45ddb713b381efd9e7f410f0749ef08eed34013d", "sha256": "daf5a79a7b462122794f7073075aabe79a6c077a8c44a8bf0e7bd5c847dd679e", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/projectreactor/reactor-core/3.6.0/reactor-core-3.6.0-sources.jar"} , "name": "io_projectreactor_reactor_core", "actual": "@io_projectreactor_reactor_core//jar", "bind": "jar/io/projectreactor/reactor_core"}, {"artifact": "io.prometheus:simpleclient:0.8.0", "lang": "java", "sha1": "8d4e28b6e2a90204b578a64e017d607daeff338a", "sha256": "4a7a4966d6d369d4b82dee3c42fe488bd4173ec4928b4315e928afe378835e44", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient/0.8.0/simpleclient-0.8.0.jar", "source": {"sha1": "bc5a4e1d25ef6a677aaadba0de60135b69cd2914", "sha256": "6c6f6a98488fa38b970c8f416cd1731dc3e2c6925ad23146d606aa5404fbaa30", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient/0.8.0/simpleclient-0.8.0-sources.jar"} , "name": "io_prometheus_simpleclient", "actual": "@io_prometheus_simpleclient//jar", "bind": "jar/io/prometheus/simpleclient"}, {"artifact": "io.prometheus:simpleclient_common:0.8.0", "lang": "java", "sha1": "011ed6deba9a4a5d204abc1cbbc51f0c91caf1c2", "sha256": "8aa113443e876124eab257a68c3bfca489bebda9dd633e4938c96804c81b0d62", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient_common/0.8.0/simpleclient_common-0.8.0.jar", "source": {"sha1": "644bc720ac45f7a176f2c1053e46a4a4710516d8", "sha256": "bb9e4926fabc4fd1dde6f1b9da1f69152e46296ca01fea8d2dc4f916f7231c8b", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient_common/0.8.0/simpleclient_common-0.8.0-sources.jar"} , "name": "io_prometheus_simpleclient_common", "actual": "@io_prometheus_simpleclient_common//jar", "bind": "jar/io/prometheus/simpleclient_common"}, {"artifact": "io.prometheus:simpleclient_hotspot:0.8.0", "lang": "java", "sha1": "d860de9f2032f26d16a86665ddca98ce2063d304", "sha256": "619a151356ced431501fbd5c429f1bb14391bda73ec1d70b9a57417dfa6b98fc", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient_hotspot/0.8.0/simpleclient_hotspot-0.8.0.jar", "source": {"sha1": "2af3bac9679124eee0e0d02253325d92f06be18b", "sha256": "b27a9164ef141edb7f77a6950f34a95fe4eb42265ee1c67908ca1d7f51a05dc5", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/io/prometheus/simpleclient_hotspot/0.8.0/simpleclient_hotspot-0.8.0-sources.jar"} , "name": "io_prometheus_simpleclient_hotspot", "actual": "@io_prometheus_simpleclient_hotspot//jar", "bind": "jar/io/prometheus/simpleclient_hotspot"}, @@ -137,7 +143,7 @@ def list_dependencies(): # - ch.qos.logback:logback-classic:1.3.5 wanted version 2.0.4 # - org.apache.kafka:kafka-clients:3.4.0 wanted version 1.7.36 {"artifact": "org.slf4j:slf4j-api:2.0.3", "lang": "java", "sha1": "deef7fc81f00bd5e6205bb097be1040b4094f007", "sha256": "68ddcda65300ff8097ad1a096d7cd2fb06cef25193887cec3f2690e01bdbf421", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/slf4j/slf4j-api/2.0.3/slf4j-api-2.0.3.jar", "source": {"sha1": "6ba8526502af8ff682286a8c7104162be97913db", "sha256": "c067fd1d63b3f15a532b8828e8e95788b39b9b90a0d15a8c7cad59e1ba2666c4", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/slf4j/slf4j-api/2.0.3/slf4j-api-2.0.3-sources.jar"} , "name": "org_slf4j_slf4j_api", "actual": "@org_slf4j_slf4j_api//jar", "bind": "jar/org/slf4j/slf4j_api"}, - {"artifact": "org.xerial.snappy:snappy-java:1.1.8.4", "lang": "java", "sha1": "66f0d56454509f6e36175f2331572e250e04a6cc", "sha256": "24c4d1fc1e89e078331ab8f401a99cad68599bde4a2e4516042cb548c51b1c3e", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar", "source": {"sha1": "ae02b50fff26612bdf687952dc970ce9760b620c", "sha256": "0fc6448a4e432096a2315efeed8b9ed089df5995482414441019f021afad2ca6", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4-sources.jar"} , "name": "org_xerial_snappy_snappy_java", "actual": "@org_xerial_snappy_snappy_java//jar", "bind": "jar/org/xerial/snappy/snappy_java"}, + {"artifact": "org.xerial.snappy:snappy-java:1.1.10.4", "lang": "java", "sha1": "50d0390056017158bdc75c063efd5c2a898d5f0c", "sha256": "55b30c94e5c4cc2d4b6976916098d0678a8a6cc7427fa8c875621bd94e731ac8", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.10.4/snappy-java-1.1.10.4.jar", "source": {"sha1": "fc1d79afc60e936d4b57acd77c29cff65b7272b6", "sha256": "da5f3390bfb48df6c39c4e4f1363c96477ffbf3fa8e8ab3203cf848b4eeafe24", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.10.4/snappy-java-1.1.10.4-sources.jar"} , "name": "org_xerial_snappy_snappy_java", "actual": "@org_xerial_snappy_snappy_java//jar", "bind": "jar/org/xerial/snappy/snappy_java"}, ] def maven_dependencies(callback = jar_artifact_callback): diff --git a/dependencies.yaml b/dependencies.yaml index f13838b..8b07933 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -68,12 +68,17 @@ dependencies: io.projectreactor: reactor-core: lang: java - version: "3.3.3.RELEASE" + version: "3.6.0" io.projectreactor.addons: reactor-extra: lang: java - version: "3.3.3.RELEASE" + version: "3.5.1" + + io.projectreactor.kafka: + reactor-kafka: + lang: java + version: "1.3.22" io.prometheus: simpleclient: diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index 2052e3c..5d3464a 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -5,14 +5,14 @@ import java.util.Date; import java.util.stream.StreamSupport; import monitoring.Monitor; -import org.apache.kafka.clients.consumer.ConsumerRecord; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import target.ITarget; +import reactor.kafka.receiver.KafkaReceiver; public class Consumer { - + private final KafkaReceiver foo; private final ReactiveKafkaClient kafkaConsumer; private final ITarget target; From 9cbd876f9850cf40f0e9251b16bde5844797b3d6 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 18:01:49 +0200 Subject: [PATCH 02/15] . --- src/BUILD | 1 + src/main/java/Main.java | 60 ++-- src/main/java/kafka/Consumer.java | 42 +-- src/main/java/kafka/KafkaClientFactory.java | 6 +- src/main/java/kafka/Producer.java | 1 - src/main/java/kafka/ReactiveKafkaClient.java | 330 ------------------- src/main/java/monitoring/Monitor.java | 7 +- 7 files changed, 46 insertions(+), 401 deletions(-) delete mode 100644 src/main/java/kafka/ReactiveKafkaClient.java diff --git a/src/BUILD b/src/BUILD index a7a33f2..993a4c2 100644 --- a/src/BUILD +++ b/src/BUILD @@ -20,6 +20,7 @@ java_library( "@third_party//3rdparty/jvm/dev/failsafe:failsafe_okhttp", "@third_party//3rdparty/jvm/io/github/cdimascio:java_dotenv", "@third_party//3rdparty/jvm/io/projectreactor:reactor_core", + "@third_party//3rdparty/jvm/io/projectreactor/kafka:reactor_kafka", "@third_party//3rdparty/jvm/io/projectreactor/addons:reactor_extra", "@third_party//3rdparty/jvm/io/prometheus:simpleclient", "@third_party//3rdparty/jvm/io/prometheus:simpleclient_hotspot", diff --git a/src/main/java/Main.java b/src/main/java/Main.java index 8e4afdf..6368ebf 100644 --- a/src/main/java/Main.java +++ b/src/main/java/Main.java @@ -1,16 +1,15 @@ +package src.main.java; + import configuration.Config; import configuration.TopicsRoutes; -import java.util.Collection; import java.util.concurrent.CountDownLatch; import kafka.Consumer; import kafka.KafkaClientFactory; import kafka.Producer; -import kafka.ReactiveKafkaClient; import monitoring.Monitor; import monitoring.MonitoringServer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; import reactor.core.Disposable; +import reactor.kafka.receiver.KafkaReceiver; import target.HttpTarget; import target.TargetHealthcheck; @@ -27,7 +26,7 @@ public static void main(String[] args) throws Exception { Monitor.init(); topicsRoutes = new TopicsRoutes(Config.TOPICS_ROUTES); - waitForTargetHealthcheck(); + waitForTargetHealthCheck(); monitoringServer = new MonitoringServer().start(); consumer = createConsumer(monitoringServer); onShutdown(consumer, monitoringServer); @@ -40,7 +39,7 @@ public static void main(String[] args) throws Exception { Monitor.serviceTerminated(); } - private static void waitForTargetHealthcheck() throws InterruptedException { + private static void waitForTargetHealthCheck() throws InterruptedException { do { Monitor.waitingForTargetHealthcheck(); Thread.sleep(1000); @@ -49,42 +48,23 @@ private static void waitForTargetHealthcheck() throws InterruptedException { } private static Disposable createConsumer(MonitoringServer monitoringServer) { - return new Consumer( - new ReactiveKafkaClient<>( - KafkaClientFactory.createConsumer(), - topicsRoutes.getTopics(), - new ConsumerRebalanceListener() { - @Override - public void onPartitionsAssigned(Collection partitions) { - if (partitions.size() == 0) { - return; - } - - Monitor.assignedToPartition(partitions); - monitoringServer.consumerAssigned(); - } - - @Override - public void onPartitionsRevoked(Collection partitions) { - Monitor.revokedFromPartition(partitions); - } + var receiverOptions = KafkaClientFactory.createReceiverOptions() + .subscription(topicsRoutes.getTopics()) + .addAssignListener(partitions -> { + if (partitions.isEmpty()) { + return; } - ), - new HttpTarget(topicsRoutes, new Producer(KafkaClientFactory.createProducer())) - ) + Monitor.assignedToPartition(partitions); + monitoringServer.consumerAssigned(); + }) + .addRevokeListener(Monitor::revokedFromPartition); + + return (Disposable) new Consumer( + KafkaReceiver.create(receiverOptions), + new HttpTarget(topicsRoutes, new Producer(KafkaClientFactory.createProducer()))) .stream() - .doOnError(Monitor::consumerError) - .subscribe( - __ -> {}, - exception -> { - monitoringServer.consumerDisposed(); - Monitor.consumerError(exception); - }, - () -> { - monitoringServer.consumerDisposed(); - Monitor.consumerCompleted(); - } - ); + .doOnError(Monitor::consumerError); + } private static void onShutdown(Disposable consumer, MonitoringServer monitoringServer) { diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index 5d3464a..e101ef9 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -3,47 +3,41 @@ import configuration.Config; import java.time.Duration; import java.util.Date; -import java.util.stream.StreamSupport; import monitoring.Monitor; +import org.apache.kafka.clients.consumer.ConsumerRecord; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import target.ITarget; import reactor.kafka.receiver.KafkaReceiver; +import target.ITarget; public class Consumer { - private final KafkaReceiver foo; - private final ReactiveKafkaClient kafkaConsumer; + private final KafkaReceiver kafkaReceiver; private final ITarget target; - public Consumer(ReactiveKafkaClient kafkaConsumer, ITarget target) { - this.kafkaConsumer = kafkaConsumer; + + public Consumer(KafkaReceiver kafkaReceiver, ITarget target) { + this.kafkaReceiver = kafkaReceiver; this.target = target; } public Flux stream() { - return kafkaConsumer - .doOnNext(records -> Monitor.batchProcessStarted(records.count())) + return kafkaReceiver.receiveBatch() + .doOnNext(records -> records.count().doOnNext(Monitor::batchProcessStarted)) .concatMap(records -> { var batchStartTimestamp = new Date().getTime(); - var hasNullKey = StreamSupport - .stream(records.spliterator(), false) - .anyMatch(record -> record.key() == null); - - return Flux - .fromIterable(records) - .groupBy(record -> hasNullKey ? record.partition() : record.key()) + return records + .groupBy(ConsumerRecord::key) .delayElements(Duration.ofMillis(Config.PROCESSING_DELAY)) .publishOn(Schedulers.parallel()) - .flatMap(partition -> partition.concatMap(record -> Mono.fromFuture(target.call(record)))) + .flatMap(partition -> partition.concatMap(record -> Mono.fromFuture(target.call(record)). + map(__ -> record ))) .collectList() - .map(__ -> batchStartTimestamp); + .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)); }) - .doOnNext(Monitor::batchProcessCompleted) - .map(__ -> { - kafkaConsumer.commit(); - return 0; - }) - .doOnNext(__ -> kafkaConsumer.poll()); - } + .flatMap(records -> Mono.fromRunnable(() -> { + var lastOffset = records.get(records.size() - 1).receiverOffset(); + lastOffset.commit().block(); + })); +} } diff --git a/src/main/java/kafka/KafkaClientFactory.java b/src/main/java/kafka/KafkaClientFactory.java index 498af3d..bdaee3d 100644 --- a/src/main/java/kafka/KafkaClientFactory.java +++ b/src/main/java/kafka/KafkaClientFactory.java @@ -3,10 +3,10 @@ import configuration.Config; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.clients.producer.KafkaProducer; +import reactor.kafka.receiver.ReceiverOptions; public class KafkaClientFactory { @@ -48,7 +48,7 @@ private static String getSaslMechanism() { }; } - public static org.apache.kafka.clients.consumer.Consumer createConsumer() { + public static ReceiverOptions createReceiverOptions() { var props = getAuthProperties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, Config.GROUP_ID); props.put( @@ -68,7 +68,7 @@ public static org.apache.kafka.clients.consumer.Consumer createCons props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Config.SESSION_TIMEOUT); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, Config.SESSION_TIMEOUT / 3); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Config.KAFKA_POLL_INTERVAL_MS); - return new KafkaConsumer<>(props); + return ReceiverOptions.create(props); } public static KafkaProducer createProducer() { diff --git a/src/main/java/kafka/Producer.java b/src/main/java/kafka/Producer.java index 86b2c96..6476317 100644 --- a/src/main/java/kafka/Producer.java +++ b/src/main/java/kafka/Producer.java @@ -1,7 +1,6 @@ package kafka; import configuration.Config; -import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; import monitoring.Monitor; diff --git a/src/main/java/kafka/ReactiveKafkaClient.java b/src/main/java/kafka/ReactiveKafkaClient.java deleted file mode 100644 index ab79999..0000000 --- a/src/main/java/kafka/ReactiveKafkaClient.java +++ /dev/null @@ -1,330 +0,0 @@ -package kafka; - -import configuration.Config; -import java.time.Duration; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.errors.WakeupException; -import org.jetbrains.annotations.NotNull; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; - -public class ReactiveKafkaClient extends Flux> implements Disposable { - - final Collection topics; - final ConsumerRebalanceListener consumerRebalanceListener; - - final AtomicBoolean isActive = new AtomicBoolean(); - final AtomicBoolean isClosed = new AtomicBoolean(); - - final Scheduler scheduler; - final PollEvent pollEvent; - final CommitEvent commitEvent; - - Consumer consumer; - CoreSubscriber> actual; - - public ReactiveKafkaClient( - Consumer consumer, - Collection topics, - ConsumerRebalanceListener consumerRebalanceListener - ) { - this.topics = topics; - this.consumer = consumer; - this.consumerRebalanceListener = consumerRebalanceListener; - - pollEvent = new PollEvent(); - commitEvent = new CommitEvent(); - scheduler = KafkaSchedulers.newEvent(Config.GROUP_ID); - } - - @Override - public void subscribe(@NotNull CoreSubscriber> actual) { - if (!isActive.compareAndSet(false, true)) { - Operators.error( - actual, - new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux") - ); - return; - } - - this.actual = actual; - - isClosed.set(false); - - try { - scheduler.schedule(new SubscribeEvent()); - - actual.onSubscribe( - new Subscription() { - @Override - public void request(long n) { - pollEvent.scheduleIfRequired(); - } - - @Override - public void cancel() {} - } - ); - - scheduler.start(); - } catch (Exception e) { - Operators.error(actual, e); - return; - } - } - - void poll() { - pollEvent.scheduleIfRequired(); - } - - void commit() { - commitEvent.scheduleIfRequired(); - } - - @Override - public void dispose() { - if (!isActive.compareAndSet(true, false)) { - return; - } - - boolean isConsumerClosed = consumer == null; - if (isConsumerClosed) { - return; - } - - try { - consumer.wakeup(); - CloseEvent closeEvent = new CloseEvent(Duration.ofNanos(Long.MAX_VALUE)); - - boolean isEventsThread = KafkaSchedulers.isCurrentThreadFromScheduler(); - if (isEventsThread) { - closeEvent.run(); - return; - } - - if (scheduler.isDisposed()) { - closeEvent.run(); - return; - } - - scheduler.schedule(closeEvent); - isConsumerClosed = closeEvent.await(); - } finally { - scheduler.dispose(); - - int maxRetries = 10; - for (int i = 0; i < maxRetries && !isConsumerClosed; i++) { - try { - if (consumer != null) { - consumer.close(); - } - isConsumerClosed = true; - } catch (Exception ignored) {} - } - isClosed.set(true); - } - } - - class SubscribeEvent implements Runnable { - - @Override - public void run() { - try { - consumer.subscribe(Pattern.compile(getTopicsPattern()), consumerRebalanceListener); - } catch (Exception e) { - if (isActive.get()) { - actual.onError(e); - } - } - } - - private String getTopicsPattern() { - return topics.stream().map(topic -> String.format("^%s$", topic)).collect(Collectors.joining("|")); - } - } - - class PollEvent implements Runnable { - - private final AtomicInteger pendingCount = new AtomicInteger(); - private final Duration pollTimeout = Duration.ofMillis(Config.POLL_TIMEOUT); - - @Override - public void run() { - try { - if (isActive.get()) { - pendingCount.decrementAndGet(); - var records = consumer.poll(pollTimeout); - if (records.count() == 0) { - scheduleIfRequired(); - return; - } - actual.onNext(records); - } - } catch (Exception e) { - if (isActive.get()) { - actual.onError(e); - } - } - } - - void scheduleIfRequired() { - if (pendingCount.get() <= 0) { - scheduler.schedule(this); - pendingCount.incrementAndGet(); - } - } - } - - class CommitEvent implements Runnable { - - private final AtomicBoolean isPending = new AtomicBoolean(); - private final AtomicInteger inProgress = new AtomicInteger(); - - @Override - public void run() { - if (!isPending.compareAndSet(true, false)) { - return; - } - try { - inProgress.incrementAndGet(); - consumer.commitAsync((__, error) -> { - if ( - error != null && - !(error instanceof RetriableCommitFailedException) && - !(error instanceof CommitFailedException) - ) { - actual.onError(error); - return; - } - }); - inProgress.decrementAndGet(); - } catch (Exception e) { - inProgress.decrementAndGet(); - actual.onError(e); - } - } - - void runIfRequired() { - isPending.set(true); - if (isPending.get()) run(); - } - - void scheduleIfRequired() { - if (isActive.get() && isPending.compareAndSet(false, true)) { - scheduler.schedule(this); - } - } - - private void waitFor(long endTimeNanos) { - while (inProgress.get() > 0 && endTimeNanos - System.nanoTime() > 0) { - consumer.poll(Duration.ofMillis(1)); - } - } - } - - class CloseEvent implements Runnable { - - private final long closeEndTimeNanos; - private final CountDownLatch latch = new CountDownLatch(1); - - CloseEvent(Duration timeout) { - this.closeEndTimeNanos = System.nanoTime() + timeout.toNanos(); - } - - @Override - public void run() { - try { - if (consumer != null) { - int attempts = 3; - for (int i = 0; i < attempts; i++) { - try { - commitEvent.runIfRequired(); - commitEvent.waitFor(closeEndTimeNanos); - long timeoutNanos = closeEndTimeNanos - System.nanoTime(); - if (timeoutNanos < 0) timeoutNanos = 0; - consumer.close(Duration.ofNanos(timeoutNanos)); - break; - } catch (WakeupException e) { - if (i == attempts - 1) throw e; - } - } - } - latch.countDown(); - } catch (Exception e) { - actual.onError(e); - } - } - - boolean await(long timeoutNanos) throws InterruptedException { - return latch.await(timeoutNanos, TimeUnit.NANOSECONDS); - } - - boolean await() { - boolean closed = false; - long remainingNanos; - while (!closed && (remainingNanos = closeEndTimeNanos - System.nanoTime()) > 0) { - try { - closed = await(remainingNanos); - } catch (InterruptedException e) { - // ignore - } - } - return closed; - } - } -} - -class KafkaSchedulers { - - static void defaultUncaughtException(Thread t, Throwable e) {} - - static Scheduler newEvent(String groupId) { - return Schedulers.newSingle(new EventThreadFactory(groupId)); - } - - static boolean isCurrentThreadFromScheduler() { - return Thread.currentThread() instanceof EventThreadFactory.EmitterThread; - } - - static final class EventThreadFactory implements ThreadFactory { - - static final String PREFIX = "reactive-kafka-"; - static final AtomicLong COUNTER_REFERENCE = new AtomicLong(); - - private final String groupId; - - EventThreadFactory(String groupId) { - this.groupId = groupId; - } - - @Override - public Thread newThread(@NotNull Runnable runnable) { - String newThreadName = PREFIX + groupId + "-" + COUNTER_REFERENCE.incrementAndGet(); - Thread t = new EmitterThread(runnable, newThreadName); - t.setUncaughtExceptionHandler(KafkaSchedulers::defaultUncaughtException); - return t; - } - - static final class EmitterThread extends Thread { - - EmitterThread(Runnable target, String name) { - super(target, name); - } - } - } -} diff --git a/src/main/java/monitoring/Monitor.java b/src/main/java/monitoring/Monitor.java index cdac88a..d04f3ff 100644 --- a/src/main/java/monitoring/Monitor.java +++ b/src/main/java/monitoring/Monitor.java @@ -11,6 +11,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.json.JSONObject; +import reactor.kafka.receiver.ReceiverPartition; public class Monitor { @@ -125,7 +126,7 @@ public static void targetHealthcheckPassedSuccessfully() { write(log); } - public static void batchProcessStarted(int count) { + public static void batchProcessStarted(long count) { JSONObject log = new JSONObject() .put("level", "info") .put("message", "batch process started") @@ -298,7 +299,7 @@ public static void started() { write(log); } - public static void assignedToPartition(Collection partitions) { + public static void assignedToPartition(Collection partitions) { JSONObject log = new JSONObject() .put("level", "info") .put("message", "consumer was assigned to partitions") @@ -308,7 +309,7 @@ public static void assignedToPartition(Collection partitions) { assignedPartitions.inc(partitions.size()); } - public static void revokedFromPartition(Collection partitions) { + public static void revokedFromPartition(Collection partitions) { JSONObject log = new JSONObject() .put("level", "info") .put("message", "consumer was revoked from partitions") From c49ffccc57b8d30c4434ce11201d125eb7cb3a85 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 18:23:37 +0200 Subject: [PATCH 03/15] / --- src/main/java/Main.java | 26 ++++++++++++++++++-------- src/main/java/kafka/Consumer.java | 24 ++++++++++++++---------- tests/setupFilesAfterEnv.ts | 2 +- tests/testcontainers/dafkaConsumer.ts | 7 ++++++- 4 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/main/java/Main.java b/src/main/java/Main.java index 6368ebf..f60ea1b 100644 --- a/src/main/java/Main.java +++ b/src/main/java/Main.java @@ -1,5 +1,3 @@ -package src.main.java; - import configuration.Config; import configuration.TopicsRoutes; import java.util.concurrent.CountDownLatch; @@ -48,7 +46,8 @@ private static void waitForTargetHealthCheck() throws InterruptedException { } private static Disposable createConsumer(MonitoringServer monitoringServer) { - var receiverOptions = KafkaClientFactory.createReceiverOptions() + var receiverOptions = KafkaClientFactory + .createReceiverOptions() .subscription(topicsRoutes.getTopics()) .addAssignListener(partitions -> { if (partitions.isEmpty()) { @@ -59,12 +58,23 @@ private static Disposable createConsumer(MonitoringServer monitoringServer) { }) .addRevokeListener(Monitor::revokedFromPartition); - return (Disposable) new Consumer( - KafkaReceiver.create(receiverOptions), - new HttpTarget(topicsRoutes, new Producer(KafkaClientFactory.createProducer()))) + return new Consumer( + KafkaReceiver.create(receiverOptions), + new HttpTarget(topicsRoutes, new Producer(KafkaClientFactory.createProducer())) + ) .stream() - .doOnError(Monitor::consumerError); - + .doOnError(Monitor::consumerError) + .subscribe( + __ -> {}, + exception -> { + monitoringServer.consumerDisposed(); + Monitor.consumerError(exception); + }, + () -> { + monitoringServer.consumerDisposed(); + Monitor.consumerCompleted(); + } + ); } private static void onShutdown(Disposable consumer, MonitoringServer monitoringServer) { diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index e101ef9..4f4be98 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -12,17 +12,18 @@ import target.ITarget; public class Consumer { + private final KafkaReceiver kafkaReceiver; private final ITarget target; - public Consumer(KafkaReceiver kafkaReceiver, ITarget target) { this.kafkaReceiver = kafkaReceiver; this.target = target; } public Flux stream() { - return kafkaReceiver.receiveBatch() + return kafkaReceiver + .receiveBatch() .doOnNext(records -> records.count().doOnNext(Monitor::batchProcessStarted)) .concatMap(records -> { var batchStartTimestamp = new Date().getTime(); @@ -30,14 +31,17 @@ public Flux stream() { .groupBy(ConsumerRecord::key) .delayElements(Duration.ofMillis(Config.PROCESSING_DELAY)) .publishOn(Schedulers.parallel()) - .flatMap(partition -> partition.concatMap(record -> Mono.fromFuture(target.call(record)). - map(__ -> record ))) + .flatMap(partition -> + partition.concatMap(record -> Mono.fromFuture(target.call(record)).map(__ -> record)) + ) .collectList() - .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)); + .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)); }) - .flatMap(records -> Mono.fromRunnable(() -> { - var lastOffset = records.get(records.size() - 1).receiverOffset(); - lastOffset.commit().block(); - })); -} + .flatMap(records -> + Mono.fromRunnable(() -> { + var lastOffset = records.get(records.size() - 1).receiverOffset(); + lastOffset.commit().block(); + }) + ); + } } diff --git a/tests/setupFilesAfterEnv.ts b/tests/setupFilesAfterEnv.ts index b0e7ac6..2ef0dcf 100644 --- a/tests/setupFilesAfterEnv.ts +++ b/tests/setupFilesAfterEnv.ts @@ -1,3 +1,3 @@ import {jest} from '@jest/globals'; -jest.retryTimes(3, {logErrorsBeforeRetry: true}); +jest.retryTimes(1, {logErrorsBeforeRetry: true}); diff --git a/tests/testcontainers/dafkaConsumer.ts b/tests/testcontainers/dafkaConsumer.ts index e203c01..39cdcf4 100644 --- a/tests/testcontainers/dafkaConsumer.ts +++ b/tests/testcontainers/dafkaConsumer.ts @@ -21,7 +21,12 @@ export const dafkaConsumer = async ( .start(); if (process.env.DEBUG) { - await container.logs().then((logs) => logs.pipe(fs.createWriteStream('./tests/logs/service.log', {}))); + try { + fs.truncateSync('service.log', 0); + } catch (err) { + fs.writeFileSync('service.log', '', {flag: 'wx'}); + } + await container.logs().then((logs) => logs.pipe(fs.createWriteStream('service.log'))); } return { From 7bd90fb9abab35ba8f93e226c62a6f1c7cc6d865 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 18:30:40 +0200 Subject: [PATCH 04/15] . --- src/main/java/kafka/Consumer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index 4f4be98..fbfab0f 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -4,7 +4,6 @@ import java.time.Duration; import java.util.Date; import monitoring.Monitor; -import org.apache.kafka.clients.consumer.ConsumerRecord; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -28,7 +27,7 @@ public Flux stream() { .concatMap(records -> { var batchStartTimestamp = new Date().getTime(); return records - .groupBy(ConsumerRecord::key) + .groupBy(x -> x.key() == null ? x.partition() : x.key()) .delayElements(Duration.ofMillis(Config.PROCESSING_DELAY)) .publishOn(Schedulers.parallel()) .flatMap(partition -> From 9c56c33eac78fa650423a1046acba36e5a181cf7 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 19:43:04 +0200 Subject: [PATCH 05/15] . --- src/main/java/kafka/Consumer.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index fbfab0f..4196f30 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -36,11 +36,6 @@ public Flux stream() { .collectList() .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)); }) - .flatMap(records -> - Mono.fromRunnable(() -> { - var lastOffset = records.get(records.size() - 1).receiverOffset(); - lastOffset.commit().block(); - }) - ); + .flatMap(records -> records.get(records.size() - 1).receiverOffset().commit()); } } From 8806a11f48545b6233f64a6c2b05bc28f0fa807d Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Wed, 22 Nov 2023 19:46:46 +0200 Subject: [PATCH 06/15] . --- tests/setupFilesAfterEnv.ts | 2 +- ...uceAndCosume.spec.ts.snap => produceAndConsume.spec.ts.snap} | 0 ....ts.snap => produceAndConsumeWithRegexPatterns.spec.ts.snap} | 0 .../{produceAndCosume.spec.ts => produceAndConsume.spec.ts} | 0 ...terns.spec.ts => produceAndConsumeWithRegexPatterns.spec.ts} | 0 5 files changed, 1 insertion(+), 1 deletion(-) rename tests/specs/__snapshots__/{produceAndCosume.spec.ts.snap => produceAndConsume.spec.ts.snap} (100%) rename tests/specs/__snapshots__/{produceAndCosumeWithRegexPatterns.spec.ts.snap => produceAndConsumeWithRegexPatterns.spec.ts.snap} (100%) rename tests/specs/{produceAndCosume.spec.ts => produceAndConsume.spec.ts} (100%) rename tests/specs/{produceAndCosumeWithRegexPatterns.spec.ts => produceAndConsumeWithRegexPatterns.spec.ts} (100%) diff --git a/tests/setupFilesAfterEnv.ts b/tests/setupFilesAfterEnv.ts index 2ef0dcf..b0e7ac6 100644 --- a/tests/setupFilesAfterEnv.ts +++ b/tests/setupFilesAfterEnv.ts @@ -1,3 +1,3 @@ import {jest} from '@jest/globals'; -jest.retryTimes(1, {logErrorsBeforeRetry: true}); +jest.retryTimes(3, {logErrorsBeforeRetry: true}); diff --git a/tests/specs/__snapshots__/produceAndCosume.spec.ts.snap b/tests/specs/__snapshots__/produceAndConsume.spec.ts.snap similarity index 100% rename from tests/specs/__snapshots__/produceAndCosume.spec.ts.snap rename to tests/specs/__snapshots__/produceAndConsume.spec.ts.snap diff --git a/tests/specs/__snapshots__/produceAndCosumeWithRegexPatterns.spec.ts.snap b/tests/specs/__snapshots__/produceAndConsumeWithRegexPatterns.spec.ts.snap similarity index 100% rename from tests/specs/__snapshots__/produceAndCosumeWithRegexPatterns.spec.ts.snap rename to tests/specs/__snapshots__/produceAndConsumeWithRegexPatterns.spec.ts.snap diff --git a/tests/specs/produceAndCosume.spec.ts b/tests/specs/produceAndConsume.spec.ts similarity index 100% rename from tests/specs/produceAndCosume.spec.ts rename to tests/specs/produceAndConsume.spec.ts diff --git a/tests/specs/produceAndCosumeWithRegexPatterns.spec.ts b/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts similarity index 100% rename from tests/specs/produceAndCosumeWithRegexPatterns.spec.ts rename to tests/specs/produceAndConsumeWithRegexPatterns.spec.ts From b0fa46c65f71784cc953283edf722558855b5330 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 09:39:04 +0200 Subject: [PATCH 07/15] . --- package.json | 2 +- tests/specs/produceAndConsumeWithRegexPatterns.spec.ts | 2 +- .../produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 4746cff..07e26e0 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "homepage": "https://github.com/osskit/dafka-consumer#readme", "scripts": { "pretest": "bazel build //src:image.tar && docker load -i bazel-bin/src/image.tar", - "test": "NODE_OPTIONS=--experimental-vm-modules jest --config tests/jest.config.ts", + "test": "NODE_OPTIONS=--experimental-vm-modules jest --config tests/jest.config.ts --runInBand", "format": "prettier --write '**/*.{ts,js,json,java}'", "logs": "docker-compose -f tests/dafka.yml logs" }, diff --git a/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts b/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts index d4c28ff..b24abe5 100644 --- a/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts +++ b/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts @@ -5,7 +5,7 @@ import {getOffset} from '../services/getOffset.js'; import {produce} from '../services/produce.js'; import {topicRoutes} from '../services/topicRoutes.js'; -describe('tests', () => { +describe.skip('tests', () => { let orchestrator: Orchestrator; beforeEach(async () => { diff --git a/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts b/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts index b857157..c1d17f9 100644 --- a/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts +++ b/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts @@ -7,7 +7,7 @@ import {getOffset} from '../services/getOffset.js'; import {topicRoutes} from '../services/topicRoutes.js'; import {consume} from '../services/consume.js'; -describe('tests', () => { +describe.skip('tests', () => { let orchestrator: Orchestrator; beforeEach(async () => { From 66753411c00945d286941bf8d634c2be216f59fa Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 09:55:14 +0200 Subject: [PATCH 08/15] . --- .bazeliskrc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.bazeliskrc b/.bazeliskrc index 2d36d77..3717fd1 100644 --- a/.bazeliskrc +++ b/.bazeliskrc @@ -1 +1 @@ -USE_BAZEL_VERSION=6.1.2 +USE_BAZEL_VERSION=6.4.0 From b781c87fb67d2a0828d369f2f3367553182ecdb2 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 10:09:24 +0200 Subject: [PATCH 09/15] a --- tests/specs/produceAndConsumeWithRegexPatterns.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts b/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts index b24abe5..d4c28ff 100644 --- a/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts +++ b/tests/specs/produceAndConsumeWithRegexPatterns.spec.ts @@ -5,7 +5,7 @@ import {getOffset} from '../services/getOffset.js'; import {produce} from '../services/produce.js'; import {topicRoutes} from '../services/topicRoutes.js'; -describe.skip('tests', () => { +describe('tests', () => { let orchestrator: Orchestrator; beforeEach(async () => { From 2208aa4abc20e423b149b72837cb231cbf3be599 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 10:13:19 +0200 Subject: [PATCH 10/15] . --- src/main/java/Main.java | 3 ++- src/main/java/configuration/TopicsRoutes.java | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/Main.java b/src/main/java/Main.java index f60ea1b..fdd33ce 100644 --- a/src/main/java/Main.java +++ b/src/main/java/Main.java @@ -1,6 +1,7 @@ import configuration.Config; import configuration.TopicsRoutes; import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; import kafka.Consumer; import kafka.KafkaClientFactory; import kafka.Producer; @@ -48,7 +49,7 @@ private static void waitForTargetHealthCheck() throws InterruptedException { private static Disposable createConsumer(MonitoringServer monitoringServer) { var receiverOptions = KafkaClientFactory .createReceiverOptions() - .subscription(topicsRoutes.getTopics()) + .subscription(Pattern.compile(topicsRoutes.getTopicsPattern())) .addAssignListener(partitions -> { if (partitions.isEmpty()) { return; diff --git a/src/main/java/configuration/TopicsRoutes.java b/src/main/java/configuration/TopicsRoutes.java index d60f445..5a15a5d 100644 --- a/src/main/java/configuration/TopicsRoutes.java +++ b/src/main/java/configuration/TopicsRoutes.java @@ -3,6 +3,7 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class TopicsRoutes { @@ -30,4 +31,11 @@ public String getRoute(String topic) { public Set getTopics() { return this.topicsRoutes.keySet(); } + + public String getTopicsPattern() { + return this.topicsRoutes.keySet() + .stream() + .map(topic -> String.format("^%s$", topic)) + .collect(Collectors.joining("|")); + } } From 6d72331a4df00d4c40cfe1b5eca5391fb9530cde Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 11:06:31 +0200 Subject: [PATCH 11/15] . --- src/main/java/kafka/Consumer.java | 9 ++++----- tests/setupFilesAfterEnv.ts | 2 +- ...oduceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index 4196f30..be4b762 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -30,12 +30,11 @@ public Flux stream() { .groupBy(x -> x.key() == null ? x.partition() : x.key()) .delayElements(Duration.ofMillis(Config.PROCESSING_DELAY)) .publishOn(Schedulers.parallel()) - .flatMap(partition -> - partition.concatMap(record -> Mono.fromFuture(target.call(record)).map(__ -> record)) - ) + .flatMap(partition -> partition.concatMap(record -> Mono.fromFuture(target.call(record)))) .collectList() - .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)); + .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)) + .map(__ -> records); }) - .flatMap(records -> records.get(records.size() - 1).receiverOffset().commit()); + .flatMap(records -> records.last().flatMap(l -> l.receiverOffset().commit())); } } diff --git a/tests/setupFilesAfterEnv.ts b/tests/setupFilesAfterEnv.ts index b0e7ac6..2ef0dcf 100644 --- a/tests/setupFilesAfterEnv.ts +++ b/tests/setupFilesAfterEnv.ts @@ -1,3 +1,3 @@ import {jest} from '@jest/globals'; -jest.retryTimes(3, {logErrorsBeforeRetry: true}); +jest.retryTimes(1, {logErrorsBeforeRetry: true}); diff --git a/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts b/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts index c1d17f9..b857157 100644 --- a/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts +++ b/tests/specs/produceToDeadLetterTopicWhenValueIsNotValidJson.spec.ts @@ -7,7 +7,7 @@ import {getOffset} from '../services/getOffset.js'; import {topicRoutes} from '../services/topicRoutes.js'; import {consume} from '../services/consume.js'; -describe.skip('tests', () => { +describe('tests', () => { let orchestrator: Orchestrator; beforeEach(async () => { From b75da684c556bd0feae4dfabb906b2fc8e2eac67 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 11:06:43 +0200 Subject: [PATCH 12/15] , --- src/main/java/kafka/Consumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index be4b762..e18e77e 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -35,6 +35,6 @@ public Flux stream() { .doOnNext(__ -> Monitor.batchProcessCompleted(batchStartTimestamp)) .map(__ -> records); }) - .flatMap(records -> records.last().flatMap(l -> l.receiverOffset().commit())); + .flatMap(records -> records.last().flatMap(record -> record.receiverOffset().commit())); } } From 217572dbfc2b0593acd1289790736b2397239524 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 11:09:47 +0200 Subject: [PATCH 13/15] . --- .../__snapshots__/produceAndConsume.spec.ts.snap | 12 +++++++++++- tests/specs/produceAndConsume.spec.ts | 8 ++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/specs/__snapshots__/produceAndConsume.spec.ts.snap b/tests/specs/__snapshots__/produceAndConsume.spec.ts.snap index c19dde7..3b71513 100644 --- a/tests/specs/__snapshots__/produceAndConsume.spec.ts.snap +++ b/tests/specs/__snapshots__/produceAndConsume.spec.ts.snap @@ -4,7 +4,17 @@ exports[`tests should produce and consume 1`] = ` [ { "body": { - "data": "foo", + "data": "foo1", + }, + }, + { + "body": { + "data": "foo2", + }, + }, + { + "body": { + "data": "foo3", }, }, ] diff --git a/tests/specs/produceAndConsume.spec.ts b/tests/specs/produceAndConsume.spec.ts index c6f92e5..5e6c1d0 100644 --- a/tests/specs/produceAndConsume.spec.ts +++ b/tests/specs/produceAndConsume.spec.ts @@ -33,10 +33,14 @@ describe('tests', () => { await produce(orchestrator, { topic: 'foo', - messages: [{value: JSON.stringify({data: 'foo'})}], + messages: [ + {value: JSON.stringify({data: 'foo1'})}, + {value: JSON.stringify({data: 'foo2'})}, + {value: JSON.stringify({data: 'foo3'})}, + ], }); await expect(getCalls(orchestrator.wiremockClient, target)).resolves.toMatchSnapshot(); - await expect(getOffset(orchestrator.kafkaClient, 'foo')).resolves.toBe(1); + await expect(getOffset(orchestrator.kafkaClient, 'foo')).resolves.toBe(3); }); }); From 487bd9bd9384204ec22c469d6b3d8a387694e129 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Thu, 23 Nov 2023 11:46:58 +0200 Subject: [PATCH 14/15] . --- src/main/java/kafka/Consumer.java | 2 +- src/main/java/monitoring/Monitor.java | 2 +- tests/setupFilesAfterEnv.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/kafka/Consumer.java b/src/main/java/kafka/Consumer.java index e18e77e..a615aa6 100644 --- a/src/main/java/kafka/Consumer.java +++ b/src/main/java/kafka/Consumer.java @@ -23,7 +23,7 @@ public Consumer(KafkaReceiver kafkaReceiver, ITarget target) { public Flux stream() { return kafkaReceiver .receiveBatch() - .doOnNext(records -> records.count().doOnNext(Monitor::batchProcessStarted)) + .flatMap(records -> records.count().doOnNext(Monitor::batchProcessStarted).thenReturn(records)) .concatMap(records -> { var batchStartTimestamp = new Date().getTime(); return records diff --git a/src/main/java/monitoring/Monitor.java b/src/main/java/monitoring/Monitor.java index d04f3ff..2f911c0 100644 --- a/src/main/java/monitoring/Monitor.java +++ b/src/main/java/monitoring/Monitor.java @@ -126,7 +126,7 @@ public static void targetHealthcheckPassedSuccessfully() { write(log); } - public static void batchProcessStarted(long count) { + public static void batchProcessStarted(Long count) { JSONObject log = new JSONObject() .put("level", "info") .put("message", "batch process started") diff --git a/tests/setupFilesAfterEnv.ts b/tests/setupFilesAfterEnv.ts index 2ef0dcf..b0e7ac6 100644 --- a/tests/setupFilesAfterEnv.ts +++ b/tests/setupFilesAfterEnv.ts @@ -1,3 +1,3 @@ import {jest} from '@jest/globals'; -jest.retryTimes(1, {logErrorsBeforeRetry: true}); +jest.retryTimes(3, {logErrorsBeforeRetry: true}); From d998f389d4e3755c83d5cf765677d1b9a96df2d2 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Sat, 25 Nov 2023 14:15:01 +0200 Subject: [PATCH 15/15] . --- src/main/java/Main.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/Main.java b/src/main/java/Main.java index fdd33ce..6bc92c4 100644 --- a/src/main/java/Main.java +++ b/src/main/java/Main.java @@ -51,10 +51,10 @@ private static Disposable createConsumer(MonitoringServer monitoringServer) { .createReceiverOptions() .subscription(Pattern.compile(topicsRoutes.getTopicsPattern())) .addAssignListener(partitions -> { + Monitor.assignedToPartition(partitions); if (partitions.isEmpty()) { return; } - Monitor.assignedToPartition(partitions); monitoringServer.consumerAssigned(); }) .addRevokeListener(Monitor::revokedFromPartition);