diff --git a/packages/node/src/local/client.ts b/packages/node/src/local/client.ts index 8dc393c..7cd4db6 100644 --- a/packages/node/src/local/client.ts +++ b/packages/node/src/local/client.ts @@ -96,6 +96,7 @@ export class LocalEvaluationClient { httpClient, this.config.cohortConfig?.cohortServerUrl, this.config.cohortConfig?.maxCohortSize, + this.config.cohortConfig?.cohortRequestDelayMillis, this.config.debug, ); this.cohortUpdater = new CohortPoller( diff --git a/packages/node/src/local/cohort/cohort-api.ts b/packages/node/src/local/cohort/cohort-api.ts index 3986499..a033d6c 100644 --- a/packages/node/src/local/cohort/cohort-api.ts +++ b/packages/node/src/local/cohort/cohort-api.ts @@ -24,6 +24,10 @@ export interface CohortApi { getCohort(options?: GetCohortOptions): Promise; } +export class CohortMaxSizeExceededError extends Error {} + +export class CohortDownloadError extends Error {} + export class SdkCohortApi implements CohortApi { private readonly cohortApiKey; private readonly serverUrl; @@ -72,9 +76,11 @@ export class SdkCohortApi implements CohortApi { } else if (response.status == 204) { return undefined; } else if (response.status == 413) { - throw Error(`Cohort error response: size > ${options.maxCohortSize}`); + throw new CohortMaxSizeExceededError( + `Cohort error response: size > ${options.maxCohortSize}`, + ); } else { - throw Error( + throw new CohortDownloadError( `Cohort error response: status ${response.status}, body ${response.body}`, ); } diff --git a/packages/node/src/local/cohort/fetcher.ts b/packages/node/src/local/cohort/fetcher.ts index 3673995..bff6538 100644 --- a/packages/node/src/local/cohort/fetcher.ts +++ b/packages/node/src/local/cohort/fetcher.ts @@ -3,26 +3,23 @@ import { WrapperClient } from '../../transport/http'; import { Cohort } from '../../types/cohort'; import { CohortConfigDefaults } from '../../types/config'; import { HttpClient } from '../../types/transport'; -import { BackoffPolicy, doWithBackoffFailLoudly } from '../../util/backoff'; +import { BackoffPolicy } from '../../util/backoff'; import { ConsoleLogger, Logger } from '../../util/logger'; import { Mutex, Executor } from '../../util/threading'; +import { sleep } from '../../util/time'; -import { SdkCohortApi } from './cohort-api'; +import { CohortMaxSizeExceededError, SdkCohortApi } from './cohort-api'; export const COHORT_CONFIG_TIMEOUT = 20000; -const BACKOFF_POLICY: BackoffPolicy = { - attempts: 3, - min: 1000, - max: 1000, - scalar: 1, -}; +const ATTEMPTS = 3; export class CohortFetcher { private readonly logger: Logger; readonly cohortApi: SdkCohortApi; readonly maxCohortSize: number; + readonly cohortRequestDelayMillis: number; private readonly inProgressCohorts: Record< string, @@ -37,6 +34,7 @@ export class CohortFetcher { httpClient: HttpClient, serverUrl = CohortConfigDefaults.cohortServerUrl, maxCohortSize = CohortConfigDefaults.maxCohortSize, + cohortRequestDelayMillis = 100, debug = false, ) { this.cohortApi = new SdkCohortApi( @@ -45,6 +43,7 @@ export class CohortFetcher { new WrapperClient(httpClient), ); this.maxCohortSize = maxCohortSize; + this.cohortRequestDelayMillis = cohortRequestDelayMillis; this.logger = new ConsoleLogger(debug); } @@ -63,32 +62,32 @@ export class CohortFetcher { if (!this.inProgressCohorts[key]) { this.inProgressCohorts[key] = this.executor.run(async () => { this.logger.debug('Start downloading', cohortId); - const cohort = await doWithBackoffFailLoudly( - async () => - this.cohortApi.getCohort({ + for (let i = 0; i < ATTEMPTS; i++) { + try { + const cohort = await this.cohortApi.getCohort({ libraryName: 'experiment-node-server', libraryVersion: PACKAGE_VERSION, cohortId: cohortId, maxCohortSize: this.maxCohortSize, lastModified: lastModified, timeoutMillis: COHORT_CONFIG_TIMEOUT, - }), - BACKOFF_POLICY, - ) - .then(async (cohort) => { + }); + // Do unlock before return. const unlock = await this.mutex.lock(); delete this.inProgressCohorts[key]; unlock(); + this.logger.debug('Stop downloading', cohortId); return cohort; - }) - .catch(async (err) => { - const unlock = await this.mutex.lock(); - delete this.inProgressCohorts[key]; - unlock(); - throw err; - }); - this.logger.debug('Stop downloading', cohortId); - return cohort; + } catch (e) { + if (i === ATTEMPTS - 1 || e instanceof CohortMaxSizeExceededError) { + const unlock = await this.mutex.lock(); + delete this.inProgressCohorts[key]; + unlock(); + throw e; + } + await sleep(this.cohortRequestDelayMillis); + } + } }); } diff --git a/packages/node/src/local/streamer.ts b/packages/node/src/local/streamer.ts index d9eaecf..fc6aa69 100644 --- a/packages/node/src/local/streamer.ts +++ b/packages/node/src/local/streamer.ts @@ -76,6 +76,7 @@ export class FlagConfigStreamer this.stream.onInitUpdate = async (flagConfigs) => { this.logger.debug('[Experiment] streamer - receives updates'); await super._update(flagConfigs, true, onChange); + this.logger.debug('[Experiment] streamer - start flags stream success'); }; this.stream.onUpdate = async (flagConfigs) => { this.logger.debug('[Experiment] streamer - receives updates'); @@ -94,7 +95,6 @@ export class FlagConfigStreamer libraryVersion: PACKAGE_VERSION, }); this.poller.stop(); - this.logger.debug('[Experiment] streamer - start flags stream success'); } catch (e) { const err = e as StreamErrorEvent; this.logger.debug( diff --git a/packages/node/src/types/config.ts b/packages/node/src/types/config.ts index f83c218..00f67d0 100644 --- a/packages/node/src/types/config.ts +++ b/packages/node/src/types/config.ts @@ -215,6 +215,8 @@ export type CohortConfig = { * size will be skipped. */ maxCohortSize?: number; + + cohortRequestDelayMillis?: number; }; /** @@ -249,6 +251,7 @@ export const CohortConfigDefaults: Omit = { cohortServerUrl: 'https://cohort-v2.lab.amplitude.com', maxCohortSize: 10_000_000, + cohortRequestDelayMillis: 100, }; export const EU_SERVER_URLS = { diff --git a/packages/node/test/local/cohort/cohortFetcher.test.ts b/packages/node/test/local/cohort/cohortFetcher.test.ts index 695fbe1..040134d 100644 --- a/packages/node/test/local/cohort/cohortFetcher.test.ts +++ b/packages/node/test/local/cohort/cohortFetcher.test.ts @@ -1,6 +1,10 @@ -import { SdkCohortApi } from 'src/local/cohort/cohort-api'; +import { + CohortMaxSizeExceededError, + SdkCohortApi, +} from 'src/local/cohort/cohort-api'; import { COHORT_CONFIG_TIMEOUT, CohortFetcher } from 'src/local/cohort/fetcher'; import { CohortConfigDefaults } from 'src/types/config'; +import { sleep } from 'src/util/time'; import { version as PACKAGE_VERSION } from '../../../gen/version'; @@ -34,7 +38,7 @@ const COHORTS = { }, }; -afterEach(() => { +beforeEach(() => { jest.clearAllMocks(); }); @@ -109,10 +113,38 @@ test('cohort fetch failed', async () => { throw Error(); }); - const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10, 100); + const cohortPromise = cohortFetcher.fetch('c1', 10); + await sleep(10); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(1); + await sleep(100); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(2); + + await expect(cohortPromise).rejects.toThrowError(); + + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(3); + + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, + }); +}); + +test('cohort fetch maxSize exceeded, no retry', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async () => { + throw new CohortMaxSizeExceededError(); + }); - await expect(cohortFetcher.fetch('c1', 10)).rejects.toThrowError(); + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10, 100); + const cohortPromise = cohortFetcher.fetch('c1', 10); + await expect(cohortPromise).rejects.toThrowError(); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(1); expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ cohortId: 'c1', lastModified: 10, diff --git a/packages/node/test/local/cohort/cohortPoller.test.ts b/packages/node/test/local/cohort/cohortPoller.test.ts index 5f4fcfb..ddb50cb 100644 --- a/packages/node/test/local/cohort/cohortPoller.test.ts +++ b/packages/node/test/local/cohort/cohortPoller.test.ts @@ -3,6 +3,7 @@ import { CohortFetcher } from 'src/local/cohort/fetcher'; import { CohortPoller } from 'src/local/cohort/poller'; import { InMemoryCohortStorage } from 'src/local/cohort/storage'; import { CohortStorage } from 'src/types/cohort'; +import { sleep } from 'src/util/time'; const OLD_COHORTS = { c1: { @@ -64,8 +65,6 @@ const NEW_COHORTS = { }, }; -const sleep = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); - const POLL_MILLIS = 500; let storage: CohortStorage; let fetcher: CohortFetcher; diff --git a/packages/node/test/local/flagConfigPoller.test.ts b/packages/node/test/local/flagConfigPoller.test.ts index 298ae5c..053a647 100644 --- a/packages/node/test/local/flagConfigPoller.test.ts +++ b/packages/node/test/local/flagConfigPoller.test.ts @@ -6,6 +6,7 @@ import { import { SdkCohortApi } from 'src/local/cohort/cohort-api'; import { CohortFetcher } from 'src/local/cohort/fetcher'; import { InMemoryCohortStorage } from 'src/local/cohort/storage'; +import { sleep } from 'src/util/time'; import { FLAGS, NEW_FLAGS } from './util/flags'; import { MockHttpClient } from './util/mockHttpClient'; @@ -76,7 +77,7 @@ test('flagConfig poller success', async () => { expect(cohortStorage.getCohort('hahaorgname1').lastModified).toBe(1); // On update, flag, existing cohort doesn't update. - await new Promise((f) => setTimeout(f, 2000)); + await sleep(2000); expect(flagPolled).toBe(2); expect(await poller.cache.getAll()).toStrictEqual({ ...NEW_FLAGS, @@ -182,7 +183,7 @@ test('flagConfig poller initial success, polling error and use old flags', async // Second poll flags with new cohort should fail when new cohort download failed. // The different flag should not be updated. - await new Promise((f) => setTimeout(f, 2000)); + await sleep(2000); expect(flagPolled).toBeGreaterThanOrEqual(2); expect(await poller.cache.getAll()).toStrictEqual(FLAGS); diff --git a/packages/node/test/local/flagConfigStreamer.test.ts b/packages/node/test/local/flagConfigStreamer.test.ts index 8e967df..28c93eb 100644 --- a/packages/node/test/local/flagConfigStreamer.test.ts +++ b/packages/node/test/local/flagConfigStreamer.test.ts @@ -30,6 +30,7 @@ const getTestObjs = ({ streamFlagRetryDelayMillis = 15000, apiKey = 'client-xxxx', serverUrl = 'http://localhostxxxx:00000000', + cohortFetcherDelayMillis = 100, fetcherData = [ '[{"key": "fetcher-a", "variants": {}, "segments": []}]', '[{"key": "fetcher-b", "variants": {}, "segments": []}]', @@ -44,6 +45,8 @@ const getTestObjs = ({ 'apikey', 'secretkey', new MockHttpClient(async () => ({ status: 200, body: '' })), + serverUrl, + cohortFetcherDelayMillis, ), }; let dataI = 0; @@ -674,6 +677,7 @@ test('FlagConfigUpdater.connect, flag success, cohort fail, retry fail, initiali streamFlagTryAttempts: 2, streamFlagTryDelayMillis: 1000, streamFlagRetryDelayMillis: 100000, + debug: true, }); // Return cohort with their own cohortId. updater.start(); @@ -681,11 +685,14 @@ test('FlagConfigUpdater.connect, flag success, cohort fail, retry fail, initiali await mockClient.client.doMsg({ data: getFlagWithCohort('cohort1'), }); + await new Promise((resolve) => setTimeout(resolve, 250)); // Wait for cohort download done retries and fails. + await new Promise((resolve) => setTimeout(resolve, 1050)); // Wait for retry stream. // Second try await mockClient.client.doOpen({ type: 'open' }); await mockClient.client.doMsg({ data: getFlagWithCohort('cohort1'), }); + await new Promise((resolve) => setTimeout(resolve, 250)); // Wait for cohort download done retries and fails. expect(fetchObj.fetchCalls).toBeGreaterThanOrEqual(1); expect(mockClient.numCreated).toBe(2); diff --git a/packages/node/test/util/threading.test.ts b/packages/node/test/util/threading.test.ts index c95fa34..84f5911 100644 --- a/packages/node/test/util/threading.test.ts +++ b/packages/node/test/util/threading.test.ts @@ -1,8 +1,5 @@ import { Executor, Mutex, Semaphore } from 'src/util/threading'; - -function sleep(ms) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} +import { sleep } from 'src/util/time'; function mutexSleepFunc(lock, ms, acc) { return async () => {