Skip to content

Commit

Permalink
feat: Inject schema registry credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci committed Feb 26, 2022
1 parent 8de48bf commit be86edd
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 103 deletions.
53 changes: 36 additions & 17 deletions examples/basic_auth/main.tf
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
variable "endpoints" {
variable "kafka_endpoints" {
type = string
default = "example.confluent.cloud:9092"
}

variable "basic_auth" {
variable "schema_registry_endpoint" {
type = string
default = "https://schemaregistry.confluent.cloud:9093"
}


variable "schema_registry_basic_auth" {
type = map(string)
default = {
username = "key"
password = "password"
}
}

variable "kafka_basic_auth" {
type = map(string)
default = {
username = "key"
Expand All @@ -15,26 +29,31 @@ resource "aws_secretsmanager_secret" "kafka_basic_auth" {
name = "kafka_basic_auth"
}

resource "aws_secretsmanager_secret_version" "kafka_user_certificate" {
resource "aws_secretsmanager_secret_version" "kafka_basic_auth" {
secret_id = aws_secretsmanager_secret.kafka_basic_auth.id
secret_string = jsonencode(var.basic_auth)
secret_string = jsonencode(var.kafka_basic_auth)
}

resource "aws_secretsmanager_secret" "schema_registry_basic_auth" {
name = "schema_registry_basic_auth"
}

resource "aws_secretsmanager_secret_version" "schema_registry_basic_auth" {
secret_id = aws_secretsmanager_secret.schema_registry_basic_auth.id
secret_string = jsonencode(var.schema_registry_basic_auth)
}

module "lambda_to_sqs" {
#source = "https://github.com/andrewinci/lambda-kafka2sqs/releases/download/v1.0.1/module.zip"
source = "../../module"
function_name = "consumer"
kafka_endpoints = var.endpoints
kafka_authentication_type = "BASIC"
kafka_credentials_arn = aws_secretsmanager_secret.kafka_basic_auth.arn
source = "../../module"
function_name = "consumer"
kafka_endpoints = var.kafka_endpoints
kafka_authentication_type = "BASIC"
kafka_credentials_arn = aws_secretsmanager_secret.kafka_basic_auth.arn
schema_registry_endpoint = var.schema_registry_endpoint
schema_registry_credentials_arn = aws_secretsmanager_secret.schema_registry_basic_auth.arn
kafka_topics = [
{
topic_name = "test"
is_avro = true
},
{
topic_name = "test-2"
is_avro = false
}
{ topic_name = "test", is_avro = true },
{ topic_name = "test-2", is_avro = false }
]
}
14 changes: 11 additions & 3 deletions module/lambda.tf
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
locals {
env_topic_config = merge(
{ TOPIC_CONFIGURATION = jsonencode(var.kafka_topics) },
length(var.schema_registry_endpoint) > 0 ? { SCHEMA_REGISTRY_URL = var.schema_registry_endpoint } : {},
length(var.schema_registry_credentials_arn) > 0 ? { SCHEMA_REGISTRY_SECRET_ARN = var.schema_registry_credentials_arn } : {}
)
}

resource "aws_lambda_function" "lambda" {
function_name = var.function_name
role = aws_iam_role.role.arn
handler = local.function_handler
filename = local.function_zip
source_code_hash = filesha256(local.function_zip)
runtime = "python3.9"

environment {
variables = {
TOPIC_CONFIGURATION = jsonencode(var.kafka_topics)
}
variables = local.env_topic_config
}

depends_on = [
aws_cloudwatch_log_group.consumer_lambda_logging
]
Expand Down
12 changes: 12 additions & 0 deletions module/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,15 @@ variable "kafka_ca_secret_arn" {
description = "The arn of the secret containing the ca certificate in PEM format"
default = ""
}

variable "schema_registry_endpoint" {
type = string
description = "Schema registry endpoint including the protocol (i.e. https://...)."
default = ""
}

variable "schema_registry_credentials_arn" {
type = string
description = "Secret containing the username and password to connect to schema registry"
default = ""
}
76 changes: 0 additions & 76 deletions src/handler.py

This file was deleted.

81 changes: 76 additions & 5 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import os
import asyncio
import boto3
import base64
from collections import namedtuple
from schema_registry.serializers import AsyncAvroMessageSerializer
from schema_registry.client import AsyncSchemaRegistryClient
from .handler import Handler

TOPIC_CONFIGURATION = os.environ.get("TOPIC_CONFIGURATION")
SCHEMA_REGISTRY_URL = os.environ.get("SCHEMA_REGISTRY_URL")
SCHEMA_REGISTRY_SECRET_ARN = os.environ.get("SCHEMA_REGISTRY_SECRET_ARN")

SchemaRegistryConfig = namedtuple("SchemaRegistryConfig", "endpoint username password")
RawRecord = namedtuple("RawRecord", "topic raw_key raw_value timestamp original")
Record = namedtuple("Record", "topic key value timestamp")


def lambda_handler(event, _):
Expand All @@ -20,7 +23,7 @@ def lambda_handler(event, _):
avro_serializer = (
None
if not schema_registry_config
else retrieve_schema_registry_configs(schema_registry_config)
else build_avro_serializer(schema_registry_config)
)
handler = Handler(topic_configs, avro_serializer)
return asyncio.run(handler.handle(event))
Expand All @@ -33,13 +36,12 @@ def retrieve_schema_registry_configs() -> typing.Optional[SchemaRegistryConfig]:
return SchemaRegistryConfig(SCHEMA_REGISTRY_URL, None, None)
else:
client = boto3.client("secretsmanager")
res = client.get_secret_value(SecretId=SCHEMA_REGISTRY_SECRET_ARN)
credentials = json.loads(res)
response = client.get_secret_value(SecretId=SCHEMA_REGISTRY_SECRET_ARN)
credentials = json.loads(response['SecretString'])
return SchemaRegistryConfig(
SCHEMA_REGISTRY_URL, credentials["username"], credentials["password"]
)


def build_avro_serializer(
schema_registry_config: SchemaRegistryConfig,
) -> AsyncAvroMessageSerializer:
Expand All @@ -65,3 +67,72 @@ def parse_topic_configuration(config) -> dict:
raise Exception("Duplicate topic name in configuration")
res[c["topic_name"]] = c["is_avro"]
return res



class Handler:
def __init__(
self,
topic_configuration: dict,
avro_serializer: AsyncAvroMessageSerializer = None,
) -> None:
if topic_configuration is None:
raise Exception("Unable to init the Handler: Missing topic configuration")
self.topic_configuration = topic_configuration
if avro_serializer:
self.avro_serializer = avro_serializer
else:
print(
"warning: Missing schema registry configuration. Unable to deserialize avro messages"
)

async def handle(self, event):
"""
Main lambda handler
"""
parsed = []
failed = []
for r in self.extract_records(event):
try:
parsed.append(await self.deserialize_record(r))
except Exception as e:
print(f"Unable to deserialize the record: {e}")
failed.append(r.original)
# todo: send to sqs and dlq
print("Parsed", parsed)
print("Failed", failed)
return json.dumps([r._asdict() for r in parsed])

def extract_records(self, event) -> typing.List[RawRecord]:
"""
Extract the raw records from the event triggered by the lambda
runtime
"""
return [
RawRecord(r["topic"], r["key"], r["value"], r["timestamp"], r)
for v in event["records"].values()
for r in v
]

async def deserialize_record(self, record: RawRecord) -> Record:
"""
Deserialize the key and the value of the record coming
from the kafka topic
"""
is_avro = self.topic_configuration.get(record.topic)
if is_avro is None:
raise Exception(f"Missing configuration for topic {record.topic}")
# assumption: the key is always string
key = base64.standard_b64decode(record.raw_key).decode("UTF-8")
bin_value: bytes = base64.standard_b64decode(record.raw_value)
if not is_avro:
return Record(
record.topic, key, bin_value.decode("UTF-8"), record.timestamp
)
elif self.avro_serializer:
value = await self.avro_serializer.decode_message(bin_value)
return Record(record.topic, key, json.dumps(value), record.timestamp)
else:
raise Exception(
"Unable to deserialize avro. Missing schema registry configuration"
)
3 changes: 1 addition & 2 deletions test/maint_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from src.main import parse_topic_configuration
from src.handler import Handler, RawRecord
from src.main import Handler, RawRecord, parse_topic_configuration
import pytest


Expand Down

0 comments on commit be86edd

Please sign in to comment.