Skip to content

Demo applications and code examples for Apache Kafka's Streams API.

License

Notifications You must be signed in to change notification settings

dennis-brinley/kafka-streams-examples

 
 

Repository files navigation

Kafka Microservices Demo for Solace Event Portal

This project was adapted from Confluent "Kafka Streams Examples." The purpose of this derivative is to create a functioning microservices demo so that it can be scanned, audited, and modified to demonstrate the capabilities of Solace Event Portal.

The demo is based off of the Confluent kafka-streams-examples, specifically, the microservices subset. The Confluent tutorial on the subject can be found here: Tutorial: Introduction to Streaming Application Development

There are several important differences between the confluent version

  • Input from the Kafka Connect framework, JDBC source connector, is excluded
  • Output to the Kafka Connect Framework, Elasticsearch sink connector, is excluded
  • There are 3 new producer and 3 new consumer microservice applications

The following diagram illustrates the microservices application.

Microservices Demo Overview

Requirements

  • Access to a Kafka cluster with sufficient privilges to create and read/write topics
  • Currently, the demo will only work if a Confluent-compatible schema registry is present
  • Java 11 - To build or modify demo code
  • Docker Desktop - To build or execute locally; Or container run-time to run the packaged image

Execution Options

  1. Build and execute yourself
  2. Execute a container from the published image.

Option 1 Only: Build Youself

You can build your own copy of the java code and docker container. Clone the repository and proceed with the build steps from the project root directory.

A. Build Java Code

mvn clean

mvn -DskipTests=true package

(Optional) Run Sample Producer and Consumer

Running the services requires a configuration file. See below for examples. Topic "orders" must exist on the associated Kafka cluster for these apps to run correctly.

export CONFIG_FILE=/path/to/config/file/my-config.properties

java -cp target/kafka-streams-examples-7.1.1-standalone.jar \
  io.confluent.examples.streams.microservices.util.ProduceOrders --config-file $CONFIG_FILE

java -cp target/kafka-streams-examples-7.1.1-standalone.jar \
  io.confluent.examples.streams.microservices.util.ConsumeOrders --config-file $CONFIG_FILE

B. Build Docker Container

docker build -t [repo-name]:[tag-name] --file Dockerfile .

Executing the Demo

The demo can be executed from a detached container or an interactive container. The latter gives your more control and better opportunity to observe and to debug should problems arise.

Container

As of this writing, the current packaged container is publicly available at: ghcr.io/solacelabs/kafka-microservices-demo:1.0. You can pull this image or build yourself as described above.

In both cases, the following configurations are required:

  • volume : specify the folder where your kafka/schema registry configuration properties file is mapped in the container
  • env CONFIG_FILE : specify the container path where the config file is found
  • env DEMO_HOME : The home directory of the demo, currently just DEMO_HOME=/opt/kafka-demo

Executing the Demo from a detached container

Example command:

docker run --name my-kafka-demo --rm --detach \
  --volume /home/ec2-user/kafka-demo/configs:/opt/kafka-demo/configs \
  --env CONFIG_FILE=/opt/kafka-demo/configs/my-kafka-config.properties \
  --env DEMO_HOME=/opt/kafka-demo \
  ghcr.io/solacelabs/kafka-microservices-demo:1.0

When executed this way, the container process will attempt to configure the kafka cluster by creating the necessary topics and execute the demo services automatically.

Starting the Demo from an Interactive Container

A. Start the Container in interactive mode

Run the following command to execute the demo container interactively:

docker run -i -t --name kafka-demo \
  --rm \
  --volume /home/ec2-user/kafka-demo/configs:/opt/kafka-demo/configs \
  --env CONFIG_FILE=/opt/kafka-demo/configs/my-kafka-config.properties \
  --env DEMO_HOME=/opt/kafka-demo \
  ghcr.io/solacelabs/kafka-microservices-demo:1.0 /bin/bash

Commands below are executed in the kafk-demo container from the demo home directory: /opt/kafka-demo

B. Prime Environment

Set generic environment variables using the following command:

source scripts/env.config

C. Create Topics (If Topics Do Not Exist on Cluster)

