Skip to content

Commit

Permalink
Merge branch 'master' into support_date_convert
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyuhang0 authored Sep 27, 2024
2 parents 004fd0c + df2a5a4 commit 85e6165
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.pingcap.tispark.safepoint

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.pingcap.tikv.ClientSession
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.tikv.common.exception.TiInternalException
import org.tikv.common.meta.TiTimestamp
Expand All @@ -29,7 +30,8 @@ case class ServiceSafePoint(
serviceId: String,
ttl: Long,
GCMaxWaitTime: Long,
clientSession: ClientSession) {
clientSession: ClientSession,
sparkSession: SparkSession) {

private val PD_UPDATE_SAFE_POINT_BACKOFF: Int = 20 * 1000
private final val logger = LoggerFactory.getLogger(getClass.getName)
Expand All @@ -38,6 +40,14 @@ case class ServiceSafePoint(
new ThreadFactoryBuilder().setNameFormat("serviceSafePoint-thread-%d").setDaemon(true).build)
service.scheduleAtFixedRate(
() => {
val now = clientSession.getTiKVSession.getTimestamp
if (now.getPhysical - TiTimestamp.extractPhysical(minStartTs) >= GCMaxWaitTime * 1000) {
val msg =
s"Can not pause GC more than spark.tispark.gc_max_wait_time=$GCMaxWaitTime s. start_ts: ${minStartTs}, now: ${now.getVersion}. You can adjust spark.tispark.gc_max_wait_time to increase the gc max wait time."
logger.error(msg)
sparkSession.stop()
throw new TiInternalException(msg)
}
if (minStartTs != Long.MaxValue) {
val safePoint = clientSession.getTiKVSession.getPDClient.updateServiceGCSafePoint(
serviceId,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/TiContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class TiContext(val sparkSession: SparkSession) extends Serializable with Loggin
"tispark_" + UUID.randomUUID,
TiConfigConst.DEFAULT_GC_SAFE_POINT_TTL,
GCMaxWaitTime,
clientSession)
clientSession,
sparkSession)

sparkSession.sparkContext.addSparkListener(new SparkListener() {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ spark.tispark.jdbc.tls_enable=true
spark.tispark.jdbc.server_cert_store = /config/cert/jks/server-cert-store
spark.tispark.jdbc.server_cert_password = 12345678
spark.tispark.jdbc.client_cert_store = /config/cert/jks/client-keystore
spark.tispark.jdbc.client_cert_password = 123456
spark.tispark.jdbc.client_cert_password = 123456

0 comments on commit 85e6165

Please sign in to comment.