Adds ORC support to Flink Streaming File Sink.
repositories {
maven { url 'https://jitpack.io' }
}
dependencies {
compileOnly 'org.apache.hadoop:hadoop-common:2.8.3'
compile 'com.github.mattczyz:flink-orc:release-0.3'
# For reflection based writers
compile ('org.apache.hive:hive-exec:2.3.4:core')
}
To configure the sink with Encoder, an implementation of OrcRowEncoder[T]
is required with logic to transform user record T
into ColumnVectors
and then populate VectorizedRowBatch
.
Helper methods:
- nextIndex(batch) - returning the next row index as Int
- incrementBatchSize(batch) - completing the row and incrementing internal VectorizedRowBatch counter
class Encoder extends OrcRowEncoder[(Int, String, String)]() with Serializable {
override def encodeAndAdd(
datum: (Int, String, String),
batch: VectorizedRowBatch
): Unit = {
val row = nextIndex(batch)
batch.cols(0).asInstanceOf[LongColumnVector].vector(row) = datum._1
batch
.cols(1)
.asInstanceOf[BytesColumnVector]
.setVal(row, datum._2.getBytes)
batch
.cols(2)
.asInstanceOf[BytesColumnVector]
.setVal(row, datum._3.getBytes)
incrementBatchSize(batch)
}
}
Visit ORC documentation to get more information on VectorizedRowBatch.
Sink is built with writerFactory
returned from
OrcWriters.withCustomEncoder[(Int, String, String)](encoder, schema, props)
passing encoder, output schema and additional ORC configuration.
[(Int, String, String)]
- input data type- encoder - implementation of
OrcRowEncoder[T]
- schema - ORC
TypeDescription
- props - non-default ORC configuration as
Properties
val props = new Properties()
props.setProperty("orc.compress", "SNAPPY")
props.setProperty("orc.bloom.filter.columns", "x")
val schemaString = """struct<x:int,y:string,z:string>"""
val schema = TypeDescription.fromString(schemaString)
stream
.addSink(StreamingFileSink
.forBulkFormat(
new Path(out),
OrcWriters
.withCustomEncoder[(Int, String, String)](new Encoder, schema, props)
)
.withBucketAssigner(new BucketAssigner)
.build())
Sink can be configured to use reflection to inspect types and encode records. It requires Java POJO or Scala Case Class type specified when instantiating the sink. Internally using Hive ObjectInspector.
Sink is built with writerFactory
returned from
OrcWriters.forReflectRecord(classOf[TestData], props)
specifying incoming data type and additional ORC configuration.
classOf[TestData]
- input data typeClass<T>
of Java POJO or Scala Case Class- props - non-default ORC configuration as
Properties
val props = new Properties()
stream
.addSink(StreamingFileSink
.forBulkFormat(
new Path(out),
OrcWriters.forReflectRecord(classOf[TestData], props)
)
.withBucketAssigner(new BucketAssigner)
.build())
Sink can encode Avro GenericRecord directly to ORC. It requires Avro schema provided when instantiating the sink.
Sink is built with writerFactory
returned from
OrcWriters.forGenericRecord[GenericRecord](avroSchemaString, props)
with schema of incoming data and additional ORC configuration.
avroSchemaString
- Avro schema asJSON String
- props - non-default ORC configuration as
Properties
val schema = """{
| "name": "record",
| "type": "record",
| "fields": [{
| "name": "x",
| "type": "int",
| "doc": ""
| }, {
| "name": "y",
| "type": "string",
| "doc": ""
| }, {
| "name": "z",
| "type": "string",
| "doc": ""
| }]
|}""".stripMargin
val props = new Properties()
stream
.addSink(StreamingFileSink
.forBulkFormat(
new Path(out),
OrcWriters.forGenericRecord[GenericRecord](schema, props)
)
.withBucketAssigner(new BucketAssigner)
.build())
- 0.4
- Avro GenericRecord Writer
- Removed deprecated EncoderOrcWriters class
- 0.3 - Reflection based Writer
- 0.2 - VectorizedRowBatch based Writer