Skip to content

Commit

Permalink
JS-2160 FIX Agent shutdown no longer fail-over
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Dec 18, 2024
1 parent 143564a commit b55cef0
Showing 1 changed file with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.softwaremill.tagging.{@@, Tagger}
import io.circe.syntax.EncoderOps
import java.time.ZoneId
import js7.agent.configuration.AgentConfiguration
import scala.language.unsafeNulls
import js7.agent.data.AgentState
import js7.agent.data.commands.AgentCommand
import js7.agent.data.commands.AgentCommand.{AttachItem, AttachOrder, AttachSignedItem, DetachItem, DetachOrder, MarkOrder, OrderCommand, ReleaseEvents, Response}
Expand All @@ -34,7 +33,7 @@ import js7.base.utils.CatsUtils.syntax.logWhenItTakesLonger
import js7.base.utils.Collections.implicits.InsertableMutableMap
import js7.base.utils.ScalaUtils.syntax.*
import js7.base.utils.{Allocated, DuplicateKeyException, SetOnce}
import js7.cluster.ClusterNode
import js7.cluster.{ClusterNode, WorkingClusterNode}
import js7.common.pekkoutils.Pekkos.{encodeAsActorName, uniqueActorName}
import js7.common.pekkoutils.SupervisorStrategies
import js7.common.system.PlatformInfos.currentPlatformInfo
Expand Down Expand Up @@ -75,6 +74,7 @@ import scala.collection.mutable
import scala.concurrent.duration.*
import scala.concurrent.duration.Deadline.now
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.language.unsafeNulls
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -194,10 +194,26 @@ extends MainJournalingActor[AgentState, Event], Stash:
.map(_ => self ! Internal.JobDriverStopped)
.unsafeRunAndForget()
if jobRegister.isEmpty && !terminatingJournal then
persist(AgentShutDown) { (_, _) =>
terminatingJournal = true
journalAllocated.release.unsafeRunAndForget()
}
IO.whenA(!shutDown.isFailOrSwitchover):
clusterNode.workingClusterNode.match
case Left(problem) =>
IO(logger.warn(s"clusterNode.workingClusterNode => $problem"))
case Right(workingClusterNode) =>
workingClusterNode.shutDownThisNode
.flatMap:
case Right(Completed) => IO.unit
case Left(problem) =>
IO(logger.warn(s"workingClusterNode.shutDownThisNode => $problem"))
.map: _ =>
self ! Internal.AgentShutdown
.unsafeRunAndForget()
end if

def finallyShutdown(): Unit =
persist(AgentShutDown): (_, _) =>
terminatingJournal = true
journalAllocated.release.unsafeRunAndForget()

import shutdown.shuttingDown

private val subagentKeeper =
Expand Down Expand Up @@ -944,6 +960,9 @@ extends MainJournalingActor[AgentState, Event], Stash:
case Internal.StillTerminating =>
shutdown.onStillTerminating()

case Internal.AgentShutdown =>
shutdown.finallyShutdown()

case _ =>
super.unhandled(message)

Expand Down Expand Up @@ -985,6 +1004,7 @@ object AgentOrderKeeper:
case object JobDriverStopped
case object StillTerminating extends DeadLetterSuppression
case object ContinueSwitchover
case object AgentShutdown

private final class JobEntry(
val jobKey: JobKey,
Expand Down

0 comments on commit b55cef0

Please sign in to comment.