If this is a new Kafka demo region, you will need to create the primary topic associated with the demo. The topics to create are defined in scripts/topics.txt. The script will not throw errors if the topics already exist.

scripts/create-topics.sh scripts/topics.txt

D. Start the app services

scripts/exec-demo.sh

### Verify that the services are running:
ps -ef | grep kafka

E. Terminating the Demo

If you want to kill the demo but leave the container running:

scripts/kill-svc.sh .microservices.pids

Otherwise, you can just exit the shell, which will terminate the container and any running processes.

Sample Config Files

Confluent Cloud with SASL Plain over SSL

This sample file pattern is used for Confluent Cloud with API Keys.

bootstrap.servers=[kafka-cluster-host.us-east-2.aws.confluent.cloud:9092]
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='[Confluent API Key]' password='[Confluent API Key Secret]';
sasl.mechanism=PLAIN

# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=[https://sr-server-id.us-east-2.aws.confluent.cloud]
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=[Confluent SR API Key]:[Confluent SR API Secret]

Kafka Streams Examples

This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of Apache Kafka aka Kafka Streams.

For more information take a look at the latest Confluent documentation on the Kafka Streams API, notably the Developer Guide


Table of Contents


This repository has several branches to help you find the correct code examples for the version of Apache Kafka and/or Confluent Platform that you are using. See Version Compatibility Matrix below for details.

There are three kinds of examples:

  • Examples under src/main/: These examples are short and concise. Also, you can interactively test-drive these examples, e.g. against a local Kafka cluster. If you want to actually run these examples, then you must first install and run Apache Kafka and friends, which we describe in section Packaging and running the examples. Each example also states its exact requirements and instructions at the very top.
  • Examples under src/test/: These examples should test applications under src/main/. Unit Tests with TopologyTestDriver test the stream logic without external system dependencies. The integration tests use an embedded Kafka clusters, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client). These examples are also a good starting point to learn how to implement your own end-to-end integration tests.
  • Ready-to-run Docker Examples: These examples are already built and containerized.

Additional examples may be found under src/main/.

Application Name Concepts used Java 8+ Java 7+ Scala
WordCount DSL, aggregation, stateful Java 8+ example Scala Example
MapFunction DSL, stateless transformations, map() Java 8+ example Scala Example
SessionWindows Sessionization of user events, user behavior analysis Java 7+ example
GlobalKTable join() between KStream and GlobalKTable Java 8+ example
GlobalStore "join" between KStream and GlobalStore Java 8+ example
PageViewRegion join() between KStream and KTable Java 8+ example Java 7+ example
PageViewRegionGenericAvro Working with data in Generic Avro format Java 8+ example Java 7+ example
WikipediaFeedSpecificAvro Working with data in Specific Avro format Java 8+ example Java 7+ example
SecureKafkaStreams Secure, encryption, client authentication Java 7+ example
Sum DSL, stateful transformations, reduce() Java 8+ example
WordCountInteractiveQueries Interactive Queries, REST, RPC Java 8+ example
KafkaMusic Interactive Queries, State Stores, REST API Java 8+ example
ApplicationReset Application Reset Tool kafka-streams-application-reset Java 8+ example
Microservice Microservice ecosystem, state stores, dynamic routing, joins, filtering, branching, stateful operations Java 8+ example

The stream processing of Kafka Streams can be unit tested with the TopologyTestDriver from the org.apache.kafka:kafka-streams-test-utils artifact. The test driver allows you to write sample input into your processing topology and validate its output.

See the documentation at Testing Streams Code.

We also provide several integration tests, which demonstrate end-to-end data pipelines. Here, we spawn embedded Kafka clusters and the Confluent Schema Registry, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client).

Additional examples may be found under src/test/.

Tip: Run mvn test to launch the tests.

