From 82f5f8e10d4359f730ed1b90f5c2e6361fad4933 Mon Sep 17 00:00:00 2001 From: "markus.eisl" Date: Mon, 15 Apr 2024 10:31:02 +0200 Subject: [PATCH 1/4] Add kafkaQueueProblems featureflag Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg) --- CHANGELOG.md | 2 ++ docker-compose.yml | 2 ++ src/checkoutservice/main.go | 35 ++++++++++++++----- src/flagd/demo.flagd.json | 9 +++++ src/frauddetectionservice/build.gradle.kts | 8 +++++ .../main/kotlin/frauddetectionservice/main.kt | 31 ++++++++++++++++ 6 files changed, 79 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aa24b8468..f261059b69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ the release. ([#1519](https://github.com/open-telemetry/opentelemetry-demo/pull/1519)) * [flagd] export flagd traces to otel collector ([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522)) +* [kafka] add kafkaQueueProblems feature flag + ([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528)) ## 1.9.0 diff --git a/docker-compose.yml b/docker-compose.yml index 220ce155d4..3fe9f9e043 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -250,6 +250,8 @@ services: memory: 200M restart: unless-stopped environment: + - FLAGD_HOST + - FLAGD_PORT - KAFKA_SERVICE_ADDR - OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP} - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index e67d3d8496..7be3789bc7 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -316,6 +316,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { + log.Infof("sending to postProcessor") cs.sendToPostProcessor(ctx, orderResult) } @@ -439,7 +440,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) { paymentService := cs.paymentSvcClient - if cs.checkPaymentFailure(ctx) { + if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") { badAddress := "badAddress:50051" c := mustCreateClient(context.Background(), badAddress) paymentService = pb.NewPaymentServiceClient(c) @@ -505,6 +506,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) + + if cs.isFeatureFlagEnabled(ctx, "kafakQueueProblems") { + log.Infof("Warning: FeatureFlag 'kafakQueueProblems' is activated, overloading queue now.") + messageCount := 100 + for i := 0; i < messageCount; i++ { + go func(i int) { + cs.KafkaProducerClient.Input() <- &msg + _ = <-cs.KafkaProducerClient.Successes() + }(i) + } + log.Infof("Done with #%d messages for overload simulation.", messageCount) + } } func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { @@ -533,11 +546,17 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. return span } -func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool { - openfeature.AddHooks(otelhooks.NewTracesHook()) - client := openfeature.NewClient("checkout") - failureEnabled, _ := client.BooleanValue( - ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{}, - ) - return failureEnabled +func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool { + openfeature.AddHooks(otelhooks.NewTracesHook()) + client := openfeature.NewClient("checkout") + + // Default value is set to false, but you could also make this a parameter. + featureEnabled, _ := client.BooleanValue( + ctx, + featureFlagName, + false, + openfeature.EvaluationContext{}, + ) + + return featureEnabled } diff --git a/src/flagd/demo.flagd.json b/src/flagd/demo.flagd.json index d4c38a390c..1b3c9cbe23 100644 --- a/src/flagd/demo.flagd.json +++ b/src/flagd/demo.flagd.json @@ -61,6 +61,15 @@ ] } }, + "kafakQueueProblems": { + "description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike", + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "off" + }, "cartServiceFailure": { "description": "Fail cart service", "state": "ENABLED", diff --git a/src/frauddetectionservice/build.gradle.kts b/src/frauddetectionservice/build.gradle.kts index 10cde2b7fb..1819c7b934 100644 --- a/src/frauddetectionservice/build.gradle.kts +++ b/src/frauddetectionservice/build.gradle.kts @@ -42,6 +42,8 @@ dependencies { implementation("org.apache.logging.log4j:log4j-core:2.21.1") implementation("org.slf4j:slf4j-api:2.0.9") implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}") + implementation("dev.openfeature:sdk:1.7.4") + implementation("dev.openfeature.contrib.providers:flagd:0.7.0") if (JavaVersion.current().isJava9Compatible) { // Workaround for @javax.annotation.Generated @@ -50,6 +52,12 @@ dependencies { } } +tasks { + shadowJar { + mergeServiceFiles() + } +} + tasks.test { useJUnitPlatform() } diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index 009a849764..bb7061f7bb 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -15,6 +15,12 @@ import oteldemo.Demo.* import java.time.Duration.ofMillis import java.util.* import kotlin.system.exitProcess +import dev.openfeature.contrib.providers.flagd.FlagdOptions +import dev.openfeature.contrib.providers.flagd.FlagdProvider +import dev.openfeature.sdk.Client +import dev.openfeature.sdk.EvaluationContext +import dev.openfeature.sdk.MutableContext +import dev.openfeature.sdk.OpenFeatureAPI const val topic = "orders" const val groupID = "frauddetectionservice" @@ -22,6 +28,12 @@ const val groupID = "frauddetectionservice" private val logger: Logger = LogManager.getLogger(groupID) fun main() { + val options = FlagdOptions.builder() + .withGlobalTelemetry(true) + .build() + val flagdProvider = FlagdProvider(options) + OpenFeatureAPI.getInstance().setProvider(flagdProvider) + val props = Properties() props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name @@ -46,8 +58,27 @@ fun main() { val newCount = accumulator + 1 val orders = OrderResult.parseFrom(record.value()) logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") + if (getFeatureFlagEnabled("kafakQueueProblems")) { + logger.info("FeatureFlag 'kafakQueueProblems' is enabled, sleeping 1 second") + Thread.sleep(1000) + } newCount } } } } + +/** +* Retrieves the status of a feature flag from the Feature Flag service. +* +* @param ff The name of the feature flag to retrieve. +* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors. +*/ +fun getFeatureFlagEnabled(ff: String): Boolean { + val client = OpenFeatureAPI.getInstance().client + // TODO: Plumb the actual session ID from the frontend via baggage? + val uuid = UUID.randomUUID() + client.evaluationContext = MutableContext().add("session", uuid.toString()) + val booleanValue = client.getBooleanValue(ff, false) + return booleanValue +} From cf5c1dc8ced305b2a9215ace71cc113a9b79bf44 Mon Sep 17 00:00:00 2001 From: "markus.eisl" Date: Mon, 15 Apr 2024 15:12:17 +0200 Subject: [PATCH 2/4] changed feature flag to int value for more configurability also adjusted the resource limit for the frauddetection service since it kept dying --- docker-compose.yml | 2 +- src/checkoutservice/main.go | 25 +++++++++++++++---- src/flagd/demo.flagd.json | 6 ++--- .../main/kotlin/frauddetectionservice/main.kt | 10 ++++---- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 3fe9f9e043..30309794bf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -247,7 +247,7 @@ services: deploy: resources: limits: - memory: 200M + memory: 300M restart: unless-stopped environment: - FLAGD_HOST diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index 7be3789bc7..c6633e1395 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -507,16 +507,16 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) - if cs.isFeatureFlagEnabled(ctx, "kafakQueueProblems") { - log.Infof("Warning: FeatureFlag 'kafakQueueProblems' is activated, overloading queue now.") - messageCount := 100 - for i := 0; i < messageCount; i++ { + ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems") + if ffValue > 0 { + log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.") + for i := 0; i < ffValue; i++ { go func(i int) { cs.KafkaProducerClient.Input() <- &msg _ = <-cs.KafkaProducerClient.Successes() }(i) } - log.Infof("Done with #%d messages for overload simulation.", messageCount) + log.Infof("Done with #%d messages for overload simulation.", ffValue) } } @@ -560,3 +560,18 @@ func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlag return featureEnabled } + +func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int { + openfeature.AddHooks(otelhooks.NewTracesHook()) + client := openfeature.NewClient("checkout") + + // Default value is set to 0, but you could also make this a parameter. + featureFlagValue, _ := client.IntValue( + ctx, + featureFlagName, + 0, + openfeature.EvaluationContext{}, + ) + + return int(featureFlagValue) +} diff --git a/src/flagd/demo.flagd.json b/src/flagd/demo.flagd.json index 1b3c9cbe23..ed68712fff 100644 --- a/src/flagd/demo.flagd.json +++ b/src/flagd/demo.flagd.json @@ -61,12 +61,12 @@ ] } }, - "kafakQueueProblems": { + "kafkaQueueProblems": { "description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike", "state": "ENABLED", "variants": { - "on": true, - "off": false + "on": 100, + "off": 0 }, "defaultVariant": "off" }, diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index bb7061f7bb..7af0da043b 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -58,8 +58,8 @@ fun main() { val newCount = accumulator + 1 val orders = OrderResult.parseFrom(record.value()) logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") - if (getFeatureFlagEnabled("kafakQueueProblems")) { - logger.info("FeatureFlag 'kafakQueueProblems' is enabled, sleeping 1 second") + if (getFeatureFlagValue("kafkaQueueProblems") > 0) { + logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second") Thread.sleep(1000) } newCount @@ -74,11 +74,11 @@ fun main() { * @param ff The name of the feature flag to retrieve. * @return `true` if the feature flag is enabled, `false` otherwise or in case of errors. */ -fun getFeatureFlagEnabled(ff: String): Boolean { +fun getFeatureFlagValue(ff: String): Int { val client = OpenFeatureAPI.getInstance().client // TODO: Plumb the actual session ID from the frontend via baggage? val uuid = UUID.randomUUID() client.evaluationContext = MutableContext().add("session", uuid.toString()) - val booleanValue = client.getBooleanValue(ff, false) - return booleanValue + val intValue = client.getIntegerValue(ff, 0) + return intValue } From db7bffba3f5afacdf3a11ba659d2c581ebe49a74 Mon Sep 17 00:00:00 2001 From: "markus.eisl" Date: Wed, 17 Apr 2024 11:19:34 +0200 Subject: [PATCH 3/4] addressed PR comments --- src/checkoutservice/main.go | 5 ++--- .../src/main/kotlin/frauddetectionservice/main.kt | 8 ++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index c6633e1395..b2acb998c6 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -160,6 +160,7 @@ func main() { } openfeature.SetProvider(flagd.NewProvider()) + openfeature.AddHooks(otelhooks.NewTracesHook()) tracer = tp.Tracer("checkoutservice") @@ -547,9 +548,8 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. } func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool { - openfeature.AddHooks(otelhooks.NewTracesHook()) client := openfeature.NewClient("checkout") - + // Default value is set to false, but you could also make this a parameter. featureEnabled, _ := client.BooleanValue( ctx, @@ -562,7 +562,6 @@ func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlag } func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int { - openfeature.AddHooks(otelhooks.NewTracesHook()) client := openfeature.NewClient("checkout") // Default value is set to 0, but you could also make this a parameter. diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index 7af0da043b..d770218962 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -19,7 +19,8 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions import dev.openfeature.contrib.providers.flagd.FlagdProvider import dev.openfeature.sdk.Client import dev.openfeature.sdk.EvaluationContext -import dev.openfeature.sdk.MutableContext +import dev.openfeature.sdk.ImmutableContext +import dev.openfeature.sdk.Value import dev.openfeature.sdk.OpenFeatureAPI const val topic = "orders" @@ -78,7 +79,10 @@ fun getFeatureFlagValue(ff: String): Int { val client = OpenFeatureAPI.getInstance().client // TODO: Plumb the actual session ID from the frontend via baggage? val uuid = UUID.randomUUID() - client.evaluationContext = MutableContext().add("session", uuid.toString()) + + val clientAttrs = mutableMapOf() + clientAttrs["session"] = Value(uuid.toString()) + client.evaluationContext = ImmutableContext(clientAttrs) val intValue = client.getIntegerValue(ff, 0) return intValue } From 960ed2683db8af3ac30fb6b1ea70f120bc969e63 Mon Sep 17 00:00:00 2001 From: "markus.eisl" Date: Mon, 22 Apr 2024 15:55:13 +0200 Subject: [PATCH 4/4] addressed PR comment --- .../src/main/kotlin/frauddetectionservice/main.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index d770218962..8f8223f518 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -57,12 +57,12 @@ fun main() { .poll(ofMillis(100)) .fold(totalCount) { accumulator, record -> val newCount = accumulator + 1 - val orders = OrderResult.parseFrom(record.value()) - logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") if (getFeatureFlagValue("kafkaQueueProblems") > 0) { logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second") Thread.sleep(1000) } + val orders = OrderResult.parseFrom(record.value()) + logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") newCount } }