Skip to content

Commit

Permalink
[SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic test failures
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fix sporadic Spark Connect test failures.
1. SPARK-50534: VerifyEvents.this.listener.executeHolder was not declared "volatile", causing the thread to repeatedly read potentially outdated value. The data structure is only used by the test suite.
2. SPARK-50535: org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions is susceptible to system time synchronization (e.g., NTP), leaving stale sessions. invalidateAllSessions is only used by test suites.

### Why are the changes needed?

Fix sporadic test failures.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite and org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49253 from changgyoopark-db/SPARK-50534.

Authored-by: changgyoopark-db <changgyoo.park@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
changgyoopark-db authored and HyukjinKwon committed Dec 23, 2024
1 parent 827d2a0 commit 5ac42e2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ class SparkConnectSessionManager extends Logging {
* Used for testing
*/
private[connect] def invalidateAllSessions(): Unit = {
periodicMaintenance(defaultInactiveTimeoutMs = 0L, ignoreCustomTimeout = true)
assert(sessionStore.isEmpty)
sessionStore.forEach((key, sessionHolder) => {
removeSessionHolder(key)
shutdownSessionHolder(sessionHolder)
})
closedSessionsCache.invalidateAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,8 @@ class SparkConnectServiceSuite
}
class MockSparkListener() extends SparkListener {
val semaphoreStarted = new Semaphore(0)
var executeHolder = Option.empty[ExecuteHolder]
// Accessed by multiple threads in parallel.
@volatile var executeHolder = Option.empty[ExecuteHolder]
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SparkListenerConnectOperationStarted =>
Expand Down

0 comments on commit 5ac42e2

Please sign in to comment.