diff --git a/polling.js b/polling.js index 27f766c..5630ef2 100644 --- a/polling.js +++ b/polling.js @@ -23,11 +23,12 @@ function PollingProcessor(config, requestor) { const sleepFor = Math.max(config.pollInterval * 1000 - elapsed, 0); config.logger.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor); if (err) { - const message = err.status || err.message; - cb(new errors.LDPollingError(messages.httpErrorMessage(message, 'polling request', 'will retry'))); - if (!errors.isHttpErrorRecoverable(err.status)) { - config.logger.error('Received 401 error, no further polling requests will be made since SDK key is invalid'); + if (err.status && !errors.isHttpErrorRecoverable(err.status)) { + const message = messages.httpErrorMessage(err.status, 'polling request'); + config.logger.error(message); + cb(new errors.LDPollingError(message)); } else { + config.logger.warn(messages.httpErrorMessage(err.status || err.message, 'polling request', 'will retry')); // Recursively call poll after the appropriate delay setTimeout(() => { poll(cb); }, sleepFor); } @@ -37,7 +38,7 @@ function PollingProcessor(config, requestor) { initData[dataKind.features.namespace] = allData.flags; initData[dataKind.segments.namespace] = allData.segments; featureStore.init(initData, () => { - cb(); // We can call the callback directly here because there's always already at least one level of deferral due to I/O + cb(); // Recursively call poll after the appropriate delay setTimeout(() => { poll(cb); }, sleepFor); }); diff --git a/streaming.js b/streaming.js index 03fbad9..b184bf2 100644 --- a/streaming.js +++ b/streaming.js @@ -1,4 +1,5 @@ const errors = require('./errors'); +const messages = require('./messages'); const EventSource = require('./eventsource'); const dataKind = require('./versioned_data_kind'); @@ -15,8 +16,6 @@ function StreamProcessor(sdkKey, config, requestor, eventSourceFactory) { processor.start = fn => { const cb = fn || function(){}; - // Note that we can call this callback directly here because there's always already at least one level of deferral due to I/O - es = new eventSourceFactory(config.streamUri + '/all', { agent: config.proxyAgent, @@ -25,7 +24,15 @@ function StreamProcessor(sdkKey, config, requestor, eventSourceFactory) { }); es.onerror = err => { - cb(new errors.LDStreamingError(err.message, err.code)); + if (err.status && !errors.isHttpErrorRecoverable(err.status)) { + const message = messages.httpErrorMessage(err.status, 'streaming request'); + config.logger.error(message); + es && es.close(); + cb(new errors.LDStreamingError(err.message, err.status)); + } else { + const message = messages.httpErrorMessage(err.status, 'streaming request', 'will retry'); + config.logger.warn(message); + } }; function reportJsonError(type, data) { diff --git a/test/LDClient-tls-test.js b/test/LDClient-tls-test.js index ec03248..5b24dc6 100644 --- a/test/LDClient-tls-test.js +++ b/test/LDClient-tls-test.js @@ -1,6 +1,7 @@ import * as selfsigned from 'selfsigned'; import * as LDClient from '../index'; +import { sleepAsync } from './async_utils'; import * as httpServer from './http_server'; import * as stubs from './stubs'; @@ -57,7 +58,9 @@ describe('LDClient TLS configuration', () => { logger: stubs.stubLogger(), }; const client = LDClient.init(sdkKey, config); - await expect(client.waitForInitialization()).rejects.toThrow(/self signed/); + await sleepAsync(300); // the client won't signal an unrecoverable error, but it should log a message + expect(config.logger.warn.mock.calls.length).toEqual(2); + expect(config.logger.warn.mock.calls[1][0]).toMatch(/self signed/); }); it('can use custom TLS options for streaming as well as polling', async () => { diff --git a/test/polling-test.js b/test/polling-test.js index c918130..56f4c7d 100644 --- a/test/polling-test.js +++ b/test/polling-test.js @@ -2,6 +2,7 @@ const InMemoryFeatureStore = require('../feature_store'); const PollingProcessor = require('../polling'); const dataKind = require('../versioned_data_kind'); const { asyncify, asyncifyNode, sleepAsync } = require('./async_utils'); +const stubs = require('./stubs'); describe('PollingProcessor', () => { const longInterval = 100000; @@ -14,20 +15,13 @@ describe('PollingProcessor', () => { beforeEach(() => { store = InMemoryFeatureStore(); - config = { featureStore: store, pollInterval: longInterval, logger: fakeLogger() }; + config = { featureStore: store, pollInterval: longInterval, logger: stubs.stubLogger() }; }); afterEach(() => { processor && processor.stop(); }); - function fakeLogger() { - return { - debug: jest.fn(), - error: jest.fn() - }; - } - it('makes no request before start', () => { const requestor = { requestAllData: jest.fn() @@ -57,16 +51,6 @@ describe('PollingProcessor', () => { await asyncifyNode(cb => processor.start(cb)); // didn't throw -> success }); - it('calls callback with error on failure', async () => { - const err = new Error('sorry'); - const requestor = { - requestAllData: cb => cb(err) - }; - processor = PollingProcessor(config, requestor); - - await expect(asyncifyNode(cb => processor.start(cb))).rejects.toThrow(/sorry.*will retry/); - }); - it('initializes feature store', async () => { const requestor = { requestAllData: cb => cb(null, jsonData) @@ -94,22 +78,26 @@ describe('PollingProcessor', () => { expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(4); }); - function testRecoverableHttpError(status) { - it('continues polling after error ' + status, async () => { - const err = new Error('sorry'); - err.status = status; - const requestor = { - requestAllData: jest.fn(cb => cb(err)) - }; - config.pollInterval = 0.1; - processor = PollingProcessor(config, requestor); + async function testRecoverableError(err) { + const requestor = { + requestAllData: jest.fn(cb => cb(err)) + }; + config.pollInterval = 0.1; + processor = PollingProcessor(config, requestor); - processor.start(() => {}); - await sleepAsync(300); + let errReceived; + processor.start(e => { errReceived = e; }); + await sleepAsync(300); - expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); - expect(config.logger.error).not.toHaveBeenCalled(); - }); + expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(config.logger.error).not.toHaveBeenCalled(); + expect(errReceived).toBeUndefined(); + } + + function testRecoverableHttpError(status) { + const err = new Error('sorry'); + err.status = status; + it('continues polling after error ' + status, async () => await testRecoverableError(err)); } testRecoverableHttpError(400); @@ -118,6 +106,8 @@ describe('PollingProcessor', () => { testRecoverableHttpError(500); testRecoverableHttpError(503); + it('continues polling after I/O error', async () => await testRecoverableError(new Error('sorry'))); + function testUnrecoverableHttpError(status) { it('stops polling after error ' + status, async () => { const err = new Error('sorry'); @@ -128,11 +118,13 @@ describe('PollingProcessor', () => { config.pollInterval = 0.1; processor = PollingProcessor(config, requestor); - processor.start(() => {}); + let errReceived; + processor.start(e => { errReceived = e; }); await sleepAsync(300); expect(requestor.requestAllData.mock.calls.length).toEqual(1); expect(config.logger.error).toHaveBeenCalledTimes(1); + expect(errReceived).not.toBeUndefined(); }); } diff --git a/test/streaming-test.js b/test/streaming-test.js index f07adfc..ae02022 100644 --- a/test/streaming-test.js +++ b/test/streaming-test.js @@ -2,29 +2,27 @@ const InMemoryFeatureStore = require('../feature_store'); const StreamProcessor = require('../streaming'); const dataKind = require('../versioned_data_kind'); const { asyncify, sleepAsync } = require('./async_utils'); +const stubs = require('./stubs'); -describe('StreamProcessor', function() { - var sdkKey = 'SDK_KEY'; +describe('StreamProcessor', () => { + const sdkKey = 'SDK_KEY'; function fakeEventSource() { var es = { handlers: {} }; es.constructor = function(url, options) { es.url = url; es.options = options; - this.addEventListener = function(type, handler) { + this.addEventListener = (type, handler) => { es.handlers[type] = handler; }; + this.close = () => { + es.closed = true; + }; + es.instance = this; }; return es; } - function fakeLogger() { - return { - debug: jest.fn(), - error: jest.fn() - }; - } - function expectJsonError(err, config) { expect(err).not.toBe(undefined); expect(err.message).toEqual('Malformed JSON data in event stream'); @@ -62,7 +60,7 @@ describe('StreamProcessor', function() { it('causes flags and segments to be stored', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); sp.start(); @@ -80,7 +78,7 @@ describe('StreamProcessor', function() { it('calls initialization callback', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -92,7 +90,7 @@ describe('StreamProcessor', function() { it('passes error to callback if data is invalid', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -106,7 +104,7 @@ describe('StreamProcessor', function() { describe('patch message', function() { it('updates flag', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -124,7 +122,7 @@ describe('StreamProcessor', function() { it('updates segment', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -142,7 +140,7 @@ describe('StreamProcessor', function() { it('passes error to callback if data is invalid', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -156,7 +154,7 @@ describe('StreamProcessor', function() { describe('delete message', function() { it('deletes flag', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -176,7 +174,7 @@ describe('StreamProcessor', function() { it('deletes segment', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -196,7 +194,7 @@ describe('StreamProcessor', function() { it('passes error to callback if data is invalid', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, null, es.constructor); @@ -224,7 +222,7 @@ describe('StreamProcessor', function() { it('requests and stores flags and segments', async () => { var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor); @@ -254,7 +252,7 @@ describe('StreamProcessor', function() { }; var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor); @@ -278,7 +276,7 @@ describe('StreamProcessor', function() { }; var featureStore = InMemoryFeatureStore(); - var config = { featureStore: featureStore, logger: fakeLogger() }; + var config = { featureStore: featureStore, logger: stubs.stubLogger() }; var es = fakeEventSource(); var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor); @@ -291,4 +289,59 @@ describe('StreamProcessor', function() { expect(s.version).toEqual(1); }); }); + + async function testRecoverableError(err) { + const featureStore = InMemoryFeatureStore(); + const config = { featureStore: featureStore, logger: stubs.stubLogger() }; + const es = fakeEventSource(); + const sp = StreamProcessor(sdkKey, config, null, es.constructor); + + let errReceived; + sp.start(e => { errReceived = e; }); + + es.instance.onerror(err); + await sleepAsync(300); + + expect(config.logger.error).not.toHaveBeenCalled(); + expect(errReceived).toBeUndefined(); + expect(es.closed).not.toEqual(true); + } + + function testRecoverableHttpError(status) { + const err = new Error('sorry'); + err.status = status; + it('continues retrying after error ' + status, async () => await testRecoverableError(err)); + } + + testRecoverableHttpError(400); + testRecoverableHttpError(408); + testRecoverableHttpError(429); + testRecoverableHttpError(500); + testRecoverableHttpError(503); + + it('continues retrying after I/O error', async () => await testRecoverableError(new Error('sorry'))); + + function testUnrecoverableHttpError(status) { + it('stops retrying after error ' + status, async () => { + const err = new Error('sorry'); + err.status = status; + const featureStore = InMemoryFeatureStore(); + const config = { featureStore: featureStore, logger: stubs.stubLogger() }; + const es = fakeEventSource(); + const sp = StreamProcessor(sdkKey, config, null, es.constructor); + + let errReceived; + sp.start(e => { errReceived = e; }); + + es.instance.onerror(err); + await sleepAsync(300); + + expect(config.logger.error).toHaveBeenCalledTimes(1); + expect(errReceived).not.toBeUndefined(); + expect(es.closed).toEqual(true); + }); + } + + testUnrecoverableHttpError(401); + testUnrecoverableHttpError(403); });