Skip to content

Commit

Permalink
Extract magic string to constant
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaowu committed Apr 28, 2022
1 parent d0aa789 commit 96b044b
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 55 deletions.
10 changes: 5 additions & 5 deletions docs/en/transform/uuid.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Uuid
# UUID

## Description

Expand Down Expand Up @@ -38,25 +38,25 @@ Transform plugin common parameters, please refer to [Transform Plugin](common-op
## Examples

```bash
uuid {
UUID {
fields = "u"
prefix = "uuid-"
secure = true
}
}
```

Use `Uuid` as udf in sql.
Use `UUID` as udf in sql.

```bash
Uuid {
UUID {
fields = "u"
prefix = "uuid-"
secure = true
}

# Use the uuid function (confirm that the fake table exists)
sql {
sql = "select * from (select raw_message, uuid() as info_row from fake) t1"
sql = "select * from (select raw_message, UUID() as info_row from fake) t1"
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.constants;

public final class TransformConfigConstants {
/* common transform constant */
public static final String FIELDS = "fields";
public static final String SOURCE_FILED = "source_field";
public static final String DEFAULT_SOURCE_FILED = "raw_message";
public static final String TARGET_FILED = "target_field";

/* json transform constant */
public static final String SCHEMA_DIR = "schema_dir";
public static final String SCHEMA_FILE = "schema_file";
public static final String DEFAULT_SCHEMA_FILE = "";

/* replace transform constant */
public static final String PATTERN = "pattern";
public static final String REPLACEMENT = "replacement";
public static final String REPLACE_REGEX = "is_regex";
public static final boolean DEFAULT_REPLACE_REGEX = false;
public static final String REPLACE_FIRST = "replace_first";
public static final boolean DEFAULT_REPLACE_FIRST = false;

/* split transform constant */
public static final String SPLIT_SEPARATOR = "separator";
public static final String DEFAULT_SPLIT_SEPARATOR = " ";

/* UUID transform constant */
public static final String UUID_PREFIX = "prefix";
public static final String DEFAULT_UUID_PREFIX = "";
public static final String UUID_SECURE = "secure";
public static final boolean DEFAULT_UUID_SECURE = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.spark.transform

import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException}
import org.apache.seatunnel.common.constants.TransformConfigConstants._
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.functions._
Expand All @@ -40,19 +42,19 @@ class Json extends BaseSparkTransform {
var useCustomSchema: Boolean = false

override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
val srcField = config.getString("source_field")
val srcField = config.getString(SOURCE_FILED)
val spark = env.getSparkSession

import spark.implicits._

config.getString("target_field") match {
config.getString(TARGET_FILED) match {
case Constants.ROW_ROOT => {

val jsonRDD = df.select(srcField).as[String].rdd

val newDF = srcField match {
// for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String]
case "raw_message" => {
case DEFAULT_SOURCE_FILED => {
val tmpDF =
if (this.useCustomSchema) {
spark.read.schema(this.customSchema).json(jsonRDD)
Expand Down Expand Up @@ -94,14 +96,14 @@ class Json extends BaseSparkTransform {
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
"source_field" -> "raw_message",
"target_field" -> Constants.ROW_ROOT,
"schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
"schema_file" -> ""))
SOURCE_FILED -> DEFAULT_SOURCE_FILED,
TARGET_FILED -> Constants.ROW_ROOT,
SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
SCHEMA_FILE -> DEFAULT_SCHEMA_FILE))
config = config.withFallback(defaultConfig)
val schemaFile = config.getString("schema_file")
val schemaFile = config.getString(SCHEMA_FILE)
if (schemaFile.trim != "") {
parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile)
parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.spark.transform

import scala.collection.JavaConversions._
Expand All @@ -24,6 +25,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.common.constants.TransformConfigConstants._
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
Expand All @@ -32,16 +34,16 @@ import org.apache.spark.sql.functions.{col, udf}

class Replace extends BaseSparkTransform {
override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
val srcField = config.getString("source_field")
val key = config.getString("fields")
val srcField = config.getString(SOURCE_FILED)
val key = config.getString(FIELDS)

val func: UserDefinedFunction = udf((s: String) => {
replace(
s,
config.getString("pattern"),
config.getString("replacement"),
config.getBoolean("is_regex"),
config.getBoolean("replace_first"))
config.getString(PATTERN),
config.getString(REPLACEMENT),
config.getBoolean(REPLACE_REGEX),
config.getBoolean(REPLACE_FIRST))
})
var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
Expand All @@ -53,25 +55,25 @@ class Replace extends BaseSparkTransform {
}

override def checkConfig(): CheckResult = {
checkAllExists(config, "fields", "pattern", "replacement")
checkAllExists(config, FIELDS, PATTERN, REPLACEMENT)
}

override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
"source_field" -> "raw_message",
"is_regex" -> false,
"replace_first" -> false))
SOURCE_FILED -> DEFAULT_SOURCE_FILED,
REPLACE_REGEX -> DEFAULT_REPLACE_REGEX,
REPLACE_FIRST -> DEFAULT_REPLACE_FIRST))
config = config.withFallback(defaultConfig)
}

@VisibleForTesting
def replace(
str: String,
pattern: String,
replacement: String,
isRegex: Boolean,
replaceFirst: Boolean): String = {
str: String,
pattern: String,
replacement: String,
isRegex: Boolean,
replaceFirst: Boolean): String = {

if (isRegex) {
if (replaceFirst) pattern.replaceFirstIn(str, replacement)
Expand All @@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform {
}

implicit def toReg(pattern: String): Regex = pattern.r

override def getPluginName: String = "replace"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.spark.transform

import scala.collection.JavaConversions._

import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.common.constants.TransformConfigConstants._
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
Expand All @@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf}
class Split extends BaseSparkTransform {

override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
val srcField = config.getString("source_field")
val keys = config.getStringList("fields")
val srcField = config.getString(SOURCE_FILED)
val keys = config.getStringList(FIELDS)

// https://stackoverflow.com/a/33345698/1145750
var func: UserDefinedFunction = null
val ds = config.getString("target_field") match {
val ds = config.getString(TARGET_FILED) match {
case Constants.ROW_ROOT =>
func = udf((s: String) => {
split(s, config.getString("separator"), keys.size())
split(s, config.getString(SPLIT_SEPARATOR), keys.size())
})
var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
for (i <- 0 until keys.size()) {
Expand All @@ -47,7 +49,7 @@ class Split extends BaseSparkTransform {
filterDf.drop(Constants.ROW_TMP)
case targetField: String =>
func = udf((s: String) => {
val values = split(s, config.getString("separator"), keys.size)
val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size)
val kvs = (keys zip values).toMap
kvs
})
Expand All @@ -60,15 +62,15 @@ class Split extends BaseSparkTransform {
}

override def checkConfig(): CheckResult = {
checkAllExists(config, "fields")
checkAllExists(config, FIELDS)
}

override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
"separator" -> " ",
"source_field" -> "raw_message",
"target_field" -> Constants.ROW_ROOT))
SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR,
SOURCE_FILED -> DEFAULT_SOURCE_FILED,
TARGET_FILED -> Constants.ROW_ROOT))
config = config.withFallback(defaultConfig)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.spark.transform

import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.seatunnel.spark.transform.UUID
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.spark.transform

import java.security.SecureRandom
import java.util.UUID

import scala.collection.JavaConversions._

Expand All @@ -26,20 +26,21 @@ import org.apache.commons.math3.random.{RandomGenerator, Well19937c}
import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.common.constants.TransformConfigConstants._
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}

class Uuid extends BaseSparkTransform {
class UUID extends BaseSparkTransform {
private var prng: RandomGenerator = _

override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
val key = config.getString("fields")
val key = config.getString(FIELDS)

val func: UserDefinedFunction = udf(() => {
generate(config.getString("prefix"))
generate(config.getString(UUID_PREFIX))
})
var filterDf = df.withColumn(Constants.ROW_TMP, func())
filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
Expand All @@ -51,11 +52,11 @@ class Uuid extends BaseSparkTransform {
}

override def checkConfig(): CheckResult = {
checkAllExists(config, "fields")
checkAllExists(config, FIELDS)
}

override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(Map("prefix" -> "", "secure" -> false))
val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE))
config = config.withFallback(defaultConfig)

/**
Expand All @@ -65,7 +66,7 @@ class Uuid extends BaseSparkTransform {
* though it is one of the strongest uniform pseudo-random number generators known so far.
* thanks for whoschek@cloudera.com
*/
if (config.getBoolean("secure")) {
if (config.getBoolean(UUID_SECURE)) {
val rand = new SecureRandom
val seed = for (_ <- 0 until 728) yield rand.nextInt
prng = new Well19937c(seed.toArray)
Expand All @@ -74,12 +75,15 @@ class Uuid extends BaseSparkTransform {

@VisibleForTesting
def generate(prefix: String): String = {
val uuid = if (prng == null) UUID.randomUUID else new UUID(prng.nextLong, prng.nextLong)
prefix + uuid
val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong)
prefix + UUID
}

// Only used for test
@VisibleForTesting
def setPrng(prng: RandomGenerator): Unit = {
this.prng = prng
}

override def getPluginName: String = "UUID"
}
Loading

0 comments on commit 96b044b

Please sign in to comment.