Skip to content

Commit

Permalink
Reduce schema registry calls by using cached calls
Browse files Browse the repository at this point in the history
Add support to use cached calls instead of making latest calls for the KEY schema of the topic
Batcher does compute the id of the key
Passes it to the loader via the loader kafka job
Loader now retrives the job by ID which is cached and no longer there is a need to query kafka everytime

This will reduce the schema registry calls
  • Loading branch information
alok87 committed Apr 8, 2021
1 parent 8a42ddc commit bacd6e5
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 79 deletions.
36 changes: 3 additions & 33 deletions redshiftsink/pkg/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@ import (
"fmt"
"github.com/Shopify/sarama"
"github.com/linkedin/goavro/v2"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"strings"
"time"
)

type AvroProducer struct {
producer sarama.SyncProducer
registry schemaregistry.SchemaRegistry
}

func NewAvroProducer(
brokers []string,
kafkaVersion string,
schemaRegistryURL string,
configTLS TLSConfig,
) (*AvroProducer, error) {
) (
*AvroProducer, error,
) {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, fmt.Errorf("Error parsing Kafka version: %v\n", err)
Expand Down Expand Up @@ -52,36 +49,9 @@ func NewAvroProducer(

return &AvroProducer{
producer: producer,
registry: schemaregistry.NewRegistry(schemaRegistryURL),
}, nil
}

// CreateSchema creates schema if it does not exist
func (c *AvroProducer) CreateSchema(
topic string, scheme string) (int, bool, error) {

created := false

schemeStr := strings.ReplaceAll(scheme, "\n", "")
schemeStr = strings.ReplaceAll(schemeStr, " ", "")

schema, err := schemaregistry.GetLatestSchemaWithRetry(
c.registry, topic, false, 2,
)
if schema == nil || schema.Schema() != schemeStr {
klog.V(2).Infof("%s: Creating schema for the topic", topic)
schema, err = c.registry.CreateSchema(
topic, scheme, schemaregistry.Avro, false,
)
if err != nil {
return 0, false, err
}
created = true
}

return schema.ID(), created, nil
}

func (c *AvroProducer) Add(
topic string,
schema string,
Expand Down
39 changes: 34 additions & 5 deletions redshiftsink/pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
loader "github.com/practo/tipoca-stream/redshiftsink/pkg/redshiftloader"
"github.com/practo/tipoca-stream/redshiftsink/pkg/s3sink"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"github.com/practo/tipoca-stream/redshiftsink/pkg/serializer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium"
Expand Down Expand Up @@ -53,8 +54,12 @@ type batchProcessor struct {

maxConcurrency int

// loaderSchemaID informations for the loader topic
// loaderSchemaID stores the schema ID for the loader topic-value
loaderSchemaID int

// schemaIDKey stores the schema ID for the batcher topic-key
// loader would use these to fetch primaryKeys for the table
schemaIDKey int
}

func newBatchProcessor(
Expand Down Expand Up @@ -84,7 +89,6 @@ func newBatchProcessor(
signaler, err := kafka.NewAvroProducer(
strings.Split(kafkaConfig.Brokers, ","),
kafkaConfig.Version,
viper.GetString("schemaRegistryURL"),
kafkaConfig.TLSConfig,
)
if err != nil {
Expand All @@ -101,13 +105,35 @@ func newBatchProcessor(
)
}

loaderSchemaID, _, err := signaler.CreateSchema(
registry := schemaregistry.NewRegistry(viper.GetString("schemaRegistryURL"))
// creates the loader schema for value if not present
loaderSchemaID, _, err := schemaregistry.CreateSchema(
registry,
kafkaLoaderTopicPrefix+topic,
loader.JobAvroSchema,
false, // key is false means its for the value
)
if err != nil {
return nil, fmt.Errorf(
"Error creating schema for topic: %s, err: %v",
kafkaLoaderTopicPrefix+topic, err)
}
schemaKey, err := schemaregistry.GetLatestSchemaWithRetry(
registry,
topic,
true, // key is true means its for the key
2,
)
if err != nil {
return nil, fmt.Errorf(
"Error creating schema for topic: %s, err: %v", topic, err)
"Error fetching schema for topic-key for topic: %s, err: %v",
topic, err)
}
if schemaKey == nil {
return nil, fmt.Errorf(
"Error since schema came as nil for topic-key for topic: %s",
topic,
)
}

klog.V(2).Infof("%s: autoCommit: %v", topic, saramaConfig.AutoCommit)
Expand All @@ -128,6 +154,7 @@ func newBatchProcessor(
signaler: signaler,
maxConcurrency: maxConcurrency,
loaderSchemaID: loaderSchemaID,
schemaIDKey: schemaKey.ID(),
}, nil
}

Expand Down Expand Up @@ -228,7 +255,8 @@ func (b *batchProcessor) signalLoad(resp *response) error {
resp.endOffset,
",",
b.s3sink.GetKeyURI(resp.s3Key),
resp.batchSchemaID, // schema of upstream topic
resp.batchSchemaID, // schema of upstream topic's value
b.schemaIDKey, // schema of upstream topic's key
resp.maskSchema,
resp.skipMerge,
resp.bytesProcessed,
Expand Down Expand Up @@ -288,6 +316,7 @@ func (b *batchProcessor) processMessage(
r, err := b.schemaTransformer.TransformValue(
b.topic,
resp.batchSchemaID,
b.schemaIDKey,
resp.maskSchema,
)
if err != nil {
Expand Down
18 changes: 15 additions & 3 deletions redshiftsink/pkg/redshiftloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,29 @@ var JobAvroSchema string = `{
{"name": "csvDialect", "type": "string"},
{"name": "s3Path", "type": "string"},
{"name": "schemaId", "type": "int"},
{"name": "schemaIdKey", "type": "int"},
{"name": "maskSchema", "type": "string"},
{"name": "skipMerge", "type": "string", "default": ""},
{"name": "batchBytes", "type": "long", "default": 0}
]
}`

type Job struct {
UpstreamTopic string `json:"upstreamTopic"`
UpstreamTopic string `json:"upstreamTopic"` // batcher topic
StartOffset int64 `json:"startOffset"`
EndOffset int64 `json:"endOffset"`
CsvDialect string `json:"csvDialect"`
S3Path string `json:"s3Path"`
SchemaId int `json:"schemaId"` // schema id of debezium event
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
}

func NewJob(
upstreamTopic string, startOffset int64, endOffset int64,
csvDialect string, s3Path string, schemaId int,
csvDialect string, s3Path string, schemaId int, schemaIdKey int,
maskSchema map[string]serializer.MaskInfo, skipMerge bool,
batchBytes int64) Job {

Expand All @@ -47,6 +49,7 @@ func NewJob(
CsvDialect: csvDialect,
S3Path: s3Path,
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
SkipMerge: skipMerge,
BatchBytes: batchBytes,
Expand Down Expand Up @@ -84,6 +87,14 @@ func StringMapToJob(data map[string]interface{}) Job {
} else if value, ok := v.(int); ok {
job.SchemaId = value
}
case "schemaIdKey":
if value, ok := v.(int32); ok {
job.SchemaIdKey = int(value)
} else if value, ok := v.(int); ok {
job.SchemaIdKey = value
} else {
job.SchemaIdKey = -1 // backward compatibility
}
case "skipMerge":
if value, ok := v.(string); ok {
if value == "true" {
Expand Down Expand Up @@ -198,6 +209,7 @@ func (c Job) ToStringMap() map[string]interface{} {
"csvDialect": c.CsvDialect,
"s3Path": c.S3Path,
"schemaId": c.SchemaId,
"schemaIdKey": c.SchemaIdKey,
"skipMerge": skipMerge,
"maskSchema": ToSchemaString(c.MaskSchema),
"batchBytes": c.BatchBytes,
Expand Down
1 change: 1 addition & 0 deletions redshiftsink/pkg/redshiftloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestToStringMap(t *testing.T) {
",",
"s3path",
1,
2,
maskSchema,
false,
10,
Expand Down
34 changes: 23 additions & 11 deletions redshiftsink/pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func newLoadProcessor(
partition int32,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift,
) serializer.MessageBatchSyncProcessor {
) (serializer.MessageBatchSyncProcessor, error) {
sink, err := s3sink.NewS3Sink(
viper.GetString("s3sink.accessKeyId"),
viper.GetString("s3sink.secretAccessKey"),
viper.GetString("s3sink.region"),
viper.GetString("s3sink.bucket"),
)
if err != nil {
klog.Fatalf("Error creating s3 client: %v\n", err)
return nil, fmt.Errorf("Error creating s3 client: %v\n", err)
}

klog.V(3).Infof("%s: auto-commit: %v", topic, saramaConfig.AutoCommit)
Expand All @@ -119,7 +119,7 @@ func newLoadProcessor(
targetTable: nil,
tableSuffix: viper.GetString("redshift.tableSuffix"),
redshiftStats: viper.GetBool("redshift.stats"),
}
}, nil
}

func (b *loadProcessor) ctxCancelled(ctx context.Context) error {
Expand Down Expand Up @@ -425,8 +425,11 @@ func (b *loadProcessor) merge(ctx context.Context) error {
// batch messages.
// this also intializes b.stagingTable
func (b *loadProcessor) createStagingTable(
ctx context.Context, schemaId int, inputTable redshift.Table) error {

ctx context.Context,
schemaId int,
schemaIdKey int,
inputTable redshift.Table,
) error {
b.stagingTable = redshift.NewTable(inputTable)
b.stagingTable.Name = b.stagingTable.Name + "_staged"

Expand All @@ -449,10 +452,17 @@ func (b *loadProcessor) createStagingTable(
return fmt.Errorf("Error dropping staging table: %v\n", err)
}

primaryKeys, err := b.schemaTransformer.TransformKey(
b.upstreamTopic)
if err != nil {
return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err)
var primaryKeys []string
if schemaIdKey == -1 {
primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey)
if err != nil {
return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err)
}
} else { // Deprecated as below is expensive and does not use cache
primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic)
if err != nil {
return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err)
}
}
b.primaryKeys = primaryKeys

Expand Down Expand Up @@ -622,8 +632,8 @@ func (b *loadProcessor) processBatch(
}

var inputTable redshift.Table
var schemaId int
var err error
var schemaId, schemaIdKey int
b.stagingTable = nil
b.targetTable = nil
b.upstreamTopic = ""
Expand All @@ -637,6 +647,7 @@ func (b *loadProcessor) processBatch(
default:
job := StringMapToJob(message.Value.(map[string]interface{}))
schemaId = job.SchemaId
schemaIdKey = job.SchemaIdKey
b.batchEndOffset = message.Offset
bytesProcessed += job.BatchBytes

Expand All @@ -651,6 +662,7 @@ func (b *loadProcessor) processBatch(
resp, err := b.schemaTransformer.TransformValue(
b.upstreamTopic,
schemaId,
schemaIdKey,
job.MaskSchema,
)
if err != nil {
Expand Down Expand Up @@ -699,7 +711,7 @@ func (b *loadProcessor) processBatch(

// load
klog.V(2).Infof("%s, load staging\n", b.topic)
err = b.createStagingTable(ctx, schemaId, inputTable)
err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable)
if err != nil {
return bytesProcessed, err
}
Expand Down
7 changes: 6 additions & 1 deletion redshiftsink/pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,18 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,

var lastSchemaId *int
var err error
processor := newLoadProcessor(
processor, err := newLoadProcessor(
h.consumerGroupID,
claim.Topic(),
claim.Partition(),
h.saramaConfig,
h.redshifter,
)
if err != nil {
return fmt.Errorf(
"Error making the load processor for topic: %s, err: %v",
claim.Topic(), err)
}
maxBufSize := h.maxSize
if h.maxBytesPerBatch != nil {
maxBufSize = serializer.DefaultMessageBufferSize
Expand Down
Loading

0 comments on commit bacd6e5

Please sign in to comment.