From e0500b2578d2e503480055e7fd72ff7f313ad5c9 Mon Sep 17 00:00:00 2001 From: Markus Eisl <114515319+EislM0203@users.noreply.github.com> Date: Tue, 30 Apr 2024 02:03:45 +0200 Subject: [PATCH] [kafka][checkoutservice][frauddetectionservice] add kafkaQueueProblems featureflag (#1528) * Add kafkaQueueProblems featureflag Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg) * changed feature flag to int value for more configurability also adjusted the resource limit for the frauddetection service since it kept dying * addressed PR comments * addressed PR comment --------- Co-authored-by: Austin Parker --- CHANGELOG.md | 2 + docker-compose.yml | 4 +- src/checkoutservice/main.go | 49 ++++++++++++++++--- src/flagd/demo.flagd.json | 9 ++++ src/frauddetectionservice/build.gradle.kts | 8 +++ .../main/kotlin/frauddetectionservice/main.kt | 35 +++++++++++++ 6 files changed, 98 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3fabbbe25..e14db39b4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ the release. ([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522)) * [frontend] Pass down image optimization requests to imageprovider ([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522)) +* [kafka] add kafkaQueueProblems feature flag + ([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528)) * [otelcollector] Add `redisreceiver` ([#1537](https://github.com/open-telemetry/opentelemetry-demo/pull/1537)) diff --git a/docker-compose.yml b/docker-compose.yml index 220ce155d4..30309794bf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -247,9 +247,11 @@ services: deploy: resources: limits: - memory: 200M + memory: 300M restart: unless-stopped environment: + - FLAGD_HOST + - FLAGD_PORT - KAFKA_SERVICE_ADDR - OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP} - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index e67d3d8496..b2acb998c6 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -160,6 +160,7 @@ func main() { } openfeature.SetProvider(flagd.NewProvider()) + openfeature.AddHooks(otelhooks.NewTracesHook()) tracer = tp.Tracer("checkoutservice") @@ -316,6 +317,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { + log.Infof("sending to postProcessor") cs.sendToPostProcessor(ctx, orderResult) } @@ -439,7 +441,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) { paymentService := cs.paymentSvcClient - if cs.checkPaymentFailure(ctx) { + if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") { badAddress := "badAddress:50051" c := mustCreateClient(context.Background(), badAddress) paymentService = pb.NewPaymentServiceClient(c) @@ -505,6 +507,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) + + ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems") + if ffValue > 0 { + log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.") + for i := 0; i < ffValue; i++ { + go func(i int) { + cs.KafkaProducerClient.Input() <- &msg + _ = <-cs.KafkaProducerClient.Successes() + }(i) + } + log.Infof("Done with #%d messages for overload simulation.", ffValue) + } } func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { @@ -533,11 +547,30 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. return span } -func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool { - openfeature.AddHooks(otelhooks.NewTracesHook()) - client := openfeature.NewClient("checkout") - failureEnabled, _ := client.BooleanValue( - ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{}, - ) - return failureEnabled +func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool { + client := openfeature.NewClient("checkout") + + // Default value is set to false, but you could also make this a parameter. + featureEnabled, _ := client.BooleanValue( + ctx, + featureFlagName, + false, + openfeature.EvaluationContext{}, + ) + + return featureEnabled +} + +func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int { + client := openfeature.NewClient("checkout") + + // Default value is set to 0, but you could also make this a parameter. + featureFlagValue, _ := client.IntValue( + ctx, + featureFlagName, + 0, + openfeature.EvaluationContext{}, + ) + + return int(featureFlagValue) } diff --git a/src/flagd/demo.flagd.json b/src/flagd/demo.flagd.json index d4c38a390c..ed68712fff 100644 --- a/src/flagd/demo.flagd.json +++ b/src/flagd/demo.flagd.json @@ -61,6 +61,15 @@ ] } }, + "kafkaQueueProblems": { + "description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike", + "state": "ENABLED", + "variants": { + "on": 100, + "off": 0 + }, + "defaultVariant": "off" + }, "cartServiceFailure": { "description": "Fail cart service", "state": "ENABLED", diff --git a/src/frauddetectionservice/build.gradle.kts b/src/frauddetectionservice/build.gradle.kts index 10cde2b7fb..1819c7b934 100644 --- a/src/frauddetectionservice/build.gradle.kts +++ b/src/frauddetectionservice/build.gradle.kts @@ -42,6 +42,8 @@ dependencies { implementation("org.apache.logging.log4j:log4j-core:2.21.1") implementation("org.slf4j:slf4j-api:2.0.9") implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}") + implementation("dev.openfeature:sdk:1.7.4") + implementation("dev.openfeature.contrib.providers:flagd:0.7.0") if (JavaVersion.current().isJava9Compatible) { // Workaround for @javax.annotation.Generated @@ -50,6 +52,12 @@ dependencies { } } +tasks { + shadowJar { + mergeServiceFiles() + } +} + tasks.test { useJUnitPlatform() } diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index 009a849764..8f8223f518 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -15,6 +15,13 @@ import oteldemo.Demo.* import java.time.Duration.ofMillis import java.util.* import kotlin.system.exitProcess +import dev.openfeature.contrib.providers.flagd.FlagdOptions +import dev.openfeature.contrib.providers.flagd.FlagdProvider +import dev.openfeature.sdk.Client +import dev.openfeature.sdk.EvaluationContext +import dev.openfeature.sdk.ImmutableContext +import dev.openfeature.sdk.Value +import dev.openfeature.sdk.OpenFeatureAPI const val topic = "orders" const val groupID = "frauddetectionservice" @@ -22,6 +29,12 @@ const val groupID = "frauddetectionservice" private val logger: Logger = LogManager.getLogger(groupID) fun main() { + val options = FlagdOptions.builder() + .withGlobalTelemetry(true) + .build() + val flagdProvider = FlagdProvider(options) + OpenFeatureAPI.getInstance().setProvider(flagdProvider) + val props = Properties() props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name @@ -44,6 +57,10 @@ fun main() { .poll(ofMillis(100)) .fold(totalCount) { accumulator, record -> val newCount = accumulator + 1 + if (getFeatureFlagValue("kafkaQueueProblems") > 0) { + logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second") + Thread.sleep(1000) + } val orders = OrderResult.parseFrom(record.value()) logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") newCount @@ -51,3 +68,21 @@ fun main() { } } } + +/** +* Retrieves the status of a feature flag from the Feature Flag service. +* +* @param ff The name of the feature flag to retrieve. +* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors. +*/ +fun getFeatureFlagValue(ff: String): Int { + val client = OpenFeatureAPI.getInstance().client + // TODO: Plumb the actual session ID from the frontend via baggage? + val uuid = UUID.randomUUID() + + val clientAttrs = mutableMapOf() + clientAttrs["session"] = Value(uuid.toString()) + client.evaluationContext = ImmutableContext(clientAttrs) + val intValue = client.getIntegerValue(ff, 0) + return intValue +}