Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdk/kinesis: Data stream sharing #28814

Open
2 tasks
Rinbo opened this issue Jan 22, 2024 · 3 comments
Open
2 tasks

cdk/kinesis: Data stream sharing #28814

Rinbo opened this issue Jan 22, 2024 · 3 comments
Labels
@aws-cdk/aws-kinesis Related to Amazon Kinesis blocked Work is blocked on this issue for this codebase. Other labels or comments may indicate why. effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. needs-cfn This issue is waiting on changes to CloudFormation before it can be addressed. p3

Comments

@Rinbo
Copy link

Rinbo commented Jan 22, 2024

Describe the feature

Implement ability to add a resource-policy for a Kinesis Data Stream. Currently, not even the L1 contruct can accomodate this so we have to add it manually in the console which is not good.

Use Case

Eg. cross account sharing

Proposed Solution

Add a function to the Stream construct (L2?), to allow it to grant read/write access to an AccountPrincipal (similar to how it is done for SQS). This should create a resource policy on the stream. Currently this can only be done through the console from what I can tell

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

CDK version used

2.122.0

Environment details (OS name and version, etc.)

Ubuntu 20.04.5 LTS

@Rinbo Rinbo added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Jan 22, 2024
@github-actions github-actions bot added the @aws-cdk/aws-kinesis Related to Amazon Kinesis label Jan 22, 2024
@tim-finnigan tim-finnigan self-assigned this Jan 24, 2024
@tim-finnigan tim-finnigan added investigating This issue is being investigated and/or work is in progress to resolve the issue. and removed needs-triage This issue or PR still needs to be triaged. labels Jan 24, 2024
@tim-finnigan
Copy link

Thanks for the feature request. As you mentioned there is no L1 support, so that would need to be added: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/AWS_Kinesis.html, like this. Labeling this as blocked pending Cfn support.

@tim-finnigan tim-finnigan added p2 needs-cfn This issue is waiting on changes to CloudFormation before it can be addressed. blocked Work is blocked on this issue for this codebase. Other labels or comments may indicate why. effort/medium Medium work item – several days of effort and removed investigating This issue is being investigated and/or work is in progress to resolve the issue. labels Jan 24, 2024
@tim-finnigan tim-finnigan removed their assignment Jan 24, 2024
@thorben-akent
Copy link

We also have a client that has a need to support cross account Kinesis -> Lambda triggers within one of their CDK projects. As mentioned already in this issue CloudFormation does not yet support resource-based policies on AWS::Kinesis::Stream resources, but there is always Lambda-backed custom resources and CDK already has the AwsApi class that we can use to invoke kinesis:PutResourcePolicy.

I'm currently investigating using this as a workaround for our client.

@thorben-akent
Copy link

We ended up implementing a lambda-backed custom resource for this. Note: you may also have to use a Lambda layer to provide a more recent version of the boto3 module depending on the AWS partition you are deploying to.

Lambda code:

import boto3
import json
import logging
import os
import uuid

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("lambda")
logger.setLevel(logging.INFO)

logger.info("Lambda cold start")

debug_enabled = os.getenv("DEBUG","FALSE").upper() in ("TRUE","T","1")

if debug_enabled:
    logger.setLevel(logging.DEBUG)
    logger.debug("Debug enabled")

logger.debug(f"Boto3 version: {boto3.__version__}")

access_type_action_map = {
    "READ": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
    ],
    "WRITE": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
    ],
    "READWRITE": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
    ],
}


def get_policy(arn):
    
    client = boto3.client("kinesis")
    
    params = {
        "ResourceARN": arn
    }
    try:
        response = client.get_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.get_resource_policy()", extra={"params":params})
        raise
    else:
        policy = json.loads(response["Policy"])

    if not "Version" in policy:
         policy["Version"] = "2012-10-17"
    if not "Id" in policy:
         policy["Id"] = uuid.uuid4()
    if not "Statement" in policy:
         policy["Statement"] = []

    return policy


def put_policy(arn, policy):
    
    client = boto3.client("kinesis")
    
    params = {
        "ResourceARN": arn,
        "Policy": json.dumps(policy, default=str),
    }
    try:
        response = client.put_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.put_resource_policy()", extra={"params":params})
        raise


def delete_policy(arn):
    
    client = boto3.client("kinesis")
    
    params = {
        "ResourceARN": arn,
    }
    try:
        response = client.delete_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.delete_resource_policy()", extra={"params":params})
        raise


