Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gthea committed Nov 20, 2024
1 parent 55003c3 commit 74a9281
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.split.android.client.service.impressions.observer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static helper.IntegrationHelper.ResponseClosure.getSinceFromUri;
Expand All @@ -26,6 +24,8 @@
import fake.HttpClientMock;
import fake.HttpResponseMock;
import fake.HttpResponseMockDispatcher;
import fake.LifecycleManagerStub;
import fake.SynchronizerSpyImpl;
import helper.DatabaseHelper;
import helper.FileHelper;
import helper.IntegrationHelper;
Expand All @@ -38,6 +38,7 @@
import io.split.android.client.impressions.Impression;
import io.split.android.client.impressions.ImpressionListener;
import io.split.android.client.service.impressions.ImpressionsMode;
import io.split.android.client.service.synchronizer.SynchronizerSpy;
import io.split.android.client.storage.db.ImpressionEntity;
import io.split.android.client.storage.db.SplitRoomDatabase;
import io.split.android.client.utils.Json;
Expand All @@ -49,13 +50,18 @@ public class DedupeIntegrationTest {
private AtomicInteger mImpressionsListenerCount;
private HttpClientMock mHttpClient;
private SplitRoomDatabase mDatabase;
private LifecycleManagerStub mLifecycleManager;
private SynchronizerSpy mSynchronizerSpy;

@Before
public void setUp() throws IOException {
mImpressionsListenerCount = new AtomicInteger(0);
mDatabase = DatabaseHelper.getTestDatabase(mContext);
mDatabase.clearAllTables();
mHttpClient = new HttpClientMock(getDispatcher());
mLifecycleManager = new LifecycleManagerStub();
mSynchronizerSpy = new SynchronizerSpyImpl();
mLifecycleManager.register(mSynchronizerSpy);
}

@Test
Expand Down Expand Up @@ -210,21 +216,27 @@ public void expiredObserverCacheValuesExistingInDatabaseAreRemovedOnStartup() th
}

// wait for them to expire
Thread.sleep(1000);
Thread.sleep(2000);

// initialize SDK
SplitClient client = initSplitFactory(new TestableSplitConfigBuilder()
.impressionsMode(ImpressionsMode.DEBUG)
.streamingEnabled(false)
.enableDebug()
.impressionsDedupeTimeInterval(1)
.observerCacheExpirationPeriod(100), mHttpClient).client();
Thread.sleep(200);

client.getTreatment("FACUNDO_TEST");
Thread.sleep(2000);
Thread.sleep(100);
mLifecycleManager.simulateOnPause();
Thread.sleep(500);
mLifecycleManager.simulateOnResume();

while (mDatabase.impressionsObserverCacheDao().getAll(5).size() > 1) {
Thread.sleep(100);
}
Thread.sleep(100);
Thread.sleep(1000);

int count = mDatabase.impressionsObserverCacheDao().getAll(3000).size();
assertEquals(1, count);
Expand Down Expand Up @@ -311,7 +323,8 @@ private SplitFactory initSplitFactory(TestableSplitConfigBuilder builder, HttpCl
builder.build(),
mContext,
httpClient,
mDatabase);
mDatabase, mSynchronizerSpy, null,
mLifecycleManager);

