From 5ac42e2fe122c972cd3a2ecc87b11b1a70758f72 Mon Sep 17 00:00:00 2001 From: changgyoopark-db Date: Mon, 23 Dec 2024 09:03:47 +0900 Subject: [PATCH] [SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic test failures ### What changes were proposed in this pull request? Fix sporadic Spark Connect test failures. 1. SPARK-50534: VerifyEvents.this.listener.executeHolder was not declared "volatile", causing the thread to repeatedly read potentially outdated value. The data structure is only used by the test suite. 2. SPARK-50535: org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions is susceptible to system time synchronization (e.g., NTP), leaving stale sessions. invalidateAllSessions is only used by test suites. ### Why are the changes needed? Fix sporadic test failures. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite and org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49253 from changgyoopark-db/SPARK-50534. Authored-by: changgyoopark-db Signed-off-by: Hyukjin Kwon --- .../sql/connect/service/SparkConnectSessionManager.scala | 6 ++++-- .../sql/connect/planner/SparkConnectServiceSuite.scala | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index a306856efa33c..b0b74a36e187b 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -289,8 +289,10 @@ class SparkConnectSessionManager extends Logging { * Used for testing */ private[connect] def invalidateAllSessions(): Unit = { - periodicMaintenance(defaultInactiveTimeoutMs = 0L, ignoreCustomTimeout = true) - assert(sessionStore.isEmpty) + sessionStore.forEach((key, sessionHolder) => { + removeSessionHolder(key) + shutdownSessionHolder(sessionHolder) + }) closedSessionsCache.invalidateAll() } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index d6d137e6d91aa..5e88725691656 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -919,7 +919,8 @@ class SparkConnectServiceSuite } class MockSparkListener() extends SparkListener { val semaphoreStarted = new Semaphore(0) - var executeHolder = Option.empty[ExecuteHolder] + // Accessed by multiple threads in parallel. + @volatile var executeHolder = Option.empty[ExecuteHolder] override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: SparkListenerConnectOperationStarted =>