Skip to content

Commit

Permalink
Memberships sync improvements (#697)
Browse files Browse the repository at this point in the history
  • Loading branch information
gthea authored Sep 3, 2024
2 parents 602527c + bba392a commit 5563a5a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,20 @@ public SplitTaskExecutionInfo execute() {
long startTime = System.currentTimeMillis();
long latency = 0;
try {
// if target change number is outdated, we don't need to fetch
if (targetChangeNumberIsOutdated()) {
Logger.v("Target CN is outdated. Skipping membership fetch");
return SplitTaskExecutionInfo.success(mTaskType);
}

fetch(mOnDemandFetchBackoffMaxRetries);

long now = System.currentTimeMillis();
latency = now - startTime;

mTelemetryRuntimeProducer.recordSuccessfulSync(mTelemetryOperationType, now);
} catch (HttpFetcherException e) {
logError("Network error while retrieving my segments: " + e.getLocalizedMessage());
logError("Network error while retrieving memberships: " + e.getLocalizedMessage());
mTelemetryRuntimeProducer.recordSyncError(mTelemetryOperationType, e.getHttpStatus());

if (HttpStatus.isNotRetryable(HttpStatus.fromCode(e.getHttpStatus()))) {
Expand All @@ -136,7 +142,7 @@ public SplitTaskExecutionInfo execute() {

return SplitTaskExecutionInfo.error(mTaskType);
} catch (Exception e) {
logError("Unknown error while retrieving my segments: " + e.getLocalizedMessage());
logError("Unknown error while retrieving memberships: " + e.getLocalizedMessage());
return SplitTaskExecutionInfo.error(mTaskType);
} finally {
mTelemetryRuntimeProducer.recordSyncLatency(mTelemetryOperationType, latency);
Expand All @@ -145,6 +151,32 @@ public SplitTaskExecutionInfo execute() {
return SplitTaskExecutionInfo.success(mTaskType);
}

private boolean targetChangeNumberIsOutdated() {
long segmentsTarget = Utils.getOrDefault(mTargetSegmentsChangeNumber, -1L);
long largeSegmentsTarget = Utils.getOrDefault(mTargetLargeSegmentsChangeNumber, -1L);

long msStorageChangeNumber = mMySegmentsStorage.getTill();
long lsStorageChangeNumber = mMyLargeSegmentsStorage.getTill();

// In case both targets are present, both CN in storage should be newer for the targets to be considered outdated
if (mTargetSegmentsChangeNumber != null && mTargetLargeSegmentsChangeNumber != null) {
return segmentsTarget <= msStorageChangeNumber && largeSegmentsTarget <= lsStorageChangeNumber;
}

// If only LS target is set, there's no need to check MS storage CN
if (mTargetLargeSegmentsChangeNumber != null) {
return largeSegmentsTarget <= lsStorageChangeNumber;
}

// If only MS target is set, there's no need to check LS storage CN
if (mTargetSegmentsChangeNumber != null) {
return segmentsTarget <= msStorageChangeNumber;
}

// If no targets are set, consider it not outdated
return false;
}

private void fetch(int initialRetries) throws HttpFetcherException, InterruptedException {
int remainingRetries = initialRetries;
mBackoffCounter.resetCounter();
Expand Down Expand Up @@ -218,7 +250,7 @@ private static UpdateSegmentsResult updateSegments(SegmentsChange segmentsChange
}

private void logError(String message) {
Logger.e("Error while executing my segments sync task: " + message);
Logger.e("Error while executing memberships sync task: " + message);
}

private @Nullable Map<String, String> getHeaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ public String getData() {
return data;
}

@Nullable
public Long getUpdateIntervalMs() {
return updateIntervalMs;
}

@Nullable
public HashingAlgorithm getHashingAlgorithm() {
return hashingAlgorithm;
}

@Nullable
public Integer getAlgorithmSeed() {
return algorithmSeed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void handleIncomingMessage(Map<String, String> values) {
}
break;
default:
Logger.w("SSE Handler: Unknown notification");
Logger.w("SSE Handler: Unknown notification: " + incomingNotification.getType());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,62 @@ public void segmentsTargetIsUsedForCdnBypassWhenLargeSegmentsChangeNumberIsSmall
verify(mMySegmentsFetcher).execute(Collections.singletonMap("till", 5L), null);
}

@Test
public void noFetchWhenSegmentsChangeNumberInStorageIsNewerThanTarget() throws HttpFetcherException {
long targetSegmentsChangeNumber = 5L;
when(mySegmentsStorage.getTill()).thenReturn(6L);
when(myLargeSegmentsStorage.getTill()).thenReturn(5L);

mTask = new MySegmentsSyncTask(mMySegmentsFetcher, mySegmentsStorage, myLargeSegmentsStorage, false, mEventsManager, mMySegmentsChangeChecker, mTelemetryRuntimeProducer, MySegmentsSyncTaskConfig.get(), targetSegmentsChangeNumber, null, mock(BackoffCounter.class), 1);
mTask.execute();

verify(mMySegmentsFetcher, never()).execute(any(), any());
}

@Test
public void noFetchWhenLargeSegmentsChangeNumberIsNewerThanTarget() throws HttpFetcherException {
long targetLargeSegmentsChangeNumber = 4L;
when(mySegmentsStorage.getTill()).thenReturn(3L);
when(myLargeSegmentsStorage.getTill()).thenReturn(5L);

mTask = new MySegmentsSyncTask(mMySegmentsFetcher, mySegmentsStorage, myLargeSegmentsStorage, false, mEventsManager, mMySegmentsChangeChecker, mTelemetryRuntimeProducer, MySegmentsSyncTaskConfig.get(), null, targetLargeSegmentsChangeNumber, mock(BackoffCounter.class), 1);
mTask.execute();

verify(mMySegmentsFetcher, never()).execute(any(), any());
}

@Test
public void fetchWhenSegmentsChangeNumberInStorageIsNewerThanTargetAndLargeSegmentsChangeNumberIsOlder() throws HttpFetcherException {
long targetSegmentsChangeNumber = 5L;
long targetLargeSegmentsChangeNumber = 4L;
when(mySegmentsStorage.getTill()).thenReturn(6L);
when(myLargeSegmentsStorage.getTill()).thenReturn(3L);
when(mMySegmentsFetcher.execute(noParams, null))
.thenReturn(createChange(6L, 4L));

mTask = new MySegmentsSyncTask(mMySegmentsFetcher, mySegmentsStorage, myLargeSegmentsStorage, false, mEventsManager, mMySegmentsChangeChecker, mTelemetryRuntimeProducer, MySegmentsSyncTaskConfig.get(), targetSegmentsChangeNumber, targetLargeSegmentsChangeNumber, mock(BackoffCounter.class), 1);
mTask.execute();

verify(mMySegmentsFetcher).execute(any(), any());
verify(mMySegmentsFetcher).execute(noParams, null);
}

@Test
public void fetchIsPerformedWhenLargeSegmentsChangeNumberInStorageIsNewerThanTargetAndSegmentsChangeNumberIsOlder() throws HttpFetcherException {
long targetSegmentsChangeNumber = 5L;
long targetLargeSegmentsChangeNumber = 4L;
when(mySegmentsStorage.getTill()).thenReturn(3L);
when(myLargeSegmentsStorage.getTill()).thenReturn(6L);
when(mMySegmentsFetcher.execute(noParams, null))
.thenReturn(createChange(5L, 6L));

mTask = new MySegmentsSyncTask(mMySegmentsFetcher, mySegmentsStorage, myLargeSegmentsStorage, false, mEventsManager, mMySegmentsChangeChecker, mTelemetryRuntimeProducer, MySegmentsSyncTaskConfig.get(), targetSegmentsChangeNumber, targetLargeSegmentsChangeNumber, mock(BackoffCounter.class), 1);
mTask.execute();

verify(mMySegmentsFetcher).execute(any(), any());
verify(mMySegmentsFetcher).execute(noParams, null);
}

@NonNull
private static AllSegmentsChange createChange(Long msChangeNumber, Long lsChangeNumber) {
return AllSegmentsChange.create(
Expand Down

0 comments on commit 5563a5a

Please sign in to comment.