Skip to content

Commit

Permalink
refactor SubagentEventListener
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed May 28, 2024
1 parent 090409c commit 09f7419
Showing 1 changed file with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import cats.syntax.traverse.*
import fs2.Stream
import js7.base.catsutils.CatsEffectExtensions.{joinStd, left}
import js7.base.catsutils.UnsafeMemoizable.unsafeMemoize
import js7.base.fs2utils.StreamExtensions.interruptWhenF
import js7.base.generic.Completed
import js7.base.log.Logger
import js7.base.log.Logger.syntax.*
import js7.base.monixlike.MonixLikeExtensions.{completedL, takeUntilEval}
import js7.base.monixutils.StreamPauseDetector.*
import js7.base.monixutils.Switch
import js7.base.problem.Checked.*
Expand Down Expand Up @@ -64,46 +64,53 @@ private trait SubagentEventListener:
protected final val coupled = Switch(false)

protected final def stopEventListener: IO[Unit] =
lock.lock(
logger.debugIO(IO.defer(
IO.whenA(isListening.getAndSet(false))(IO.defer {
lock.lock:
logger.debugIO:
IO.defer(IO.whenA(isListening.getAndSet(false)):
stopObserving
.flatMap(_.tryPut(())).void
.*>(IO.defer(observing.joinStd))
}))))
.flatMap(_.tryPut(()))
.*>(IO.defer:
observing.joinStd))
.logWhenItTakesLonger

protected final def startEventListener: IO[Unit] =
lock.lock(IO.defer {
if isListening.getAndSet(true) then
val msg = "Duplicate startEventListener"
logger.error(msg)
IO.raiseError(new RuntimeException(s"$toString: $msg"))
else
stopObserving.flatMap(_.tryTake)
.*>(observeEvents)
.start
.flatMap(fiber => IO {
observing = fiber
})
})
lock.lock:
IO.defer:
if isListening.getAndSet(true) then
val msg = "Duplicate startEventListener"
logger.error(msg)
IO.raiseError(new RuntimeException(s"$toString: $msg"))
else
stopObserving.flatMap(_.tryTake)
.*>(observeEvents)
.start
.flatMap(fiber => IO:
observing = fiber)

private def observeEvents: IO[Unit] =
logger.debugIO(IO.defer {
IO.defer:
val recouplingStreamReader = newEventListener()
val bufferDelay = conf.eventBufferDelay max conf.commitDelay
logger.debugIO(recouplingStreamReader
logger.debugStream(recouplingStreamReader
.stream(
api,
after = journal.unsafeCurrentState().idToSubagentItemState(subagentId).eventId)
.takeUntilEval(stopObserving.flatMap(_.read))
.onFinalizeCase: exitCase =>
IO(logger.trace(s"### onFinalize A $exitCase"))
.interruptWhenF:
// FIXME interruptWhen does not work if Stream is busy?
// Maybe due to FS2/Pekko coupling
// Tests have to delay before stopping the Director
stopObserving.flatMap(_.read)
*> IO(logger.trace("### observeEvents stops"))
.onFinalizeCase: exitCase =>
IO(logger.trace(s"### onFinalize B $exitCase"))
.pipe: stream =>
if !bufferDelay.isPositive then
stream.chunks
else
stream.groupWithin(
conf.eventBufferSize,
conf.eventBufferDelay max conf.commitDelay)
.evalMap(_
stream.groupWithin(conf.eventBufferSize, bufferDelay)
.evalTap(_
.traverse(handleEvent)
.flatMap: updatedStampedChunk0 =>
val (updatedStampedSeqSeq, followUps) = updatedStampedChunk0.toArraySeq.unzip
Expand All @@ -120,15 +127,15 @@ private trait SubagentEventListener:
// to terminate StartOrderProcess command idempotency detection and
// allow a new StartOrderProcess command for a next process.
.*>(updatedStampedSeq
.collect { case Stamped(_, _, KeyedEvent(o: OrderId, _: OrderProcessed)) => o }
.collect:
case Stamped(_, _, KeyedEvent(o: OrderId, _: OrderProcessed)) => o
.traverse(detachProcessedOrder))
.*>(lastEventId.traverse(releaseEvents))
.*>(followUps.combineAll))
.onFinalize(recouplingStreamReader
.terminateAndLogout
.logWhenItTakesLonger)
.completedL)
})
.logWhenItTakesLonger("recouplingStreamReader.terminateAndLogout"))
).compile.drain

/** Returns optionally the event and a follow-up task. */
private def handleEvent(stamped: Stamped[AnyKeyedEvent])
Expand Down Expand Up @@ -170,23 +177,21 @@ private trait SubagentEventListener:
override protected def idleTimeout = None // SubagentEventListener itself detects heartbeat loss

override protected def couple(eventId: EventId) =
logger.debugIO(
logger.debugIO:
dedicateOrCouple
.flatMapT { case (_, eventId) =>
.flatMapT: (_, eventId) =>
coupled.switchOn
.as(Right(eventId))
})
.<*(IO {
lastProblem = None
})
.<*(IO:
lastProblem = None)

protected def getStream(api: HttpSubagentApi, after: EventId) =
logger.debugIO("getStream", s"after=$after")(
journal.state.map(_.idToSubagentItemState.checked(subagentId).map(_.subagentRunId))
.flatMapT {
logger.debugIO("getStream", s"after=$after"):
journal.state
.map(_.idToSubagentItemState.checked(subagentId).map(_.subagentRunId))
.flatMapT:
case None => IO.left(Problem.pure("Subagent not yet dedicated"))
case Some(subagentRunId) => getStream(api, after, subagentRunId)
})

private def getStream(api: HttpSubagentApi, after: EventId, subagentRunId: SubagentRunId) =
api.login(onlyIfNotLoggedIn = true) *>
Expand Down Expand Up @@ -231,19 +236,18 @@ private trait SubagentEventListener:

override protected def onCouplingFailed(api: HttpSubagentApi, problem: Problem) =
stopObserving.flatMap(_.tryRead).map(_.isDefined)
.flatMap(stopped =>
.flatMap: stopped =>
if stopped then
IO.pure(false)
else
onSubagentDecoupled(Some(problem)) *>
IO {
IO:
if lastProblem contains problem then
logger.debug(s"⚠️ Coupling failed again: $problem")
else
lastProblem = Some(problem)
logger.warn(s"Coupling failed: $problem")
true
})

override protected val onDecoupled =
logger.traceIO(
Expand Down

0 comments on commit 09f7419

Please sign in to comment.