Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow message body redaction #67

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/__fixtures__/private-key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAqJACmpNdviOCUFgig7+BwYqVaNX9BelsCt9PymUv4NmSBwzb
QgxTIPT8jzEapcFtHWqz+ahH3OKizLglyCeqIC6bFSn8CWsm6u/g6d9kFk4Uz9Dw
1zuG31n/LYA1ebPOHZud9UiwTFJFvKPQPv8fKV73E/18swT+gLIBpgij83olb75N
+DpLpzAsKOjNJKWPOLzQPMYnd4XAEWMZpzt2Nb5VNcKU26sg7IEoj3LioiokWOX+
+pPn9u/3KRkEYBH7qFXHozY2vhG5qqOu0+ImuXtZ6r0aI8CpXkKGSqg2KqNrhN2v
NL0N3onetPl/k2RCPZ/g08OHVFF9OOcytRuX3wIDAQABAoIBAHaEy0/kTgVi8j2L
urjn7lQnHOaZj06Y0V7TpUap3wA5+nL6ly/Zepmxp+MGo7XoStBkNidUKzMkJ1PK
JsaVHQmDu4cl/hChRrvp7jqC19zXCcsVHkI3mJ1yqflULEVmJ4ap5GaStWL0dhQt
Gj8xIrf0DcYAda1p1YinoIEdkimelI6trdGlQDF+BsKrrnal+K6cRKdMcAQWvgt0
UKCw5wkcpMTxaN2pKsQtHu3GTOjP8uRoL7C6V0tCYo7VZO9+ZmMN+mpaO6LA+Rfj
wXr4itY9PtooLTrwAdP6WFIPwWsgNYqc5eVY9DvSeFH1saK3nguS2nTIr+DtY1H+
D61PYAECgYEA0nEjteKzpM7gqLp8pcEicWY+d7j5GjvzUwfl6UJ2i5XbfbyC/khU
sz905rpc5OnVlxqodoew73v5gH3uvIcwhOXdcaIrwH1pMiFaV/TAeQjttmoVL0bJ
UOlpyWs8Tf04907l5fqtxz6izUk90nWK+wxrM7HdarxeRXsQz4aLIW0CgYEAzQ3h
Xnzi/0B/jnMeua/X1NGqgWJIp868A5pGog4BUWnSOzK5pRHQsVXN1wQq/5d1duhm
qnGWIEV8m+gChsDcuFoAWZorrFBSVI2/mTCWRLf5nELtkLZnXQ53x9N3LS8BvTek
rA8LOP+Cgv8IdM2em3DvIbZTa0uy1q2m6VdH2vsCgYBsrXsosmPdx+zjljNLEpur
/oZiI8eZUb6OcbS9KtK3sXOB0rm/gjEjxLClezcADPZ+K4k2dUrd0qN+RQrml9Zp
u6AJ0BtSNDIAbpMOe1pu5zqECvLX0HGk9HXqTBP/nrctmLRHeZcHH4TKCXoA1y0o
CzjNoJxdQ9xXe3+p/KybXQKBgC+tWIdltknvLzlp3u0By8c58NEgjxAla2XTCzVG
2FubpTwKcUvGNqXk83VZDL5c8vzw0F41Btj+DxkY+u1mDmv20ToENL9d9aafRrtR
pr7Xn/wLO714C9SBNqyJqJ4i3d6m/2zaGpvoHOpkbgzqekReH9vQztiVw0FTIwoC
NzzdAoGASTbIMn4+Syy1qF6uxB8PBlM3u9kutNgtQImEXgmjVO/46pzSPIyfijXu
z49EtzCqXuLM8eXGolX4CRyxe3zxLoq3BaS9aFDnKVfHsFoMsXpvLBxW9fTCmEhf
7oOqnKi1b6Y75/aM2yt9Eg8dTu8+619dPcX1EF17HVQQAWQe6HA=
-----END RSA PRIVATE KEY-----
9 changes: 9 additions & 0 deletions src/__fixtures__/public-key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqJACmpNdviOCUFgig7+B
wYqVaNX9BelsCt9PymUv4NmSBwzbQgxTIPT8jzEapcFtHWqz+ahH3OKizLglyCeq
IC6bFSn8CWsm6u/g6d9kFk4Uz9Dw1zuG31n/LYA1ebPOHZud9UiwTFJFvKPQPv8f
KV73E/18swT+gLIBpgij83olb75N+DpLpzAsKOjNJKWPOLzQPMYnd4XAEWMZpzt2
Nb5VNcKU26sg7IEoj3LioiokWOX++pPn9u/3KRkEYBH7qFXHozY2vhG5qqOu0+Im
uXtZ6r0aI8CpXkKGSqg2KqNrhN2vNL0N3onetPl/k2RCPZ/g08OHVFF9OOcytRuX
3wIDAQAB
-----END PUBLIC KEY-----
178 changes: 178 additions & 0 deletions src/sqs.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import { v4 as uuid } from 'uuid';
import { LoggerInterface } from '@lifeomic/logging';
import { SQSMessageAction, SQSMessageHandler } from './sqs';
import { promises as fs } from 'fs';
import { privateDecrypt } from 'crypto';