Integration Test Name Concepts used Java 8+ Java 7+ Scala
WordCount DSL, aggregation, stateful Java 8+ Example Scala Example
WordCountInteractiveQueries Interactive Queries, REST, RPC Java 7+ Example
Aggregate DSL, groupBy(), aggregate() Java 8+ Example Scala Example
CustomStreamTableJoin DSL, Processor API, Transformers Java 8+ Example
EventDeduplication DSL, Processor API, Transformers Java 8+ Example
GlobalKTable DSL, global state Java 7+ Example
GlobalStore DSL, global state, Transformers Java 7+ Example
HandlingCorruptedInputRecords DSL, flatMap() Java 8+ Example
KafkaMusic (Interactive Queries) Interactive Queries, State Stores, REST API Java 7+ Example
MapFunction DSL, stateless transformations, map() Java 8+ Example
MixAndMatch DSL + Processor API Integrating DSL and Processor API Java 8+ Example
PassThrough DSL, stream(), to() Java 7+ Example
PoisonPill DSL, flatMap() Java 8+ Example
ProbabilisticCounting*** DSL, Processor API, custom state stores Scala Example
Reduce (Concatenate) DSL, groupByKey(), reduce() Java 8+ Example Scala Example
SessionWindows DSL, windowed aggregation, sessionization Java 7+ Example
StatesStoresDSL DSL, Processor API, Transformers Java 8+ Example
StreamToStreamJoin DSL, join() between KStream and KStream Java 7+ Example
StreamToTableJoin DSL, join() between KStream and KTable Java 7+ Example Scala Example
Sum DSL, aggregation, stateful, reduce() Java 8+ Example
TableToTableJoin DSL, join() between KTable and KTable Java 7+ Example
UserCountsPerRegion DSL, aggregation, stateful, count() Java 8+ Example
ValidateStateWithInteractiveQueries Interactive Queries for validating state Java 8+ Example
GenericAvro Working with data in Generic Avro format Java 7+ Example Scala Example
SpecificAvro Working with data in Specific Avro format Java 7+ Example Scala Example

***demonstrates how to probabilistically count items in an input stream by implementing a custom state store (CMSStore) that is backed by a Count-Min Sketch data structure (with the CMS implementation of Twitter Algebird)

This containerized example launches:

The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as latest Top 5 songs per music genre. It exposes its latest processing results -- the latest charts -- via Kafka’s Interactive Queries feature via a REST API. The application's input data is in Avro format, hence the use of Confluent Schema Registry, and comes from two sources: a stream of play events (think: "song X was played") and a stream of song metadata ("song X was written by artist Y").

You can find detailed documentation at https://docs.confluent.io/current/streams/kafka-streams-examples/docs/index.html.

For additional examples that showcase Kafka Streams applications within an event streaming platform, please refer to the examples GitHub repository.

The code in this repository requires Apache Kafka 0.10+ because from this point onwards Kafka includes its Kafka Streams library. See Version Compatibility Matrix for further details, as different branches of this repository may have different Kafka requirements.

For the master branch: To build a development version, you typically need the latest trunk version of Apache Kafka (cf. kafka.version in pom.xml for details). The following instructions will build and locally install the latest trunk Kafka version:

$ git clone git@github.com:apache/kafka.git
$ cd kafka
$ git checkout trunk

# Now build and install Kafka locally
$ ./gradlew clean && ./gradlewAll install

The code in this repository requires Confluent Schema Registry. See Version Compatibility Matrix for further details, as different branches of this repository have different Confluent Platform requirements.

For the master branch: To build a development version, you typically need the latest master version of Confluent Platform's Schema Registry (cf. confluent.version in pom.xml, which is set by the upstream Confluent Common project). The following instructions will build and locally install the latest master Schema Registry version, which includes building its dependencies such as Confluent Common and Confluent Rest Utils. Please read the Schema Registry README for details.

$ git clone https://github.com/confluentinc/common.git
$ cd common
$ git checkout master

# Build and install common locally
$ mvn -DskipTests=true clean install

$ git clone https://github.com/confluentinc/rest-utils.git
$ cd rest-utils
$ git checkout master

# Build and install rest-utils locally
$ mvn -DskipTests=true clean install

$ git clone https://github.com/confluentinc/schema-registry.git
$ cd schema-registry
$ git checkout master

# Now build and install schema-registry locally
$ mvn -DskipTests=true clean install

Also, each example states its exact requirements at the very top.

If you are using an IDE and import the project you might end up with a "missing import / class not found" error. Some Avro classes are generated from schema files and IDEs sometimes do not generate these classes automatically. To resolve this error, manually run:

$ mvn -Dskip.tests=true compile

