Skip to content

Commit

Permalink
NONE fetch algorithm support (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
gthea authored Aug 7, 2024
1 parent 394d2cd commit c7a80d7
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/instrumented.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches:
- 'development'
- 'master'
- '*_baseline'
# - '*_baseline' temporary disabled

jobs:
test:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public class MyLargeSegmentsResponse {
@SerializedName("myLargeSegments")
private List<String> myLargeSegments;

@SerializedName("changeNumber")
private Long changeNumber;
@SerializedName("till")
private Long till;

@NonNull
public List<String> getMyLargeSegments() {
Expand All @@ -23,7 +23,7 @@ public List<String> getMyLargeSegments() {
}

@Nullable
public Long getChangeNumber() {
return changeNumber;
public Long getTill() {
return till;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

public enum HashingAlgorithm {
@SerializedName("0")
MURMUR3_32,
NONE,
@SerializedName("1")
MURMUR3_32,
@SerializedName("2")
MURMUR3_64
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ public MyLargeSegmentsNotificationProcessorImpl(@NonNull NotificationParser noti
}

@Override
public void process(MyLargeSegmentChangeNotification notification) {
long syncDelay = mSyncDelayCalculator.calculateSyncDelay(mUserKey, notification.getUpdateIntervalMs(), notification.getAlgorithmSeed());
public void process(@NonNull MyLargeSegmentChangeNotification notification) {
long syncDelay = mSyncDelayCalculator.calculateSyncDelay(mUserKey,
notification.getUpdateIntervalMs(),
notification.getAlgorithmSeed(),
notification.getUpdateStrategy(),
notification.getHashingAlgorithm());

mProcessorHelper.processUpdate(notification.getUpdateStrategy(),
notification.getData(),
notification.getCompression(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.split.android.client.service.sseclient.notifications.mysegments;

import io.split.android.client.service.sseclient.notifications.HashingAlgorithm;
import io.split.android.client.service.sseclient.notifications.MySegmentUpdateStrategy;

interface SyncDelayCalculator {
long calculateSyncDelay(String mUserKey, Long updateIntervalMs, Integer algorithmSeed);
long calculateSyncDelay(String key, Long updateIntervalMs, Integer algorithmSeed, MySegmentUpdateStrategy updateStrategy, HashingAlgorithm hashingAlgorithm);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

import java.util.concurrent.TimeUnit;

import io.split.android.client.service.sseclient.notifications.HashingAlgorithm;
import io.split.android.client.service.sseclient.notifications.MySegmentUpdateStrategy;
import io.split.android.client.utils.MurmurHash3;

class SyncDelayCalculatorImpl implements SyncDelayCalculator {

public static final long DEFAULT_SYNC_INTERVAL_MS = TimeUnit.SECONDS.toMillis(60);

@Override
public long calculateSyncDelay(String key, Long updateIntervalMs, Integer algorithmSeed) {
public long calculateSyncDelay(String key, Long updateIntervalMs, Integer algorithmSeed, MySegmentUpdateStrategy updateStrategy, HashingAlgorithm hashingAlgorithm) {
boolean fetchNotification = updateStrategy == MySegmentUpdateStrategy.UNBOUNDED_FETCH_REQUEST ||
updateStrategy == MySegmentUpdateStrategy.BOUNDED_FETCH_REQUEST;
if (!fetchNotification || hashingAlgorithm == HashingAlgorithm.NONE) {
return 0L;
}

if (updateIntervalMs == null || updateIntervalMs <= 0) {
updateIntervalMs = DEFAULT_SYNC_INTERVAL_MS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class RetryBackoffCounterTimer implements SplitTaskExecutionListener {
private Long mInitialDelayInSeconds;
private SplitTask mTask;
private SplitTaskExecutionListener mListener;
private String mTaskId;
private volatile String mTaskId;

/**
* Creates an instance which retries tasks indefinitely, using the strategy defined by backoffCounter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
import static io.split.android.client.ServiceEndpoints.EndpointValidator.streamingEndpointIsOverridden;
import static io.split.android.client.ServiceEndpoints.EndpointValidator.telemetryEndpointIsOverridden;

import android.os.Build;

import androidx.annotation.NonNull;

import io.split.android.client.SplitClientConfig;
import io.split.android.client.service.impressions.ImpressionsMode;
import io.split.android.client.telemetry.model.Config;
import io.split.android.client.telemetry.model.RefreshRates;
import io.split.android.client.telemetry.model.UrlOverrides;
import io.split.android.client.utils.logger.Logger;

public class TelemetryConfigProviderImpl implements TelemetryConfigProvider {

Expand All @@ -36,6 +39,7 @@ public TelemetryConfigProviderImpl(@NonNull TelemetryStorageConsumer telemetryCo
@Override
public Config getConfigTelemetry() {
Config config = new Config();
addDefaultTags(mSplitClientConfig);
config.setStreamingEnabled(mSplitClientConfig.streamingEnabled());
config.setRefreshRates(buildRefreshRates(mSplitClientConfig));
config.setTags(mTelemetryConsumer.popTags());
Expand Down Expand Up @@ -87,4 +91,20 @@ private UrlOverrides buildUrlOverrides(SplitClientConfig splitClientConfig) {

return urlOverrides;
}

private void addDefaultTags(SplitClientConfig mSplitClientConfig) {
try {
TelemetryRuntimeProducer producer = (TelemetryRuntimeProducer) mTelemetryConsumer;
int sdkInt = Build.VERSION.SDK_INT;
if (sdkInt > 0) {
producer.addTag("av:" + sdkInt);
}

if (mSplitClientConfig.synchronizeInBackground()) {
producer.addTag("bgr:" + mSplitClientConfig.backgroundSyncPeriod());
}
} catch (ClassCastException ex) {
Logger.d("Telemetry storage is not a producer");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import static io.split.android.client.utils.Utils.checkNotNull;

import android.os.Build;

import androidx.annotation.NonNull;

import io.split.android.client.storage.mysegments.MySegmentsStorageContainer;
import io.split.android.client.storage.splits.SplitsStorage;
import io.split.android.client.telemetry.model.EventsDataRecordsEnum;
import io.split.android.client.telemetry.model.ImpressionsDataType;
import io.split.android.client.telemetry.model.Stats;
import io.split.android.client.utils.logger.Logger;

public class TelemetryStatsProviderImpl implements TelemetryStatsProvider {

Expand Down Expand Up @@ -49,6 +52,7 @@ public void clearStats() {

private Stats buildStats() {
Stats stats = new Stats();
addDefaultTags();

stats.setStreamingEvents(mTelemetryStorageConsumer.popStreamingEvents());
stats.setSplitCount(mSplitsStorage.getAll().size());
Expand All @@ -74,4 +78,16 @@ private Stats buildStats() {

return stats;
}

private void addDefaultTags() {
try {
TelemetryRuntimeProducer producer = (TelemetryRuntimeProducer) mTelemetryStorageConsumer;
int sdkInt = Build.VERSION.SDK_INT;
if (sdkInt > 0) {
producer.addTag("av:" + sdkInt);
}
} catch (ClassCastException ex) {
Logger.d("Telemetry storage is not a producer");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.split.android.client.service.executor.SplitTask;
import io.split.android.client.service.executor.SplitTaskBatchItem;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void simpleScheduled() throws InterruptedException {
latch.await(10, TimeUnit.SECONDS);

Assert.assertTrue(task.taskHasBeenCalled);
Assert.assertEquals(4, task.callCount);
Assert.assertEquals(4, task.callCount.get());
}

@Test
Expand All @@ -152,7 +153,7 @@ public void multipleScheduled() throws InterruptedException {
for (int i = 0; i < taskCount; i++) {
Assert.assertTrue(taskList.get(i).taskHasBeenCalled);
}
Assert.assertTrue(4 < taskList.get(0).callCount);
Assert.assertTrue(4 < taskList.get(0).callCount.get());
}

@Test
Expand Down Expand Up @@ -192,22 +193,22 @@ public void pauseScheduled() throws InterruptedException {
mTaskExecutor.schedule(
task1, 6L, 20, mock(SplitTaskExecutionListener.class));
latch.await(5, TimeUnit.SECONDS);
int countBeforePause = task.callCount;
int count1BeforePause = task1.callCount;
int countBeforePause = task.callCount.get();
int count1BeforePause = task1.callCount.get();

mTaskExecutor.pause();

// Using sleep to make a pause in this thread
// then resumes task executor
// and wait for task 1 latch
sleep(3);
int countAfterPause = task.callCount;
int count1AfterPause = task1.callCount;
int countAfterPause = task.callCount.get();
int count1AfterPause = task1.callCount.get();

mTaskExecutor.resume();
latch1.await(5, TimeUnit.SECONDS);

int count1AfterResume = task1.callCount;
int count1AfterResume = task1.callCount.get();

Assert.assertEquals(3, countBeforePause);
Assert.assertEquals(0, count1BeforePause);
Expand Down Expand Up @@ -250,7 +251,7 @@ public void exceptionInScheduled() throws InterruptedException {
latch.await(10, TimeUnit.SECONDS);

Assert.assertTrue(task.taskHasBeenCalled);
Assert.assertEquals(4, task.callCount);
Assert.assertEquals(4, task.callCount.get());
}

@Test
Expand Down Expand Up @@ -278,7 +279,7 @@ public void stopStartedTask() throws InterruptedException {

assertTrue(task.taskHasBeenCalled);
assertTrue(testListener.taskExecutedCalled);
assertEquals(2, task.callCount);
assertEquals(2, task.callCount.get());
}

@Test
Expand All @@ -293,7 +294,7 @@ public void scheduleDelayedTask() throws InterruptedException {
listenerLatch.await(2L, TimeUnit.SECONDS);
assertTrue(task.taskHasBeenCalled);
assertTrue(testListener.taskExecutedCalled);
assertEquals(1, task.callCount);
assertEquals(1, task.callCount.get());
}

@Test
Expand All @@ -314,7 +315,7 @@ public void schedulingAfterShutdown() throws InterruptedException {

assertTrue(task.taskHasBeenCalled);
assertTrue(testListener.taskExecutedCalled);
assertEquals(2, task.callCount);
assertEquals(2, task.callCount.get());
assertFalse(newTask.taskHasBeenCalled);
}

Expand All @@ -338,13 +339,13 @@ static class TestTask implements SplitTask {
}

public boolean shouldThrowException = false;
public int callCount = 0;
public AtomicInteger callCount = new AtomicInteger(0);
public boolean taskHasBeenCalled = false;

@NonNull
@Override
public SplitTaskExecutionInfo execute() {
callCount++;
callCount.incrementAndGet();
taskHasBeenCalled = true;
latch.countDown();
if (shouldThrowException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class NotificationParserTest {

private final static String ERROR = "{\"id\":\"null\",\"name\":\"error\",\"comment\":\"[no comments]\",\"data\":\"{\\\"message\\\":\\\"Invalid token; capability must be a string\\\",\\\"code\\\":40144,\\\"statusCode\\\":400,\\\"href\\\":\\\"https://help.ably.io/error/40144\\\"}\"}";

private static final String MY_LARGE_SEGMENTS_UPDATE = "{\"id\": \"diSrQttrC9:0:0\",\"clientId\": \"pri:MjcyNDE2NDUxMA==\",\"timestamp\": 1702507131100,\"encoding\": \"json\",\"channel\": \"NzM2MDI5Mzc0_MTc1MTYwODQxMQ==_mylargesegments\",\"data\": \"{\\\"type\\\":\\\"MY_LARGE_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1702507130121,\\\"largeSegments\\\":[\\\"android_test\\\"],\\\"c\\\":2,\\\"u\\\":2,\\\"d\\\":\\\"eJwEwLsRwzAMA9BdWKsg+IFBraJTkRXS5rK7388+tg+KdC8+jq4eBBQLFcUnO8FAAC36gndOSEyFqJFP32Vf2+f+3wAAAP//hUQQ9A==\\\",\\\"i\\\":100,\\\"h\\\":0,\\\"s\\\":325}\"}";
private static final String MY_LARGE_SEGMENTS_UPDATE = "{\"id\": \"diSrQttrC9:0:0\",\"clientId\": \"pri:MjcyNDE2NDUxMA==\",\"timestamp\": 1702507131100,\"encoding\": \"json\",\"channel\": \"NzM2MDI5Mzc0_MTc1MTYwODQxMQ==_mylargesegments\",\"data\": \"{\\\"type\\\":\\\"MY_LARGE_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1702507130121,\\\"largeSegments\\\":[\\\"android_test\\\"],\\\"c\\\":2,\\\"u\\\":2,\\\"d\\\":\\\"eJwEwLsRwzAMA9BdWKsg+IFBraJTkRXS5rK7388+tg+KdC8+jq4eBBQLFcUnO8FAAC36gndOSEyFqJFP32Vf2+f+3wAAAP//hUQQ9A==\\\",\\\"i\\\":100,\\\"h\\\":1,\\\"s\\\":325}\"}";

@Before
public void setup() {
Expand Down Expand Up @@ -170,7 +170,7 @@ public void parseMyLargeSegmentsIncomingNotification() {
IncomingNotification incoming = mParser.parseIncoming(MY_LARGE_SEGMENTS_UPDATE);

assertEquals(MY_LARGE_SEGMENT_UPDATE, incoming.getType());
assertEquals("{\"type\":\"MY_LARGE_SEGMENT_UPDATE\",\"changeNumber\":1702507130121,\"largeSegments\":[\"android_test\"],\"c\":2,\"u\":2,\"d\":\"eJwEwLsRwzAMA9BdWKsg+IFBraJTkRXS5rK7388+tg+KdC8+jq4eBBQLFcUnO8FAAC36gndOSEyFqJFP32Vf2+f+3wAAAP//hUQQ9A==\",\"i\":100,\"h\":0,\"s\":325}", incoming.getJsonData());
assertEquals("{\"type\":\"MY_LARGE_SEGMENT_UPDATE\",\"changeNumber\":1702507130121,\"largeSegments\":[\"android_test\"],\"c\":2,\"u\":2,\"d\":\"eJwEwLsRwzAMA9BdWKsg+IFBraJTkRXS5rK7388+tg+KdC8+jq4eBBQLFcUnO8FAAC36gndOSEyFqJFP32Vf2+f+3wAAAP//hUQQ9A==\",\"i\":100,\"h\":1,\"s\":325}", incoming.getJsonData());
assertEquals("NzM2MDI5Mzc0_MTc1MTYwODQxMQ==_mylargesegments", incoming.getChannel());
assertEquals(1702507131100L, incoming.getTimestamp());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.split.android.client.service.sseclient.notifications.mysegments;

import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -13,6 +12,7 @@
import java.util.concurrent.BlockingQueue;

import io.split.android.client.common.CompressionType;
import io.split.android.client.service.sseclient.notifications.HashingAlgorithm;
import io.split.android.client.service.sseclient.notifications.MyLargeSegmentChangeNotification;
import io.split.android.client.service.sseclient.notifications.MySegmentUpdateStrategy;

Expand Down Expand Up @@ -40,11 +40,12 @@ public void processDelegatesToHelper() {
MyLargeSegmentChangeNotification notification = mock(MyLargeSegmentChangeNotification.class);
when(notification.getCompression()).thenReturn(CompressionType.GZIP);
when(notification.getData()).thenReturn("dummy");
when(notification.getHashingAlgorithm()).thenReturn(HashingAlgorithm.MURMUR3_32);
when(notification.getLargeSegments()).thenReturn(new HashSet<>(Arrays.asList("segment1", "segment2")));
when(notification.getUpdateIntervalMs()).thenReturn(1000L);
when(notification.getAlgorithmSeed()).thenReturn(1234);
when(notification.getUpdateStrategy()).thenReturn(MySegmentUpdateStrategy.BOUNDED_FETCH_REQUEST);
when(mSyncDelayCalculator.calculateSyncDelay("key", 1000L, 1234)).thenReturn(25L);
when(mSyncDelayCalculator.calculateSyncDelay("key", 1000L, 1234, MySegmentUpdateStrategy.BOUNDED_FETCH_REQUEST, HashingAlgorithm.MURMUR3_32)).thenReturn(25L);

mNotificationProcessor.process(notification);

Expand All @@ -61,10 +62,12 @@ public void delayIsCalculatedWithCalculator() {
MyLargeSegmentChangeNotification notification = mock(MyLargeSegmentChangeNotification.class);
when(notification.getUpdateIntervalMs()).thenReturn(1000L);
when(notification.getAlgorithmSeed()).thenReturn(1234);
when(mSyncDelayCalculator.calculateSyncDelay("key", 1000L, 1234)).thenReturn(25L);
when(notification.getUpdateStrategy()).thenReturn(MySegmentUpdateStrategy.UNBOUNDED_FETCH_REQUEST);
when(notification.getHashingAlgorithm()).thenReturn(HashingAlgorithm.MURMUR3_32);
when(mSyncDelayCalculator.calculateSyncDelay("key", 1000L, 1234, MySegmentUpdateStrategy.UNBOUNDED_FETCH_REQUEST, HashingAlgorithm.MURMUR3_32)).thenReturn(25L);

mNotificationProcessor.process(notification);

verify(mSyncDelayCalculator).calculateSyncDelay("key", 1000L, 1234);
verify(mSyncDelayCalculator).calculateSyncDelay("key", 1000L, 1234, MySegmentUpdateStrategy.UNBOUNDED_FETCH_REQUEST, HashingAlgorithm.MURMUR3_32);
}
}
Loading

0 comments on commit c7a80d7

Please sign in to comment.