Skip to content

Commit

Permalink
upload offloaded to workmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
anonvt committed Aug 7, 2024
1 parent 5bf0c78 commit 5c831fa
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 48 deletions.
3 changes: 2 additions & 1 deletion TopsortAnalytics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
implementation 'androidx.core:core-ktx:1.10.1'

//Work Manager
implementation "androidx.work:work-runtime-ktx:2.8.1"
implementation "androidx.work:work-runtime-ktx:2.9.0"
implementation("androidx.datastore:datastore-preferences:1.1.1")

//JodaTime
Expand All @@ -51,4 +51,5 @@ dependencies {
androidTestImplementation 'org.assertj:assertj-core:3.26.0'
androidTestImplementation 'androidx.test.ext:junit:1.1.5'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.1'
androidTestImplementation("androidx.work:work-testing:2.9.0")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package com.topsort.analytics

import android.util.Log
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import androidx.work.Configuration
import androidx.work.WorkInfo
import androidx.work.WorkManager
import androidx.work.WorkQuery
import androidx.work.impl.utils.SynchronousExecutor
import androidx.work.testing.WorkManagerTestInitHelper
import com.topsort.analytics.core.Logger
import com.topsort.analytics.model.Click
import com.topsort.analytics.model.ClickEvent
import com.topsort.analytics.model.Event
Expand All @@ -11,20 +19,39 @@ import com.topsort.analytics.model.Purchase
import com.topsort.analytics.model.PurchaseEvent
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.assertj.core.api.Assertions.assertThat
import org.json.JSONArray
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith

@ExperimentalCoroutinesApi
@RunWith(AndroidJUnit4::class)
class EventPipelineTest {

@Test
fun impressions_are_batched() {
lateinit var workManager: WorkManager

@Before
fun setup() {
val appContext = InstrumentationRegistry.getInstrumentation().targetContext

val config = Configuration.Builder()
.setMinimumLoggingLevel(Log.DEBUG)
.setExecutor(SynchronousExecutor())
.build()

// Initialize WorkManager for instrumentation tests.
WorkManagerTestInitHelper.initializeTestWorkManager(appContext, config)

workManager = WorkManager.getInstance(appContext)

EventPipeline.setup(appContext)
Logger.log.clear()
}

@Test
fun impressions_are_batched() {
val impressions1 = listOf(
getImpressionPromoted(),
getImpressionOrganic()
Expand All @@ -36,8 +63,14 @@ import org.junit.runner.RunWith

runBlocking {
EventPipeline.clear()
val job1 = EventPipeline.storeImpression(ImpressionEvent(impressions1))
val job2 = EventPipeline.storeImpression(ImpressionEvent(impressions2))
val job1 = EventPipeline.storeImpression(
ImpressionEvent(impressions1),
shouldFlush = false
)
val job2 = EventPipeline.storeImpression(
ImpressionEvent(impressions2),
shouldFlush = false
)

// Make sure they're persisted
job1.join()
Expand All @@ -46,15 +79,12 @@ import org.junit.runner.RunWith
val storedStr = EventPipeline.readImpressions()
val storedDeserialized = Impression.Factory.fromJsonArray(JSONArray("[$storedStr]"))

assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(impressions1+impressions2)
assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(impressions1 + impressions2)
}
}

@Test
fun clicks_are_batched() {
val appContext = InstrumentationRegistry.getInstrumentation().targetContext
EventPipeline.setup(appContext)

val clicks1 = listOf(
getClickPromoted(),
getClickOrganic(),
Expand All @@ -66,8 +96,14 @@ import org.junit.runner.RunWith

runBlocking {
EventPipeline.clear()
val job1 = EventPipeline.storeClick(ClickEvent(clicks1))
val job2 = EventPipeline.storeClick(ClickEvent(clicks2))
val job1 = EventPipeline.storeClick(
ClickEvent(clicks1),
shouldFlush = false
)
val job2 = EventPipeline.storeClick(
ClickEvent(clicks2),
shouldFlush = false
)

// Make sure they're persisted
job1.join()
Expand All @@ -76,15 +112,12 @@ import org.junit.runner.RunWith
val storedStr = EventPipeline.readClicks()
val storedDeserialized = Click.Factory.fromJsonArray(JSONArray("[$storedStr]"))

assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(clicks1+clicks2)
assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(clicks1 + clicks2)
}
}

@Test
fun purchases_are_batched() {
val appContext = InstrumentationRegistry.getInstrumentation().targetContext
EventPipeline.setup(appContext)

val purchases1 = listOf(
getRandomPurchase(),
getRandomPurchase(),
Expand All @@ -96,8 +129,14 @@ import org.junit.runner.RunWith

runBlocking {
EventPipeline.clear()
val job1 = EventPipeline.storePurchase(PurchaseEvent(purchases1))
val job2 = EventPipeline.storePurchase(PurchaseEvent(purchases2))
val job1 = EventPipeline.storePurchase(
PurchaseEvent(purchases1),
shouldFlush = false
)
val job2 = EventPipeline.storePurchase(
PurchaseEvent(purchases2),
shouldFlush = false
)

// Make sure they're persisted
job1.join()
Expand All @@ -106,15 +145,12 @@ import org.junit.runner.RunWith
val storedStr = EventPipeline.readPurchases()
val storedDeserialized = Purchase.fromJsonArray(JSONArray("[$storedStr]"))

assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(purchases1+purchases2)
assertThat(storedDeserialized).containsExactlyInAnyOrderElementsOf(purchases1 + purchases2)
}
}

@Test
fun aggregate_joins_Events() {
val appContext = InstrumentationRegistry.getInstrumentation().targetContext
EventPipeline.setup(appContext)

val impressions = listOf(
getImpressionPromoted(),
getImpressionOrganic()
Expand All @@ -137,9 +173,18 @@ import org.junit.runner.RunWith
clicks = clicks,
purchases = purchases
)
val job1 = EventPipeline.storeImpression(ImpressionEvent(impressions))
val job2 = EventPipeline.storeClick(ClickEvent(clicks))
val job3 = EventPipeline.storePurchase(PurchaseEvent(purchases))
val job1 = EventPipeline.storeImpression(
ImpressionEvent(impressions),
shouldFlush = false
)
val job2 = EventPipeline.storeClick(
ClickEvent(clicks),
shouldFlush = false
)
val job3 = EventPipeline.storePurchase(
PurchaseEvent(purchases),
shouldFlush = false
)

// Make sure they're persisted
job1.join()
Expand All @@ -151,4 +196,39 @@ import org.junit.runner.RunWith
assertThat(aggregated).isEqualTo(storedEvent)
}
}

@Test
fun events_are_uploaded() {
val impressions1 = listOf(
getImpressionPromoted(),
getImpressionOrganic()
)
val impressions2 = listOf(
getImpressionPromoted(),
getImpressionOrganic()
)

val aggregated = Event(
impressions = impressions1 + impressions2,
)

runBlocking {
EventPipeline.clear()

EventPipeline.storeImpression(ImpressionEvent(impressions1), shouldFlush = false)
EventPipeline.storeImpression(ImpressionEvent(impressions2))
.join()

// Wait for the upload work to be done
while (
workManager.getWorkInfos(WorkQuery.fromUniqueWorkNames("UPLOAD"))
.get().first().state != WorkInfo.State.SUCCEEDED
) {
yield()
}

}

assertThat(Logger.log).contains("uploading: ${aggregated.toJsonObject()}")
}
}
Loading

0 comments on commit 5c831fa

Please sign in to comment.