Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis): support stream consumers #32087

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

humanzz
Copy link
Contributor

@humanzz humanzz commented Nov 11, 2024

Issue # (if applicable)

Closes #32050

Reason for this change

Support Enhanced fan-out consumers via AWS::Kinesis::StreamConsumer and facilitate cross-account stream consumption via Lambda

Description of changes

  • introduce StreamConsumer construct to model AWS::Kinesis::StreamConsumer
    • introduce addToResourcePolicy to enable creating/configuring a resource policy for the consumer
    • introduce grant and grantRead for granting permissions
    • leverage iam.Grant.addToPrincipalOrResource in grant to be able to use grant methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed
  • update ResourcePolicy to support both Stream and StreamConsumer
  • update Stream's grant to leverage iam.Grant.addToPrincipalOrResource for cross-environment support
  • introduce KinesisConsumerEventSource to lambda-event-sources for use with the newly introduced StreamConsumer

Useful links

Description of how you validated changes

unit and integration tests

Checklist


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

- introduce `StreamConsumer` construct to model `AWS::Kinesis::StreamConsumer`
  - introduce `addToResourcePolicy` to enable creating/configuring a resource policy for the consumer
  - introduce `grant` and `grantRead` for granting permissions
  - leverage `iam.Grant.addToPrincipalOrResource` in `grant` to be able to use `grant` methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed
- update `ResourcePolicy` to support both `Stream` and `StreamConsumer`
- update `Stream`'s `grant` to leverage `iam.Grant.addToPrincipalOrResource` for cross-environment support

closes aws#32050
@github-actions github-actions bot added admired-contributor [Pilot] contributed between 13-24 PRs to the CDK effort/small Small work item – less than a day of effort feature-request A feature should be added or improved. p2 labels Nov 11, 2024
@aws-cdk-automation aws-cdk-automation requested a review from a team November 11, 2024 10:43
Copy link

codecov bot commented Nov 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 78.46%. Comparing base (24adca3) to head (f1f7ef6).
Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #32087   +/-   ##
=======================================
  Coverage   78.46%   78.46%           
=======================================
  Files         106      106           
  Lines        7208     7208           
  Branches     1323     1323           
=======================================
  Hits         5656     5656           
  Misses       1364     1364           
  Partials      188      188           
Flag Coverage Δ
suite.unit 78.46% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
packages/aws-cdk 78.46% <ø> (ø)

@aws-cdk-automation aws-cdk-automation added the pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member. label Nov 11, 2024
@humanzz
Copy link
Contributor Author

humanzz commented Nov 22, 2024

One other thing I noticed, is that in lambda event sources, the KinesisEventSource assumes an IStream, and I think that will need to be changed to allow for StreamConsumer

constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {

A potential implementation

  • Extract the logic into a private base class e.g. KinesisEventSourceBase which takes as input a private KinesisSource
  • Update existing KinesisEventSource to extend KinesisEventSourceBase
  • Introduce a new KinesisConsumerEventSource also extending KinesisEventSourceBase
import * as constructs from 'constructs';
import { StreamEventSource, StreamEventSourceProps } from './stream';
import * as iam from '../../aws-iam';
import * as kinesis from '../../aws-kinesis';
import * as lambda from '../../aws-lambda';
import * as cdk from '../../core';

export interface KinesisEventSourceProps extends StreamEventSourceProps {
  /**
   * The time from which to start reading, in Unix time seconds.
   *
   * @default - no timestamp
   */
  readonly startingPositionTimestamp?: number;
}

interface KinesisSource {
  readonly node: constructs.Node;
  readonly sourceArn: string;
  readonly eventSourceName: string;
  grantRead(grantee: iam.IGrantable): iam.Grant;
}

abstract class KinesisEventSourceBase extends StreamEventSource {
  private _eventSourceMappingId?: string = undefined;
  private _eventSourceMappingArn?: string = undefined;
  private startingPositionTimestamp?: number;

  constructor(readonly source: KinesisSource, props: KinesisEventSourceProps) {
    super(props);
    this.startingPositionTimestamp = props.startingPositionTimestamp;

    this.props.batchSize !== undefined && cdk.withResolved(this.props.batchSize, batchSize => {
      if (batchSize < 1 || batchSize > 10000) {
        throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
      }
    });
  }

  public bind(target: lambda.IFunction) {
    const eventSourceMapping = target.addEventSourceMapping(`${this.source.eventSourceName}:${cdk.Names.nodeUniqueId(this.source.node)}`,
      this.enrichMappingOptions({
        eventSourceArn: this.source.sourceArn,
        startingPositionTimestamp: this.startingPositionTimestamp,
        metricsConfig: this.props.metricsConfig,
      }),
    );
    this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;
    this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn;

    this.source.grantRead(target);
  }

  /**
   * The identifier for this EventSourceMapping
   */
  public get eventSourceMappingId(): string {
    if (!this._eventSourceMappingId) {
      throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
    }
    return this._eventSourceMappingId;
  }

  /**
   * The ARN for this EventSourceMapping
   */
  public get eventSourceMappingArn(): string {
    if (!this._eventSourceMappingArn) {
      throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
    }
    return this._eventSourceMappingArn;
  }
}

/**
 * Use an Amazon Kinesis stream as an event source for AWS Lambda.
 */
export class KinesisEventSource extends KinesisEventSourceBase {
  constructor(stream: kinesis.IStream, props: KinesisEventSourceProps) {
    super({ ...stream, eventSourceName: 'KinesisEventSource', sourceArn: stream.streamArn }, props);
  }
}

/**
 * Use an Amazon Kinesis stream consumer as an event source for AWS Lambda.
 */
export class KinesisConsumerEventSource extends KinesisEventSourceBase {
  constructor(streamConsumer: kinesis.IStreamConsumer, props: KinesisEventSourceProps) {
    super({ ...streamConsumer, eventSourceName: 'KinesisConsumerEventSource', sourceArn: streamConsumer.streamConsumerArn }, props);
  }
}

// and `SubscribeToShard` APIs.
// The Lambda::EventSourceMapping resource validates against the `DescribeStream` permission. So we add it explicitly.
// FIXME This permission can be removed when the event source mapping resource drops it from validation.
this.stream.grant(target, 'kinesis:DescribeStream');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This permission is now already part of grantRead

@aws-cdk-automation
Copy link
Collaborator

AWS CodeBuild CI Report

  • CodeBuild project: AutoBuildv2Project1C6BFA3F-wQm2hXv2jqQv
  • Commit ID: f1f7ef6
  • Result: SUCCEEDED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
admired-contributor [Pilot] contributed between 13-24 PRs to the CDK effort/small Small work item – less than a day of effort feature-request A feature should be added or improved. p2 pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kinesis: model AWS::Kinesis::StreamConsumer (L2 construct)
2 participants