Skip to content

Commit

Permalink
JOC-1794 FIX: Missing AgentCouplingFailed when Controller starts with…
Browse files Browse the repository at this point in the history
… missing Agent cluster
  • Loading branch information
Zschimmer committed Oct 14, 2024
1 parent d6f3818 commit a7f48ec
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ import fs2.Stream
import izumi.reflect.Tag
import js7.base.catsutils.CatsEffectExtensions.*
import js7.base.generic.Completed
import js7.base.log.{LogLevel, Logger}
import js7.base.log.Logger.syntax.*
import js7.base.problem.Checked
import js7.base.session.SessionApi
import js7.base.log.{LogLevel, Logger}
import js7.base.problem.{Checked, Problem}
import js7.base.utils.CatsUtils.Nel
import js7.base.utils.ScalaUtils.syntax.*
import js7.base.utils.{DelayConf, Delayer}
import js7.base.web.HttpClient
import js7.data.Problems.NoActiveClusterNodeProblem
import js7.data.cluster.{ClusterNodeApi, ClusterNodeState}
import scala.math.Ordered.orderingToOrdered

final class ActiveClusterNodeSelector[Api <: HttpClusterNodeApi] private(
apisResource: ResourceIO[Nel[Api]],
delayConf: DelayConf,
clusterName: String = "",
onCouplingError: Api => Throwable => IO[Unit] =
(_: Api) => (t: Throwable) => SessionApi.onErrorTryAgain(toString, t).void)
onCouplingError: Problem => IO[Unit])
(using Tag[Api]):

private val logger = Logger.withPrefix[this.type](clusterName)
Expand All @@ -33,15 +32,17 @@ final class ActiveClusterNodeSelector[Api <: HttpClusterNodeApi] private(
apisResource.flatMap: apis =>
Resource.eval:
selectActiveNodeApiOnly(
apis,
api => throwable => onCouplingError(api)(throwable))
apis)

private def selectActiveNodeApiOnly(apis: Nel[Api]): IO[Api] =
logger.traceIOWithResult:
apis match
case Nel(api, Nil) => // Only one api
api.loginUntilReachable(
onError = t => onCouplingError(api)(t).as(true),
onError = t =>
onCouplingError:
Problem.reverseThrowable(t).withPrefix(api.toString)
.as(true),
onlyIfNotLoggedIn = true)
.map((_: Completed) => api)

