This project has heavily inspired by two existing efforts from Data In Motion's FLaNK Stack and Data Artisan's blog on stateful streaming applications. The goal of this project is to provide insight into connecting an Apache Flink applications to Apache Kafka.
Data Artisan's
NYC Taxi Ride Data Set
This project includes the Apache Flink application code and NiFi flow required to get the data into and out Apache Kafka. It doesn't include installation steps NiFi, Kafka, or Flink, but links to installation documentations have been provided below.
- Apache NiFi local server
- Apache Kafka with an empty topic called "rawinput" and "enriched"
- IntelliJ IDE installed with Scala plug-in installed
- A cloned copy of this Git repository
With Apache NiFi, the records from the source CSV file will be converted into individual JSON records. These records will be written to an Apache Kafka topic called "rawInput".
-
In the NiFi UI, import the NiFi Flow template (XML file in this Git repo). For help, please review the following documentation. Cloudera Documemnetation Link.
-
Upload NiFi Flow template using the UI icons.
- To add NiFi Flow Template the to canvas, click on the "Add Template icon" in the NiFI UI.
- Select the NiFi FLow Template to add.
- Once the NiFi template is loaded, the left side of the NiFi flow will look like this.
- Right click on the GetFileCSV processor, open Properties tab, and set the path to the source CSV file in the Input Directory option. Please note, the CSV file is located in the data directory of this Git repo.
- Right click on the SplitRecord processor, open Properties tab, and click on the CSVReader.
- Before the NiFi Flow will work, all of these services need to be enabled.
- Right click on the PublishKafkaRecord processor, open Properties tab, and verify the location of your Kafka broker and topic name.
- Verify the JSON records are being written to rawInput Kafka topic. This can be accomplished with right side the NiFi flow. Once this has been verified please turn off Kafka Consumer processor.
- Validate the JSON record in the Flow File
For Development purposes, a running Flink cluster isn't required for application development. This application was built inside of the IntelliJ IDE because it will stand up Flink when your application is running, and the shut it down. This of course isn't required, but it will does make your life easier.
Once JSON files are being written to the Kafka topic, Flink can create a connection to the topic and create a Flink table on top of it, which can later be queried with SQL. This Github repository contains a Flink application that demonstrates this capability.
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
- In Flink, the following java code defines the Flink Stream Execution and Stream Table Environments
//Class Member Static Variables
static StreamExecutionEnvironment fsEnv;
static StreamTableEnvironment fsTableEnv;
static EnvironmentSettings fsSettings;
// create execution environment
fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("default_catalog")
.withBuiltInDatabaseName("default_database")
.build();
fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time and watermarks
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableForceAvro();
fsEnv.getConfig().setAutoWatermarkInterval(1000L);
//Create Streaming Table Environment
fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
- In Flink, the following java code establishes a Flink Table connection with a Kafka topic. Please note, the schema has been set as JSON and the schema has been provided.
// create table environment
fsTableEnv.connect(
new Kafka()
.version("universal")
.topic("rawInput")
.startFromLatest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test")
)
// declare a format for this system
.withFormat(
new Json()
)
// declare the schema of the table
.withSchema(
new Schema()
.field("medallion", DataTypes.STRING())
.field("licenseId", DataTypes.STRING())
.field("pickUpTime", DataTypes.STRING())
.field("dropOffTime", DataTypes.STRING())
.field("trip_time_in_secs", DataTypes.BIGINT())
.field("trip_distance", DataTypes.FLOAT())
.field("pickUpLon", DataTypes.FLOAT())
.field("pickUpLat", DataTypes.FLOAT())
.field("dropOffLon", DataTypes.FLOAT())
.field("dropOffLat", DataTypes.FLOAT())
.field("payment_type", DataTypes.STRING())
.field("fare_amount", DataTypes.FLOAT())
.field("surcharge", DataTypes.FLOAT())
.field("mta_tax", DataTypes.FLOAT())
.field("tip_amount", DataTypes.FLOAT())
.field("tolls_amount", DataTypes.FLOAT())
.field("total", DataTypes.FLOAT())
)
.inAppendMode()
// create a table with given name
.createTemporaryTable("TaxiRides");
- The Flink application will display the following if everything is working as expected.
- In Flink, the following Java code will query the newly established Flink Table and print to the screen
// define SQL query to compute average total per area and hour
Table result = fsTableEnv.sqlQuery(
"SELECT " +
" * " +
"FROM TaxiRides"
);
// convert result table into a stream and print it
fsTableEnv.toAppendStream(result, Row.class).print();
- In Flink, the following java code will create a connection to a Kafka topic "enriched". Please note, the schema has been set as JSON and the schema has been provided.
// create table environment
fsTableEnv.connect(
new Kafka()
.version("universal")
.topic("enriched")
.startFromLatest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test")
)
// declare a format for this system
.withFormat(
new Json()
)
// declare the schema of the table
.withSchema(
new Schema()
.field("medallion", DataTypes.STRING())
.field("TimeStamp", DataTypes.TIMESTAMP(3) )
)
.inAppendMode()
// create a table with given name
.createTemporaryTable("KafkaSink");
- In Flink, the following code will write the query results to a Kafka topic that was established in the previous step.
// define SQL query to compute average total per area and hour
Table result = fsTableEnv.sqlQuery(
"SELECT " +
" medallion, CURRENT_TIMESTAMP, " +
" FROM TaxiRides"
);
result.insertInto("KafkaSink");
- The following output is expected in the application. Pleas note the last value in this images was removed from the code example.
- In the NiFi UI, find the following section of the flow.
- Validate the Kafka settings are correct.
- Active the Consumer Kafka Processor and validate results.
Apache Flink
- Apache Flink Basic Transformation Example
- The Flink Table And SQL API
- Flink State Checkpointing
- Apache Flink Table API & SQL
Apache Kafka + Apache Flink
- From Streams to Tables and Back Again: An Update on Flink's Table & SQL API
- Building Stateful Streaming Applications with Apache Flink
- How to build stateful streaming applications with Apache Flink
- Flink and Kakfa Pipelines
- Consuming Kafka Messages From Apache Flink
- Kafka + Flink: A Practical, How-To Guide
- Basic Apache Flink Tutorial: DataStream API Programming
- Introducing Flink Streaming
- DataStream API - Writing to and reading from Kafka
Apache Flink and Apache Kafka Code Examples
- Using Flink to Consume and Produce from Kakfa topic
- Intro to Flink and Kakfa
- Flink Table API
- Flink + Kafka + JSON Example
- Read From Kakfa Code Example
- Kafka Topic Name Dynamically In Flink
- Java Code Examples for org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartition
- Java Code Examples for org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
- Java Code Examples for org.apache.flink.table.api.TableEnvironment
- Java Code Examples for org.apache.flink.table.api.java.StreamTableEnvironment.registerTableSource
- Java Code Examples for org.apache.flink.table.api.TableSchema
Apache Kafka + Apache Druid
Additional Apache Project Install Links
Additional Apache Projects On Docker