Connectors for Pravega to be used with Flink jobs.
Install the Pravega client libraries to your local Maven repository.
$ git clone https://github.com/pravega/pravega.git
$./gradlew install
Use the built-in gradle wrapper to build the connector.
$ git clone https://github.com/pravega/flink-connectors.git
$ ./gradlew clean build
...
BUILD SUCCESSFUL
Use the following command to publish the shaded connector jar file. The jar file created is named as - pravega-connectors-flink_2.11-.jar
$ ./gradlew install
Note: This connector is built against flink version 1.3-SNAPSHOT
Instantiate a FlinkPravegaReader instance and add it as a source to the Flink streaming job. The source does a tail read of the supplied set of streams.
// Define your event deserializer
AbstractDeserializationSchema<EventType> deserializer = ...
// Set startTime to 0 to read from the beginning of the streams
FlinkPravegaReader<EventType> pravegaSource = new FlinkPravegaReader<>(
"tcp://localhost:9090",
scopeName,
listOfStreams,
startTime,
deserializer);
DataStreamSource<EventType> dataStream = flinkEnv
.addSource(pravegaSource)
.setParallelism(2)
.enableCheckpointing(1000);
...
We currently have 2 implementations of the sink. These will be merged into FlinkPravegaWriter soon.
- FlinkExactlyOncePravegaWriter
- Should be only used with checkpointing enabled for providing exactly once guarantees using transactions.
- FlinkPravegaWriter
- Should be used by flink jobs which have disabled checkpointing.
The usage is the same for both the connectors. Example:
DataStreamSource<EventType> dataStream = ...
...
// Define Event Serializer.
SerializationSchema<EventType> serializer = ...
// Define the event router for selecting the keys to route events within pravega.
PravegaEventRouter router = ...
FlinkPravegaWriter<EventType> pravegaSink = new FlinkPravegaWriter<>(
"tcp://localhost:9090",
scopeName,
streamName,
serializer,
router)
dataStream.addSink(pravegaSink);