Skip to content

Commit

Permalink
fix: move session-specific logic from ContextPlugin to avoid race con…
Browse files Browse the repository at this point in the history
…ditions (#86)
  • Loading branch information
falconandy authored Oct 18, 2022
1 parent 549aff2 commit 311c230
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 57 deletions.
89 changes: 69 additions & 20 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.core.Amplitude
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin
import com.amplitude.core.utilities.AnalyticsIdentityListener
Expand Down Expand Up @@ -79,10 +80,56 @@ open class Amplitude(
return this
}

override fun processEvent(event: BaseEvent): Iterable<BaseEvent>? {
val eventTimestamp = event.timestamp ?: System.currentTimeMillis()
event.timestamp = eventTimestamp
var sessionEvents: Iterable<BaseEvent>? = null

if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) {
if (!inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
}
}

if (event.sessionId < 0) {
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents ?. let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}

if (lastEventId > savedLastEventId) {
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}
}

return sessionEvents
}

fun onEnterForeground(timestamp: Long) {
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
startNewSessionIfNeeded(timestamp)
startNewSessionIfNeeded(timestamp) ?. let {
it.forEach { event -> process(event) }
}
inForeground = true
}
}
Expand All @@ -99,33 +146,30 @@ open class Amplitude(
}
}

fun startNewSessionIfNeeded(timestamp: Long): Boolean {
fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession()) {

if (isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return false
return null
}

startNewSession(timestamp)
return true
return startNewSession(timestamp)
}

// no current session - check for previous session
if (isWithinMinTimeBetweenSessions(timestamp)) {
if (previousSessionId == -1L) {
startNewSession(timestamp)
return true
return startNewSession(timestamp)
}

// extend previous session
setSessionId(previousSessionId)
refreshSessionTime(timestamp)
return false
return null
}

startNewSession(timestamp)
return true
return startNewSession(timestamp)
}

private fun setSessionId(timestamp: Long) {
Expand All @@ -137,25 +181,30 @@ open class Amplitude(
}
}

