Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-50370
Browse files Browse the repository at this point in the history
  • Loading branch information
panbingkun committed Dec 18, 2024
2 parents 4d0885e + 5ef99bd commit ac8819e
Show file tree
Hide file tree
Showing 81 changed files with 7,587 additions and 142 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_python_connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
sed -i 's/rootLogger.level = info/rootLogger.level = warn/g' conf/log4j2.properties
# Start a Spark Connect server for local
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.8-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
--jars "`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
Expand All @@ -101,7 +101,7 @@ jobs:
mv pyspark.back python/pyspark
# Start a Spark Connect server for local-cluster
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.8-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--master "local-cluster[2, 4, 1024]" \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
--jars "`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_python_connect35.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
sed -i 's/rootLogger.level = info/rootLogger.level = warn/g' conf/log4j2.properties
# Start a Spark Connect server for local
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.8-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
--jars "`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.8-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.7-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.8-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
45 changes: 45 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
],
"sqlState" : "42000"
},
"AMBIGUOUS_RESOLVER_EXTENSION" : {
"message" : [
"The single-pass analyzer cannot process this query or command because the extension choice for <operator> is ambiguous: <extensions>."
],
"sqlState" : "XX000"
},
"ARITHMETIC_OVERFLOW" : {
"message" : [
"<message>.<alternative> If necessary set <config> to \"false\" to bypass this error."
Expand Down Expand Up @@ -1659,6 +1665,39 @@
],
"sqlState" : "22000"
},
"HYBRID_ANALYZER_EXCEPTION" : {
"message" : [
"An failure occurred when attempting to resolve a query or command with both the legacy fixed-point analyzer as well as the single-pass resolver."
],
"subClass" : {
"FIXED_POINT_FAILED_SINGLE_PASS_SUCCEEDED" : {
"message" : [
"Fixed-point resolution failed, but single-pass resolution succeeded.",
"Single-pass analyzer output:",
"<singlePassOutput>"
]
},
"LOGICAL_PLAN_COMPARISON_MISMATCH" : {
"message" : [
"Outputs of fixed-point and single-pass analyzers do not match.",
"Fixed-point analyzer output:",
"<fixedPointOutput>",
"Single-pass analyzer output:",
"<singlePassOutput>"
]
},
"OUTPUT_SCHEMA_COMPARISON_MISMATCH" : {
"message" : [
"Output schemas of fixed-point and single-pass analyzers do not match.",
"Fixed-point analyzer output schema:",
"<fixedPointOutputSchema>",
"Single-pass analyzer output schema:",
"<singlePassOutputSchema>"
]
}
},
"sqlState" : "XX000"
},
"IDENTIFIER_TOO_MANY_NAME_PARTS" : {
"message" : [
"<identifier> is not a valid identifier as it has more than 2 name parts."
Expand Down Expand Up @@ -5659,6 +5698,12 @@
},
"sqlState" : "0A000"
},
"UNSUPPORTED_SINGLE_PASS_ANALYZER_FEATURE" : {
"message" : [
"The single-pass analyzer cannot process this query or command because it does not yet support <feature>."
],
"sqlState" : "0A000"
},
"UNSUPPORTED_STREAMING_OPERATOR_WITHOUT_WATERMARK" : {
"message" : [
"<outputMode> output mode not supported for <statefulOperator> on streaming DataFrames/DataSets without watermark."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import org.apache.spark.internal.Logging

abstract class MariaDBDatabaseOnDocker extends DatabaseOnDocker with Logging {
override val imageName: String =
sys.env.getOrElse("MARIADB_DOCKER_IMAGE_NAME", "mariadb:10.11.10")
override val env: Map[String, String] = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 3306

override def getEntryPoint: Option[String] =
Some("/docker-entrypoint/mariadb-docker-entrypoint.sh")
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,11 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected val userName = s"mariadb/$dockerIp"
override protected val keytabFileName = "mariadb.keytab"

override val db = new DatabaseOnDocker {
override val imageName = sys.env.getOrElse("MARIADB_DOCKER_IMAGE_NAME", "mariadb:10.6.19")
override val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 3306
override val db = new MariaDBDatabaseOnDocker() {

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=$principal"

override def getEntryPoint: Option[String] =
Some("/docker-entrypoint/mariadb-docker-entrypoint.sh")

override def beforeContainerStart(
hostConfigBuilder: HostConfig,
containerConfigBuilder: ContainerConfig): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.7</version>
<version>0.10.9.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.ArrayImplicits.SparkArrayOps
import org.apache.spark.util.Utils

private[spark] object PythonUtils extends Logging {
val PY4J_ZIP_NAME = "py4j-0.10.9.7-src.zip"
val PY4J_ZIP_NAME = "py4j-0.10.9.8-src.zip"

/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
def sparkPythonPath: String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ parquet-format-structures/1.15.0//parquet-format-structures-1.15.0.jar
parquet-hadoop/1.15.0//parquet-hadoop-1.15.0.jar
parquet-jackson/1.15.0//parquet-jackson-1.15.0.jar
pickle/1.5//pickle-1.5.jar
py4j/0.10.9.7//py4j-0.10.9.7.jar
py4j/0.10.9.8//py4j-0.10.9.8.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/9.7.3//rocksdbjni-9.7.3.jar
scala-collection-compat_2.13/2.7.0//scala-collection-compat_2.13-2.7.0.jar
Expand Down
2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build
SOURCEDIR ?= source
BUILDDIR ?= build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.7-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.8-src.zip)

# Put it first so that "make" without argument is like "make help".
help:
Expand Down
2 changes: 1 addition & 1 deletion python/docs/make2.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
set SOURCEDIR=source
set BUILDDIR=build

set PYTHONPATH=..;..\lib\py4j-0.10.9.7-src.zip
set PYTHONPATH=..;..\lib\py4j-0.10.9.8-src.zip

if "%1" == "" goto help

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ PySpark requires the following dependencies.
========================== ========================= =============================
Package Supported version Note
========================== ========================= =============================
`py4j` >=0.10.9.7 Required to interact with JVM
`py4j` >=0.10.9.8 Required to interact with JVM
========================== ========================= =============================

Additional libraries that enhance functionality but are not included in the installation packages:
Expand Down
Binary file removed python/lib/py4j-0.10.9.7-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.10.9.8-src.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion python/packaging/classic/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def run(self):
license="http://www.apache.org/licenses/LICENSE-2.0",
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
install_requires=["py4j==0.10.9.7"],
install_requires=["py4j==0.10.9.8"],
extras_require={
"ml": ["numpy>=%s" % _minimum_numpy_version],
"mllib": ["numpy>=%s" % _minimum_numpy_version],
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.8-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ import scala.util.{Failure, Random, Success, Try}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.resolver.{
AnalyzerBridgeState,
HybridAnalyzer,
Resolver => OperatorResolver,
ResolverExtension,
ResolverGuard
}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -146,15 +153,43 @@ case class AnalysisContext(
// lookup a temporary function. And export to the view metadata.
referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
referredTempVariableNames: Seq[Seq[String]] = Seq.empty,
outerPlan: Option[LogicalPlan] = None)
outerPlan: Option[LogicalPlan] = None,

/**
* This is a bridge state between this fixed-point [[Analyzer]] and a single-pass [[Resolver]].
* It's managed ([[setSinglePassResolverBridgeState]] method) by the [[HybridAnalyzer]] - the
* goal is to preserve it correctly between the fixed-point and single-pass runs.
* [[AnalysisContext.reset]] simply propagates it to prevent it from being reset in
* [[Analyzer.execute]]. Normally it's always [[None]], unless
* [[ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER]] is set to [[true]].
*
* See [[AnalyzerBridgeState]] and [[HybridAnalyzer]] for more info.
*/
private var singlePassResolverBridgeState: Option[AnalyzerBridgeState] = None) {

def setSinglePassResolverBridgeState(bridgeState: Option[AnalyzerBridgeState]): Unit =
singlePassResolverBridgeState = bridgeState

def getSinglePassResolverBridgeState: Option[AnalyzerBridgeState] =
singlePassResolverBridgeState
}

object AnalysisContext {
private val value = new ThreadLocal[AnalysisContext]() {
override def initialValue: AnalysisContext = AnalysisContext()
}

def get: AnalysisContext = value.get()
def reset(): Unit = value.remove()

def reset(): Unit = {
// We need to preserve the single-pass resolver bridge state here, since it's managed by the
// [[HybridAnalyzer]] (set or reset to `None`) to avoid it being reset in [[execute]].
// It acts as a bridge between the single-pass and fixed-point analyzers in the absence of any
// other explicit state.
val prevSinglePassResolverBridgeState = value.get.getSinglePassResolverBridgeState
value.remove()
value.get.setSinglePassResolverBridgeState(prevSinglePassResolverBridgeState)
}

private def set(context: AnalysisContext): Unit = value.set(context)

Expand Down Expand Up @@ -219,9 +254,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
if (plan.analyzed) return plan
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
checkAnalysis(analyzed)
analyzed
new HybridAnalyzer(
this,
new ResolverGuard(catalogManager),
new OperatorResolver(
catalogManager,
singlePassResolverExtensions,
singlePassMetadataResolverExtensions
)
).apply(plan, tracker)
}
}

Expand All @@ -245,6 +286,20 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
errorOnExceed = true,
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)

/**
* Extensions for the single-pass analyzer.
*
* See [[ResolverExtension]] for more info.
*/
val singlePassResolverExtensions: Seq[ResolverExtension] = Nil

/**
* Extensions used for early resolution of the single-pass analyzer.
*
* See [[ResolverExtension]] for more info.
*/
val singlePassMetadataResolverExtensions: Seq[ResolverExtension] = Nil

/**
* Override to provide additional rules for the "Resolution" batch.
*/
Expand Down Expand Up @@ -1018,7 +1073,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
relationResolution.resolveRelation(u).getOrElse(u)
resolveRelation(u).getOrElse(u)
case other => other
}

Expand All @@ -1035,7 +1090,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case write: V2WriteCommand =>
write.table match {
case u: UnresolvedRelation if !u.isStreaming =>
relationResolution.resolveRelation(u).map(unwrapRelationPlan).map {
resolveRelation(u).map(unwrapRelationPlan).map {
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
case r: DataSourceV2Relation => write.withNewTable(r)
Expand All @@ -1050,12 +1105,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}

case u: UnresolvedRelation =>
relationResolution.resolveRelation(u).map(resolveViews).getOrElse(u)
resolveRelation(u).map(resolveViews).getOrElse(u)

case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)
relationResolution.resolveRelation(u, timeTravelSpec).getOrElse(r)
resolveRelation(u, timeTravelSpec).getOrElse(r)

case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
Expand Down Expand Up @@ -1119,6 +1174,25 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}
}

def resolveRelation(
unresolvedRelation: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
relationResolution
.resolveRelation(
unresolvedRelation,
timeTravelSpec
)
.map { relation =>
// We put the synchronously resolved relation into the [[AnalyzerBridgeState]] for
// it to be later reused by the single-pass [[Resolver]] to avoid resolving the relation
// metadata twice.
AnalysisContext.get.getSinglePassResolverBridgeState.map { bridgeState =>
bridgeState.relationsWithResolvedMetadata.put(unresolvedRelation, relation)
}
relation
}
}
}

/** Handle INSERT INTO for DSv2 */
Expand Down
Loading

0 comments on commit ac8819e

Please sign in to comment.