diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md index 8286007cd51..ecfce6dd578 100644 --- a/docs/en/transform/replace.md +++ b/docs/en/transform/replace.md @@ -1,4 +1,4 @@ -# Json +# Replace ## Description @@ -33,13 +33,13 @@ The name of the field to replaced. The string to match. -### is_regex [string] +### replacement [string] -Whether or not to interpret the pattern as a regex (true) or string literal (false). +The replacement pattern (is_regex is true) or string literal (is_regex is false). -### replacement [boolean] +### is_regex [boolean] -The replacement pattern (is_regex is true) or string literal (is_regex is false). +Whether or not to interpret the pattern as a regex (true) or string literal (false). ### replace_first [boolean] diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md new file mode 100644 index 00000000000..4633a843be6 --- /dev/null +++ b/docs/en/transform/uuid.md @@ -0,0 +1,62 @@ +# UUID + +## Description + +Generate a universally unique identifier on a specified field. + +:::tip + +This transform **ONLY** supported by Spark. + +::: + +## Options + +| name | type | required | default value | +| -------------- | ------ | -------- | ------------- | +| fields | string | yes | - | +| prefix | string | no | - | +| secure | boolean| no | false | + +### field [string] + +The name of the field to generate. + +### prefix [string] + +The prefix string constant to prepend to each generated UUID. + +### secure [boolean] + +the cryptographically secure algorithm can be comparatively slow +The nonSecure algorithm uses a secure random seed but is otherwise deterministic + +### common options [string] + +Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details + +## Examples + +```bash + UUID { + fields = "u" + prefix = "uuid-" + secure = true + } +} +``` + +Use `UUID` as udf in sql. + +```bash + UUID { + fields = "u" + prefix = "uuid-" + secure = true + } + + # Use the uuid function (confirm that the fake table exists) + sql { + sql = "select * from (select raw_message, UUID() as info_row from fake) t1" + } +``` diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml index 53ae179d285..86410e077e2 100644 --- a/seatunnel-core/seatunnel-core-spark/pom.xml +++ b/seatunnel-core/seatunnel-core-spark/pom.xml @@ -74,6 +74,12 @@ seatunnel-transform-spark-replace ${project.version} + + + org.apache.seatunnel + seatunnel-transform-spark-uuid + ${project.version} + diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml index 0626dfc0228..e95a4ebca6a 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml +++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml @@ -34,6 +34,7 @@ seatunnel-transform-spark-json seatunnel-transform-spark-split seatunnel-transform-spark-replace + seatunnel-transform-spark-uuid seatunnel-transform-spark-sql diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala index 519be92b0bc..63e50ceebde 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala @@ -14,11 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException} import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.seatunnel.spark.transform.JsonConfig._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -40,19 +42,19 @@ class Json extends BaseSparkTransform { var useCustomSchema: Boolean = false override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") + val srcField = config.getString(SOURCE_FILED) val spark = env.getSparkSession import spark.implicits._ - config.getString("target_field") match { + config.getString(TARGET_FILED) match { case Constants.ROW_ROOT => { val jsonRDD = df.select(srcField).as[String].rdd val newDF = srcField match { // for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String] - case "raw_message" => { + case DEFAULT_SOURCE_FILED => { val tmpDF = if (this.useCustomSchema) { spark.read.schema(this.customSchema).json(jsonRDD) @@ -94,14 +96,14 @@ class Json extends BaseSparkTransform { override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "source_field" -> "raw_message", - "target_field" -> Constants.ROW_ROOT, - "schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString, - "schema_file" -> "")) + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + TARGET_FILED -> Constants.ROW_ROOT, + SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString, + SCHEMA_FILE -> DEFAULT_SCHEMA_FILE)) config = config.withFallback(defaultConfig) - val schemaFile = config.getString("schema_file") + val schemaFile = config.getString(SCHEMA_FILE) if (schemaFile.trim != "") { - parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile) + parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile) } } @@ -136,5 +138,5 @@ class Json extends BaseSparkTransform { } } - override def getPluginName: String = "json" + override def getPluginName: String = PLUGIN_NAME } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala new file mode 100644 index 00000000000..560fce0c259 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +object JsonConfig { + val PLUGIN_NAME = "json" + + val FIELDS = "fields" + val SOURCE_FILED = "source_field" + val DEFAULT_SOURCE_FILED = "raw_message" + val TARGET_FILED = "target_field" + val SCHEMA_DIR = "schema_dir" + val SCHEMA_FILE = "schema_file" + val DEFAULT_SCHEMA_FILE = "" +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala index 609dec15d08..27e6cf9c43f 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import scala.collection.JavaConversions._ @@ -26,42 +27,43 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists import org.apache.seatunnel.common.config.CheckResult import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.seatunnel.spark.transform.ReplaceConfig._ import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{col, udf} class Replace extends BaseSparkTransform { override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") - val key = config.getString("fields") + val srcField = config.getString(SOURCE_FILED) + val key = config.getString(FIELDS) val func: UserDefinedFunction = udf((s: String) => { replace( s, - config.getString("pattern"), - config.getString("replacement"), - config.getBoolean("is_regex"), - config.getBoolean("replace_first")) + config.getString(PATTERN), + config.getString(REPLACEMENT), + config.getBoolean(REPLACE_REGEX), + config.getBoolean(REPLACE_FIRST)) }) var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField))) filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP)) val ds = filterDf.drop(Constants.ROW_TMP) if (func != null) { - env.getSparkSession.udf.register("Replace", func) + env.getSparkSession.udf.register(UDF_NAME, func) } ds } override def checkConfig(): CheckResult = { - checkAllExists(config, "fields", "pattern", "replacement") + checkAllExists(config, FIELDS, PATTERN, REPLACEMENT) } override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "source_field" -> "raw_message", - "is_regex" -> false, - "replace_first" -> false)) + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + REPLACE_REGEX -> DEFAULT_REPLACE_REGEX, + REPLACE_FIRST -> DEFAULT_REPLACE_FIRST)) config = config.withFallback(defaultConfig) } @@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform { } implicit def toReg(pattern: String): Regex = pattern.r + + override def getPluginName: String = PLUGIN_NAME } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala new file mode 100644 index 00000000000..51e15153dd9 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +object ReplaceConfig { + val PLUGIN_NAME = "replace" + val UDF_NAME = "Replace" + + val FIELDS = "fields" + val SOURCE_FILED = "source_field" + val DEFAULT_SOURCE_FILED = "raw_message" + val PATTERN = "pattern" + val REPLACEMENT = "replacement" + val REPLACE_REGEX = "is_regex" + val DEFAULT_REPLACE_REGEX = false + val REPLACE_FIRST = "replace_first" + val DEFAULT_REPLACE_FIRST = false +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala index dddac46af75..fdb4287dee5 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import junit.framework.TestCase.assertEquals diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala index 498776c03e1..764c5bab603 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import scala.collection.JavaConversions._ @@ -23,6 +24,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists import org.apache.seatunnel.common.config.CheckResult import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.seatunnel.spark.transform.SplitConfig._ import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{col, udf} @@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf} class Split extends BaseSparkTransform { override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") - val keys = config.getStringList("fields") + val srcField = config.getString(SOURCE_FILED) + val keys = config.getStringList(FIELDS) // https://stackoverflow.com/a/33345698/1145750 var func: UserDefinedFunction = null - val ds = config.getString("target_field") match { + val ds = config.getString(TARGET_FILED) match { case Constants.ROW_ROOT => func = udf((s: String) => { - split(s, config.getString("separator"), keys.size()) + split(s, config.getString(SPLIT_SEPARATOR), keys.size()) }) var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField))) for (i <- 0 until keys.size()) { @@ -47,28 +49,28 @@ class Split extends BaseSparkTransform { filterDf.drop(Constants.ROW_TMP) case targetField: String => func = udf((s: String) => { - val values = split(s, config.getString("separator"), keys.size) + val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size) val kvs = (keys zip values).toMap kvs }) df.withColumn(targetField, func(col(srcField))) } if (func != null) { - env.getSparkSession.udf.register("Split", func) + env.getSparkSession.udf.register(UDF_NAME, func) } ds } override def checkConfig(): CheckResult = { - checkAllExists(config, "fields") + checkAllExists(config, FIELDS) } override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "separator" -> " ", - "source_field" -> "raw_message", - "target_field" -> Constants.ROW_ROOT)) + SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR, + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + TARGET_FILED -> Constants.ROW_ROOT)) config = config.withFallback(defaultConfig) } @@ -86,5 +88,5 @@ class Split extends BaseSparkTransform { filled.toSeq } - override def getPluginName: String = "split" + override def getPluginName: String = PLUGIN_NAME } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala new file mode 100644 index 00000000000..b23036ab28d --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +object SplitConfig { + val PLUGIN_NAME = "split" + val UDF_NAME = "Split" + + val FIELDS = "fields" + val SOURCE_FILED = "source_field" + val DEFAULT_SOURCE_FILED = "raw_message" + val TARGET_FILED = "target_field" + val SPLIT_SEPARATOR = "separator" + val DEFAULT_SPLIT_SEPARATOR = " " +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala index 2881b1a2992..b15a4dd5fce 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml new file mode 100644 index 00000000000..f9185ef4a59 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml @@ -0,0 +1,55 @@ + + + + + org.apache.seatunnel + seatunnel-transforms-spark + ${revision} + + 4.0.0 + + seatunnel-transform-spark-uuid + + + + org.apache.seatunnel + seatunnel-api-spark + ${project.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + junit + junit + + + diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform new file mode 100644 index 00000000000..281a724294d --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.seatunnel.spark.transform.UUID diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala new file mode 100644 index 00000000000..829df588c18 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +import java.security.SecureRandom + +import scala.collection.JavaConversions._ + +import com.google.common.annotations.VisibleForTesting +import org.apache.commons.math3.random.{RandomGenerator, Well19937c} +import org.apache.seatunnel.common.Constants +import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists +import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory +import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.seatunnel.spark.transform.UUIDConfig._ +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{col, udf} + +class UUID extends BaseSparkTransform { + private var prng: RandomGenerator = _ + + override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { + val key = config.getString(FIELDS) + + val func: UserDefinedFunction = udf(() => { + generate(config.getString(UUID_PREFIX)) + }) + var filterDf = df.withColumn(Constants.ROW_TMP, func()) + filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP)) + val ds = filterDf.drop(Constants.ROW_TMP) + if (func != null) { + env.getSparkSession.udf.register(UDF_NAME, func) + } + ds + } + + override def checkConfig(): CheckResult = { + checkAllExists(config, FIELDS) + } + + override def prepare(env: SparkEnvironment): Unit = { + val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE)) + config = config.withFallback(defaultConfig) + + /** + * The secure algorithm can be comparatively slow. + * The new nonSecure algorithm never blocks and is much faster. + * The nonSecure algorithm uses a secure random seed but is otherwise deterministic, + * though it is one of the strongest uniform pseudo-random number generators known so far. + * thanks for whoschek@cloudera.com + */ + if (config.getBoolean(UUID_SECURE)) { + val rand = new SecureRandom + val seed = for (_ <- 0 until 728) yield rand.nextInt + prng = new Well19937c(seed.toArray) + } + } + + @VisibleForTesting + def generate(prefix: String): String = { + val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong) + prefix + UUID + } + + // Only used for test + @VisibleForTesting + def setPrng(prng: RandomGenerator): Unit = { + this.prng = prng + } + + override def getPluginName: String = PLUGIN_NAME +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala new file mode 100644 index 00000000000..4cf1aeae044 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +object UUIDConfig { + val PLUGIN_NAME = "UUID" + val UDF_NAME = PLUGIN_NAME + + val FIELDS = "fields" + val DEFAULT_SOURCE_FILED = "raw_message" + val UUID_PREFIX = "prefix" + val DEFAULT_UUID_PREFIX = "" + val UUID_SECURE = "secure" + val DEFAULT_UUID_SECURE = false +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala new file mode 100644 index 00000000000..7d375118754 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +import java.security.SecureRandom + +import junit.framework.TestCase.assertEquals +import org.apache.commons.math3.random.Well19937c +import org.junit.Test + +class TestUUID { + @Test + def testUuid() { + val UUID = new UUID + assertEquals(36, UUID.generate("").length) + assertEquals(37, UUID.generate("u").length) + } + + @Test + def testSecureUuid() { + val rand = new SecureRandom + val seed = for (_ <- 0 until 728) yield rand.nextInt + val prng = new Well19937c(seed.toArray) + + val UUID = new UUID + UUID.setPrng(prng) + assertEquals(36, UUID.generate("").length) + assertEquals(37, UUID.generate("u").length) + } +}