Skip to content

Commit

Permalink
release 1.1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed May 13, 2022
1 parent cf67275 commit 68fff61
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,32 @@ class JobTimeoutManager extends Logging {
private def timeoutDetective(): Unit = {
if (timeoutCheck) {
def checkAndSwitch(job: EntranceJob): Unit = {
info(s"Checking whether the job timed out: ${job.getId()}")
val currentTime = System.currentTimeMillis() / 1000
val queuingTime = currentTime - job.getScheduledTime / 1000
val runningTime = currentTime - job.getStartTime / 1000
info(s"Checking whether the job id ${job.getJobRequest.getId()} timed out. ")
val currentTimeSeconds = System.currentTimeMillis() / 1000
// job.isWaiting == job in queue
val jobScheduleStartTimeSeconds = if (job.isWaiting) job.createTime / 1000 else currentTimeSeconds
val queuingTimeSeconds = currentTimeSeconds - jobScheduleStartTimeSeconds
val jobRunningStartTimeSeconds = if (job.getStartTime > 0) job.getStartTime / 1000 else currentTimeSeconds
val runningTimeSeconds = currentTimeSeconds - jobRunningStartTimeSeconds
if (!job.isCompleted) {
job.jobRequest.getLabels foreach {
case queueTimeOutLabel: JobQueuingTimeoutLabel =>
if (queueTimeOutLabel.getQueuingTimeout > 0 && queuingTime >= queueTimeOutLabel.getQueuingTimeout) {
warn(s"Job queuing timeout, cancel it now: ${job.getId()}")
job.cancel()
if (job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 && queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout) {
logger.warn(s"Job ${job.getJobRequest.getId()} queued time : ${queuingTimeSeconds} seconds, which was over queueTimeOut : ${queueTimeOutLabel.getQueuingTimeout} seconds, cancel it now! ")
job.onFailure(s"Job queued ${queuingTimeSeconds} seconds over max queue time : ${queueTimeOutLabel.getQueuingTimeout} seconds.", null)
}
case jobRunningTimeoutLabel: JobRunningTimeoutLabel =>
if (jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTime >= jobRunningTimeoutLabel.getRunningTimeout) {
warn(s"Job running timeout, cancel it now: ${job.getId()}")
job.cancel()
if (job.isRunning && jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTimeSeconds >= jobRunningTimeoutLabel.getRunningTimeout) {
logger.warn(s"Job ${job.getJobRequest.getId()} run timeout ${runningTimeSeconds} seconds, which was over runTimeOut : ${jobRunningTimeoutLabel.getRunningTimeout} seconds, cancel it now! ")
job.onFailure(s"Job run ${runningTimeSeconds} seconds over max run time : ${jobRunningTimeoutLabel.getRunningTimeout} seconds.", null)
}
case _ =>
}
}
}

timeoutJobByName.foreach(item => {
info(s"Running timeout detection!")
logger.info(s"Running timeout detection!")
synchronized {
jobCompleteDelete(item._1)
if (jobExist(item._1)) checkAndSwitch(item._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@
</insert>

<insert id="replaceIntoLabelKeyValue">
INSERT INTO linkis_cg_manager_label_value_relation (label_value_key,label_value_content,label_id,update_time,create_time) VALUES(#{labelKey}, #{labelStringValue},#{labelId},now(),now())
ON DUPLICATE KEY UPDATE label_value_content=#{labelStringValue},update_time=now()
REPLACE INTO linkis_cg_manager_label_value_relation (label_value_key,label_value_content,label_id,update_time,create_time) VALUES(#{labelKey}, #{labelStringValue},#{labelId},now(),now())
</insert>

<select id="getLabel" resultMap="persistenceLabelResultMap">
Expand Down

0 comments on commit 68fff61

Please sign in to comment.