Skip to content

Commit

Permalink
added locks to ensure concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Sep 19, 2024
1 parent c293786 commit 61dd7a8
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 142 deletions.
131 changes: 69 additions & 62 deletions src/main/kotlin/flag/FlagConfigStreamApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

internal open class FlagConfigStreamApiError(message: String?, cause: Throwable?): Exception(message, cause) {
constructor(message: String?) : this(message, null)
Expand All @@ -32,8 +34,9 @@ internal class FlagConfigStreamApi (
httpClient: OkHttpClient = OkHttpClient(),
val connectionTimeoutMillis: Long = CONNECTION_TIMEOUT_MILLIS_DEFAULT,
keepaliveTimeoutMillis: Long = KEEP_ALIVE_TIMEOUT_MILLIS_DEFAULT,
reconnIntervalMillis: Long = RECONN_INTERVAL_MILLIS_DEFAULT
reconnIntervalMillis: Long = RECONN_INTERVAL_MILLIS_DEFAULT,
) {
private val lock: ReentrantLock = ReentrantLock()
var onInitUpdate: ((List<EvaluationFlag>) -> Unit)? = null
var onUpdate: ((List<EvaluationFlag>) -> Unit)? = null
var onError: ((Exception?) -> Unit)? = null
Expand All @@ -47,84 +50,88 @@ internal class FlagConfigStreamApi (
reconnIntervalMillis)

internal fun connect() {
val isInit = AtomicBoolean(true)
val connectTimeoutFuture = CompletableFuture<Unit>()
val updateTimeoutFuture = CompletableFuture<Unit>()
stream.onUpdate = { data ->
if (isInit.getAndSet(false)) {
// Stream is establishing. First data received.
// Resolve timeout.
connectTimeoutFuture.complete(Unit)

// Make sure valid data.
try {
val flags = getFlagsFromData(data)
// Guarded by lock. Update to callbacks and waits can lead to race conditions.
lock.withLock {
val isInit = AtomicBoolean(true)
val connectTimeoutFuture = CompletableFuture<Unit>()
val updateTimeoutFuture = CompletableFuture<Unit>()
stream.onUpdate = { data ->
if (isInit.getAndSet(false)) {
// Stream is establishing. First data received.
// Resolve timeout.
connectTimeoutFuture.complete(Unit)

// Make sure valid data.
try {
if (onInitUpdate != null) {
onInitUpdate?.let { it(flags) }
} else {
onUpdate?.let { it(flags) }
val flags = getFlagsFromData(data)

try {
if (onInitUpdate != null) {
onInitUpdate?.let { it(flags) }
} else {
onUpdate?.let { it(flags) }
}
updateTimeoutFuture.complete(Unit)
} catch (e: Throwable) {
updateTimeoutFuture.completeExceptionally(e)
}
updateTimeoutFuture.complete(Unit)
} catch (e: Throwable) {
updateTimeoutFuture.completeExceptionally(e)
} catch (_: Throwable) {
updateTimeoutFuture.completeExceptionally(FlagConfigStreamApiDataCorruptError())
}
} catch (_: Throwable) {
updateTimeoutFuture.completeExceptionally(FlagConfigStreamApiDataCorruptError())
}

} else {
// Stream has already established.
// Make sure valid data.
try {
val flags = getFlagsFromData(data)

} else {
// Stream has already established.
// Make sure valid data.
try {
onUpdate?.let { it(flags) }
val flags = getFlagsFromData(data)

try {
onUpdate?.let { it(flags) }
} catch (_: Throwable) {
// Don't care about application error.
}
} catch (_: Throwable) {
// Don't care about application error.
// Stream corrupted. Reconnect.
handleError(FlagConfigStreamApiDataCorruptError())
}
} catch (_: Throwable) {
// Stream corrupted. Reconnect.
handleError(FlagConfigStreamApiDataCorruptError())
}

}
}
}
stream.onError = { t ->
if (isInit.getAndSet(false)) {
connectTimeoutFuture.completeExceptionally(t)
updateTimeoutFuture.completeExceptionally(t)
} else {
handleError(FlagConfigStreamApiStreamError(t))
stream.onError = { t ->
if (isInit.getAndSet(false)) {
connectTimeoutFuture.completeExceptionally(t)
updateTimeoutFuture.completeExceptionally(t)
} else {
handleError(FlagConfigStreamApiStreamError(t))
}
}
}
stream.connect()
stream.connect()

val t: Throwable
try {
connectTimeoutFuture.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
updateTimeoutFuture.get()
return
} catch (e: TimeoutException) {
// Timeouts should retry
t = FlagConfigStreamApiConnTimeoutError()
} catch (e: ExecutionException) {
val cause = e.cause
t = if (cause is StreamException) {
FlagConfigStreamApiStreamError(cause)
} else {
FlagConfigStreamApiError(e)
val t: Throwable
try {
connectTimeoutFuture.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
updateTimeoutFuture.get()
return
} catch (e: TimeoutException) {
// Timeouts should retry
t = FlagConfigStreamApiConnTimeoutError()
} catch (e: ExecutionException) {
val cause = e.cause
t = if (cause is StreamException) {
FlagConfigStreamApiStreamError(cause)
} else {
FlagConfigStreamApiError(e)
}
} catch (e: Throwable) {
t = FlagConfigStreamApiError(e)
}
} catch (e: Throwable) {
t = FlagConfigStreamApiError(e)
close()
throw t
}
close()
throw t
}

internal fun close() {
// Not guarded by lock. close() can halt connect().
stream.cancel()
}

Expand Down
135 changes: 82 additions & 53 deletions src/main/kotlin/flag/FlagConfigUpdater.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.max
import kotlin.math.min

Expand Down Expand Up @@ -89,42 +91,52 @@ internal class FlagConfigPoller(
private val cohortLoader: CohortLoader?,
private val cohortStorage: CohortStorage?,
private val config: LocalEvaluationConfig,
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper()
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(),
): FlagConfigUpdaterBase(
storage, cohortLoader, cohortStorage
) {
private val lock: ReentrantLock = ReentrantLock()
private val pool = Executors.newScheduledThreadPool(1, daemonFactory)
private var scheduledFuture: ScheduledFuture<*>? = null
private var scheduledFuture: ScheduledFuture<*>? = null // @GuardedBy(lock)
override fun start(onError: (() -> Unit)?) {
refresh()
if (scheduledFuture != null) {
stop()
lock.withLock {
stopInternal()
scheduledFuture = pool.scheduleWithFixedDelay(
{
try {
refresh()
} catch (t: Throwable) {
Logger.e("Refresh flag configs failed.", t)
stop()
onError?.invoke()
}
},
config.flagConfigPollerIntervalMillis,
config.flagConfigPollerIntervalMillis,
TimeUnit.MILLISECONDS
)
}
scheduledFuture = pool.scheduleWithFixedDelay(
{
try {
refresh()
} catch (t: Throwable) {
Logger.e("Refresh flag configs failed.", t)
stop()
onError?.invoke()
}
},
config.flagConfigPollerIntervalMillis,
config.flagConfigPollerIntervalMillis,
TimeUnit.MILLISECONDS
)
}

override fun stop() {
// @GuardedBy(lock)
private fun stopInternal() {
// Pause only stop the task scheduled. It doesn't stop the executor.
scheduledFuture?.cancel(true)
scheduledFuture = null
}

override fun stop() {
lock.withLock {
stopInternal()
}
}

override fun shutdown() {
// Stop the executor.
pool.shutdown()
lock.withLock {
// Stop the executor.
pool.shutdown()
}
}

private fun refresh() {
Expand All @@ -151,21 +163,25 @@ internal class FlagConfigStreamer(
): FlagConfigUpdaterBase(
storage, cohortLoader, cohortStorage
) {
private val lock: ReentrantLock = ReentrantLock()
override fun start(onError: (() -> Unit)?) {
flagConfigStreamApi.onUpdate = {flags ->
update(flags)
}
flagConfigStreamApi.onError = {e ->
Logger.e("Stream flag configs streaming failed.", e)
metrics.onFlagConfigStreamFailure(e)
onError?.invoke()
}
wrapMetrics(metric = metrics::onFlagConfigStream, failure = metrics::onFlagConfigStreamFailure) {
flagConfigStreamApi.connect()
lock.withLock {
flagConfigStreamApi.onUpdate = { flags ->
update(flags)
}
flagConfigStreamApi.onError = { e ->
Logger.e("Stream flag configs streaming failed.", e)
metrics.onFlagConfigStreamFailure(e)
onError?.invoke()
}
wrapMetrics(metric = metrics::onFlagConfigStream, failure = metrics::onFlagConfigStreamFailure) {
flagConfigStreamApi.connect()
}
}
}

override fun stop() {
// Not guarded by lock. close() can cancel start().
flagConfigStreamApi.close()
}

Expand All @@ -178,11 +194,12 @@ internal class FlagConfigFallbackRetryWrapper(
private val mainUpdater: FlagConfigUpdater,
private val fallbackUpdater: FlagConfigUpdater?,
private val retryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
private val maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT
private val maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
): FlagConfigUpdater {
private val lock: ReentrantLock = ReentrantLock()
private val reconnIntervalRange = max(0, retryDelayMillis - maxJitterMillis)..(min(retryDelayMillis, retryDelayMillis - maxJitterMillis) + maxJitterMillis)
private val executor = Executors.newScheduledThreadPool(1, daemonFactory)
private var retryTask: ScheduledFuture<*>? = null
private var retryTask: ScheduledFuture<*>? = null // @GuardedBy(lock)

/**
* Since the wrapper retries, so there will never be error case. Thus, onError will never be called.
Expand All @@ -192,37 +209,49 @@ internal class FlagConfigFallbackRetryWrapper(
throw Error("Do not use FlagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
}

try {
mainUpdater.start {
scheduleRetry() // Don't care if poller start error or not, always retry.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
lock.withLock {
retryTask?.cancel(true)

try {
mainUpdater.start {
lock.withLock {
scheduleRetry() // Don't care if poller start error or not, always retry.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
}
}
}
fallbackUpdater?.stop()
} catch (t: Throwable) {
Logger.e("Primary flag configs start failed, start fallback. Error: ", t)
if (fallbackUpdater == null) {
// No fallback, main start failed is wrapper start fail
throw t
}
fallbackUpdater.start()
scheduleRetry()
}
} catch (t: Throwable) {
Logger.e("Primary flag configs start failed, start fallback. Error: ", t)
if (fallbackUpdater == null) {
// No fallback, main start failed is wrapper start fail
throw t
}
fallbackUpdater.start()
scheduleRetry()
}
}

override fun stop() {
mainUpdater.stop()
fallbackUpdater?.stop()
retryTask?.cancel(true)
lock.withLock {
mainUpdater.stop()
fallbackUpdater?.stop()
retryTask?.cancel(true)
}
}

override fun shutdown() {
mainUpdater.shutdown()
fallbackUpdater?.shutdown()
retryTask?.cancel(true)
lock.withLock {
mainUpdater.shutdown()
fallbackUpdater?.shutdown()
retryTask?.cancel(true)
}
}

// @GuardedBy(lock)
private fun scheduleRetry() {
retryTask = executor.schedule({
try {
Expand Down
Loading

0 comments on commit 61dd7a8

Please sign in to comment.