Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a lambda to replicate dynamo CDC stream to ClickHouse #5419

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions aws/lambda/dynamo-clickhouse-replicator/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
prepare: clean
mkdir -p ./packages
pip3 install --target ./packages -r requirements.txt
cd packages && zip -r ../dynamo-clickhouse-replicator-deployment.zip .
zip -g dynamo-clickhouse-replicator-deployment.zip lambda_function.py

deploy: prepare
aws lambda update-function-code --function-name dynamo-clickhouse-replicator --zip-file fileb://dynamo-clickhouse-replicator-deployment.zip

clean:
rm -rf dynamo-clickhouse-replicator-deployment.zip packages
15 changes: 15 additions & 0 deletions aws/lambda/dynamo-clickhouse-replicator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
This lambda is used to replicate the change data capture (CDC) records
from our DynamoDB tables to their corresponding ClickHouse ones. This
is done by listening to the stream of `INSERT`, `MODIFY`, and `REMOVE`
events coming to the DynamoDB tables, extracting the documents, and upsert
them into ClickHouse

Because the JSON structure of a DynamoDB event includes some simple
datatype annotation ([link](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html)).
The lambda performs some transformation to convert it back to a regular
JSON data structure.

### Deployment

A new version of the lambda can be deployed using `make deploy` and it
is done so automatically as part of the CI.
165 changes: 165 additions & 0 deletions aws/lambda/dynamo-clickhouse-replicator/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
import os
import re
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, Optional, Union
from warnings import warn

import clickhouse_connect


DYNAMODB_TABLE_REGEX = re.compile(
"arn:aws:dynamodb:.*?:.*?:table/(?P<table>[0-9a-zA-Z_-]+)/.+"
)
CLICKHOUSE_ENDPOINT = os.getenv("CLICKHOUSE_ENDPOINT", "")
CLICKHOUSE_USERNAME = os.getenv("CLICKHOUSE_USERNAME", "default")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "")


class EventType(Enum):
INSERT = "INSERT"
REMOVE = "REMOVE"
MODIFY = "MODIFY"


def lambda_handler(event: Any, context: Any) -> None:
# https://clickhouse.com/docs/en/integrations/python
clickhouse_client = clickhouse_connect.get_client(
host=CLICKHOUSE_ENDPOINT,
user=CLICKHOUSE_USERNAME,
password=CLICKHOUSE_PASSWORD,
secure=True,
)

counts = defaultdict(int)
for record in event["Records"]:
event_name = record.get("eventName", "")
try:
if (
event_name == EventType.INSERT.value
or event_name == EventType.MODIFY.value
):
upsert_document(clickhouse_client, record)
elif event_name == EventType.REMOVE.value:
remove_document(clickhouse_client, record)
else:
warn(f"Unrecognized event type {event_name} in {json.dumps(record)}")

counts[event_name] += 1
except Exception as error:
warn(f"Failed to process {json.dumps(record)}: {error}")

print(f"Finish processing {json.dumps(counts)}")


def extract_dynamodb_table(record: Any) -> Optional[str]:
"""
Extract the DynamoDB table name from the source ARN. This will be used later as
the index name
"""
table = record.get("tableName", "")
# In the case of a Kinesis stream, the table name has already been provided
if table:
return table

s = record.get("eventSourceARN", "")
m = DYNAMODB_TABLE_REGEX.match(s)
if not m:
warn(f"Invalid value {s}, expecting a DynamoDB table")
return

return m.group("table").lower()


def extract_dynamodb_key(record: Any) -> Optional[str]:
keys = unmarshal({"M": record.get("dynamodb", {}).get("Keys", {})})
if not keys:
return
return "|".join(keys.values())


def to_number(s: str) -> Union[int, float]:
try:
return int(s)
except ValueError:
return float(s)


def unmarshal(doc: Dict[Any, Any]) -> Any:
"""
Convert the DynamoDB stream record into a regular JSON document. This is done recursively.
At the top level, it will be a dictionary of type M (Map). Here is the list of DynamoDB
attributes to handle:

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html
"""
for k, v in list(doc.items()):
if k == "NULL":
return

if k == "S" or k == "BOOL":
return v

if k == "N":
return to_number(v)

if k == "M":
return {sk: unmarshal(sv) for sk, sv in v.items()}

if k == "BS" or k == "L":
return [unmarshal(item) for item in v]

if k == "SS":
return v.copy()

if k == "NS":
return [to_number(item) for item in v]


