From 96b044b47c965db4d4fa73e7ce8850e44032e6e6 Mon Sep 17 00:00:00 2001 From: xiaowu Date: Thu, 28 Apr 2022 23:10:15 +0800 Subject: [PATCH] Extract magic string to constant --- docs/en/transform/uuid.md | 10 ++-- .../constants/TransformConfigConstants.java | 49 +++++++++++++++++++ .../seatunnel/spark/transform/Json.scala | 20 ++++---- .../seatunnel/spark/transform/Replace.scala | 34 +++++++------ .../seatunnel/spark/transform/Split.scala | 20 ++++---- .../seatunnel/spark/transform/Sql.scala | 1 + ....apache.seatunnel.spark.BaseSparkTransform | 18 +++++++ .../transform/{Uuid.scala => UUID.scala} | 22 +++++---- .../{TestUuid.scala => TestUUID.scala} | 17 ++++--- 9 files changed, 136 insertions(+), 55 deletions(-) create mode 100644 seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/TransformConfigConstants.java create mode 100644 seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform rename seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/{Uuid.scala => UUID.scala} (82%) rename seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/{TestUuid.scala => TestUUID.scala} (81%) diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md index 1de780c4be9c..4633a843be6b 100644 --- a/docs/en/transform/uuid.md +++ b/docs/en/transform/uuid.md @@ -1,4 +1,4 @@ -# Uuid +# UUID ## Description @@ -38,7 +38,7 @@ Transform plugin common parameters, please refer to [Transform Plugin](common-op ## Examples ```bash - uuid { + UUID { fields = "u" prefix = "uuid-" secure = true @@ -46,10 +46,10 @@ Transform plugin common parameters, please refer to [Transform Plugin](common-op } ``` -Use `Uuid` as udf in sql. +Use `UUID` as udf in sql. ```bash - Uuid { + UUID { fields = "u" prefix = "uuid-" secure = true @@ -57,6 +57,6 @@ Use `Uuid` as udf in sql. # Use the uuid function (confirm that the fake table exists) sql { - sql = "select * from (select raw_message, uuid() as info_row from fake) t1" + sql = "select * from (select raw_message, UUID() as info_row from fake) t1" } ``` diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/TransformConfigConstants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/TransformConfigConstants.java new file mode 100644 index 000000000000..0373dac1fdc3 --- /dev/null +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/TransformConfigConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.constants; + +public final class TransformConfigConstants { + /* common transform constant */ + public static final String FIELDS = "fields"; + public static final String SOURCE_FILED = "source_field"; + public static final String DEFAULT_SOURCE_FILED = "raw_message"; + public static final String TARGET_FILED = "target_field"; + + /* json transform constant */ + public static final String SCHEMA_DIR = "schema_dir"; + public static final String SCHEMA_FILE = "schema_file"; + public static final String DEFAULT_SCHEMA_FILE = ""; + + /* replace transform constant */ + public static final String PATTERN = "pattern"; + public static final String REPLACEMENT = "replacement"; + public static final String REPLACE_REGEX = "is_regex"; + public static final boolean DEFAULT_REPLACE_REGEX = false; + public static final String REPLACE_FIRST = "replace_first"; + public static final boolean DEFAULT_REPLACE_FIRST = false; + + /* split transform constant */ + public static final String SPLIT_SEPARATOR = "separator"; + public static final String DEFAULT_SPLIT_SEPARATOR = " "; + + /* UUID transform constant */ + public static final String UUID_PREFIX = "prefix"; + public static final String DEFAULT_UUID_PREFIX = ""; + public static final String UUID_SECURE = "secure"; + public static final boolean DEFAULT_UUID_SECURE = false; +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala index 519be92b0bc5..b0ac2e08db45 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException} +import org.apache.seatunnel.common.constants.TransformConfigConstants._ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} import org.apache.spark.sql.functions._ @@ -40,19 +42,19 @@ class Json extends BaseSparkTransform { var useCustomSchema: Boolean = false override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") + val srcField = config.getString(SOURCE_FILED) val spark = env.getSparkSession import spark.implicits._ - config.getString("target_field") match { + config.getString(TARGET_FILED) match { case Constants.ROW_ROOT => { val jsonRDD = df.select(srcField).as[String].rdd val newDF = srcField match { // for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String] - case "raw_message" => { + case DEFAULT_SOURCE_FILED => { val tmpDF = if (this.useCustomSchema) { spark.read.schema(this.customSchema).json(jsonRDD) @@ -94,14 +96,14 @@ class Json extends BaseSparkTransform { override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "source_field" -> "raw_message", - "target_field" -> Constants.ROW_ROOT, - "schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString, - "schema_file" -> "")) + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + TARGET_FILED -> Constants.ROW_ROOT, + SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString, + SCHEMA_FILE -> DEFAULT_SCHEMA_FILE)) config = config.withFallback(defaultConfig) - val schemaFile = config.getString("schema_file") + val schemaFile = config.getString(SCHEMA_FILE) if (schemaFile.trim != "") { - parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile) + parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile) } } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala index 609dec15d085..b1d7ab0a29a3 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import scala.collection.JavaConversions._ @@ -24,6 +25,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.seatunnel.common.Constants import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.common.constants.TransformConfigConstants._ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} import org.apache.spark.sql.{Dataset, Row} @@ -32,16 +34,16 @@ import org.apache.spark.sql.functions.{col, udf} class Replace extends BaseSparkTransform { override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") - val key = config.getString("fields") + val srcField = config.getString(SOURCE_FILED) + val key = config.getString(FIELDS) val func: UserDefinedFunction = udf((s: String) => { replace( s, - config.getString("pattern"), - config.getString("replacement"), - config.getBoolean("is_regex"), - config.getBoolean("replace_first")) + config.getString(PATTERN), + config.getString(REPLACEMENT), + config.getBoolean(REPLACE_REGEX), + config.getBoolean(REPLACE_FIRST)) }) var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField))) filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP)) @@ -53,25 +55,25 @@ class Replace extends BaseSparkTransform { } override def checkConfig(): CheckResult = { - checkAllExists(config, "fields", "pattern", "replacement") + checkAllExists(config, FIELDS, PATTERN, REPLACEMENT) } override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "source_field" -> "raw_message", - "is_regex" -> false, - "replace_first" -> false)) + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + REPLACE_REGEX -> DEFAULT_REPLACE_REGEX, + REPLACE_FIRST -> DEFAULT_REPLACE_FIRST)) config = config.withFallback(defaultConfig) } @VisibleForTesting def replace( - str: String, - pattern: String, - replacement: String, - isRegex: Boolean, - replaceFirst: Boolean): String = { + str: String, + pattern: String, + replacement: String, + isRegex: Boolean, + replaceFirst: Boolean): String = { if (isRegex) { if (replaceFirst) pattern.replaceFirstIn(str, replacement) @@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform { } implicit def toReg(pattern: String): Regex = pattern.r + + override def getPluginName: String = "replace" } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala index 498776c03e12..d396b9d2d476 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import scala.collection.JavaConversions._ @@ -21,6 +22,7 @@ import scala.collection.JavaConversions._ import org.apache.seatunnel.common.Constants import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.common.constants.TransformConfigConstants._ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} import org.apache.spark.sql.{Dataset, Row} @@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf} class Split extends BaseSparkTransform { override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val srcField = config.getString("source_field") - val keys = config.getStringList("fields") + val srcField = config.getString(SOURCE_FILED) + val keys = config.getStringList(FIELDS) // https://stackoverflow.com/a/33345698/1145750 var func: UserDefinedFunction = null - val ds = config.getString("target_field") match { + val ds = config.getString(TARGET_FILED) match { case Constants.ROW_ROOT => func = udf((s: String) => { - split(s, config.getString("separator"), keys.size()) + split(s, config.getString(SPLIT_SEPARATOR), keys.size()) }) var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField))) for (i <- 0 until keys.size()) { @@ -47,7 +49,7 @@ class Split extends BaseSparkTransform { filterDf.drop(Constants.ROW_TMP) case targetField: String => func = udf((s: String) => { - val values = split(s, config.getString("separator"), keys.size) + val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size) val kvs = (keys zip values).toMap kvs }) @@ -60,15 +62,15 @@ class Split extends BaseSparkTransform { } override def checkConfig(): CheckResult = { - checkAllExists(config, "fields") + checkAllExists(config, FIELDS) } override def prepare(env: SparkEnvironment): Unit = { val defaultConfig = ConfigFactory.parseMap( Map( - "separator" -> " ", - "source_field" -> "raw_message", - "target_field" -> Constants.ROW_ROOT)) + SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR, + SOURCE_FILED -> DEFAULT_SOURCE_FILED, + TARGET_FILED -> Constants.ROW_ROOT)) config = config.withFallback(defaultConfig) } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala index 2881b1a29923..b15a4dd5fcea 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform new file mode 100644 index 000000000000..281a724294d7 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.seatunnel.spark.transform.UUID diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala similarity index 82% rename from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala rename to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala index 423c6ed2b399..6f188b618eab 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/Uuid.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import java.security.SecureRandom -import java.util.UUID import scala.collection.JavaConversions._ @@ -26,20 +26,21 @@ import org.apache.commons.math3.random.{RandomGenerator, Well19937c} import org.apache.seatunnel.common.Constants import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.common.constants.TransformConfigConstants._ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{col, udf} -class Uuid extends BaseSparkTransform { +class UUID extends BaseSparkTransform { private var prng: RandomGenerator = _ override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - val key = config.getString("fields") + val key = config.getString(FIELDS) val func: UserDefinedFunction = udf(() => { - generate(config.getString("prefix")) + generate(config.getString(UUID_PREFIX)) }) var filterDf = df.withColumn(Constants.ROW_TMP, func()) filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP)) @@ -51,11 +52,11 @@ class Uuid extends BaseSparkTransform { } override def checkConfig(): CheckResult = { - checkAllExists(config, "fields") + checkAllExists(config, FIELDS) } override def prepare(env: SparkEnvironment): Unit = { - val defaultConfig = ConfigFactory.parseMap(Map("prefix" -> "", "secure" -> false)) + val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE)) config = config.withFallback(defaultConfig) /** @@ -65,7 +66,7 @@ class Uuid extends BaseSparkTransform { * though it is one of the strongest uniform pseudo-random number generators known so far. * thanks for whoschek@cloudera.com */ - if (config.getBoolean("secure")) { + if (config.getBoolean(UUID_SECURE)) { val rand = new SecureRandom val seed = for (_ <- 0 until 728) yield rand.nextInt prng = new Well19937c(seed.toArray) @@ -74,12 +75,15 @@ class Uuid extends BaseSparkTransform { @VisibleForTesting def generate(prefix: String): String = { - val uuid = if (prng == null) UUID.randomUUID else new UUID(prng.nextLong, prng.nextLong) - prefix + uuid + val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong) + prefix + UUID } + // Only used for test @VisibleForTesting def setPrng(prng: RandomGenerator): Unit = { this.prng = prng } + + override def getPluginName: String = "UUID" } diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala similarity index 81% rename from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala rename to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala index 3c414fd3fe3e..7d3751187540 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUuid.scala +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.spark.transform import java.security.SecureRandom @@ -22,12 +23,12 @@ import junit.framework.TestCase.assertEquals import org.apache.commons.math3.random.Well19937c import org.junit.Test -class TestUuid { +class TestUUID { @Test def testUuid() { - val uuid = new Uuid - assertEquals(36, uuid.generate("").length) - assertEquals(37, uuid.generate("u").length) + val UUID = new UUID + assertEquals(36, UUID.generate("").length) + assertEquals(37, UUID.generate("u").length) } @Test @@ -36,9 +37,9 @@ class TestUuid { val seed = for (_ <- 0 until 728) yield rand.nextInt val prng = new Well19937c(seed.toArray) - val uuid = new Uuid - uuid.setPrng(prng) - assertEquals(36, uuid.generate("").length) - assertEquals(37, uuid.generate("u").length) + val UUID = new UUID + UUID.setPrng(prng) + assertEquals(36, UUID.generate("").length) + assertEquals(37, UUID.generate("u").length) } }