From 53c7d16080cfd1b00a7a738d89fb9fd33e0b8640 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 24 Dec 2023 14:36:07 -0800 Subject: [PATCH] [SPARK-46478][SQL] Revert SPARK-43049 to use oracle varchar(255) for string ### What changes were proposed in this pull request? Revert SPARK-43049 to use Oracle Varchar (255) for string for performance consideration ### Why are the changes needed? for performance consideration ### Does this PR introduce _any_ user-facing change? yes, storing strings in Oracle table, which is defined by spark DDL with string columns. Users will get an error if string values exceed 255 ```java org.apache.spark.SparkRuntimeException: [EXCEED_LIMIT_LENGTH] Exceeds char/varchar type length limitation: 255. SQLSTATE: 54006 [info] at org.apache.spark.sql.errors.QueryExecutionErrors$.exceedMaxLimit(QueryExecutionErrors.scala:2512) ``` ### How was this patch tested? revised unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44452 Closes #44442 from yaooqinn/SPARK-46478. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../sql/jdbc/OracleIntegrationSuite.scala | 3 +-- .../sql/jdbc/v2/OracleIntegrationSuite.scala | 23 ++++++++++++++----- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 2 +- .../sql/catalyst/util/CharVarcharUtils.scala | 3 ++- .../apache/spark/sql/jdbc/OracleDialect.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 ++-- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index b7f2da4d83c01..4e13d21864fef 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -175,8 +175,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark } - // SPARK-43049: Use CLOB instead of VARCHAR(255) for StringType for Oracle jdbc-am"" - test("SPARK-12941: String datatypes to be mapped to CLOB in Oracle") { + test("SPARK-12941: String datatypes to be mapped to VARCHAR(255) in Oracle") { // create a sample dataframe with string type val df1 = sparkContext.parallelize(Seq(("foo"))).toDF("x") // write the dataframe to the oracle table tbl diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 5584a56e51d98..0c844219aeb5f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -22,8 +22,9 @@ import java.util.Locale import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkRuntimeException} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ @@ -88,6 +89,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes s"jdbc:oracle:thin:system/$oracle_password@//$ip:$port/freepdb1" } + override val defaultMetadata: Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)") + .build() + override def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.oracle", classOf[JDBCTableCatalog].getName) .set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort)) @@ -106,11 +112,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, defaultMetadata) + var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, super.defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, defaultMetadata) + expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, super.defaultMetadata) assert(t.schema === expectedSchema) // Update column type from LONG to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -131,12 +137,17 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT) - test("SPARK-43049: Use CLOB instead of VARCHAR(255) for StringType for Oracle JDBC") { + test("SPARK-46478: Revert SPARK-43049 to use varchar(255) for string") { val tableName = catalogName + ".t1" withTable(tableName) { sql(s"CREATE TABLE $tableName(c1 string)") - sql(s"INSERT INTO $tableName SELECT rpad('hi', 256, 'spark')") - assert(sql(s"SELECT char_length(c1) from $tableName").head().get(0) === 256) + checkError( + exception = intercept[SparkRuntimeException] { + sql(s"INSERT INTO $tableName SELECT rpad('hi', 256, 'spark')") + }, + errorClass = "EXCEED_LIMIT_LENGTH", + parameters = Map("limit" -> "255") + ) } } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 76277dbc96b61..b93106b0ce789 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -49,7 +49,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def notSupportsTableComment: Boolean = false - val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + def defaultMetadata: Metadata = new MetadataBuilder().putLong("scale", 0).build() def testUpdateColumnNullability(tbl: String): Unit = { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala index 192d812cc7aaf..0e7bffcc3f956 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala @@ -29,7 +29,8 @@ import org.apache.spark.util.ArrayImplicits._ object CharVarcharUtils extends Logging with SparkCharVarcharUtils { - private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" + // visible for testing + private[sql] val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" /** * Replaces CharType/VarcharType with StringType recursively in the given struct type. If a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index bcc8bc44d2f1a..b6c98eedc16da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -118,7 +118,7 @@ private case object OracleDialect extends JdbcDialect { case DoubleType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.DOUBLE)) case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT)) - case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) + case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 262bd488d1bbf..ffecccd50cf18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1277,7 +1277,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("SPARK 12941: The data type mapping for StringType to Oracle") { val oracleDialect = JdbcDialects.get("jdbc:oracle://127.0.0.1/db") assert(oracleDialect.getJDBCType(StringType). - map(_.databaseTypeDefinition).get == "CLOB") + map(_.databaseTypeDefinition).get == "VARCHAR2(255)") } test("SPARK-16625: General data types to be mapped to Oracle") { @@ -1295,7 +1295,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(getJdbcType(oracleDialect, DoubleType) == "NUMBER(19, 4)") assert(getJdbcType(oracleDialect, ByteType) == "NUMBER(3)") assert(getJdbcType(oracleDialect, ShortType) == "NUMBER(5)") - assert(getJdbcType(oracleDialect, StringType) == "CLOB") + assert(getJdbcType(oracleDialect, StringType) == "VARCHAR2(255)") assert(getJdbcType(oracleDialect, BinaryType) == "BLOB") assert(getJdbcType(oracleDialect, DateType) == "DATE") assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")