Kafka Topic and Schema creator
Add ktsllex
to your deps
list :
{:ktsllex, "~> 0.0.2"},
Run mix do deps.get, deps.compile
To have it run schema migrations at application boot time.
Add ktsllex
to your app boot sequence. After logger
, and before any schema reading apps.
extra_applications: [
:logger,
...
:ktsllex,
...
:event_serializer
And update config.exs
config :ktsllex,
# Should it run the migration when called? Default: false
run_migrations: false,
schema_registry_host: {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"},
# Reads the yaml schema file from :
base_path: {:system, "KAFKA_SCHEMA_BASE_PATH", "./schemas"},
schema_name: {:system, "KAFKA_SCHEMA_NAME", "schema_name"},
app_name: :app,
lenses_host: {:system, "LENSES_HOST", "http://localhost:3030"},
lenses_user: {:system, "LENSES_USER", "admin"},
lenses_pass: {:system, "LENSES_PASS", "admin"},
lenses_topic: {:system, "LENSES_TOPIC", "topic_name"}
base_path
- needs to be relitive to a path that is releasted with your app, egpriv
You have access to create_schemas
and create_topics
mix tasks, eg:
$ mix create_schemas --host=localhost:8081 --schema=schema_name --base=./path/to/schemas/json
$ mix create_topics --host=localhost:3030 --user=admin --password=admin --topic=topic_name
--base
The path to the schema files is passed into mix create_schemas
via --base=./path/to/schemas/json
.
It expects to find two files there, one ending -key.json
and one -value.json
.
Example: If this command was used:
mix create_schemas --base=./schemas/users
Then there should be two flies in ./schemas:
-
./schemas-key.json
-
./schemas-value.json
-
--schema
The -key
and -value
schemas get updated based on the schema
parameter
Example: Given this myschema
command :
mix create_schemas --schema=myschema
And if this is the schemas-key.json
file :
{
"type": "record",
"name": "Key",
"namespace": "anything",
"connect.name": "anything.Key"
}
Then it would be updated to
{
"type": "record",
"name": "Key",
"namespace": "myschema",
"connect.name": "myschema.Key"
}
To get
or set
compatibility config :
# Get global
iex> Ktsllex.Config.get("http://localhost:8081")
%{"compatibilityLevel" => "BACKWARD"}
# Set global
iex> Ktsllex.Config.set("http://localhost:8081", "BACKWARD")
%{"compatibilityLevel" => "BACKWARD"}
# Get for a given topic name
iex> Ktsllex.Config.get("http://localhost:8081", "topic-name")
%{"compatibilityLevel" => "BACKWARD"}
# Set for a given topic name
iex> Ktsllex.Config.set("http://localhost:8081", "BACKWARD", "topic-name")
%{"compatibilityLevel" => "BACKWARD"}
If getting a topic that does not have a compatibility set, it will return this:
%{"error_code" => 40401, "message" => "Subject not found."}
mix deps.get
mix test
Copyright (c) 2018 Quiqup LTD, MIT License. See LICENSE.txt for further details.