Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #148 from launchdarkly/eb/ch46962/init-error
Browse files Browse the repository at this point in the history
don't signal init failure & don't fire an error unless it's unrecoverable
  • Loading branch information
eli-darkly authored Oct 23, 2019
2 parents 102b595 + 1bb0cfc commit bd06c68
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 64 deletions.
11 changes: 6 additions & 5 deletions polling.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ function PollingProcessor(config, requestor) {
const sleepFor = Math.max(config.pollInterval * 1000 - elapsed, 0);
config.logger.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor);
if (err) {
const message = err.status || err.message;
cb(new errors.LDPollingError(messages.httpErrorMessage(message, 'polling request', 'will retry')));
if (!errors.isHttpErrorRecoverable(err.status)) {
config.logger.error('Received 401 error, no further polling requests will be made since SDK key is invalid');
if (err.status && !errors.isHttpErrorRecoverable(err.status)) {
const message = messages.httpErrorMessage(err.status, 'polling request');
config.logger.error(message);
cb(new errors.LDPollingError(message));
} else {
config.logger.warn(messages.httpErrorMessage(err.status || err.message, 'polling request', 'will retry'));
// Recursively call poll after the appropriate delay
setTimeout(() => { poll(cb); }, sleepFor);
}
Expand All @@ -37,7 +38,7 @@ function PollingProcessor(config, requestor) {
initData[dataKind.features.namespace] = allData.flags;
initData[dataKind.segments.namespace] = allData.segments;
featureStore.init(initData, () => {
cb(); // We can call the callback directly here because there's always already at least one level of deferral due to I/O
cb();
// Recursively call poll after the appropriate delay
setTimeout(() => { poll(cb); }, sleepFor);
});
Expand Down
13 changes: 10 additions & 3 deletions streaming.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const errors = require('./errors');
const messages = require('./messages');
const EventSource = require('./eventsource');
const dataKind = require('./versioned_data_kind');

Expand All @@ -15,8 +16,6 @@ function StreamProcessor(sdkKey, config, requestor, eventSourceFactory) {

processor.start = fn => {
const cb = fn || function(){};
// Note that we can call this callback directly here because there's always already at least one level of deferral due to I/O

es = new eventSourceFactory(config.streamUri + '/all',
{
agent: config.proxyAgent,
Expand All @@ -25,7 +24,15 @@ function StreamProcessor(sdkKey, config, requestor, eventSourceFactory) {
});

es.onerror = err => {
cb(new errors.LDStreamingError(err.message, err.code));
if (err.status && !errors.isHttpErrorRecoverable(err.status)) {
const message = messages.httpErrorMessage(err.status, 'streaming request');
config.logger.error(message);
es && es.close();
cb(new errors.LDStreamingError(err.message, err.status));
} else {
const message = messages.httpErrorMessage(err.status, 'streaming request', 'will retry');
config.logger.warn(message);
}
};

function reportJsonError(type, data) {
Expand Down
5 changes: 4 additions & 1 deletion test/LDClient-tls-test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as selfsigned from 'selfsigned';

import * as LDClient from '../index';
import { sleepAsync } from './async_utils';
import * as httpServer from './http_server';
import * as stubs from './stubs';

Expand Down Expand Up @@ -57,7 +58,9 @@ describe('LDClient TLS configuration', () => {
logger: stubs.stubLogger(),
};
const client = LDClient.init(sdkKey, config);
await expect(client.waitForInitialization()).rejects.toThrow(/self signed/);
await sleepAsync(300); // the client won't signal an unrecoverable error, but it should log a message
expect(config.logger.warn.mock.calls.length).toEqual(2);
expect(config.logger.warn.mock.calls[1][0]).toMatch(/self signed/);
});

it('can use custom TLS options for streaming as well as polling', async () => {
Expand Down
58 changes: 25 additions & 33 deletions test/polling-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const InMemoryFeatureStore = require('../feature_store');
const PollingProcessor = require('../polling');
const dataKind = require('../versioned_data_kind');
const { asyncify, asyncifyNode, sleepAsync } = require('./async_utils');
const stubs = require('./stubs');

describe('PollingProcessor', () => {
const longInterval = 100000;
Expand All @@ -14,20 +15,13 @@ describe('PollingProcessor', () => {

beforeEach(() => {
store = InMemoryFeatureStore();
config = { featureStore: store, pollInterval: longInterval, logger: fakeLogger() };
config = { featureStore: store, pollInterval: longInterval, logger: stubs.stubLogger() };
});

afterEach(() => {
processor && processor.stop();
});

function fakeLogger() {
return {
debug: jest.fn(),
error: jest.fn()
};
}

it('makes no request before start', () => {
const requestor = {
requestAllData: jest.fn()
Expand Down Expand Up @@ -57,16 +51,6 @@ describe('PollingProcessor', () => {
await asyncifyNode(cb => processor.start(cb)); // didn't throw -> success
});

it('calls callback with error on failure', async () => {
const err = new Error('sorry');
const requestor = {
requestAllData: cb => cb(err)
};
processor = PollingProcessor(config, requestor);

await expect(asyncifyNode(cb => processor.start(cb))).rejects.toThrow(/sorry.*will retry/);
});

it('initializes feature store', async () => {
const requestor = {
requestAllData: cb => cb(null, jsonData)
Expand Down Expand Up @@ -94,22 +78,26 @@ describe('PollingProcessor', () => {
expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(4);
});

function testRecoverableHttpError(status) {
it('continues polling after error ' + status, async () => {
const err = new Error('sorry');
err.status = status;
const requestor = {
requestAllData: jest.fn(cb => cb(err))
};
config.pollInterval = 0.1;
processor = PollingProcessor(config, requestor);
async function testRecoverableError(err) {
const requestor = {
requestAllData: jest.fn(cb => cb(err))
};
config.pollInterval = 0.1;
processor = PollingProcessor(config, requestor);

processor.start(() => {});
await sleepAsync(300);
let errReceived;
processor.start(e => { errReceived = e; });
await sleepAsync(300);

expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2);
expect(config.logger.error).not.toHaveBeenCalled();
});
expect(requestor.requestAllData.mock.calls.length).toBeGreaterThanOrEqual(2);
expect(config.logger.error).not.toHaveBeenCalled();
expect(errReceived).toBeUndefined();
}

function testRecoverableHttpError(status) {
const err = new Error('sorry');
err.status = status;
it('continues polling after error ' + status, async () => await testRecoverableError(err));
}

testRecoverableHttpError(400);
Expand All @@ -118,6 +106,8 @@ describe('PollingProcessor', () => {
testRecoverableHttpError(500);
testRecoverableHttpError(503);

it('continues polling after I/O error', async () => await testRecoverableError(new Error('sorry')));

function testUnrecoverableHttpError(status) {
it('stops polling after error ' + status, async () => {
const err = new Error('sorry');
Expand All @@ -128,11 +118,13 @@ describe('PollingProcessor', () => {
config.pollInterval = 0.1;
processor = PollingProcessor(config, requestor);

processor.start(() => {});
let errReceived;
processor.start(e => { errReceived = e; });
await sleepAsync(300);

expect(requestor.requestAllData.mock.calls.length).toEqual(1);
expect(config.logger.error).toHaveBeenCalledTimes(1);
expect(errReceived).not.toBeUndefined();
});
}

Expand Down
97 changes: 75 additions & 22 deletions test/streaming-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@ const InMemoryFeatureStore = require('../feature_store');
const StreamProcessor = require('../streaming');
const dataKind = require('../versioned_data_kind');
const { asyncify, sleepAsync } = require('./async_utils');
const stubs = require('./stubs');

describe('StreamProcessor', function() {
var sdkKey = 'SDK_KEY';
describe('StreamProcessor', () => {
const sdkKey = 'SDK_KEY';

function fakeEventSource() {
var es = { handlers: {} };
es.constructor = function(url, options) {
es.url = url;
es.options = options;
this.addEventListener = function(type, handler) {
this.addEventListener = (type, handler) => {
es.handlers[type] = handler;
};
this.close = () => {
es.closed = true;
};
es.instance = this;
};
return es;
}

function fakeLogger() {
return {
debug: jest.fn(),
error: jest.fn()
};
}

function expectJsonError(err, config) {
expect(err).not.toBe(undefined);
expect(err.message).toEqual('Malformed JSON data in event stream');
Expand Down Expand Up @@ -62,7 +60,7 @@ describe('StreamProcessor', function() {

it('causes flags and segments to be stored', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);
sp.start();
Expand All @@ -80,7 +78,7 @@ describe('StreamProcessor', function() {

it('calls initialization callback', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -92,7 +90,7 @@ describe('StreamProcessor', function() {

it('passes error to callback if data is invalid', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -106,7 +104,7 @@ describe('StreamProcessor', function() {
describe('patch message', function() {
it('updates flag', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -124,7 +122,7 @@ describe('StreamProcessor', function() {

it('updates segment', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -142,7 +140,7 @@ describe('StreamProcessor', function() {

it('passes error to callback if data is invalid', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -156,7 +154,7 @@ describe('StreamProcessor', function() {
describe('delete message', function() {
it('deletes flag', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -176,7 +174,7 @@ describe('StreamProcessor', function() {

it('deletes segment', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand All @@ -196,7 +194,7 @@ describe('StreamProcessor', function() {

it('passes error to callback if data is invalid', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, null, es.constructor);

Expand Down Expand Up @@ -224,7 +222,7 @@ describe('StreamProcessor', function() {

it('requests and stores flags and segments', async () => {
var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor);

Expand Down Expand Up @@ -254,7 +252,7 @@ describe('StreamProcessor', function() {
};

var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor);

Expand All @@ -278,7 +276,7 @@ describe('StreamProcessor', function() {
};

var featureStore = InMemoryFeatureStore();
var config = { featureStore: featureStore, logger: fakeLogger() };
var config = { featureStore: featureStore, logger: stubs.stubLogger() };
var es = fakeEventSource();
var sp = StreamProcessor(sdkKey, config, fakeRequestor, es.constructor);

Expand All @@ -291,4 +289,59 @@ describe('StreamProcessor', function() {
expect(s.version).toEqual(1);
});
});

async function testRecoverableError(err) {
const featureStore = InMemoryFeatureStore();
const config = { featureStore: featureStore, logger: stubs.stubLogger() };
const es = fakeEventSource();
const sp = StreamProcessor(sdkKey, config, null, es.constructor);

let errReceived;
sp.start(e => { errReceived = e; });

es.instance.onerror(err);
await sleepAsync(300);

expect(config.logger.error).not.toHaveBeenCalled();
expect(errReceived).toBeUndefined();
expect(es.closed).not.toEqual(true);
}

function testRecoverableHttpError(status) {
const err = new Error('sorry');
err.status = status;
it('continues retrying after error ' + status, async () => await testRecoverableError(err));
}

testRecoverableHttpError(400);
testRecoverableHttpError(408);
testRecoverableHttpError(429);
testRecoverableHttpError(500);
testRecoverableHttpError(503);

it('continues retrying after I/O error', async () => await testRecoverableError(new Error('sorry')));

function testUnrecoverableHttpError(status) {
it('stops retrying after error ' + status, async () => {
const err = new Error('sorry');
err.status = status;
const featureStore = InMemoryFeatureStore();
const config = { featureStore: featureStore, logger: stubs.stubLogger() };
const es = fakeEventSource();
const sp = StreamProcessor(sdkKey, config, null, es.constructor);

let errReceived;
sp.start(e => { errReceived = e; });

es.instance.onerror(err);
await sleepAsync(300);

expect(config.logger.error).toHaveBeenCalledTimes(1);
expect(errReceived).not.toBeUndefined();
expect(es.closed).toEqual(true);
});
}

testUnrecoverableHttpError(401);
testUnrecoverableHttpError(403);
});

0 comments on commit bd06c68

Please sign in to comment.