const logger: jest.Mocked<LoggerInterface> = {
info: jest.fn(),
error: jest.fn(),
child: jest.fn(),
} as any;

let publicKey: string;
beforeAll(async () => {
publicKey = await fs.readFile(
__dirname + '/__fixtures__/public-key.pem',
'utf8',
);
});

beforeEach(() => {
logger.info.mockReset();
logger.error.mockReset();
logger.child.mockReset();
logger.child.mockImplementation(() => logger);
});
Expand Down Expand Up @@ -79,6 +90,173 @@ describe('SQSMessageHandler', () => {
);
});

test('allows body redaction', async () => {
expect.assertions(2);

const lambda = new SQSMessageHandler({
logger,
redactionConfig: {
redactMessageBody: () => 'REDACTED',
publicEncryptionKey: publicKey,
publicKeyDescription: 'test-public-key',
},
parseMessage: testSerializer.parseMessage,
createRunContext: (ctx) => {
expect(typeof ctx.correlationId === 'string').toBe(true);
return {};
},
}).lambda();

await lambda(
{
Records: [
{ attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) },
],
} as any,
{} as any,
);

// Assert that the message body was redacted.
expect(logger.info).toHaveBeenCalledWith(
{
event: { Records: [{ attributes: {}, body: 'REDACTED' }] },
},
'Processing SQS topic message',
);
});

test('if redaction fails, a redacted body is logged with an encrypted body', async () => {
expect.assertions(5);

const error = new Error('Failed to redact message');
const lambda = new SQSMessageHandler({
logger,
redactionConfig: {
redactMessageBody: () => {
throw error;
},
publicEncryptionKey: publicKey,
publicKeyDescription: 'test-public-key',
},
parseMessage: testSerializer.parseMessage,
createRunContext: (ctx) => {
expect(typeof ctx.correlationId === 'string').toBe(true);
return {};
},
}).lambda();

const body = JSON.stringify({ data: 'test-event-1' });
const event = {
Records: [{ attributes: {}, body }],
} as any;
const response = await lambda(event, {} as any);

// Expect no failure
expect(response).toBeUndefined();

// Assert that the message body was shown redacted. Along with the
// redacted body, an encrypted body is also logged with the redaction
// error to help debugging.
expect(logger.error).toHaveBeenCalledWith(
{
error,
encryptedBody: expect.any(String),
publicKeyDescription: 'test-public-key',
},
'Failed to redact message body',
);

// Verify that the encrypted body can be decrypted.
const privateKey = await fs.readFile(
__dirname + '/__fixtures__/private-key.pem',
'utf8',
);
const encryptedBody = logger.error.mock.calls[0][0].encryptedBody;
const decrypted = privateDecrypt(
privateKey,
Buffer.from(encryptedBody, 'base64'),
).toString('utf8');
expect(decrypted).toEqual(body);

// Verify the the body was redacted.
expect(logger.info).toHaveBeenCalledWith(
{
event: {
...event,
Records: event.Records.map((record: any) => ({
...record,
body: '[REDACTION FAILED]',
})),
},
},
'Processing SQS topic message',
);
});

