Skip to content

Commit

Permalink
Merge pull request #81 from lifeomic/ignore-unparseable
Browse files Browse the repository at this point in the history
feat: ignoreUnparseable config options
  • Loading branch information
swain authored Oct 18, 2024
2 parents f6f6009 + 2ae84aa commit c2b105e
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 5 deletions.
90 changes: 90 additions & 0 deletions src/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ const logger: jest.Mocked<LoggerInterface> = {
info: jest.fn(),
error: jest.fn(),
child: jest.fn(),
warn: jest.fn(),
} as any;

beforeEach(() => {
logger.info.mockReset();
logger.error.mockReset();
logger.warn.mockReset();
logger.child.mockReset();
logger.child.mockImplementation(() => logger);
});
Expand Down Expand Up @@ -593,4 +595,92 @@ describe('KinesisEventHandler', () => {
);
});
});

test('throws when encountering an unparseable message', async () => {
const lambda = new KinesisEventHandler({
logger,
parseEvent: testSerializer.parseEvent,
createRunContext: () => ({}),
}).lambda();

await expect(
lambda(
{
Records: [
{
kinesis: {
partitionKey: uuid(),
data: Buffer.from('not-a-json-string', 'utf-8').toString(
'base64',
),
},
},
] as any,
},
{} as any,
),
).rejects.toThrow('Unexpected token o in JSON at position 1');

expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
err: expect.objectContaining({
message: 'Unexpected token o in JSON at position 1',
}),
}),
'Failed to parse event',
);
});

test('respects ignoreUnparseableEvents', async () => {
const processor = jest.fn();
const lambda = new KinesisEventHandler({
logger,
parseEvent: testSerializer.parseEvent,
createRunContext: () => ({}),
ignoreUnparseableEvents: true,
})
.onEvent((ctx, event) => processor(event))
.lambda();

await lambda(
{
Records: [
{
kinesis: {
partitionKey: uuid(),
data: Buffer.from('not-a-json-string', 'utf-8').toString(
'base64',
),
},
},
{
kinesis: {
partitionKey: uuid(),
data: Buffer.from(
JSON.stringify({ message: 'test-message' }),
'utf-8',
).toString('base64'),
},
},
] as any,
},
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
err: expect.objectContaining({
message: 'Unexpected token o in JSON at position 1',
}),
}),
'Failed to parse event',
);

expect(logger.warn).toHaveBeenCalledWith(
'ignoreUnparseableEvents is set to true. Ignoring message.',
);

expect(processor).toHaveBeenCalledTimes(1);
expect(processor).toHaveBeenCalledWith({ message: 'test-message' });
});
});
25 changes: 22 additions & 3 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ export type KinesisEventHandlerConfig<Event, Context> =
* A function for parsing the Kinesis event data into your custom type.
*/
parseEvent: (body: string) => Event;

/**
* Whether to ignore events that fail to parse. If set to true,
* throwing in the custom parsing function will cause the event
* to be ignored, and never processed.
*/
ignoreUnparseableEvents?: boolean;
};

export type KinesisEventAction<Event, Context> = (
Expand Down Expand Up @@ -93,9 +100,21 @@ export class KinesisEventHandler<Event, Context> {
eventId: record.eventID,
});

const parsedEvent = this.config.parseEvent(
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
);
let parsedEvent: Event;
try {
parsedEvent = this.config.parseEvent(
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
);
} catch (err) {
eventLogger.error({ err }, 'Failed to parse event');
if (this.config.ignoreUnparseableEvents) {
eventLogger.warn(
'ignoreUnparseableEvents is set to true. Ignoring message.',
);
return;
}
throw err;
}

for (const action of this.actions) {
await action({ ...context, logger: eventLogger }, parsedEvent);
Expand Down
64 changes: 64 additions & 0 deletions src/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const logger: jest.Mocked<LoggerInterface> = {
info: jest.fn(),
error: jest.fn(),
child: jest.fn(),
warn: jest.fn(),
} as any;

let publicKey: string;
Expand All @@ -21,6 +22,7 @@ beforeAll(async () => {
beforeEach(() => {
logger.info.mockReset();
logger.error.mockReset();
logger.warn.mockReset();
logger.child.mockReset();
logger.child.mockImplementation(() => logger);
});
Expand Down Expand Up @@ -792,4 +794,66 @@ describe('SQSMessageHandler', () => {
);
});
});

test('throws when encountering an unparseable message', async () => {
const lambda = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
createRunContext: () => ({}),
}).lambda();

await expect(
lambda(
{ Records: [{ attributes: {}, body: 'not-a-json-string' }] } as any,
{} as any,
),
).rejects.toThrow('Unexpected token o in JSON at position 1');

expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
err: expect.objectContaining({
message: 'Unexpected token o in JSON at position 1',
}),
}),
'Failed to parse message',
);
});

test('respects ignoreUnparseableMessages', async () => {
const processor = jest.fn().mockResolvedValue(void 0);
const lambda = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
createRunContext: () => ({}),
ignoreUnparseableMessages: true,
})
.onMessage((ctx, message) => processor(message))
.lambda();

await lambda(
{
Records: [
{ attributes: {}, body: 'not-a-json-string' },
{ attributes: {}, body: JSON.stringify({ message: 'test-message' }) },
],
} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
err: expect.objectContaining({
message: 'Unexpected token o in JSON at position 1',
}),
}),
'Failed to parse message',
);

expect(logger.warn).toHaveBeenCalledWith(
'ignoreUnparseableMessages is set to true. Ignoring message.',
);

expect(processor).toHaveBeenCalledTimes(1);
expect(processor).toHaveBeenCalledWith({ message: 'test-message' });
});
});
22 changes: 20 additions & 2 deletions src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export type SQSMessageHandlerConfig<Message, Context> =
*/
publicKeyDescription: string;
};

/**
* Whether to ignore messages that fail to parse. If set to true,
* throwing in the custom parsing function will cause the message
* to be ignored, and never processed.
*/
ignoreUnparseableMessages?: boolean;
};

export type SQSMessageAction<Message, Context> = (
Expand Down Expand Up @@ -183,8 +190,19 @@ export class SQSMessageHandler<Message, Context> {
messageId: record.messageId,
});

const parsedMessage = this.config.parseMessage(record.body);

let parsedMessage: Message;
try {
parsedMessage = this.config.parseMessage(record.body);
} catch (err) {
messageLogger.error({ err }, 'Failed to parse message');
if (this.config.ignoreUnparseableMessages) {
messageLogger.warn(
'ignoreUnparseableMessages is set to true. Ignoring message.',
);
return;
}
throw err;
}
for (const action of this.messageActions) {
await action({ ...context, logger: messageLogger }, parsedMessage);
}
Expand Down

0 comments on commit c2b105e

Please sign in to comment.