Skip to content

Commit

Permalink
[kafka][checkoutservice][frauddetectionservice] add kafkaQueueProblem…
Browse files Browse the repository at this point in the history
…s featureflag (#1528)

* Add kafkaQueueProblems featureflag

Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike

The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg)

* changed feature flag to int value for more configurability

also adjusted the resource limit for the frauddetection service since it kept dying

* addressed PR comments

* addressed PR comment

---------

Co-authored-by: Austin Parker <austin@ap2.io>
  • Loading branch information
EislM0203 and austinlparker authored Apr 30, 2024
1 parent 05982b2 commit e0500b2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ the release.
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [frontend] Pass down image optimization requests to imageprovider
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [kafka] add kafkaQueueProblems feature flag
([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528))
* [otelcollector] Add `redisreceiver`
([#1537](https://github.com/open-telemetry/opentelemetry-demo/pull/1537))

Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ services:
deploy:
resources:
limits:
memory: 200M
memory: 300M
restart: unless-stopped
environment:
- FLAGD_HOST
- FLAGD_PORT
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP}
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
Expand Down
49 changes: 41 additions & 8 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func main() {
}

openfeature.SetProvider(flagd.NewProvider())
openfeature.AddHooks(otelhooks.NewTracesHook())

tracer = tp.Tracer("checkoutservice")

Expand Down Expand Up @@ -316,6 +317,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq

// send to kafka only if kafka broker address is set
if cs.kafkaBrokerSvcAddr != "" {
log.Infof("sending to postProcessor")
cs.sendToPostProcessor(ctx, orderResult)
}

Expand Down Expand Up @@ -439,7 +441,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
if cs.checkPaymentFailure(ctx) {
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
Expand Down Expand Up @@ -505,6 +507,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)

ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
}

func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
Expand Down Expand Up @@ -533,11 +547,30 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
return span
}

func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool {
openfeature.AddHooks(otelhooks.NewTracesHook())
client := openfeature.NewClient("checkout")
failureEnabled, _ := client.BooleanValue(
ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{},
)
return failureEnabled
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}

func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
client := openfeature.NewClient("checkout")

// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)

return int(featureFlagValue)
}
9 changes: 9 additions & 0 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
]
}
},
"kafkaQueueProblems": {
"description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike",
"state": "ENABLED",
"variants": {
"on": 100,
"off": 0
},
"defaultVariant": "off"
},
"cartServiceFailure": {
"description": "Fail cart service",
"state": "ENABLED",
Expand Down
8 changes: 8 additions & 0 deletions src/frauddetectionservice/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-core:2.21.1")
implementation("org.slf4j:slf4j-api:2.0.9")
implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}")
implementation("dev.openfeature:sdk:1.7.4")
implementation("dev.openfeature.contrib.providers:flagd:0.7.0")

if (JavaVersion.current().isJava9Compatible) {
// Workaround for @javax.annotation.Generated
Expand All @@ -50,6 +52,12 @@ dependencies {
}
}

tasks {
shadowJar {
mergeServiceFiles()
}
}

tasks.test {
useJUnitPlatform()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ import oteldemo.Demo.*
import java.time.Duration.ofMillis
import java.util.*
import kotlin.system.exitProcess
import dev.openfeature.contrib.providers.flagd.FlagdOptions
import dev.openfeature.contrib.providers.flagd.FlagdProvider
import dev.openfeature.sdk.Client
import dev.openfeature.sdk.EvaluationContext
import dev.openfeature.sdk.ImmutableContext
import dev.openfeature.sdk.Value
import dev.openfeature.sdk.OpenFeatureAPI

const val topic = "orders"
const val groupID = "frauddetectionservice"

private val logger: Logger = LogManager.getLogger(groupID)

fun main() {
val options = FlagdOptions.builder()
.withGlobalTelemetry(true)
.build()
val flagdProvider = FlagdProvider(options)
OpenFeatureAPI.getInstance().setProvider(flagdProvider)

val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
Expand All @@ -44,10 +57,32 @@ fun main() {
.poll(ofMillis(100))
.fold(totalCount) { accumulator, record ->
val newCount = accumulator + 1
if (getFeatureFlagValue("kafkaQueueProblems") > 0) {
logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second")
Thread.sleep(1000)
}
val orders = OrderResult.parseFrom(record.value())
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
newCount
}
}
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors.
*/
fun getFeatureFlagValue(ff: String): Int {
val client = OpenFeatureAPI.getInstance().client
// TODO: Plumb the actual session ID from the frontend via baggage?
val uuid = UUID.randomUUID()

val clientAttrs = mutableMapOf<String, Value>()
clientAttrs["session"] = Value(uuid.toString())
client.evaluationContext = ImmutableContext(clientAttrs)
val intValue = client.getIntegerValue(ff, 0)
return intValue
}

0 comments on commit e0500b2

Please sign in to comment.