Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka][checkoutservice][frauddetectionservice] add kafkaQueueProblems featureflag #1528

Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ the release.
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [frontend] Pass down image optimization requests to imageprovider
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [kafka] add kafkaQueueProblems feature flag
([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528))
* [otelcollector] Add `redisreceiver`
([#1537](https://github.com/open-telemetry/opentelemetry-demo/pull/1537))

Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ services:
deploy:
resources:
limits:
memory: 200M
memory: 300M
restart: unless-stopped
environment:
- FLAGD_HOST
- FLAGD_PORT
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP}
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
Expand Down
49 changes: 41 additions & 8 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func main() {
}

openfeature.SetProvider(flagd.NewProvider())
openfeature.AddHooks(otelhooks.NewTracesHook())

tracer = tp.Tracer("checkoutservice")

Expand Down Expand Up @@ -316,6 +317,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq

// send to kafka only if kafka broker address is set
if cs.kafkaBrokerSvcAddr != "" {
log.Infof("sending to postProcessor")
cs.sendToPostProcessor(ctx, orderResult)
}

Expand Down Expand Up @@ -439,7 +441,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
if cs.checkPaymentFailure(ctx) {
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
Expand Down Expand Up @@ -505,6 +507,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)

ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
}

func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
Expand Down Expand Up @@ -533,11 +547,30 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
return span
}

func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool {
openfeature.AddHooks(otelhooks.NewTracesHook())
client := openfeature.NewClient("checkout")
failureEnabled, _ := client.BooleanValue(
ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{},
)
return failureEnabled
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}

func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
client := openfeature.NewClient("checkout")

// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)

return int(featureFlagValue)
}
9 changes: 9 additions & 0 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
]
}
},
"kafkaQueueProblems": {
"description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike",
"state": "ENABLED",
"variants": {
"on": 100,
"off": 0
},
"defaultVariant": "off"
},
"cartServiceFailure": {
"description": "Fail cart service",
"state": "ENABLED",
Expand Down
8 changes: 8 additions & 0 deletions src/frauddetectionservice/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-core:2.21.1")
implementation("org.slf4j:slf4j-api:2.0.9")
implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}")
implementation("dev.openfeature:sdk:1.7.4")
implementation("dev.openfeature.contrib.providers:flagd:0.7.0")

if (JavaVersion.current().isJava9Compatible) {
// Workaround for @javax.annotation.Generated
Expand All @@ -50,6 +52,12 @@ dependencies {
}
}

tasks {
shadowJar {
mergeServiceFiles()
}
}

tasks.test {
useJUnitPlatform()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ import oteldemo.Demo.*
import java.time.Duration.ofMillis
import java.util.*
import kotlin.system.exitProcess
import dev.openfeature.contrib.providers.flagd.FlagdOptions
import dev.openfeature.contrib.providers.flagd.FlagdProvider
import dev.openfeature.sdk.Client
import dev.openfeature.sdk.EvaluationContext
import dev.openfeature.sdk.ImmutableContext
import dev.openfeature.sdk.Value
import dev.openfeature.sdk.OpenFeatureAPI

const val topic = "orders"
const val groupID = "frauddetectionservice"

private val logger: Logger = LogManager.getLogger(groupID)

fun main() {
val options = FlagdOptions.builder()
.withGlobalTelemetry(true)
.build()
val flagdProvider = FlagdProvider(options)
OpenFeatureAPI.getInstance().setProvider(flagdProvider)

val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
Expand All @@ -44,10 +57,32 @@ fun main() {
.poll(ofMillis(100))
.fold(totalCount) { accumulator, record ->
val newCount = accumulator + 1
if (getFeatureFlagValue("kafkaQueueProblems") > 0) {
EislM0203 marked this conversation as resolved.
Show resolved Hide resolved
logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second")
Thread.sleep(1000)
}
val orders = OrderResult.parseFrom(record.value())
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
newCount
}
}
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors.
*/
fun getFeatureFlagValue(ff: String): Int {
val client = OpenFeatureAPI.getInstance().client
// TODO: Plumb the actual session ID from the frontend via baggage?
EislM0203 marked this conversation as resolved.
Show resolved Hide resolved
val uuid = UUID.randomUUID()

val clientAttrs = mutableMapOf<String, Value>()
clientAttrs["session"] = Value(uuid.toString())
client.evaluationContext = ImmutableContext(clientAttrs)
val intValue = client.getIntegerValue(ff, 0)
return intValue
}
Loading