Skip to content

Commit

Permalink
FIX Stopping both cluster nodes simultaneous does not block (3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Nov 15, 2024
1 parent 88c48ec commit 94f0b95
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import js7.base.log.{CorrelId, Logger}
import js7.base.monixlike.MonixLikeExtensions.*
import js7.base.monixutils.StreamPauseDetector.*
import js7.base.problem.Checked.*
import js7.base.problem.Problems.ShuttingDownProblem
import js7.base.problem.{Checked, Problem, ProblemException}
import js7.base.system.startup.Halt
import js7.base.time.ScalaTime.*
Expand All @@ -29,7 +28,7 @@ import js7.base.web.{HttpClient, Uri}
import js7.cluster.ActiveClusterNode.*
import js7.cluster.watch.api.ClusterWatchConfirmation
import js7.common.http.RecouplingStreamReader
import js7.data.Problems.{AckFromActiveClusterNodeProblem, ClusterCommandInapplicableProblem, ClusterNodeIsNotActiveProblem, ClusterSettingNotUpdatable, MissingPassiveClusterNodeHeartbeatProblem, PassiveClusterNodeUrlChangeableOnlyWhenNotCoupledProblem}
import js7.data.Problems.{AckFromActiveClusterNodeProblem, ClusterCommandInapplicableProblem, ClusterModuleShuttingDownProblem, ClusterNodeIsNotActiveProblem, ClusterSettingNotUpdatable, MissingPassiveClusterNodeHeartbeatProblem, PassiveClusterNodeUrlChangeableOnlyWhenNotCoupledProblem}
import js7.data.cluster.ClusterCommand.{ClusterConfirmCoupling, ClusterStartBackupNode}
import js7.data.cluster.ClusterEvent.{ClusterActiveNodeRestarted, ClusterActiveNodeShutDown, ClusterCoupled, ClusterCouplingPrepared, ClusterPassiveLost, ClusterSettingUpdated, ClusterSwitchedOver, ClusterWatchRegistered}
import js7.data.cluster.ClusterState.{ActiveShutDown, Coupled, Empty, HasNodes, IsDecoupled, NodesAppointed, PassiveLost, PreparedToBeCoupled}
Expand Down Expand Up @@ -365,7 +364,7 @@ final class ActiveClusterNode[S <: ClusterableState[S]] private[cluster](
case _ =>
Right(None)
.recoverT:
case ShuttingDownProblem => ()
case ClusterModuleShuttingDownProblem => ()
.flatMapT: _ =>
stopAcknowledgingRequested = true
stopAcknowledging.complete(())
Expand Down Expand Up @@ -502,7 +501,7 @@ final class ActiveClusterNode[S <: ClusterableState[S]] private[cluster](
case Success(Left(_: MissingPassiveClusterNodeHeartbeatProblem)) =>
logger.warn("❗ Continue as single active cluster node, without passive node")

case Success(Left(problem @ ShuttingDownProblem)) =>
case Success(Left(problem @ ClusterModuleShuttingDownProblem)) =>
logger.debug(s"fetchAndHandleAcknowledgedEventIds($passiveUri) failed with $problem")

case Success(Left(problem)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import js7.base.log.LogLevel.Debug
import js7.base.log.Logger.syntax.*
import js7.base.log.{BlockingSymbol, CorrelId, Logger}
import js7.base.monixlike.MonixLikeExtensions.onErrorRestartLoop
import js7.base.problem.Problems.ShuttingDownProblem
import js7.base.problem.{Checked, Problem}
import js7.base.service.Service
import js7.base.time.ScalaTime.*
Expand All @@ -21,6 +20,7 @@ import js7.base.utils.Tests.isTest
import js7.base.utils.{AsyncLock, Atomic}
import js7.cluster.ClusterWatchCounterpart.*
import js7.cluster.watch.api.ClusterWatchConfirmation
import js7.data.Problems.ClusterModuleShuttingDownProblem
import js7.data.cluster.ClusterEvent.{ClusterCouplingPrepared, ClusterNodesAppointed, ClusterPassiveLost}
import js7.data.cluster.ClusterState.{Coupled, FailedOver, HasNodes, PassiveLost}
import js7.data.cluster.ClusterWatchProblems.{ClusterNodeLossNotConfirmedProblem, ClusterWatchIdDoesNotMatchProblem, ClusterWatchRequestDoesNotMatchProblem, NoClusterWatchProblem, OtherClusterWatchStillAliveProblem}
Expand Down Expand Up @@ -145,7 +145,7 @@ extends Service.StoppableByRequest:
.onErrorRestartLoop(()):
case (_: RequestTimeoutException, _, retry) =>
shuttingDown.tryGet.flatMap:
case Some(()) => IO.left(ShuttingDownProblem)
case Some(()) => IO.left(ClusterModuleShuttingDownProblem)
case None =>
SyncDeadline.usingNow:
sym.onWarn()
Expand All @@ -157,7 +157,7 @@ extends Service.StoppableByRequest:

case (t, _, _) => IO.raiseError(t)
.flatTap:
case Left(problem @ ShuttingDownProblem) =>
case Left(problem @ ClusterModuleShuttingDownProblem) =>
IO(logger.log(sym.relievedLogLevel min Debug,
s"⚠️ ${request.toShortString} => $problem · after ${since.elapsed.pretty}"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import js7.base.generic.Completed
import js7.base.log.Logger.syntax.*
import js7.base.log.{CorrelId, Logger}
import js7.base.monixlike.MonixLikeExtensions.tapError
import js7.base.problem.Problems.ShuttingDownProblem
import js7.base.problem.{Checked, Problem}
import js7.base.system.startup.Halt.haltJava
import js7.base.utils.Assertions.assertThat
Expand All @@ -21,6 +20,7 @@ import js7.base.utils.ScalaUtils.syntax.*
import js7.base.utils.{AsyncLock, Atomic, SetOnce}
import js7.cluster.ClusterWatchSynchronizer.*
import js7.cluster.watch.api.ClusterWatchConfirmation
import js7.data.Problems.ClusterModuleShuttingDownProblem
import js7.data.cluster.ClusterEvent.{ClusterPassiveLost, ClusterWatchRegistered}
import js7.data.cluster.ClusterState.HasNodes
import js7.data.cluster.ClusterWatchProblems.ClusterPassiveLostWhileFailedOverProblem
Expand Down Expand Up @@ -121,7 +121,7 @@ private final class ClusterWatchSynchronizer(
case clusterState: HasNodes
if clusterState.activeId == ownId
&& ioResult != Left(ClusterPassiveLostWhileFailedOverProblem)
&& ioResult != Left(ShuttingDownProblem) =>
&& ioResult != Left(ClusterModuleShuttingDownProblem) =>
continueHeartbeating(
clusterState,
registerClusterWatchId.orThrow,
Expand Down Expand Up @@ -293,7 +293,7 @@ private final class ClusterWatchSynchronizer(
logger.trace("◼️ doACheckedHeartbeat canceled due to `stopping`")
Completed

case Right(Left(problem @ ShuttingDownProblem)) =>
case Right(Left(problem @ ClusterModuleShuttingDownProblem)) =>
IO:
logger.debug(s"⚠️ doACheckedHeartbeat => $problem")
Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ShuttingDown = "JS7 server is shutting down"
NotBecomingSoleBecauseClusterIsNotEmpty = "Not becoming Sole because ClusterState is not Empty"
SnapshotObjectNotApplicable = "Unexpected snapshot object '$object'"
EventNotApplicable = "Event '$event' is not applicable for state '$state'"
ClusterModuleShuttingDown = "Cluster module is shutting down"
ClusterNodeIsNotReady = "Cluster node is not ready yet"
ClusterNodeIsNotActive = "This cluster node is not (yet) active"
BackupClusterNodeNotAppointed = "This backup cluster node has not yet been appointed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import js7.base.log.{CorrelId, Logger}
import js7.base.monixlike.MonixLikeExtensions.{dematerialize, materialize, scheduleAtFixedRates, scheduleOnce}
import js7.base.monixlike.{SerialSyncCancelable, SyncCancelable}
import js7.base.problem.Checked.*
import js7.base.problem.Problems.ShuttingDownProblem
import js7.base.problem.{Checked, Problem}
import js7.base.thread.CatsBlocking.syntax.*
import js7.base.time.JavaTimeConverters.AsScalaDuration
Expand All @@ -46,7 +45,7 @@ import js7.controller.configuration.ControllerConfiguration
import js7.controller.problems.{ControllerIsNotReadyProblem, ControllerIsShuttingDownProblem, ControllerIsSwitchingOverProblem}
import js7.core.command.CommandMeta
import js7.core.problems.ReverseReleaseEventsProblem
import js7.data.Problems.{CannotDeleteChildOrderProblem, CannotDeleteWatchingOrderProblem, UnknownOrderProblem}
import js7.data.Problems.{CannotDeleteChildOrderProblem, CannotDeleteWatchingOrderProblem, ClusterModuleShuttingDownProblem, UnknownOrderProblem}
import js7.data.agent.AgentRefStateEvent.{AgentEventsObserved, AgentMirroredEvent, AgentReady, AgentReset, AgentShutDown}
import js7.data.agent.{AgentPath, AgentRef, AgentRefState, AgentRunId}
import js7.data.board.BoardEvent.{NoticeDeleted, NoticePosted}
Expand Down Expand Up @@ -682,8 +681,8 @@ extends Stash, MainJournalingActor[ControllerState, Event]:
case Internal.ClusterModuleTerminatedUnexpectedly(tried) =>
// Stacktrace has been debug-logged by Cluster
tried match
case Success(Left(problem @ ShuttingDownProblem)) =>
logger.error(s"Cluster module terminated with $problem")
case Success(Left(problem @ ClusterModuleShuttingDownProblem)) if shuttingDown =>
logger.debug(s"Cluster module terminated with $problem")
case Success(checked: Checked[Completed]) =>
val msg: Any = checked.fold(identity, identity)
logger.error(s"Cluster module terminated unexpectedly: $msg")
Expand Down
3 changes: 3 additions & 0 deletions js7-data/shared/src/main/scala/js7/data/Problems.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ object Problems:
def arguments: Map[String, String] = Map(
"orderId" -> orderId.string)

case object ClusterModuleShuttingDownProblem extends Problem.ArgumentlessCoded:
override val httpStatusCode = 503 // Service Unavailable

case object ClusterNodeIsNotReadyProblem extends Problem.ArgumentlessCoded:
override val httpStatusCode = 503 // ServiceUnavailable

Expand Down

0 comments on commit 94f0b95

Please sign in to comment.