Expand Down Expand Up @@ -70,8 +71,19 @@ final class ActiveClusterNodeSelector[Api <: HttpClusterNodeApi] private(
api -> nodeState
logNonActive(delayer, list, maybeActive, n = apis.length)
maybeActive match
case None => delayer.sleep.as(Left(()))
case Some(x) => IO.right(x)
case None =>
val problem =
// clusterStates may only differ if nodes are not coupled
val clusterStates = list.collect:
case ApiWithNodeState(_, Right(clusterNodeState)) =>
s"${clusterNodeState.nodeId} -> ${clusterNodeState.clusterState.toShortString}"
NoActiveClusterNodeProblem(clusterStates)
logger.log(delayer.logLevel, s"${delayer.symbol} $problem")
onCouplingError(problem) *>
delayer.sleep.as(Left(()))

case Some(x) =>
IO.right(x)
.handleErrorWith: throwable =>
logger.log(delayer.logLevel, s"${delayer.symbol} ${throwable.toStringWithCauses}")
if throwable.getStackTrace.nonEmpty then logger.debug(s"💥 $throwable", throwable)
Expand Down Expand Up @@ -101,15 +113,6 @@ final class ActiveClusterNodeSelector[Api <: HttpClusterNodeApi] private(
logger.warn(s"Cluster node ${api.baseUri} is not accessible: $problem")
case _ =>

if maybeActive.isEmpty then
// Different clusterStates only iff nodes are not coupled
val clusterStates = list.collect:
case ApiWithNodeState(api, Right(clusterNodeState)) =>
s"${api.baseUri} => ${clusterNodeState.clusterState.getClass.simpleScalaName}"

logger.log(delayer.logLevel, s"${delayer.symbol
} None of the $n cluster nodes seems to be active${
clusterStates.nonEmpty ?? s": ${clusterStates.mkString(" · ")}"}")


private case class ApiWithFiber(
Expand All @@ -128,8 +131,7 @@ object ActiveClusterNodeSelector:
def selectActiveNodeApi[Api <: HttpClusterNodeApi](
apisResource: ResourceIO[Nel[Api]],
delayConf: DelayConf,
onCouplingError: Api => Throwable => IO[Unit] =
(_: Api) => (t: Throwable) => SessionApi.onErrorTryAgain(toString, t).void,
onCouplingError: Problem => IO[Unit] = (_: Problem) => IO.unit,
clusterName: String = "")
(using Tag[Api])
: ResourceIO[Api] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ WebServiceStillNotAvailable = "Web service is still not available"
NoDirector = "Agent Director is still not ready (or Director hasn't been installed)"
GoOrderInapplicable = "GoOrder($orderId) is not applicable at this position or in this state"
HttpIdleTimeout = "Nothing has been received from $serverName since $duration"
NoActiveClusterNode = "No cluster node seems to be active"
# Do not change
TestCode = "TestMessage argument=$argument"
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import js7.base.monixlike.MonixLikeExtensions.raceFold
import js7.base.monixutils.AsyncVariable
import js7.base.problem.Checked.*
import js7.base.problem.Problems.InvalidSessionTokenProblem
import js7.base.problem.{Checked, Problem, ProblemException}
import js7.base.problem.{Checked, Problem}
import js7.base.service.Service
import js7.base.session.SessionApi
import js7.base.time.ScalaTime.*
Expand Down Expand Up @@ -470,12 +470,7 @@ extends Service.StoppableByRequest:
ActiveClusterNodeSelector.selectActiveNodeApi(
clientsResource,
conf.recouplingStreamReader.delayConf,
onCouplingError = _ => throwable =>
onCouplingFailed:
throwable match
case ProblemException(problem) => problem
case t => Problem.fromThrowable(t)
.void,
onCouplingError = onCouplingFailed(_).void,
clusterName = agentPath.toString)

private def clientsResource: ResourceIO[Nel[AgentClient]] =
Expand Down
7 changes: 6 additions & 1 deletion js7-data/shared/src/main/scala/js7/data/Problems.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import js7.data.node.NodeId
import js7.data.order.OrderId
import js7.data.value.expression.Expression
import js7.data.value.expression.Expression.FunctionCall
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable.Map.Map1
import scala.concurrent.duration.FiniteDuration

object Problems:
case object PassiveClusterNodeShutdownNotAllowedProblem extends Problem.ArgumentlessCoded
Expand Down Expand Up @@ -139,3 +139,8 @@ object Problems:
def arguments: Map[String, String] = Map1("orderId", orderId.toString)

case object ClusterNodeHasBeenSwitchedOverProblem extends Problem.ArgumentlessCoded

final case class NoActiveClusterNodeProblem(clusterStates: Seq[String]) extends Problem.Coded:
def arguments: Map[String, String] =
Map1("clusterStates", clusterStates.mkString(", "))
.filter(_._2.nonEmpty)
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object JournaledProxy:
ActiveClusterNodeSelector.selectActiveNodeApi[RequiredApi_[S]](
apisResource,
proxyConf.recouplingStreamReaderConf.delayConf,
_ => onCouplingError)
problem => IO(onProxyEvent(ProxyCouplingError(problem))))
.flatMap: api =>
Stream.eval:
maybeState.fold(api.checkedSnapshot(eventId = fromEventId))(IO.right).timed
Expand Down Expand Up @@ -133,9 +133,6 @@ object JournaledProxy:
.orThrow/*TODO Restart*/
.withEventId(stampedEvent.eventId)))

def onCouplingError(throwable: Throwable) = IO:
onProxyEvent(ProxyCouplingError(Problem.fromThrowable(throwable)))

Stream.eval:
DelayConf(1.s, 1.s, 1.s, 1.s, 1.s, 2.s, 3.s, 5.s).start[IO]
.flatMap:
Expand Down

0 comments on commit a7f48ec

Please sign in to comment.