test('if redaction fails, and encryption fails, a redacted body is logged with the encryption failure', async () => {
expect.assertions(5);

const error = new Error('Failed to redact message');
const lambda = new SQSMessageHandler({
logger,
redactionConfig: {
redactMessageBody: () => {
throw error;
},
publicEncryptionKey: 'not-a-valid-key',
publicKeyDescription: 'test-public-key',
},
parseMessage: testSerializer.parseMessage,
createRunContext: (ctx) => {
expect(typeof ctx.correlationId === 'string').toBe(true);
return {};
},
}).lambda();

const body = JSON.stringify({ data: 'test-event-1' });
const event = {
Records: [{ attributes: {}, body }],
} as any;
const response = await lambda(event, {} as any);

// Expect no failure
expect(response).toBeUndefined();

// Assert that the message body was shown redacted. Along with the
// redacted body, an encrypted body is also logged with the redaction
// error to help debugging.
expect(logger.error).toHaveBeenCalledWith(
{
error,
encryptedBody: '[ENCRYPTION FAILED]', // Signals that encryption failed
publicKeyDescription: 'test-public-key',
},
'Failed to redact message body',
);

// When encryption fails, the failure is logged.
expect(logger.error).toHaveBeenCalledWith(
{
error: expect.anything(),
},
'Failed to encrypt message body',
);

// Verify the the body was redacted.
expect(logger.info).toHaveBeenCalledWith(
{
event: {
...event,
Records: event.Records.map((record: any) => ({
...record,
body: '[REDACTION FAILED]',
})),
},
},
'Processing SQS topic message',
);
});

describe('error handling', () => {
const records = [
{
Expand Down
79 changes: 78 additions & 1 deletion src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
processWithOrdering,
withHealthCheckHandling,
} from './utils';
import { publicEncrypt } from 'crypto';

export type SQSMessageHandlerConfig<Message, Context> =
BaseHandlerConfig<Context> & {
Expand All @@ -23,6 +24,26 @@ export type SQSMessageHandlerConfig<Message, Context> =
* https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
*/
usePartialBatchResponses?: boolean;

redactionConfig?: {
/**
* This will be called to redact the message body before logging it. By
* default, the full message body is logged.
*/
redactMessageBody: (body: string) => string;

/**
* The public encryption key used for writing messages that contain
* sensitive information but failed to be redacted.
*/
publicEncryptionKey: string;

/**
* Logged with the encypted message to help identify the key used. For
* example, this could explain who has access to the key or how to get it.
*/
publicKeyDescription: string;
};
};

export type SQSMessageAction<Message, Context> = (
Expand Down Expand Up @@ -58,6 +79,47 @@ export type SQSPartialBatchResponse = {
}[];
};

const safeRedactor =
(
logger: LoggerInterface,
redactionConfig: NonNullable<
SQSMessageHandlerConfig<any, any>['redactionConfig']
>,
) =>
(body: string) => {
try {
return redactionConfig.redactMessageBody(body);
} catch (error) {
let encryptedBody;

// If redaction fails, then encrypt the message body and log it.
// Encryption allows for developers to decrypt the message if needed
// but does not log sensitive inforation the the log stream.
try {
encryptedBody = publicEncrypt(
redactionConfig.publicEncryptionKey,
Buffer.from(body),
).toString('base64');
} catch (error) {
// If encryption fails, then log the encryption error and replace
// the body with dummy text.
logger.error({ error }, 'Failed to encrypt message body');
encryptedBody = '[ENCRYPTION FAILED]';
}

// Log the redaction error
logger.error(
{
error,
encryptedBody,
publicKeyDescription: redactionConfig.publicKeyDescription,
},
'Failed to redact message body',
);
return '[REDACTION FAILED]';
}
};

/**
* An abstraction for an SQS message handler.
*/
Expand Down Expand Up @@ -96,7 +158,22 @@ export class SQSMessageHandler<Message, Context> {
Object.assign(context, await this.config.createRunContext(context));

// 2. Process all the records.
context.logger.info({ event }, 'Processing SQS topic message');
const redactor = this.config.redactionConfig
? safeRedactor(context.logger, this.config.redactionConfig)
: undefined;
const redactedEvent = redactor
? {
...event,
Records: event.Records.map((record) => ({
...record,
body: redactor(record.body),
})),
}
: event;
context.logger.info(
{ event: redactedEvent },
'Processing SQS topic message',
);

const processingResult = await processWithOrdering(
{
Expand Down
Loading