Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: local evaluation cohorts support #28

Merged
merged 15 commits into from
Aug 8, 2024
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ java {
dependencies {
implementation(kotlin("stdlib"))
testImplementation(kotlin("test"))
testImplementation("org.mockito:mockito-core:${Versions.mockito}")
testImplementation("com.squareup.okhttp3:mockwebserver:${Versions.mockwebserver}")
testImplementation("io.mockk:mockk:${Versions.mockk}")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:${Versions.serializationRuntime}")
implementation("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
Expand Down
2 changes: 2 additions & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ object Versions {
const val evaluationCore = "2.0.0-beta.2"
const val amplitudeAnalytics = "1.12.0"
const val mockk = "1.13.9"
const val mockito = "4.8.1"
const val mockwebserver = "4.12.0"
}
6 changes: 6 additions & 0 deletions src/main/kotlin/ExperimentException.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.amplitude.experiment

class ExperimentException(
override val message: String? = null,
override val cause: Throwable? = null
) : Exception()
23 changes: 23 additions & 0 deletions src/main/kotlin/ExperimentUser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ data class ExperimentUser internal constructor(
@JvmField val cohortIds: Set<String>? = null,
@JvmField val groups: Map<String, Set<String>>? = null,
@JvmField val groupProperties: Map<String, Map<String, Map<String, Any?>>>? = null,
@JvmField val groupCohortIds: Map<String, Map<String, Set<String>>>? = null,
) {

/**
Expand All @@ -65,6 +66,9 @@ data class ExperimentUser internal constructor(
.library(this.library)
.userProperties(this.userProperties)
.cohortIds(this.cohortIds)
.groups(this.groups)
.groupProperties(this.groupProperties)
.groupCohortIds(this.groupCohortIds)
}

companion object {
Expand Down Expand Up @@ -94,6 +98,7 @@ data class ExperimentUser internal constructor(
private var cohortIds: Set<String>? = null
private var groups: MutableMap<String, Set<String>>? = null
private var groupProperties: MutableMap<String, MutableMap<String, MutableMap<String, Any?>>>? = null
private var groupCohortIds: MutableMap<String, MutableMap<String, Set<String>>>? = null

fun userId(userId: String?) = apply { this.userId = userId }
fun deviceId(deviceId: String?) = apply { this.deviceId = deviceId }
Expand Down Expand Up @@ -123,26 +128,43 @@ data class ExperimentUser internal constructor(
fun cohortIds(cohortIds: Set<String>?) = apply {
this.cohortIds = cohortIds
}

fun groups(groups: Map<String, Set<String>>?) = apply {
this.groups = groups?.toMutableMap()
}

fun group(groupType: String, groupName: String) = apply {
this.groups = (this.groups ?: mutableMapOf()).apply { put(groupType, setOf(groupName)) }
}

fun groupProperties(groupProperties: Map<String, Map<String, Map<String, Any?>>>?) = apply {
this.groupProperties = groupProperties?.mapValues { groupTypes ->
groupTypes.value.toMutableMap().mapValues { groupNames ->
groupNames.value.toMutableMap()
}.toMutableMap()
}?.toMutableMap()
}

fun groupProperty(groupType: String, groupName: String, key: String, value: Any?) = apply {
this.groupProperties = (this.groupProperties ?: mutableMapOf()).apply {
getOrPut(groupType) { mutableMapOf(groupName to mutableMapOf()) }
.getOrPut(groupName) { mutableMapOf(key to value) }[key] = value
}
}

internal fun groupCohortIds(groupCohortIds: Map<String, Map<String, Set<String>>>?) = apply {
this.groupCohortIds = groupCohortIds?.mapValues { groupTypes ->
groupTypes.value.toMutableMap()
}?.toMutableMap()
}

fun groupCohortIds(groupType: String, groupName: String, cohortIds: Set<String>) = apply {
this.groupCohortIds = (this.groupCohortIds ?: mutableMapOf()).apply {
val groupNames = getOrPut(groupType) { mutableMapOf() }
groupNames[groupName] = cohortIds
}
}

fun build(): ExperimentUser {
return ExperimentUser(
userId = userId,
Expand All @@ -164,6 +186,7 @@ data class ExperimentUser internal constructor(
cohortIds = cohortIds,
groups = groups,
groupProperties = groupProperties,
groupCohortIds = groupCohortIds,
)
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/kotlin/ExperimentalApi.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.amplitude.experiment

@RequiresOptIn(level = RequiresOptIn.Level.WARNING)
@Retention(AnnotationRetention.BINARY)
@Target(
AnnotationTarget.CLASS,
AnnotationTarget.ANNOTATION_CLASS,
AnnotationTarget.PROPERTY,
AnnotationTarget.FIELD,
AnnotationTarget.LOCAL_VARIABLE,
AnnotationTarget.VALUE_PARAMETER,
AnnotationTarget.CONSTRUCTOR,
AnnotationTarget.FUNCTION,
AnnotationTarget.PROPERTY_GETTER,
AnnotationTarget.PROPERTY_SETTER,
AnnotationTarget.TYPEALIAS
)
annotation class ExperimentalApi
171 changes: 153 additions & 18 deletions src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@file:OptIn(ExperimentalApi::class)

package com.amplitude.experiment

import com.amplitude.Amplitude
Expand All @@ -6,39 +8,72 @@ import com.amplitude.experiment.assignment.AmplitudeAssignmentService
import com.amplitude.experiment.assignment.Assignment
import com.amplitude.experiment.assignment.AssignmentService
import com.amplitude.experiment.assignment.InMemoryAssignmentFilter
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.cohort.DynamicCohortApi
import com.amplitude.experiment.cohort.InMemoryCohortStorage
import com.amplitude.experiment.cohort.ProxyCohortMembershipApi
import com.amplitude.experiment.cohort.ProxyCohortStorage
import com.amplitude.experiment.deployment.DeploymentRunner
import com.amplitude.experiment.evaluation.EvaluationEngine
import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.FlagConfigApiImpl
import com.amplitude.experiment.flag.FlagConfigService
import com.amplitude.experiment.flag.FlagConfigServiceConfig
import com.amplitude.experiment.flag.FlagConfigServiceImpl
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.Once
import com.amplitude.experiment.util.USER_GROUP_TYPE
import com.amplitude.experiment.util.filterDefaultVariants
import com.amplitude.experiment.util.getAllCohortIds
import com.amplitude.experiment.util.getGroupedCohortIds
import com.amplitude.experiment.util.toEvaluationContext
import com.amplitude.experiment.util.toVariants
import com.amplitude.experiment.util.wrapMetrics
import okhttp3.HttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.OkHttpClient

class LocalEvaluationClient internal constructor(
private val apiKey: String,
apiKey: String,
private val config: LocalEvaluationConfig = LocalEvaluationConfig(),
private val httpClient: OkHttpClient = OkHttpClient(),
cohortApi: CohortApi? = getCohortDownloadApi(config, httpClient)
) {
private val startLock = Once()
private val httpClient = OkHttpClient()
private val assignmentService: AssignmentService? = createAssignmentService(apiKey)
private val serverUrl: HttpUrl = config.serverUrl.toHttpUrl()
private val serverUrl: HttpUrl = getServerUrl(config)
private val evaluation: EvaluationEngine = EvaluationEngineImpl()
private val flagConfigService: FlagConfigService = FlagConfigServiceImpl(
FlagConfigServiceConfig(config.flagConfigPollerIntervalMillis),
FlagConfigApiImpl(apiKey, serverUrl, httpClient),
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(config.metrics)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, getProxyUrl(config), httpClient)
private val flagConfigStorage = InMemoryFlagConfigStorage()
private val cohortStorage = if (config.cohortSyncConfig == null) {
null
} else if (config.evaluationProxyConfig == null) {
InMemoryCohortStorage()
} else {
ProxyCohortStorage(
proxyConfig = config.evaluationProxyConfig,
membershipApi = ProxyCohortMembershipApi(apiKey, config.evaluationProxyConfig.proxyUrl.toHttpUrl(), httpClient),
metrics = metrics,
)
}

private val deploymentRunner = DeploymentRunner(
config = config,
flagConfigApi = flagConfigApi,
flagConfigStorage = flagConfigStorage,
cohortApi = cohortApi,
cohortStorage = cohortStorage,
metrics = metrics,
)

fun start() {
startLock.once {
flagConfigService.start()
try {
deploymentRunner.start()
} catch (t: Throwable) {
throw ExperimentException(
message = "Failed to start local evaluation client.",
cause = t
)
}
}

Expand All @@ -51,7 +86,7 @@ class LocalEvaluationClient internal constructor(
setEventUploadPeriodMillis(config.assignmentConfiguration.eventUploadPeriodMillis)
useBatchMode(config.assignmentConfiguration.useBatchMode)
setOptions(Options().setMinIdLength(1))
setServerUrl(config.assignmentConfiguration.serverUrl)
setServerUrl(getEventServerUrl(config, config.assignmentConfiguration))
},
InMemoryAssignmentFilter(config.assignmentConfiguration.cacheCapacity)
)
Expand All @@ -67,15 +102,115 @@ class LocalEvaluationClient internal constructor(

@JvmOverloads
fun evaluateV2(user: ExperimentUser, flagKeys: Set<String> = setOf()): Map<String, Variant> {
val flagConfigs = flagConfigService.getFlagConfigs().toMap()
val flagConfigs = flagConfigStorage.getFlagConfigs()
val sortedFlagConfigs = topologicalSort(flagConfigs, flagKeys)
if (sortedFlagConfigs.isEmpty()) {
return mapOf()
}
val evaluationResults = evaluation.evaluate(user.toEvaluationContext(), sortedFlagConfigs)
Logger.d("evaluate - user=$user, result=$evaluationResults")
val enrichedUser = enrichUser(user, sortedFlagConfigs)
val evaluationResults = wrapMetrics(
metric = metrics::onEvaluation,
failure = metrics::onEvaluationFailure,
) {
evaluation.evaluate(enrichedUser.toEvaluationContext(), sortedFlagConfigs)
}
val variants = evaluationResults.toVariants()
assignmentService?.track(Assignment(user, variants))
return variants
}

private fun enrichUser(user: ExperimentUser, flagConfigs: List<EvaluationFlag>): ExperimentUser {
val groupedCohortIds = flagConfigs.getGroupedCohortIds()
if (cohortStorage == null) {
if (groupedCohortIds.isNotEmpty()) {
val flagKeys = flagConfigs.mapNotNull { flag ->
val cohortIds = flag.getAllCohortIds()
if (cohortIds.isEmpty()) {
null
} else {
flag.key
}
}
Logger.e("Local evaluation flags $flagKeys target cohorts but cohort targeting is not configured.")
bgiori marked this conversation as resolved.
Show resolved Hide resolved
}
return user
}
return user.copyToBuilder().apply {
val userCohortsIds = groupedCohortIds[USER_GROUP_TYPE]
if (!userCohortsIds.isNullOrEmpty() && user.userId != null) {
cohortIds(cohortStorage.getCohortsForUser(user.userId, userCohortsIds))
}
if (user.groups != null) {
for (group in user.groups) {
val groupType = group.key
val groupName = group.value.firstOrNull() ?: continue
val cohortIds = groupedCohortIds[groupType]
if (cohortIds.isNullOrEmpty()) {
continue
}
groupCohortIds(
groupType,
groupName,
cohortStorage.getCohortsForGroup(groupType, groupName, cohortIds)
)
}
}
}.build()
}
}

private fun getCohortDownloadApi(config: LocalEvaluationConfig, httpClient: OkHttpClient): CohortApi? {
return if (config.cohortSyncConfig != null) {
DynamicCohortApi(
apiKey = config.cohortSyncConfig.apiKey,
secretKey = config.cohortSyncConfig.secretKey,
maxCohortSize = config.cohortSyncConfig.maxCohortSize,
serverUrl = getCohortServerUrl(config),
proxyUrl = getProxyUrl(config),
httpClient = httpClient,
)
} else {
null
}
}

private fun getServerUrl(config: LocalEvaluationConfig): HttpUrl {
return if (config.serverUrl == LocalEvaluationConfig.Defaults.SERVER_URL) {
when (config.serverZone) {
bgiori marked this conversation as resolved.
Show resolved Hide resolved
ServerZone.US -> US_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_SERVER_URL.toHttpUrl()
}
} else {
config.serverUrl.toHttpUrl()
}
}

private fun getProxyUrl(config: LocalEvaluationConfig): HttpUrl? {
return config.evaluationProxyConfig?.proxyUrl?.toHttpUrl()
}

private fun getCohortServerUrl(config: LocalEvaluationConfig): HttpUrl {
return if (config.cohortSyncConfig?.cohortServerUrl == LocalEvaluationConfig.Defaults.COHORT_SERVER_URL) {
when (config.serverZone) {
ServerZone.US -> US_COHORT_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_COHORT_SERVER_URL.toHttpUrl()
}
} else {
config.cohortSyncConfig?.cohortServerUrl?.toHttpUrl()
?: LocalEvaluationConfig.Defaults.COHORT_SERVER_URL.toHttpUrl()
}
}

private fun getEventServerUrl(
config: LocalEvaluationConfig,
assignmentConfiguration: AssignmentConfiguration
): String {
return if (assignmentConfiguration.serverUrl == LocalEvaluationConfig.Defaults.EVENT_SERVER_URL) {
when (config.serverZone) {
ServerZone.US -> US_EVENT_SERVER_URL
ServerZone.EU -> EU_EVENT_SERVER_URL
}
} else {
assignmentConfiguration.serverUrl
}
}
Loading
Loading