Skip to content

Commit

Permalink
add cohortRequestDelayMillis, use sleep util, skip retry if maxCohort…
Browse files Browse the repository at this point in the history
…Size error
  • Loading branch information
zhukaihan committed Jul 30, 2024
1 parent 1010ec4 commit 13913f1
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 39 deletions.
1 change: 1 addition & 0 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class LocalEvaluationClient {
httpClient,
this.config.cohortConfig?.cohortServerUrl,
this.config.cohortConfig?.maxCohortSize,
this.config.cohortConfig?.cohortRequestDelayMillis,
this.config.debug,
);
this.cohortUpdater = new CohortPoller(
Expand Down
10 changes: 8 additions & 2 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export interface CohortApi {
getCohort(options?: GetCohortOptions): Promise<Cohort>;
}

export class CohortMaxSizeExceededError extends Error {}

export class CohortDownloadError extends Error {}

export class SdkCohortApi implements CohortApi {
private readonly cohortApiKey;
private readonly serverUrl;
Expand Down Expand Up @@ -72,9 +76,11 @@ export class SdkCohortApi implements CohortApi {
} else if (response.status == 204) {
return undefined;
} else if (response.status == 413) {
throw Error(`Cohort error response: size > ${options.maxCohortSize}`);
throw new CohortMaxSizeExceededError(
`Cohort error response: size > ${options.maxCohortSize}`,
);
} else {
throw Error(
throw new CohortDownloadError(
`Cohort error response: status ${response.status}, body ${response.body}`,
);
}
Expand Down
47 changes: 23 additions & 24 deletions packages/node/src/local/cohort/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@ import { WrapperClient } from '../../transport/http';
import { Cohort } from '../../types/cohort';
import { CohortConfigDefaults } from '../../types/config';
import { HttpClient } from '../../types/transport';
import { BackoffPolicy, doWithBackoffFailLoudly } from '../../util/backoff';
import { BackoffPolicy } from '../../util/backoff';
import { ConsoleLogger, Logger } from '../../util/logger';
import { Mutex, Executor } from '../../util/threading';
import { sleep } from '../../util/time';

import { SdkCohortApi } from './cohort-api';
import { CohortMaxSizeExceededError, SdkCohortApi } from './cohort-api';

export const COHORT_CONFIG_TIMEOUT = 20000;

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 3,
min: 1000,
max: 1000,
scalar: 1,
};
const ATTEMPTS = 3;

export class CohortFetcher {
private readonly logger: Logger;

readonly cohortApi: SdkCohortApi;
readonly maxCohortSize: number;
readonly cohortRequestDelayMillis: number;

private readonly inProgressCohorts: Record<
string,
Expand All @@ -37,6 +34,7 @@ export class CohortFetcher {
httpClient: HttpClient,
serverUrl = CohortConfigDefaults.cohortServerUrl,
maxCohortSize = CohortConfigDefaults.maxCohortSize,
cohortRequestDelayMillis = 100,
debug = false,
) {
this.cohortApi = new SdkCohortApi(
Expand All @@ -45,6 +43,7 @@ export class CohortFetcher {
new WrapperClient(httpClient),
);
this.maxCohortSize = maxCohortSize;
this.cohortRequestDelayMillis = cohortRequestDelayMillis;
this.logger = new ConsoleLogger(debug);
}

Expand All @@ -63,32 +62,32 @@ export class CohortFetcher {
if (!this.inProgressCohorts[key]) {
this.inProgressCohorts[key] = this.executor.run(async () => {
this.logger.debug('Start downloading', cohortId);
const cohort = await doWithBackoffFailLoudly<Cohort>(
async () =>
this.cohortApi.getCohort({
for (let i = 0; i < ATTEMPTS; i++) {
try {
const cohort = await this.cohortApi.getCohort({
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
cohortId: cohortId,
maxCohortSize: this.maxCohortSize,
lastModified: lastModified,
timeoutMillis: COHORT_CONFIG_TIMEOUT,
}),
BACKOFF_POLICY,
)
.then(async (cohort) => {
});
// Do unlock before return.
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[key];
unlock();
this.logger.debug('Stop downloading', cohortId);
return cohort;
})
.catch(async (err) => {
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[key];
unlock();
throw err;
});
this.logger.debug('Stop downloading', cohortId);
return cohort;
} catch (e) {
if (i === ATTEMPTS - 1 || e instanceof CohortMaxSizeExceededError) {
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[key];
unlock();
throw e;
}
await sleep(this.cohortRequestDelayMillis);
}
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/local/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export class FlagConfigStreamer
this.stream.onInitUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
await super._update(flagConfigs, true, onChange);
this.logger.debug('[Experiment] streamer - start flags stream success');
};
this.stream.onUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
Expand All @@ -94,7 +95,6 @@ export class FlagConfigStreamer
libraryVersion: PACKAGE_VERSION,
});
this.poller.stop();
this.logger.debug('[Experiment] streamer - start flags stream success');
} catch (e) {
const err = e as StreamErrorEvent;
this.logger.debug(
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ export type CohortConfig = {
* size will be skipped.
*/
maxCohortSize?: number;

cohortRequestDelayMillis?: number;
};

/**
Expand Down Expand Up @@ -249,6 +251,7 @@ export const CohortConfigDefaults: Omit<CohortConfig, 'apiKey' | 'secretKey'> =
{
cohortServerUrl: 'https://cohort-v2.lab.amplitude.com',
maxCohortSize: 10_000_000,
cohortRequestDelayMillis: 100,
};

export const EU_SERVER_URLS = {
Expand Down
40 changes: 36 additions & 4 deletions packages/node/test/local/cohort/cohortFetcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { SdkCohortApi } from 'src/local/cohort/cohort-api';
import {
CohortMaxSizeExceededError,
SdkCohortApi,
} from 'src/local/cohort/cohort-api';
import { COHORT_CONFIG_TIMEOUT, CohortFetcher } from 'src/local/cohort/fetcher';
import { CohortConfigDefaults } from 'src/types/config';
import { sleep } from 'src/util/time';

import { version as PACKAGE_VERSION } from '../../../gen/version';

Expand Down Expand Up @@ -34,7 +38,7 @@ const COHORTS = {
},
};

afterEach(() => {
beforeEach(() => {
jest.clearAllMocks();
});

Expand Down Expand Up @@ -109,10 +113,38 @@ test('cohort fetch failed', async () => {
throw Error();
});

const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10);
const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10, 100);
const cohortPromise = cohortFetcher.fetch('c1', 10);
await sleep(10);
expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(1);
await sleep(100);
expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(2);

await expect(cohortPromise).rejects.toThrowError();

expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(3);

expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({
cohortId: 'c1',
lastModified: 10,
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
maxCohortSize: 10,
timeoutMillis: COHORT_CONFIG_TIMEOUT,
});
});

test('cohort fetch maxSize exceeded, no retry', async () => {
const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort');
cohortApiGetCohortSpy.mockImplementation(async () => {
throw new CohortMaxSizeExceededError();
});

await expect(cohortFetcher.fetch('c1', 10)).rejects.toThrowError();
const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10, 100);
const cohortPromise = cohortFetcher.fetch('c1', 10);

await expect(cohortPromise).rejects.toThrowError();
expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(1);
expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({
cohortId: 'c1',
lastModified: 10,
Expand Down
3 changes: 1 addition & 2 deletions packages/node/test/local/cohort/cohortPoller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { CohortFetcher } from 'src/local/cohort/fetcher';
import { CohortPoller } from 'src/local/cohort/poller';
import { InMemoryCohortStorage } from 'src/local/cohort/storage';
import { CohortStorage } from 'src/types/cohort';
import { sleep } from 'src/util/time';

const OLD_COHORTS = {
c1: {
Expand Down Expand Up @@ -64,8 +65,6 @@ const NEW_COHORTS = {
},
};

const sleep = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const POLL_MILLIS = 500;
let storage: CohortStorage;
let fetcher: CohortFetcher;
Expand Down
5 changes: 3 additions & 2 deletions packages/node/test/local/flagConfigPoller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import { SdkCohortApi } from 'src/local/cohort/cohort-api';
import { CohortFetcher } from 'src/local/cohort/fetcher';
import { InMemoryCohortStorage } from 'src/local/cohort/storage';
import { sleep } from 'src/util/time';

import { FLAGS, NEW_FLAGS } from './util/flags';
import { MockHttpClient } from './util/mockHttpClient';
Expand Down Expand Up @@ -76,7 +77,7 @@ test('flagConfig poller success', async () => {
expect(cohortStorage.getCohort('hahaorgname1').lastModified).toBe(1);

// On update, flag, existing cohort doesn't update.
await new Promise((f) => setTimeout(f, 2000));
await sleep(2000);
expect(flagPolled).toBe(2);
expect(await poller.cache.getAll()).toStrictEqual({
...NEW_FLAGS,
Expand Down Expand Up @@ -182,7 +183,7 @@ test('flagConfig poller initial success, polling error and use old flags', async

// Second poll flags with new cohort should fail when new cohort download failed.
// The different flag should not be updated.
await new Promise((f) => setTimeout(f, 2000));
await sleep(2000);
expect(flagPolled).toBeGreaterThanOrEqual(2);
expect(await poller.cache.getAll()).toStrictEqual(FLAGS);

Expand Down
7 changes: 7 additions & 0 deletions packages/node/test/local/flagConfigStreamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const getTestObjs = ({
streamFlagRetryDelayMillis = 15000,
apiKey = 'client-xxxx',
serverUrl = 'http://localhostxxxx:00000000',
cohortFetcherDelayMillis = 100,
fetcherData = [
'[{"key": "fetcher-a", "variants": {}, "segments": []}]',
'[{"key": "fetcher-b", "variants": {}, "segments": []}]',
Expand All @@ -44,6 +45,8 @@ const getTestObjs = ({
'apikey',
'secretkey',
new MockHttpClient(async () => ({ status: 200, body: '' })),
serverUrl,
cohortFetcherDelayMillis,
),
};
let dataI = 0;
Expand Down Expand Up @@ -674,18 +677,22 @@ test('FlagConfigUpdater.connect, flag success, cohort fail, retry fail, initiali
streamFlagTryAttempts: 2,
streamFlagTryDelayMillis: 1000,
streamFlagRetryDelayMillis: 100000,
debug: true,
});
// Return cohort with their own cohortId.
updater.start();
await mockClient.client.doOpen({ type: 'open' });
await mockClient.client.doMsg({
data: getFlagWithCohort('cohort1'),
});
await new Promise((resolve) => setTimeout(resolve, 250)); // Wait for cohort download done retries and fails.
await new Promise((resolve) => setTimeout(resolve, 1050)); // Wait for retry stream.
// Second try
await mockClient.client.doOpen({ type: 'open' });
await mockClient.client.doMsg({
data: getFlagWithCohort('cohort1'),
});
await new Promise((resolve) => setTimeout(resolve, 250)); // Wait for cohort download done retries and fails.

expect(fetchObj.fetchCalls).toBeGreaterThanOrEqual(1);
expect(mockClient.numCreated).toBe(2);
Expand Down
5 changes: 1 addition & 4 deletions packages/node/test/util/threading.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { Executor, Mutex, Semaphore } from 'src/util/threading';

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
import { sleep } from 'src/util/time';

function mutexSleepFunc(lock, ms, acc) {
return async () => {
Expand Down

0 comments on commit 13913f1

Please sign in to comment.