Skip to content

Latest commit

 

History

History
71 lines (53 loc) · 2.2 KB

README.md

File metadata and controls

71 lines (53 loc) · 2.2 KB

struct-converters

Convert schemaless, primitive kafka messages into ConnectData

Why?

Most sink connectors in kafka-connect require a schema to insert the data. When dealing with topics that contain plain JSON records it's not possible to insert them them without an inline schema, as explained here

Sometimes it is just not possible to change the JSON records to include the schema and we are okay with having the json "as-is" in the sink system. This still allows a "schema-on-read"-strategy, supported by RDBMS like MySql and Postgres with their JSON data types.

See Further options

Installation

Copy a release jar from this website into your connect plugin path

Configuration

Given this example json

record-key: 456
record-value: {
  "id" : "11eb50e4-e3b5-f40f-b709-36bc5ee27958",
  "shopId" : 2001,
  "origin" : "space",
  "type" : "like",
  "createDate" : "2021-01-12T13:34:16.653Z",
  "payload" : {
    "profileId" : "11eb50e4-e3b5-f40f-b709-36bc5ee27958",
   }
}

and the following table created in the sink db

create table `super_event` (
  `ID` VARCHAR(255) NOT NULL,
  `VALUE` JSON,
   PRIMARY KEY (ID)
) ENGINE = InnoDB

The following connector configuration

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
topics=super_event
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.github.baunz.kafka.connect.storage.StringAsStructConverter
connection.url=jdbc=mysql=//awesome-db/
insert.mode=UPSERT
pk.mode=record_key
pk.fields=id

creates a connector that fills the target table and allows the data to be queried like this (mysql example)

select
  value->>'$.key',
  value->>'$.payload.profileId'
from
  super_event;

Further Options