Skip to content

Commit

Permalink
added flag push
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Aug 23, 2024
1 parent 6c2fc3a commit 49636d8
Show file tree
Hide file tree
Showing 10 changed files with 551 additions and 86 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
testImplementation("io.mockk:mockk:${Versions.mockk}")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:${Versions.serializationRuntime}")
implementation("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
implementation("com.squareup.okhttp3:okhttp-sse:${Versions.okhttpSse}")
implementation("com.amplitude:evaluation-core:${Versions.evaluationCore}")
implementation("com.amplitude:java-sdk:${Versions.amplitudeAnalytics}")
implementation("org.json:json:${Versions.json}")
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Versions {
const val serializationRuntime = "1.4.1"
const val json = "20231013"
const val okhttp = "4.12.0"
const val okhttpSse = "4.12.0" // Update this alongside okhttp. Note this library isn't stable and may contain breaking changes.
const val evaluationCore = "2.0.0-beta.2"
const val amplitudeAnalytics = "1.12.0"
const val mockk = "1.13.9"
Expand Down
15 changes: 14 additions & 1 deletion src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStreamApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
Expand All @@ -43,7 +45,10 @@ class LocalEvaluationClient internal constructor(
private val serverUrl: HttpUrl = getServerUrl(config)
private val evaluation: EvaluationEngine = EvaluationEngineImpl()
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(config.metrics)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, getProxyUrl(config), httpClient)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, null, httpClient)
private val proxyUrl: HttpUrl? = getProxyUrl(config)
private val flagConfigProxyApi = if (proxyUrl == null) null else DynamicFlagConfigApi(apiKey, proxyUrl, null, httpClient)
private val flagConfigStreamApi = FlagConfigStreamApi(apiKey, "https://stream.lab.amplitude.com", httpClient)
private val flagConfigStorage = InMemoryFlagConfigStorage()
private val cohortStorage = if (config.cohortSyncConfig == null) {
null
Expand All @@ -60,6 +65,8 @@ class LocalEvaluationClient internal constructor(
private val deploymentRunner = DeploymentRunner(
config = config,
flagConfigApi = flagConfigApi,
flagConfigProxyApi = flagConfigProxyApi,
flagConfigStreamApi = flagConfigStreamApi,
flagConfigStorage = flagConfigStorage,
cohortApi = cohortApi,
cohortStorage = cohortStorage,
Expand Down Expand Up @@ -214,3 +221,9 @@ private fun getEventServerUrl(
assignmentConfiguration.serverUrl
}
}

fun main() {
val client = LocalEvaluationClient("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz")
client.start()
println(client.evaluateV2(ExperimentUser("1")))
}
2 changes: 2 additions & 0 deletions src/main/kotlin/LocalEvaluationConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ interface LocalEvaluationMetrics {
fun onFlagConfigFetch()
fun onFlagConfigFetchFailure(exception: Exception)
fun onFlagConfigFetchOriginFallback(exception: Exception)
fun onFlagConfigStream()
fun onFlagConfigStreamFailure(exception: Exception?)
fun onCohortDownload()
fun onCohortDownloadFailure(exception: Exception)
fun onCohortDownloadOriginFallback(exception: Exception)
Expand Down
100 changes: 24 additions & 76 deletions src/main/kotlin/deployment/DeploymentRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@

package com.amplitude.experiment.deployment

import com.amplitude.experiment.ExperimentalApi
import com.amplitude.experiment.LocalEvaluationConfig
import com.amplitude.experiment.LocalEvaluationMetrics
import com.amplitude.experiment.*
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.cohort.CohortLoader
import com.amplitude.experiment.cohort.CohortStorage
import com.amplitude.experiment.flag.*
import com.amplitude.experiment.flag.FlagConfigApi
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.Once
import com.amplitude.experiment.util.daemonFactory
import com.amplitude.experiment.util.getAllCohortIds
import com.amplitude.experiment.util.wrapMetrics
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

Expand All @@ -26,6 +23,8 @@ private const val MIN_COHORT_POLLING_INTERVAL = 60000L
internal class DeploymentRunner(
private val config: LocalEvaluationConfig,
private val flagConfigApi: FlagConfigApi,
private val flagConfigProxyApi: FlagConfigApi? = null,
private val flagConfigStreamApi: FlagConfigStreamApi? = null,
private val flagConfigStorage: FlagConfigStorage,
cohortApi: CohortApi?,
private val cohortStorage: CohortStorage?,
Expand All @@ -39,21 +38,26 @@ internal class DeploymentRunner(
null
}
private val cohortPollingInterval: Long = getCohortPollingInterval()
// Fallback in this order: proxy, stream, poll.
private val amplitudeFlagConfigPoller = FlagConfigPoller(flagConfigApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics)
private val amplitudeFlagConfigUpdater =
if (flagConfigStreamApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller,
)
else amplitudeFlagConfigPoller
private val flagConfigUpdater =
if (flagConfigProxyApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller
)
else
amplitudeFlagConfigUpdater

fun start() = lock.once {
refresh()
poller.scheduleWithFixedDelay(
{
try {
refresh()
} catch (t: Throwable) {
Logger.e("Refresh flag configs failed.", t)
}
},
config.flagConfigPollerIntervalMillis,
config.flagConfigPollerIntervalMillis,
TimeUnit.MILLISECONDS
)
flagConfigUpdater.start()
if (cohortLoader != null) {
poller.scheduleWithFixedDelay(
{
Expand All @@ -74,63 +78,7 @@ internal class DeploymentRunner(

fun stop() {
poller.shutdown()
}

fun refresh() {
Logger.d("Refreshing flag configs.")
// Get updated flags from the network.
val flagConfigs = wrapMetrics(
metric = metrics::onFlagConfigFetch,
failure = metrics::onFlagConfigFetchFailure,
) {
flagConfigApi.getFlagConfigs()
}

// Remove flags that no longer exist.
val flagKeys = flagConfigs.map { it.key }.toSet()
flagConfigStorage.removeIf { !flagKeys.contains(it.key) }

// Get all flags from storage
val storageFlags = flagConfigStorage.getFlagConfigs()

// Load cohorts for each flag if applicable and put the flag in storage.
val futures = ConcurrentHashMap<String, CompletableFuture<*>>()
for (flagConfig in flagConfigs) {
if (cohortLoader == null) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
val cohortIds = flagConfig.getAllCohortIds()
val storageCohortIds = storageFlags[flagConfig.key]?.getAllCohortIds() ?: emptySet()
val cohortsToLoad = cohortIds - storageCohortIds
if (cohortsToLoad.isEmpty()) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
for (cohortId in cohortsToLoad) {
futures.putIfAbsent(
cohortId,
cohortLoader.loadCohort(cohortId).handle { _, exception ->
if (exception != null) {
Logger.e("Failed to load cohort $cohortId", exception)
}
flagConfigStorage.putFlagConfig(flagConfig)
}
)
}
}
futures.values.forEach { it.join() }

// Delete unused cohorts
if (cohortStorage != null) {
val flagCohortIds = flagConfigStorage.getFlagConfigs().values.toList().getAllCohortIds()
val storageCohortIds = cohortStorage.getCohorts().keys
val deletedCohortIds = storageCohortIds - flagCohortIds
for (deletedCohortId in deletedCohortIds) {
cohortStorage.deleteCohort(deletedCohortId)
}
}
Logger.d("Refreshed ${flagConfigs.size} flag configs.")
flagConfigUpdater.shutdown()
}

private fun getCohortPollingInterval(): Long {
Expand Down
134 changes: 134 additions & 0 deletions src/main/kotlin/flag/FlagConfigStreamApi.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.amplitude.experiment.flag

import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.util.*
import com.amplitude.experiment.util.SdkStream
import kotlinx.serialization.decodeFromString
import okhttp3.OkHttpClient
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean

internal open class FlagConfigStreamApiError(message: String?, cause: Throwable?): Exception(message, cause) {
constructor(message: String?) : this(message, null)
constructor(cause: Throwable?) : this(cause?.toString(), cause)
}
internal class FlagConfigStreamApiConnTimeoutError: FlagConfigStreamApiError("Initial connection timed out")
internal class FlagConfigStreamApiDataCorruptError: FlagConfigStreamApiError("Stream data corrupted")
internal class FlagConfigStreamApiStreamError(cause: Throwable?): FlagConfigStreamApiError("Stream error", cause)

private const val CONNECTION_TIMEOUT_MILLIS_DEFAULT = 2000L
private const val KEEP_ALIVE_TIMEOUT_MILLIS_DEFAULT = 17000L
private const val RECONN_INTERVAL_MILLIS_DEFAULT = 15 * 60 * 1000L
internal class FlagConfigStreamApi (
deploymentKey: String,
serverUrl: String,
httpClient: OkHttpClient = OkHttpClient(),
connectionTimeoutMillis: Long = CONNECTION_TIMEOUT_MILLIS_DEFAULT,
keepaliveTimeoutMillis: Long = KEEP_ALIVE_TIMEOUT_MILLIS_DEFAULT,
reconnIntervalMillis: Long = RECONN_INTERVAL_MILLIS_DEFAULT
) {
var onInitUpdate: ((List<EvaluationFlag>) -> Unit)? = null
var onUpdate: ((List<EvaluationFlag>) -> Unit)? = null
var onError: ((Exception?) -> Unit)? = null
private val stream: SdkStream = SdkStream(
"Api-Key $deploymentKey",
"$serverUrl/sdk/stream/v1/flags",
httpClient,
connectionTimeoutMillis,
keepaliveTimeoutMillis,
reconnIntervalMillis)

fun connect() {
val isInit = AtomicBoolean(true)
val connectTimeoutFuture = CompletableFuture<Unit>()
val updateTimeoutFuture = CompletableFuture<Unit>()
stream.onUpdate = { data ->
if (isInit.getAndSet(false)) {
// Stream is establishing. First data received.
// Resolve timeout.
connectTimeoutFuture.complete(Unit)

// Make sure valid data.
try {
val flags = getFlagsFromData(data)

try {
if (onInitUpdate != null) {
onInitUpdate?.let { it(flags) }
} else {
onUpdate?.let { it(flags) }
}
updateTimeoutFuture.complete(Unit)
} catch (e: Throwable) {
updateTimeoutFuture.completeExceptionally(e)
}
} catch (_: Throwable) {
updateTimeoutFuture.completeExceptionally(FlagConfigStreamApiDataCorruptError())
}

} else {
// Stream has already established.
// Make sure valid data.
try {
val flags = getFlagsFromData(data)

try {
onUpdate?.let { it(flags) }
} catch (_: Throwable) {
// Don't care about application error.
}
} catch (_: Throwable) {
// Stream corrupted. Reconnect.
handleError(FlagConfigStreamApiDataCorruptError())
}

}
}
stream.onError = { t ->
if (isInit.getAndSet(false)) {
connectTimeoutFuture.completeExceptionally(t)
updateTimeoutFuture.completeExceptionally(t)
} else {
handleError(FlagConfigStreamApiStreamError(t))
}
}
stream.connect()

val t: Throwable
try {
connectTimeoutFuture.get(2000, TimeUnit.MILLISECONDS)
updateTimeoutFuture.get()
return
} catch (e: TimeoutException) {
// Timeouts should retry
t = FlagConfigStreamApiConnTimeoutError()
} catch (e: ExecutionException) {
val cause = e.cause
t = if (cause is StreamException) {
FlagConfigStreamApiStreamError(cause)
} else {
FlagConfigStreamApiError(e)
}
} catch (e: Throwable) {
t = FlagConfigStreamApiError(e)
}
close()
throw t
}

fun close() {
stream.cancel()
}

private fun getFlagsFromData(data: String): List<EvaluationFlag> {
return json.decodeFromString<List<EvaluationFlag>>(data)
}

private fun handleError(e: Exception?) {
close()
onError?.let { it(e) }
}
}
Loading

0 comments on commit 49636d8

Please sign in to comment.