From 311c23053a9dcc2ce0b697aecf0588f189a116ca Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 19 Oct 2022 03:49:35 +0400 Subject: [PATCH] fix: move session-specific logic from ContextPlugin to avoid race conditions (#86) --- .../java/com/amplitude/android/Amplitude.kt | 89 +++++++++++---- .../android/plugins/AndroidContextPlugin.kt | 27 ----- .../com/amplitude/android/AmplitudeTest.kt | 107 ++++++++++++++++-- .../main/java/com/amplitude/core/Amplitude.kt | 17 ++- .../com/amplitude/core/events/BaseEvent.kt | 3 + 5 files changed, 186 insertions(+), 57 deletions(-) diff --git a/android/src/main/java/com/amplitude/android/Amplitude.kt b/android/src/main/java/com/amplitude/android/Amplitude.kt index a14b721c..a047737e 100644 --- a/android/src/main/java/com/amplitude/android/Amplitude.kt +++ b/android/src/main/java/com/amplitude/android/Amplitude.kt @@ -5,6 +5,7 @@ import com.amplitude.android.plugins.AndroidContextPlugin import com.amplitude.android.plugins.AndroidLifecyclePlugin import com.amplitude.core.Amplitude import com.amplitude.core.Storage +import com.amplitude.core.events.BaseEvent import com.amplitude.core.platform.plugins.AmplitudeDestination import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin import com.amplitude.core.utilities.AnalyticsIdentityListener @@ -79,10 +80,56 @@ open class Amplitude( return this } + override fun processEvent(event: BaseEvent): Iterable? { + val eventTimestamp = event.timestamp ?: System.currentTimeMillis() + event.timestamp = eventTimestamp + var sessionEvents: Iterable? = null + + if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) { + if (!inForeground) { + sessionEvents = startNewSessionIfNeeded(eventTimestamp) + } else { + refreshSessionTime(eventTimestamp) + } + } + + if (event.sessionId < 0) { + event.sessionId = sessionId + } + + val savedLastEventId = lastEventId + + sessionEvents ?. let { + it.forEach { e -> + e.eventId ?: let { + val newEventId = lastEventId + 1 + e.eventId = newEventId + lastEventId = newEventId + } + } + } + + event.eventId ?: let { + val newEventId = lastEventId + 1 + event.eventId = newEventId + lastEventId = newEventId + } + + if (lastEventId > savedLastEventId) { + amplitudeScope.launch(amplitudeDispatcher) { + storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString()) + } + } + + return sessionEvents + } + fun onEnterForeground(timestamp: Long) { amplitudeScope.launch(amplitudeDispatcher) { isBuilt.await() - startNewSessionIfNeeded(timestamp) + startNewSessionIfNeeded(timestamp) ?. let { + it.forEach { event -> process(event) } + } inForeground = true } } @@ -99,33 +146,30 @@ open class Amplitude( } } - fun startNewSessionIfNeeded(timestamp: Long): Boolean { + fun startNewSessionIfNeeded(timestamp: Long): Iterable? { if (inSession()) { if (isWithinMinTimeBetweenSessions(timestamp)) { refreshSessionTime(timestamp) - return false + return null } - startNewSession(timestamp) - return true + return startNewSession(timestamp) } // no current session - check for previous session if (isWithinMinTimeBetweenSessions(timestamp)) { if (previousSessionId == -1L) { - startNewSession(timestamp) - return true + return startNewSession(timestamp) } // extend previous session setSessionId(previousSessionId) refreshSessionTime(timestamp) - return false + return null } - startNewSession(timestamp) - return true + return startNewSession(timestamp) } private fun setSessionId(timestamp: Long) { @@ -137,25 +181,30 @@ open class Amplitude( } } - private fun startNewSession(timestamp: Long) { + private fun startNewSession(timestamp: Long): Iterable { + val sessionEvents = mutableListOf() + // end previous session - if ((configuration as Configuration).trackingSessionEvents) { - sendSessionEvent(END_SESSION_EVENT) + if ((configuration as Configuration).trackingSessionEvents && inSession()) { + val sessionEndEvent = BaseEvent() + sessionEndEvent.eventType = END_SESSION_EVENT + sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null + sessionEndEvent.sessionId = sessionId + sessionEvents.add(sessionEndEvent) } // start new session setSessionId(timestamp) refreshSessionTime(timestamp) if (configuration.trackingSessionEvents) { - sendSessionEvent(START_SESSION_EVENT) + val sessionStartEvent = BaseEvent() + sessionStartEvent.eventType = START_SESSION_EVENT + sessionStartEvent.timestamp = timestamp + sessionStartEvent.sessionId = sessionId + sessionEvents.add(sessionStartEvent) } - } - private fun sendSessionEvent(sessionEvent: String) { - if (!inSession()) { - return - } - track(sessionEvent) + return sessionEvents } fun refreshSessionTime(timestamp: Long) { diff --git a/android/src/main/java/com/amplitude/android/plugins/AndroidContextPlugin.kt b/android/src/main/java/com/amplitude/android/plugins/AndroidContextPlugin.kt index 2216e4cf..42a7f485 100644 --- a/android/src/main/java/com/amplitude/android/plugins/AndroidContextPlugin.kt +++ b/android/src/main/java/com/amplitude/android/plugins/AndroidContextPlugin.kt @@ -1,16 +1,12 @@ package com.amplitude.android.plugins -import com.amplitude.android.Amplitude.Companion.END_SESSION_EVENT -import com.amplitude.android.Amplitude.Companion.START_SESSION_EVENT import com.amplitude.android.BuildConfig import com.amplitude.android.Configuration import com.amplitude.android.TrackingOptions import com.amplitude.common.android.AndroidContextProvider import com.amplitude.core.Amplitude -import com.amplitude.core.Storage import com.amplitude.core.events.BaseEvent import com.amplitude.core.platform.Plugin -import kotlinx.coroutines.launch import java.util.UUID class AndroidContextPlugin : Plugin { @@ -58,16 +54,6 @@ class AndroidContextPlugin : Plugin { event.timestamp ?: let { val eventTime = System.currentTimeMillis() event.timestamp = eventTime - getAndroidAmplitude().lastEventTime = eventTime - } - event.timestamp ?. let { - if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) { - if (!getAndroidAmplitude().inForeground) { - getAndroidAmplitude().startNewSessionIfNeeded(it) - } else { - getAndroidAmplitude().refreshSessionTime(it) - } - } } event.insertId ?: let { event.insertId = UUID.randomUUID().toString() @@ -81,15 +67,6 @@ class AndroidContextPlugin : Plugin { event.deviceId ?: let { event.deviceId = amplitude.store.deviceId } - event.sessionId = getAndroidAmplitude().sessionId - event.eventId ?: let { - val newEventId = getAndroidAmplitude().lastEventId + 1 - event.eventId = newEventId - getAndroidAmplitude().lastEventId = newEventId - amplitude.amplitudeScope.launch(amplitude.amplitudeDispatcher) { - amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, newEventId.toString()) - } - } val trackingOptions = configuration.trackingOptions if (configuration.enableCoppaControl) { trackingOptions.mergeIn(TrackingOptions.forCoppaControl()) @@ -163,10 +140,6 @@ class AndroidContextPlugin : Plugin { } } - private fun getAndroidAmplitude(): com.amplitude.android.Amplitude { - return amplitude as com.amplitude.android.Amplitude - } - companion object { const val PLATFORM = "Android" const val SDK_LIBRARY = "amplitude-analytics-android" diff --git a/android/src/test/java/com/amplitude/android/AmplitudeTest.kt b/android/src/test/java/com/amplitude/android/AmplitudeTest.kt index f385ac35..4914f69c 100644 --- a/android/src/test/java/com/amplitude/android/AmplitudeTest.kt +++ b/android/src/test/java/com/amplitude/android/AmplitudeTest.kt @@ -59,15 +59,7 @@ class AmplitudeTest { identityStorageProvider = IMIdentityStorageProvider() ) IdentityContainer.getInstance(configuration) - amplitude = Amplitude( - Configuration( - apiKey = "api-key", - context = context!!, - instanceName = "testInstance", - storageProvider = InMemoryStorageProvider(), - trackingSessionEvents = false - ) - ) + amplitude = Amplitude(createConfiguration()) } private fun setDispatcher(testScheduler: TestCoroutineScheduler) { @@ -78,6 +70,22 @@ class AmplitudeTest { amplitudeDispatcherField.set(amplitude, dispatcher) } + private fun createConfiguration(minTimeBetweenSessionsMillis: Long? = null): Configuration { + val configuration = Configuration( + apiKey = "api-key", + context = context!!, + instanceName = "testInstance", + storageProvider = InMemoryStorageProvider(), + trackingSessionEvents = minTimeBetweenSessionsMillis != null, + ) + + if (minTimeBetweenSessionsMillis != null) { + configuration.minTimeBetweenSessionsMillis = minTimeBetweenSessionsMillis + } + + return configuration + } + @Test fun amplitude_reset_wipesUserIdDeviceId() = runTest { setDispatcher(testScheduler) @@ -135,4 +143,85 @@ class AmplitudeTest { } } } + + @Test + fun amplitude_tracking_session() = runTest { + setDispatcher(testScheduler) + + amplitude = Amplitude(createConfiguration(100)) + + val mockedPlugin = spyk(StubPlugin()) + amplitude?.add(mockedPlugin) + + if (amplitude?.isBuilt!!.await()) { + val event1 = BaseEvent() + event1.eventType = "test event 1" + event1.timestamp = 1000 + amplitude!!.track(event1) + + val event2 = BaseEvent() + event2.eventType = "test event 2" + event2.timestamp = 1050 + amplitude!!.track(event2) + + val event3 = BaseEvent() + event3.eventType = "test event 3" + event3.timestamp = 1200 + amplitude!!.track(event3) + + val event4 = BaseEvent() + event4.eventType = "test event 4" + event4.timestamp = 1350 + amplitude!!.track(event4) + + advanceUntilIdle() + + val tracks = mutableListOf() + + verify { + mockedPlugin.track(capture(tracks)) + } + + tracks.sortBy { event -> event.eventId } + + Assertions.assertEquals(9, tracks.count()) + + tracks[0].let { + Assertions.assertEquals("session_start", it.eventType) + Assertions.assertEquals(1000L, it.timestamp) + } + tracks[1].let { + Assertions.assertEquals("test event 1", it.eventType) + Assertions.assertEquals(1000L, it.timestamp) + } + tracks[2].let { + Assertions.assertEquals("test event 2", it.eventType) + Assertions.assertEquals(1050L, it.timestamp) + } + tracks[3].let { + Assertions.assertEquals("session_end", it.eventType) + Assertions.assertEquals(1050L, it.timestamp) + } + tracks[4].let { + Assertions.assertEquals("session_start", it.eventType) + Assertions.assertEquals(1200L, it.timestamp) + } + tracks[5].let { + Assertions.assertEquals("test event 3", it.eventType) + Assertions.assertEquals(1200L, it.timestamp) + } + tracks[6].let { + Assertions.assertEquals("session_end", it.eventType) + Assertions.assertEquals(1200L, it.timestamp) + } + tracks[7].let { + Assertions.assertEquals("session_start", it.eventType) + Assertions.assertEquals(1350L, it.timestamp) + } + tracks[8].let { + Assertions.assertEquals("test event 4", it.eventType) + Assertions.assertEquals(1350L, it.timestamp) + } + } + } } diff --git a/core/src/main/java/com/amplitude/core/Amplitude.kt b/core/src/main/java/com/amplitude/core/Amplitude.kt index 002e7d41..238036cc 100644 --- a/core/src/main/java/com/amplitude/core/Amplitude.kt +++ b/core/src/main/java/com/amplitude/core/Amplitude.kt @@ -315,17 +315,32 @@ open class Amplitude internal constructor( return this } - private fun process(event: BaseEvent) { + protected fun process(event: BaseEvent) { if (configuration.optOut) { logger.info("Skip event for opt out config.") return } + + val sessionEvents = processEvent(event) + sessionEvents ?. let { + it.forEach { e -> + amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() + timeline.process(e) + } + } + } + amplitudeScope.launch(amplitudeDispatcher) { isBuilt.await() timeline.process(event) } } + protected open fun processEvent(event: BaseEvent): Iterable? { + return null + } + /** * Add a plugin. * diff --git a/core/src/main/java/com/amplitude/core/events/BaseEvent.kt b/core/src/main/java/com/amplitude/core/events/BaseEvent.kt index 64c088a8..dd46c4a5 100644 --- a/core/src/main/java/com/amplitude/core/events/BaseEvent.kt +++ b/core/src/main/java/com/amplitude/core/events/BaseEvent.kt @@ -49,6 +49,9 @@ open class BaseEvent : EventOptions() { extra ?: let { extra = options.extra } callback ?: let { callback = options.callback } partnerId ?: let { partnerId = options.partnerId } + if (sessionId < 0) { + sessionId = options.sessionId + } } /**