def upsert_document(client: Any, record: Any) -> None:
"""
Insert a new doc or modify an existing document. Note that ClickHouse doesn't really
update the document in place, but rather adding a new record for the update
"""
table = extract_dynamodb_table(record)
if not table:
return

body = unmarshal({"M": record.get("dynamodb", {}).get("NewImage", {})})
if not body:
return

id = extract_dynamodb_key(record)
if not id:
return

# TODO (huydhn) Inserting individual record is not efficient according
# to ClickHouse doc, but we can try to improve this later. See more at
# https://clickhouse.com/docs/en/optimize/bulk-inserts
print(f"UPSERTING {id} INTO {table}")
# Checkout https://clickhouse.com/videos/how-to-upsert-rows-into-clickhouse
# to understand how to upsert works in ClickHouse and how to get the latest
# records. A generic way is to use the FINAL keyword but their doc mentions
# that it's slower https://clickhouse.com/docs/en/sql-reference/statements/select/from
client.query(f"INSERT INTO `{table}` FORMAT JSONEachRow {json.dumps(body)}")


def remove_document(client: Any, record: Any) -> None:
"""
Remove a document. This is here for completeness as we don't remove records like ever
"""
table = extract_dynamodb_table(record)
if not table:
return

id = extract_dynamodb_key(record)
if not id:
return

print(f"DELETING {id} FROM {table}")

parameters = {"id": id}
client.query(
f"DELETE FROM `{table}` WHERE dynamoKey = %(id)s", parameters=parameters
)
3 changes: 3 additions & 0 deletions aws/lambda/dynamo-clickhouse-replicator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3==1.28.24
clickhouse-connect==0.7.16
pytest==7.4.0
136 changes: 136 additions & 0 deletions tools/rockset_migration/dynamodb2s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env python3

import gzip
import io
import json
import uuid
from argparse import ArgumentParser
from typing import Any, Dict, List, Union

import boto3

S3_RESOURCE = boto3.resource("s3")
BATCH_SIZE = 1000


def parse_args() -> Any:
parser = ArgumentParser("Copy dynamoDB table to ClickHouse")
parser.add_argument(
"--s3-bucket",
type=str,
required=True,
help="the name of the S3 bucket",
)
parser.add_argument(
"--s3-path",
type=str,
required=True,
help="the name of the destination S3 path on the bucket",
)
parser.add_argument(
"--dynamodb-table",
type=str,
required=True,
help="the name of the source dynamoDB table",
)
return parser.parse_args()


def scan_dynamodb_table(dynamo_client: Any, table: str):
"""
Generates all the items in a DynamoDB table
"""
paginator = dynamo_client.get_paginator("scan")

for page in paginator.paginate(TableName=table):
yield from page["Items"]


def to_number(s: str) -> Union[int, float]:
try:
return int(s)
except ValueError:
return float(s)


def unmarshal(doc: Dict[Any, Any]) -> Any:
"""
Convert the DynamoDB stream record into a regular JSON document. This is done recursively.
At the top level, it will be a dictionary of type M (Map). Here is the list of DynamoDB
attributes to handle:

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html
"""
for k, v in list(doc.items()):
if k == "NULL":
return

if k == "S" or k == "BOOL":
return v

if k == "N":
return to_number(v)

if k == "M":
return {sk: unmarshal(sv) for sk, sv in v.items()}

if k == "BS" or k == "L":
return [unmarshal(item) for item in v]

if k == "SS":
return v.copy()

if k == "NS":
return [to_number(item) for item in v]


def upload_to_s3(
s3_bucket: str,
s3_path: str,
records: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(records)} documents to S3")
body = io.StringIO()
for r in records:
json.dump(r, body)
body.write("\n")

filename = f"{uuid.uuid4()}.json"
S3_RESOURCE.Object(
f"{s3_bucket}",
f"{s3_path}/{filename}",
).put(
Body=body.getvalue().encode(),
ContentType="application/json",
)


def copy(dynamodb_table: str, s3_bucket: str, s3_path: str):
"""
Copy everything from a dynamo table to ClickHouse
"""
count = 0
records = []

dynamo_client = boto3.client("dynamodb")
for item in scan_dynamodb_table(dynamo_client, table=dynamodb_table):
count += 1
records.append(unmarshal({"M": item}))

if count == BATCH_SIZE:
upload_to_s3(s3_bucket, s3_path, records)

count = 0
records = []

if records:
upload_to_s3(s3_bucket, s3_path, records)


def main() -> None:
args = parse_args()
copy(args.dynamodb_table, args.s3_bucket, args.s3_path)


if __name__ == "__main__":
main()
Loading