private fun startNewSession(timestamp: Long) {
private fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()

// end previous session
if ((configuration as Configuration).trackingSessionEvents) {
sendSessionEvent(END_SESSION_EVENT)
if ((configuration as Configuration).trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (configuration.trackingSessionEvents) {
sendSessionEvent(START_SESSION_EVENT)
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}
}

private fun sendSessionEvent(sessionEvent: String) {
if (!inSession()) {
return
}
track(sessionEvent)
return sessionEvents
}

fun refreshSessionTime(timestamp: Long) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.amplitude.android.plugins

import com.amplitude.android.Amplitude.Companion.END_SESSION_EVENT
import com.amplitude.android.Amplitude.Companion.START_SESSION_EVENT
import com.amplitude.android.BuildConfig
import com.amplitude.android.Configuration
import com.amplitude.android.TrackingOptions
import com.amplitude.common.android.AndroidContextProvider
import com.amplitude.core.Amplitude
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Plugin
import kotlinx.coroutines.launch
import java.util.UUID

class AndroidContextPlugin : Plugin {
Expand Down Expand Up @@ -58,16 +54,6 @@ class AndroidContextPlugin : Plugin {
event.timestamp ?: let {
val eventTime = System.currentTimeMillis()
event.timestamp = eventTime
getAndroidAmplitude().lastEventTime = eventTime
}
event.timestamp ?. let {
if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) {
if (!getAndroidAmplitude().inForeground) {
getAndroidAmplitude().startNewSessionIfNeeded(it)
} else {
getAndroidAmplitude().refreshSessionTime(it)
}
}
}
event.insertId ?: let {
event.insertId = UUID.randomUUID().toString()
Expand All @@ -81,15 +67,6 @@ class AndroidContextPlugin : Plugin {
event.deviceId ?: let {
event.deviceId = amplitude.store.deviceId
}
event.sessionId = getAndroidAmplitude().sessionId
event.eventId ?: let {
val newEventId = getAndroidAmplitude().lastEventId + 1
event.eventId = newEventId
getAndroidAmplitude().lastEventId = newEventId
amplitude.amplitudeScope.launch(amplitude.amplitudeDispatcher) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, newEventId.toString())
}
}
val trackingOptions = configuration.trackingOptions
if (configuration.enableCoppaControl) {
trackingOptions.mergeIn(TrackingOptions.forCoppaControl())
Expand Down Expand Up @@ -163,10 +140,6 @@ class AndroidContextPlugin : Plugin {
}
}

private fun getAndroidAmplitude(): com.amplitude.android.Amplitude {
return amplitude as com.amplitude.android.Amplitude
}

companion object {
const val PLATFORM = "Android"
const val SDK_LIBRARY = "amplitude-analytics-android"
Expand Down
107 changes: 98 additions & 9 deletions android/src/test/java/com/amplitude/android/AmplitudeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,7 @@ class AmplitudeTest {
identityStorageProvider = IMIdentityStorageProvider()
)
IdentityContainer.getInstance(configuration)
amplitude = Amplitude(
Configuration(
apiKey = "api-key",
context = context!!,
instanceName = "testInstance",
storageProvider = InMemoryStorageProvider(),
trackingSessionEvents = false
)
)
amplitude = Amplitude(createConfiguration())
}

private fun setDispatcher(testScheduler: TestCoroutineScheduler) {
Expand All @@ -78,6 +70,22 @@ class AmplitudeTest {
amplitudeDispatcherField.set(amplitude, dispatcher)
}

private fun createConfiguration(minTimeBetweenSessionsMillis: Long? = null): Configuration {
val configuration = Configuration(
apiKey = "api-key",
context = context!!,
instanceName = "testInstance",
storageProvider = InMemoryStorageProvider(),
trackingSessionEvents = minTimeBetweenSessionsMillis != null,
)

if (minTimeBetweenSessionsMillis != null) {
configuration.minTimeBetweenSessionsMillis = minTimeBetweenSessionsMillis
}

return configuration
}

@Test
fun amplitude_reset_wipesUserIdDeviceId() = runTest {
setDispatcher(testScheduler)
Expand Down Expand Up @@ -135,4 +143,85 @@ class AmplitudeTest {
}
}
}

@Test
fun amplitude_tracking_session() = runTest {
setDispatcher(testScheduler)

amplitude = Amplitude(createConfiguration(100))

val mockedPlugin = spyk(StubPlugin())
amplitude?.add(mockedPlugin)

if (amplitude?.isBuilt!!.await()) {
val event1 = BaseEvent()
event1.eventType = "test event 1"
event1.timestamp = 1000
amplitude!!.track(event1)

val event2 = BaseEvent()
event2.eventType = "test event 2"
event2.timestamp = 1050
amplitude!!.track(event2)

val event3 = BaseEvent()
event3.eventType = "test event 3"
event3.timestamp = 1200
amplitude!!.track(event3)

val event4 = BaseEvent()
event4.eventType = "test event 4"
event4.timestamp = 1350
amplitude!!.track(event4)

advanceUntilIdle()

val tracks = mutableListOf<BaseEvent>()

verify {
mockedPlugin.track(capture(tracks))
}

tracks.sortBy { event -> event.eventId }

Assertions.assertEquals(9, tracks.count())

tracks[0].let {
Assertions.assertEquals("session_start", it.eventType)
Assertions.assertEquals(1000L, it.timestamp)
}
tracks[1].let {
Assertions.assertEquals("test event 1", it.eventType)
Assertions.assertEquals(1000L, it.timestamp)
}
tracks[2].let {
Assertions.assertEquals("test event 2", it.eventType)
Assertions.assertEquals(1050L, it.timestamp)
}
tracks[3].let {
Assertions.assertEquals("session_end", it.eventType)
Assertions.assertEquals(1050L, it.timestamp)
}
tracks[4].let {
Assertions.assertEquals("session_start", it.eventType)
Assertions.assertEquals(1200L, it.timestamp)
}
tracks[5].let {
Assertions.assertEquals("test event 3", it.eventType)
Assertions.assertEquals(1200L, it.timestamp)
}
tracks[6].let {
Assertions.assertEquals("session_end", it.eventType)
Assertions.assertEquals(1200L, it.timestamp)
}
tracks[7].let {
Assertions.assertEquals("session_start", it.eventType)
Assertions.assertEquals(1350L, it.timestamp)
}
tracks[8].let {
Assertions.assertEquals("test event 4", it.eventType)
Assertions.assertEquals(1350L, it.timestamp)
}
}
}
}
17 changes: 16 additions & 1 deletion core/src/main/java/com/amplitude/core/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,32 @@ open class Amplitude internal constructor(
return this
}

private fun process(event: BaseEvent) {
protected fun process(event: BaseEvent) {
if (configuration.optOut) {
logger.info("Skip event for opt out config.")
return
}

val sessionEvents = processEvent(event)
sessionEvents ?. let {
it.forEach { e ->
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
timeline.process(e)
}
}
}

amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
timeline.process(event)
}
}

protected open fun processEvent(event: BaseEvent): Iterable<BaseEvent>? {
return null
}

/**
* Add a plugin.
*
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/com/amplitude/core/events/BaseEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ open class BaseEvent : EventOptions() {
extra ?: let { extra = options.extra }
callback ?: let { callback = options.callback }
partnerId ?: let { partnerId = options.partnerId }
if (sessionId < 0) {
sessionId = options.sessionId
}
}

/**
Expand Down

0 comments on commit 311c230

Please sign in to comment.