Skip to content

Commit

Permalink
feat(sns): add support for subscription filter policy (#2778)
Browse files Browse the repository at this point in the history
Add a new `filterPolicy` prop to all SNS subscriptions (`aws-sns-subscriptions`).

The filter policy can be defined using the new `SubscriptionFilterPolicy` class.

Moved the creation of the `Subscription` back to `aws-sns` to avoid code repetitions.
  • Loading branch information
jogold authored and rix0rrr committed Jun 21, 2019
1 parent 370b905 commit ae789ed
Show file tree
Hide file tree
Showing 30 changed files with 688 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down Expand Up @@ -946,4 +946,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down Expand Up @@ -910,4 +910,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@
"MyQueueE6CA6235": {
"Type": "AWS::SQS::Queue"
},
"MyQueueMyTopicSubscriptionEB66AD1B": {
"MyTopicMyQueueFA241964": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "sqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA"
]
},
"EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": {
"EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down Expand Up @@ -1197,4 +1197,4 @@
"Description": "Artifact hash for asset \"aws-ecs-integ-ecs/AdoptEcrRepositorydbc60defc59544bcaa5c28c95d68f62c/Code\""
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"MyQueueE6CA6235": {
"Type": "AWS::SQS::Queue"
},
"MyQueueMyTopicSubscriptionEB66AD1B": {
"MyTopicMyQueueFA241964": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "sqs",
Expand Down Expand Up @@ -108,4 +108,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"FServiceRole3AC82EE1"
]
},
"FTSubscription775EAF05": {
"TF2453034D": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"Protocol": "lambda",
Expand Down
15 changes: 8 additions & 7 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/email.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import sns = require('@aws-cdk/aws-sns');
import { Construct } from '@aws-cdk/cdk';
import { SubscriptionProps } from './subscription';

/**
* Options for email subscriptions.
*/
export interface EmailSubscriptionProps {
export interface EmailSubscriptionProps extends SubscriptionProps {
/**
* Indicates if the full notification JSON should be sent to the email
* address or just the message text.
Expand All @@ -23,11 +23,12 @@ export class EmailSubscription implements sns.ITopicSubscription {
constructor(private readonly emailAddress: string, private readonly props: EmailSubscriptionProps = {}) {
}

public bind(scope: Construct, topic: sns.ITopic): void {
new sns.Subscription(scope, this.emailAddress, {
topic,
public bind(_topic: sns.ITopic): sns.TopicSubscriptionConfig {
return {
subscriberId: this.emailAddress,
endpoint: this.emailAddress,
protocol: this.props.json ? sns.SubscriptionProtocol.EMAIL_JSON : sns.SubscriptionProtocol.EMAIL
});
protocol: this.props.json ? sns.SubscriptionProtocol.EMAIL_JSON : sns.SubscriptionProtocol.EMAIL,
filterPolicy: this.props.filterPolicy,
};
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './subscription';
export * from './email';
export * from './lambda';
export * from './sqs';
Expand Down
31 changes: 16 additions & 15 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,38 @@ import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sns = require('@aws-cdk/aws-sns');
import { Construct } from '@aws-cdk/cdk';
import { SubscriptionProps } from './subscription';

/**
* Properties for a Lambda subscription
*/
export interface LambdaSubscriptionProps extends SubscriptionProps {

}
/**
* Use a Lambda function as a subscription target
*/
export class LambdaSubscription implements sns.ITopicSubscription {
constructor(private readonly fn: lambda.IFunction) {
constructor(private readonly fn: lambda.IFunction, private readonly props: LambdaSubscriptionProps = {}) {
}

public bind(_scope: Construct, topic: sns.ITopic): void {
public bind(topic: sns.ITopic): sns.TopicSubscriptionConfig {
// Create subscription under *consuming* construct to make sure it ends up
// in the correct stack in cases of cross-stack subscriptions.
if (!Construct.isConstruct(this.fn)) {
throw new Error(`The supplied lambda Function object must be an instance of Construct`);
}

// we use the target name as the subscription's. there's no meaning to
// subscribing the same queue twice on the same topic.
const subscriptionName = topic.node.id + 'Subscription';
if (this.fn.node.tryFindChild(subscriptionName)) {
throw new Error(`A subscription between the topic ${topic.node.id} and the lambda ${this.fn.node.id} already exists`);
}

new sns.Subscription(this.fn, subscriptionName, {
topic,
endpoint: this.fn.functionArn,
protocol: sns.SubscriptionProtocol.LAMBDA,
});

this.fn.addPermission(topic.node.id, {
sourceArn: topic.topicArn,
principal: new iam.ServicePrincipal('sns.amazonaws.com'),
});

return {
subscriberId: this.fn.node.id,
endpoint: this.fn.functionArn,
protocol: sns.SubscriptionProtocol.LAMBDA,
filterPolicy: this.props.filterPolicy,
};
}
}
27 changes: 11 additions & 16 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import iam = require('@aws-cdk/aws-iam');
import sns = require('@aws-cdk/aws-sns');
import sqs = require('@aws-cdk/aws-sqs');
import { Construct } from '@aws-cdk/cdk';
import { SubscriptionProps } from './subscription';

/**
* Properties for an SQS subscription
*/
export interface SqsSubscriptionProps {
export interface SqsSubscriptionProps extends SubscriptionProps {
/**
* The message to the queue is the same as it was sent to the topic
*
Expand All @@ -24,27 +25,13 @@ export class SqsSubscription implements sns.ITopicSubscription {
constructor(private readonly queue: sqs.IQueue, private readonly props: SqsSubscriptionProps = {}) {
}

public bind(_scope: Construct, topic: sns.ITopic): void {
public bind(topic: sns.ITopic): sns.TopicSubscriptionConfig {
// Create subscription under *consuming* construct to make sure it ends up
// in the correct stack in cases of cross-stack subscriptions.
if (!Construct.isConstruct(this.queue)) {
throw new Error(`The supplied Queue object must be an instance of Construct`);
}

// we use the queue name as the subscription's. there's no meaning to
// subscribing the same queue twice on the same topic.
const subscriptionName = topic.node.id + 'Subscription';
if (this.queue.node.tryFindChild(subscriptionName)) {
throw new Error(`A subscription between the topic ${topic.node.id} and the queue ${this.queue.node.id} already exists`);
}

new sns.Subscription(this.queue, subscriptionName, {
topic,
endpoint: this.queue.queueArn,
protocol: sns.SubscriptionProtocol.SQS,
rawMessageDelivery: this.props.rawMessageDelivery,
});

// add a statement to the queue resource policy which allows this topic
// to send messages to the queue.
this.queue.addToResourcePolicy(new iam.PolicyStatement({
Expand All @@ -55,5 +42,13 @@ export class SqsSubscription implements sns.ITopicSubscription {
ArnEquals: { 'aws:SourceArn': topic.topicArn }
}
}));

return {
subscriberId: this.queue.node.id,
endpoint: this.queue.queueArn,
protocol: sns.SubscriptionProtocol.SQS,
rawMessageDelivery: this.props.rawMessageDelivery,
filterPolicy: this.props.filterPolicy,
};
}
}
10 changes: 10 additions & 0 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sns = require('@aws-cdk/aws-sns');

export interface SubscriptionProps {
/**
* The filter policy.
*
* @default - all messages are delivered
*/
readonly filterPolicy?: { [attribute: string]: sns.SubscriptionFilter };
}
13 changes: 7 additions & 6 deletions packages/@aws-cdk/aws-sns-subscriptions/lib/url.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import sns = require('@aws-cdk/aws-sns');
import { Construct } from '@aws-cdk/cdk';
import { SubscriptionProps } from './subscription';

/**
* Options for URL subscriptions.
*/
export interface UrlSubscriptionProps {
export interface UrlSubscriptionProps extends SubscriptionProps {
/**
* The message to the queue is the same as it was sent to the topic
*
Expand All @@ -29,12 +29,13 @@ export class UrlSubscription implements sns.ITopicSubscription {
}
}

public bind(scope: Construct, topic: sns.ITopic): void {
new sns.Subscription(scope, this.url, {
topic,
public bind(_topic: sns.ITopic): sns.TopicSubscriptionConfig {
return {
subscriberId: this.url,
endpoint: this.url,
protocol: this.url.startsWith('https:') ? sns.SubscriptionProtocol.HTTPS : sns.SubscriptionProtocol.HTTP,
rawMessageDelivery: this.props.rawMessageDelivery,
});
filterPolicy: this.props.filterPolicy,
};
}
}
Loading

0 comments on commit ae789ed

Please sign in to comment.