If you are using Eclipse, you can also right-click on pom.xml file and choose Run As > Maven generate-sources.

Some code examples require Java 8+, primarily because of the usage of lambda expressions.

IntelliJ IDEA users:

  • Open File > Project structure
  • Select "Project" on the left.
    • Set "Project SDK" to Java 1.8.
    • Set "Project language level" to "8 - Lambdas, type annotations, etc."

Scala is required only for the Scala examples in this repository. If you are a Java developer you can safely ignore this section.

If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 8 and SAM / Java lambda (e.g. Scala 2.11 with -Xexperimental compiler flag, or 2.12).

If you are compiling with Java 9+, you'll need to have Scala version 2.12+ to be compatible with the Java version.

The instructions in this section are only needed if you want to interactively test-drive the application examples under src/main/.

Tip: If you only want to run the integration tests (mvn test), then you do not need to package or install anything -- just run mvn test. These tests launch embedded Kafka clusters.

The first step is to install and run a Kafka cluster, which must consist of at least one Kafka broker as well as at least one ZooKeeper instance. Some examples may also require a running instance of Confluent schema registry. The Confluent Platform Quickstart guide provides the full details.

In a nutshell:

# Ensure you have downloaded and installed Confluent Platform as per the Quickstart instructions above.

# Start ZooKeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

# In a separate terminal, start Kafka broker
$ ./bin/kafka-server-start ./etc/kafka/server.properties

# In a separate terminal, start Confluent Schema Registry
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

# Again, please refer to the Confluent Platform Quickstart for details such as
# how to download Confluent Platform, how to stop the above three services, etc.

The next step is to create a standalone jar ("fat jar") of the application examples:

# Create a standalone jar ("fat jar")
$ mvn clean package

# >>> Creates target/kafka-streams-examples-7.1.1-standalone.jar

Tip: If needed, you can disable the test suite during packaging, for example to speed up the packaging or to lower JVM memory usage:

$ mvn -DskipTests=true clean package

You can now run the application examples as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-7.1.1-standalone.jar \
  io.confluent.examples.streams.WordCountLambdaExample

The application will try to read from the specified input topic (in the above example it is streams-plaintext-input), execute the processing logic, and then try to write back to the specified output topic (in the above example it is streams-wordcount-output). In order to observe the expected output stream, you will need to start a console producer to send messages into the input topic and start a console consumer to continuously read from the output topic. More details in how to run the examples can be found in the java docs of each example code.

If you want to turn on log4j while running your example application, you can edit the log4j.properties file and then execute as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-7.1.1-standalone.jar \
  -Dlog4j.configuration=file:src/main/resources/log4j.properties \
  io.confluent.examples.streams.WordCountLambdaExample

Keep in mind that the machine on which you run the command above must have access to the Kafka/ZooKeeper clusters you configured in the code examples. By default, the code examples assume the Kafka cluster is accessible via localhost:9092 (aka Kafka's bootstrap.servers parameter) and the ZooKeeper ensemble via localhost:2181. You can override the default bootstrap.servers parameter through a command line argument.

This project uses the standard maven lifecycle and commands such as:

$ mvn compile # This also generates Java classes from the Avro schemas
$ mvn test    # Runs unit and integration tests
$ mvn package # Packages the application examples into a standalone jar
Branch (this repo) Confluent Platform Apache Kafka
5.4.x* 5.4.0-SNAPSHOT 2.4.0-SNAPSHOT
5.3.0-post 5.3.0 2.3.0
5.2.2-post 5.2.2 2.2.1
5.2.1-post 5.2.1 2.2.1
5.1.0-post 5.1.0 2.1.0
5.0.0-post 5.0.0 2.0.0
4.1.0-post 4.1.0 1.1.0
4.0.0-post 4.0.0 1.0.0
3.3.0-post 3.3.0 0.11.0

*You must manually build the 2.3 version of Apache Kafka and the 5.3.x version of Confluent Platform. See instructions above.

The master branch of this repository represents active development, and may require additional steps on your side to make it compile. Check this README as well as pom.xml for any such information.

About

Demo applications and code examples for Apache Kafka's Streams API.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 95.9%
  • Shell 3.9%
  • Dockerfile 0.2%