Skip to content

Commit

Permalink
changed visibility modifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Sep 5, 2024
1 parent f6b7498 commit c293786
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/main/kotlin/flag/FlagConfigStreamApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal class FlagConfigStreamApi (
keepaliveTimeoutMillis,
reconnIntervalMillis)

fun connect() {
internal fun connect() {
val isInit = AtomicBoolean(true)
val connectTimeoutFuture = CompletableFuture<Unit>()
val updateTimeoutFuture = CompletableFuture<Unit>()
Expand Down Expand Up @@ -124,7 +124,7 @@ internal class FlagConfigStreamApi (
throw t
}

fun close() {
internal fun close() {
stream.cancel()
}

Expand Down
11 changes: 7 additions & 4 deletions src/main/kotlin/flag/FlagConfigUpdater.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal abstract class FlagConfigUpdaterBase(
private val cohortLoader: CohortLoader?,
private val cohortStorage: CohortStorage?,
): FlagConfigUpdater {
fun update(flagConfigs: List<EvaluationFlag>) {
protected fun update(flagConfigs: List<EvaluationFlag>) {
// Remove flags that no longer exist.
val flagKeys = flagConfigs.map { it.key }.toSet()
flagConfigStorage.removeIf { !flagKeys.contains(it.key) }
Expand Down Expand Up @@ -93,11 +93,14 @@ internal class FlagConfigPoller(
): FlagConfigUpdaterBase(
storage, cohortLoader, cohortStorage
) {
private val poller = Executors.newScheduledThreadPool(1, daemonFactory)
private val pool = Executors.newScheduledThreadPool(1, daemonFactory)
private var scheduledFuture: ScheduledFuture<*>? = null
override fun start(onError: (() -> Unit)?) {
refresh()
scheduledFuture = poller.scheduleWithFixedDelay(
if (scheduledFuture != null) {
stop()
}
scheduledFuture = pool.scheduleWithFixedDelay(
{
try {
refresh()
Expand All @@ -121,7 +124,7 @@ internal class FlagConfigPoller(

override fun shutdown() {
// Stop the executor.
poller.shutdown()
pool.shutdown()
}

private fun refresh() {
Expand Down
8 changes: 4 additions & 4 deletions src/main/kotlin/util/SseStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ internal class SseStream (

private var es: EventSource? = null
private var reconnectTimerTask: TimerTask? = null
var onUpdate: ((String) -> Unit)? = null
var onError: ((Throwable?) -> Unit)? = null
internal var onUpdate: ((String) -> Unit)? = null
internal var onError: ((Throwable?) -> Unit)? = null

/**
* Creates an event source and immediately returns. The connection is performed async. Errors are informed through callbacks.
*/
fun connect() {
internal fun connect() {
cancel() // Clear any existing event sources.
es = client.newEventSource(request, eventSourceListener)
reconnectTimerTask = Timer().schedule(reconnIntervalRange.random()) {// Timer for a new event source.
Expand All @@ -117,7 +117,7 @@ internal class SseStream (
}
}

fun cancel() {
internal fun cancel() {
reconnectTimerTask?.cancel()

// There can be cases where an event source is being cancelled by these calls, but take a long time and made a callback to onFailure callback.
Expand Down

0 comments on commit c293786

Please sign in to comment.