def create_handler(event):

    required_properties = [
        "StreamArn",
        "AccountId",
        "AccessType",
    ]

    for property in required_properties:
        if not property in event["ResourceProperties"]:
            message = f'Missing required key {property} in ResourceProperties'
            logger.error(message)
            raise RuntimeError(message)
        
    sid = event["RequestId"].replace("-","")
    new_statement = {
        "Sid": sid,
        "Effect": "Allow",
        "Principal": {
            "AWS": event["ResourceProperties"]["AccountId"],
        },
        "Action": access_type_action_map[event["ResourceProperties"]["AccessType"]],
        "Resource": event["ResourceProperties"]["StreamArn"],
    }
        
    policy = get_policy(event["ResourceProperties"]["StreamArn"])
    logger.info(f'Current resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')

    logger.info(f'Adding/updating statement to policy: {json.dumps(new_statement, default=str)}')
    statements = []
    for statement in policy["Statement"]:
        if "Sid" in statement and statement["Sid"] == sid:
            continue
        else:
            statements.append(statement)
    statements.append(new_statement)

    policy["Statement"] = statements
    logger.info(f'Applying new resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')
    put_policy(event["ResourceProperties"]["StreamArn"], policy)


def update_handler(event):

    event["RequestId"] == event["PhysicalResourceId"]
    create_handler(event)


def delete_handler(event):

    required_properties = [
        "StreamArn",
    ]

    for property in required_properties:
        if not property in event["ResourceProperties"]:
            message = f'Missing required key {property} in ResourceProperties'
            logger.error(message)
            raise RuntimeError(message)
        
    sid = event["PhysicalResourceId"].replace("-","")
        
    policy = get_policy(event["ResourceProperties"]["StreamArn"])
    logger.info(f'Current resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')

    statements = []
    for statement in policy["Statement"]:
        if "Sid" in statement and statement["Sid"] == sid:
            continue
        else:
            statements.append(statement)

    policy["Statement"] = statements
    logger.info(f'Applying resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')
    if len(policy["Statement"]) == 0:
        delete_policy(event["ResourceProperties"]["StreamArn"])
    else:
        put_policy(event["ResourceProperties"]["StreamArn"], policy)


def on_event(event, context):

    logger.debug(f"Received event: {json.dumps(event, default=str)}")

    match event["RequestType"]:
        case "Create":
            return create_handler(event)
        case "Update":
            return update_handler(event)
        case "Delete":
            return delete_handler(event)
        case _:
            message = f'Value of {event["RequestType"]} is unsupported for RequestType'
            logger.error(message)
            raise RuntimeError(message)

CDK code:

stream = kinesis.Stream(self, "Stream",
    retention_period = cdk.Duration.hours(24),
    stream_mode = kinesis.StreamMode.ON_DEMAND,
)

kinesis_resource_policy_provider_function = lambda_.Function(self, "KinesisResourcePolicyProviderFunction",
    code = lambda_.Code.from_asset(
        path = str((definitions.CUSTOM_RESOURCE_PROVIDERS_DIR / "kinesis_resource_policy_provider").resolve()),
        deploy_time = True,
    ),
    environment = {
        "DEBUG": "TRUE",
    },
    handler = "kinesis_resource_policy_provider_handler.on_event",
    runtime = lambda_.Runtime.PYTHON_3_11,
    timeout = cdk.Duration.minutes(5),
)
kinesis_resource_policy_provider_function.add_to_role_policy(iam.PolicyStatement(
    sid = "ModifyKinesisResourcePolicy",
    actions = [
        "kinesis:DeleteResourcePolicy",
        "kinesis:GetResourcePolicy",
        "kinesis:PutResourcePolicy",
    ],
    resources = [ "*" ],
))
kinesis_resource_policy_provider = custom_resources.Provider(self, "KinesisResourcePolicyProvider",
    on_event_handler = kinesis_resource_policy_provider_function,
)

kinesis_resource_policy = cdk.CustomResource(self, "KinesisResourcePolicy",
    properties = {
        "StreamArn": stream.stream_arn,
        "AccountId": "11111111111",
        "AccessType": "READ",
    },
    resource_type = "Custom::KinesisResourcePolicy",
    service_token = kinesis_resource_policy_provider.service_token,
)

@pahud pahud added p3 and removed p2 labels Jun 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@aws-cdk/aws-kinesis Related to Amazon Kinesis blocked Work is blocked on this issue for this codebase. Other labels or comments may indicate why. effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. needs-cfn This issue is waiting on changes to CloudFormation before it can be addressed. p3
Projects
None yet
Development

No branches or pull requests

4 participants