From 19dbcdec83f62b7c1d4c0b57c7251d589827cbe6 Mon Sep 17 00:00:00 2001 From: xiaowu Date: Thu, 28 Apr 2022 15:49:57 +0800 Subject: [PATCH] [Feature][seatunnel-transforms] Add Uuid transform for spark and fix some error at replace doc a a --- docs/en/transform/replace.md | 10 +-- docs/en/transform/uuid.md | 62 ++++++++++++++ seatunnel-core/seatunnel-core-spark/pom.xml | 6 ++ .../seatunnel-transforms-spark/pom.xml | 1 + .../seatunnel-transform-spark-uuid/pom.xml | 55 ++++++++++++ .../seatunnel/spark/transform/Uuid.scala | 84 +++++++++++++++++++ .../seatunnel/spark/transform/TestUuid.scala | 44 ++++++++++ 7 files changed, 257 insertions(+), 5 deletions(-) create mode 100644 docs/en/transform/uuid.md create mode 100644 seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml create mode 100644 seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala create mode 100644 seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md index 8286007cd516..ecfce6dd578c 100644 --- a/docs/en/transform/replace.md +++ b/docs/en/transform/replace.md @@ -1,4 +1,4 @@ -# Json +# Replace ## Description @@ -33,13 +33,13 @@ The name of the field to replaced. The string to match. -### is_regex [string] +### replacement [string] -Whether or not to interpret the pattern as a regex (true) or string literal (false). +The replacement pattern (is_regex is true) or string literal (is_regex is false). -### replacement [boolean] +### is_regex [boolean] -The replacement pattern (is_regex is true) or string literal (is_regex is false). +Whether or not to interpret the pattern as a regex (true) or string literal (false). ### replace_first [boolean] diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md new file mode 100644 index 000000000000..1de780c4be9c --- /dev/null +++ b/docs/en/transform/uuid.md @@ -0,0 +1,62 @@ +# Uuid + +## Description + +Generate a universally unique identifier on a specified field. + +:::tip + +This transform **ONLY** supported by Spark. + +::: + +## Options + +| name | type | required | default value | +| -------------- | ------ | -------- | ------------- | +| fields | string | yes | - | +| prefix | string | no | - | +| secure | boolean| no | false | + +### field [string] + +The name of the field to generate. + +### prefix [string] + +The prefix string constant to prepend to each generated UUID. + +### secure [boolean] + +the cryptographically secure algorithm can be comparatively slow +The nonSecure algorithm uses a secure random seed but is otherwise deterministic + +### common options [string] + +Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details + +## Examples + +```bash + uuid { + fields = "u" + prefix = "uuid-" + secure = true + } +} +``` + +Use `Uuid` as udf in sql. + +```bash + Uuid { + fields = "u" + prefix = "uuid-" + secure = true + } + + # Use the uuid function (confirm that the fake table exists) + sql { + sql = "select * from (select raw_message, uuid() as info_row from fake) t1" + } +``` diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml index 53ae179d2852..86410e077e2e 100644 --- a/seatunnel-core/seatunnel-core-spark/pom.xml +++ b/seatunnel-core/seatunnel-core-spark/pom.xml @@ -74,6 +74,12 @@ seatunnel-transform-spark-replace ${project.version} + + + org.apache.seatunnel + seatunnel-transform-spark-uuid + ${project.version} + diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml index 0626dfc0228d..e95a4ebca6a0 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml +++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml @@ -34,6 +34,7 @@ seatunnel-transform-spark-json seatunnel-transform-spark-split seatunnel-transform-spark-replace + seatunnel-transform-spark-uuid seatunnel-transform-spark-sql diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml new file mode 100644 index 000000000000..f9185ef4a599 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml @@ -0,0 +1,55 @@ + + + + + org.apache.seatunnel + seatunnel-transforms-spark + ${revision} + + 4.0.0 + + seatunnel-transform-spark-uuid + + + + org.apache.seatunnel + seatunnel-api-spark + ${project.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + junit + junit + + + diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala new file mode 100644 index 000000000000..ee745edf6146 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.spark.transform + +import java.security.SecureRandom +import java.util.UUID + +import scala.collection.JavaConversions._ + +import com.google.common.annotations.VisibleForTesting +import org.apache.commons.math3.random.{RandomGenerator, Well19937c} +import org.apache.seatunnel.common.Constants +import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists +import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory +import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{col, udf} + +class Uuid extends BaseSparkTransform { + private var prng: RandomGenerator = _ + + override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { + val key = config.getString("fields") + + val func: UserDefinedFunction = udf(() => { + generate(config.getString("prefix")) + }) + var filterDf = df.withColumn(Constants.ROW_TMP, func()) + filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP)) + val ds = filterDf.drop(Constants.ROW_TMP) + if (func != null) { + env.getSparkSession.udf.register("Uuid", func) + } + ds + } + + override def checkConfig(): CheckResult = { + checkAllExists(config, "fields") + } + + override def prepare(env: SparkEnvironment): Unit = { + val defaultConfig = ConfigFactory.parseMap(Map("prefix" -> "", "secure" -> false)) + config = config.withFallback(defaultConfig) + /** + * The secure algorithm can be comparatively slow. + * The new nonSecure algorithm never blocks and is much faster. + * The nonSecure algorithm uses a secure random seed but is otherwise deterministic, + * though it is one of the strongest uniform pseudo-random number generators known so far. + * thanks for whoschek@cloudera.com + */ + if (config.getBoolean("secure")) { + val rand = new SecureRandom // secure & slow + val seed = for (_ <- 0 until 728) yield rand.nextInt + prng = new Well19937c(seed.toArray) // non-secure & fast + } + } + + @VisibleForTesting + def generate(prefix: String): String = { + val uuid = if (prng == null) UUID.randomUUID else new UUID(prng.nextLong, prng.nextLong) + prefix + uuid + } + + @VisibleForTesting + def setPrng(prng: RandomGenerator) { + this.prng = prng + } +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala new file mode 100644 index 000000000000..3c414fd3fe3e --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.spark.transform + +import java.security.SecureRandom + +import junit.framework.TestCase.assertEquals +import org.apache.commons.math3.random.Well19937c +import org.junit.Test + +class TestUuid { + @Test + def testUuid() { + val uuid = new Uuid + assertEquals(36, uuid.generate("").length) + assertEquals(37, uuid.generate("u").length) + } + + @Test + def testSecureUuid() { + val rand = new SecureRandom + val seed = for (_ <- 0 until 728) yield rand.nextInt + val prng = new Well19937c(seed.toArray) + + val uuid = new Uuid + uuid.setPrng(prng) + assertEquals(36, uuid.generate("").length) + assertEquals(37, uuid.generate("u").length) + } +}