本文面向所有 seatunnel 插件开发人员,尽可能清晰得阐述开发一个 seatunnel 插件所经历的过程,希望能消除开发者的困惑。
- 原生提供插件可能仅能满足80%的需求,还有20%需要你自己来开发
- 有了插件机制,开发仅需关注特定插件的处理逻辑,数据处理的共性问题,由框架来统一处理。
- 通用插件大大提升了代码的复用率。
seatunnel 插件分为3部分,Input
、filter
和 output
,这里贴一个简版类图
Input
插件主要负责从外部读取数据并且封装为 Spark DataSet[Row]
数据集。插件有三种类型,分别是:
- 读取静态离线数据的
BaseStaticInput
- 实时数据流处理
BaseStreamingInput
,并且我们在此接口基础上开发了方便 Java 使用的BaseJavaStreamingInput
- 状态的实时流处理
BaseStructuredStreamingInput
Filter
插件主要负责处理类型为 Spark DataSet[Row]
的输入,并且返回同样为 Spark DataSet[Row]
类型的数据集。
Output
插件则负责将 Spark DataSet[Row]
数据集输出到外部数据源或者将结果打印到终端。
创建项目或者直接拉取本项目代码
git clone https://github.com/InterestingLab/seatunnel-example.git
当你看到这里的,相比这一步不用过多阐述
sbt
libraryDependencies += "io.github.interestinglab.waterdrop" %% "waterdrop-apis" % "1.5.0"
maven
<dependency>
<groupId>io.github.interestinglab.waterdrop</groupId>
<artifactId>waterdrop-apis_2.11</artifactId>
<version>1.5.0</version>
</dependency>
新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Input
插件开发。
BaseStreamingInput
用于实现一个流式处理 Input
插件,它支持泛型,用户可以根据实际数据情况指定类型。
需要注意,seatunnel 中的流式计算插件,类名必须以 Stream 结尾,如 hdfsStream
。
BaseStreamingInput
接口定义如下:
abstract class BaseStreamingInput[T] extends Plugin {
/**
* Things to do after filter and before output
* */
def beforeOutput: Unit = {}
/**
* Things to do after output, such as update offset
* */
def afterOutput: Unit = {}
/**
* This must be implemented to convert RDD[T] to Dataset[Row] for later processing
* */
def rdd2dataset(spark: SparkSession, rdd: RDD[T]): Dataset[Row]
/**
* start should be invoked in when data is ready.
* */
def start(spark: SparkSession, ssc: StreamingContext, handler: Dataset[Row] => Unit): Unit = {
getDStream(ssc).foreachRDD(rdd => {
val dataset = rdd2dataset(spark, rdd)
handler(dataset)
})
}
/**
* Create spark dstream from data source, you can specify type parameter.
* */
def getDStream(ssc: StreamingContext): DStream[T]
BaseStreamingInput
接口功能如下:
beforeOutput
: 定义调用 Output 之前的逻辑。afterOutput
: 定义调用 Output 之后的逻辑,常用于维护 Offset 实现 at-least-once 语义rdd2dataset
: 定义 RDD[T] 转换为 DataSet[Row] 的处理逻辑start
: 内部操作逻辑,可以无需关心getDStream
: 定义获取外部数据源数据的逻辑,需要生成 DStream[T]
总的来说,我们需要定义外部数据源转换为 DataSet[Row] 的逻辑。
BaseStaticInput
用于实现一个离线/静态数据源读取插件,接口定义如下
abstract class BaseStaticInput extends Plugin {
/**
* Get Dataset from this Static Input.
* */
def getDataset(spark: SparkSession): Dataset[Row]
}
接口功能如下:
getDataset
: 将外部数据源转换为 DataSet[Row]
BaseStructuredStreamingInput
接口定义与 BaseStaticFilter
定义类似,此处不再做过多陈述。
新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Filter
插件开发。
Spark 较好实现了批流一体,因此我们的 Filter
插件仅有一套API,且大部分 Filter
插件都能同时支持批处理和流处理。
BaseFilter
接口定义如下:
abstract class BaseFilter extends Plugin {
def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row]
/**
* Allow to register user defined UDFs
* @return empty list if there is no UDFs to be registered
* */
def getUdfList(): List[(String, UserDefinedFunction)] = List.empty
/**
* Allow to register user defined UDAFs
* @return empty list if there is no UDAFs to be registered
* */
def getUdafList(): List[(String, UserDefinedAggregateFunction)] = List.empty
}
BaseFilter
接口功能如下:
process
: 定义 Filter
插件的具体操作逻辑,方法输入和输出都是 DataSet[Row]
getUdfList
: 定义需要被注册的 UDF 列表
getUdafList
: 定义需要被注册的 UDAF 列表
大部分场景我们仅需要实现 process
方法定义数据处理逻辑即可。
新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Output
插件开发。
BaseOutput
插件支持批处理和 Spark Streaming,不支持 Spark Structured Streaming
BaseOutput
接口定义如下:
abstract class BaseOutput extends Plugin {
def process(df: Dataset[Row])
}
BaseOutput
接口功能如下:
process
: 定义 Dataset[Row] 数据输出到外部数据源的方法,需要注意,这里需要触发一个 action
操作
BaseStructuredStreamingOutputIntra
则是为了提供 Spark Structured Streaming 对外输出的插件。
BaseStructuredStreamingOutputIntra
接口定义
trait BaseStructuredStreamingOutputIntra extends Plugin {
def process(df: Dataset[Row]): DataStreamWriter[Row]
}
BaseStructuredStreamingOutputIntra
接口功能:
process
: 与 BaseOutput
不同的是,这里返回的是 DataStreamWriter[Row]。
BaseStructuredStreamingOutputIntra
主要为了提供 Spark 现在的 Output 方法支持,而 BaseStructuredStreamingOutput
则是为了自定义输出数据源。
BaseStructuredStreamingOutput
接口定义如下:
trait BaseStructuredStreamingOutput extends ForeachWriter[Row] with BaseStructuredStreamingOutputIntra {
/**
* Things to do before process.
* */
def open(partitionId: Long, epochId: Long): Boolean
/**
* Things to do with each Row.
* */
def process(row: Row): Unit
/**
* Things to do after process.
* */
def close(errorOrNull: Throwable): Unit
/**
* seatunnel Structured Streaming process.
* */
def process(df: Dataset[Row]): DataStreamWriter[Row]
}
BaseStructuredStreamingOutput
接口功能如下:
open
: 定义处理之前的逻辑
process
: 定义每条数据的处理逻辑
close
: 定义处理之后的逻辑
process
: seatunnel 内部的处理逻辑,需要返回 DataStreamWriter[Row]
- 编译打包
- 将打包后的 Jar 包放到 seatunnel 指定目录下,以便 seatunnel 在启动的时候可以加载到。
cd seatunnel
mkdir -p plugins/my_plugins/lib
cd plugins/my_plugins/lib
seatunnel需要将第三方Jar包放到,必须新建lib文件夹
plugins/your_plugin_name/lib/your_jar_name
其他文件放到
plugins/your_plugin_name/files/your_file_name
- 在配置文件中使用插件
第三方插件在使用时,必须使用插件的完整包路径,例如
org.interestinglab.seatunnel.output.JavaOutput
output {
org.interestinglab.seatunnel.output.JavaStdout {
limit = 2
}
}
至此,我们就完成了一个插件的开发,并且在 seatunnel 中使用这个插件。