SplitClient client = factory.client();
client.on(SplitEvent.SDK_READY, new TestingHelper.TestEventTask(innerLatch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ImpressionsObserverCacheImplIntegrationTest {
@Before
public void setUp() {
ImpressionsObserverCacheDao impressionsObserverCacheDao = DatabaseHelper.getTestDatabase(InstrumentationRegistry.getInstrumentation().getContext()).impressionsObserverCacheDao();
mPersistentStorage = new SqlitePersistentImpressionsObserverCacheStorage(impressionsObserverCacheDao, 2000, 1, Executors.newSingleThreadScheduledExecutor(), new AtomicBoolean(false));
mPersistentStorage = new SqlitePersistentImpressionsObserverCacheStorage(impressionsObserverCacheDao, 2000, Executors.newSingleThreadScheduledExecutor(), new AtomicBoolean(false));
mCache = new ListenableLruCache<>(5, mPersistentStorage);
mImpressionsObserverCacheImpl = new ImpressionsObserverCacheImpl(mPersistentStorage, mCache);
}
Expand Down Expand Up @@ -88,6 +88,7 @@ public void getPutsValueInCacheWhenRetrievedFromPersistentStorage() throws Inter
@Test
public void putPutsValueInCacheAndPersistentStorage() throws InterruptedException {
mImpressionsObserverCacheImpl.put(1L, 2L);
mImpressionsObserverCacheImpl.persist();
Thread.sleep(100);

assertEquals(2L, mPersistentStorage.get(1L).longValue());
Expand All @@ -98,6 +99,7 @@ public void putPutsValueInCacheAndPersistentStorage() throws InterruptedExceptio
public void putUpdatesValueInCacheAndPersistentStorage() throws InterruptedException {
mImpressionsObserverCacheImpl.put(1L, 2L);
mImpressionsObserverCacheImpl.put(1L, 3L);
mImpressionsObserverCacheImpl.persist();
Thread.sleep(100);

assertEquals(3L, mPersistentStorage.get(1L).longValue());
Expand All @@ -106,6 +108,7 @@ public void putUpdatesValueInCacheAndPersistentStorage() throws InterruptedExcep

private void putInStorageAndWait() throws InterruptedException {
mPersistentStorage.put(1L, 2L);
mPersistentStorage.persist();
Thread.sleep(100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import androidx.test.platform.app.InstrumentationRegistry;

Expand Down Expand Up @@ -32,7 +34,7 @@ public class ImpressionsObserverTest {
@Before
public void setUp() {
ImpressionsObserverCacheDao dao = DatabaseHelper.getTestDatabase(InstrumentationRegistry.getInstrumentation().getContext()).impressionsObserverCacheDao();
mStorage = new SqlitePersistentImpressionsObserverCacheStorage(dao, 2000, 1, Executors.newSingleThreadScheduledExecutor(), new AtomicBoolean(false));
mStorage = new SqlitePersistentImpressionsObserverCacheStorage(dao, 2000, Executors.newSingleThreadScheduledExecutor(), new AtomicBoolean(false));
}

private List<Impression> generateImpressions(long count) {
Expand Down Expand Up @@ -89,6 +91,7 @@ public void testValuesArePersistedAcrossInstances() throws InterruptedException
// These are not in the cache, so they should return null
Long firstImp = observer.testAndSet(imp);
Long firstImp2 = observer.testAndSet(imp2);
observer.persist();
Thread.sleep(2);

// These are in the cache, so they should return a value
Expand Down Expand Up @@ -155,6 +158,15 @@ public void testAndSetWithNullImpressionReturnsNullPreviousTime() {
assertNull(observer.testAndSet(null));
}

@Test
public void persistCallsPersistOnStorage() {
ImpressionsObserverCache cache = mock(ImpressionsObserverCache.class);
ImpressionsObserver observer = new ImpressionsObserverImpl(cache);
observer.persist();

verify(cache).persist();
}

private void caller(ImpressionsObserver o, int count, ConcurrentLinkedQueue<Impression> imps) {

while (count-- > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import io.split.android.client.impressions.Impression;

public interface ImpressionsObserver {

@Nullable
Long testAndSet(Impression impression);

void persist();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ interface ImpressionsObserverCache {
Long get(long hash);

void put(long hash, long time);

void persist();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public void put(long hash, long time) {
}
}

@Override
public void persist() {
mLock.writeLock().lock();
try {
mPersistentStorage.persist();
} finally {
mLock.writeLock().unlock();
}
}

@Nullable
private Long getFromCache(long hash) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.split.android.client.service.impressions.observer;

import static io.split.android.client.utils.Utils.checkNotNull;

import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;

import io.split.android.client.impressions.Impression;
import io.split.android.client.service.impressions.ImpressionHasher;
Expand All @@ -10,7 +13,12 @@ public class ImpressionsObserverImpl implements ImpressionsObserver {
private final ImpressionsObserverCache mCache;

public ImpressionsObserverImpl(PersistentImpressionsObserverCacheStorage persistentStorage, int size) {
mCache = new ImpressionsObserverCacheImpl(persistentStorage, size);
this(new ImpressionsObserverCacheImpl(persistentStorage, size));
}

@VisibleForTesting
ImpressionsObserverImpl(ImpressionsObserverCache cache) {
mCache = checkNotNull(cache);
}

@Override
Expand All @@ -27,4 +35,9 @@ public Long testAndSet(Impression impression) {

return (previous == null ? null : Math.min(previous, impression.time()));
}

@Override
public void persist() {
mCache.persist();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ public class PeriodicPersistenceTask implements Runnable {
private final ImpressionsObserverCacheDao mImpressionsObserverCacheDao;
private final WeakReference<OnExecutedListener> mOnExecutedListener;

PeriodicPersistenceTask(Map<Long, Long> cache, ImpressionsObserverCacheDao impressionsObserverCacheDao, OnExecutedListener onExecutedListener
) {
PeriodicPersistenceTask(Map<Long, Long> cache, ImpressionsObserverCacheDao impressionsObserverCacheDao, OnExecutedListener onExecutedListener) {
mCache = cache;
mImpressionsObserverCacheDao = impressionsObserverCacheDao;
mOnExecutedListener = new WeakReference<>(onExecutedListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.split.android.client.storage.db.impressions.observer.ImpressionsObserverCacheDao;
Expand All @@ -21,11 +20,8 @@

public class SqlitePersistentImpressionsObserverCacheStorage implements PersistentImpressionsObserverCacheStorage {

private static final long DEFAULT_PERSISTENCE_DELAY = 1000;

private final ImpressionsObserverCacheDao mImpressionsObserverCacheDao;
private final long mExpirationPeriod;
private final long mPersistenceDelay;
private final ScheduledExecutorService mExecutorsService;
private final Map<Long, Long> mCache = new ConcurrentHashMap<>();
private final AtomicBoolean mDelayedSyncRunning;
Expand All @@ -35,7 +31,6 @@ public SqlitePersistentImpressionsObserverCacheStorage(@NonNull ImpressionsObser
long expirationPeriod) {
this(impressionsObserverCacheDao,
expirationPeriod,
DEFAULT_PERSISTENCE_DELAY,
new ScheduledThreadPoolExecutor(1,
new ThreadPoolExecutor.CallerRunsPolicy()),
new AtomicBoolean(false));
Expand All @@ -44,12 +39,10 @@ public SqlitePersistentImpressionsObserverCacheStorage(@NonNull ImpressionsObser
@VisibleForTesting
SqlitePersistentImpressionsObserverCacheStorage(@NonNull ImpressionsObserverCacheDao impressionsObserverCacheDao,
long expirationPeriod,
long persistenceDelay,
ScheduledExecutorService executorService,
AtomicBoolean delayedSyncRunning) {
mImpressionsObserverCacheDao = checkNotNull(impressionsObserverCacheDao);
mExpirationPeriod = expirationPeriod;
mPersistenceDelay = persistenceDelay;
mExecutorsService = executorService;
mDelayedSyncRunning = delayedSyncRunning;
mCallback = new PeriodicPersistenceTask.OnExecutedListener() {
Expand All @@ -65,13 +58,6 @@ public void onExecuted() {
@WorkerThread
public void put(long hash, long time) {
mCache.put(hash, time);

// If the task is not running, schedule it
if (mDelayedSyncRunning.compareAndSet(false, true)) {
mExecutorsService.schedule(new PeriodicPersistenceTask(mCache, mImpressionsObserverCacheDao, mCallback),
mPersistenceDelay,
TimeUnit.MILLISECONDS);
}
}

@Override
Expand All @@ -98,4 +84,11 @@ public void onRemoval(Long key) {
mCache.remove(key);
mImpressionsObserverCacheDao.delete(key);
}

@Override
public void persist() {
if (mDelayedSyncRunning.compareAndSet(false, true)) {
mExecutorsService.submit(new PeriodicPersistenceTask(mCache, mImpressionsObserverCacheDao, mCallback));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void startPeriodicRecording() {
@Override
public void stopPeriodicRecording() {
mDebugTracker.stopPeriodicRecording();
mImpressionsObserver.persist();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void startPeriodicRecording() {
@Override
public void stopPeriodicRecording() {
mOptimizedTracker.stopPeriodicRecording();
mImpressionsObserver.persist();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void setUp() {
});
mStorage = new SqlitePersistentImpressionsObserverCacheStorage(mImpressionsObserverCacheDao,
EXPIRATION_PERIOD,
1,
mExecutorService,
mDelayedSyncRunning);
}
Expand Down Expand Up @@ -98,7 +97,7 @@ public void putCallsPeriodicSync() {
}

@Test
public void multiplePutCallsScheduleOneTaskWhenPeriodicSyncIsTrue() {
public void multiplePersistCallsSubmitsOneTaskWhenPeriodicSyncIsTrue() {
when(mExecutorService.schedule(
(Runnable) argThat(argument -> argument instanceof PeriodicPersistenceTask),
anyLong(),
Expand All @@ -110,10 +109,10 @@ public void multiplePutCallsScheduleOneTaskWhenPeriodicSyncIsTrue() {

mStorage.put(1, 2);
mStorage.put(3, 4);
mStorage.persist();
mStorage.persist();

verify(mExecutorService, times(1)).schedule(
(Runnable) argThat(argument -> argument instanceof PeriodicPersistenceTask),
eq(1L),
eq(TimeUnit.MILLISECONDS));
verify(mExecutorService, times(1)).submit(
(Runnable) argThat(argument -> argument instanceof PeriodicPersistenceTask));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ class DebugStrategyTest {
verify(tracker).stopPeriodicRecording()
}

@Test
fun `stopPeriodicRecording calls persist on observer`() {
strategy.stopPeriodicRecording()

verify(impressionsObserver).persist()
}

@Test
fun `enableTracking calls enableTracking on tracker`() {
strategy.enableTracking(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ class OptimizedStrategyTest {
verify(tracker).stopPeriodicRecording()
}

@Test
fun `stopPeriodicRecording calls persist on ImpressionsObserver`() {
strategy.stopPeriodicRecording()

verify(impressionsObserver).persist()
}

@Test
fun `enableTracking calls enableTracking on tracker`() {
strategy.enableTracking(true)
Expand Down

0 comments on commit 74a9281

Please sign in to comment.