Skip to content

Commit

Permalink
JS-2189 FIX Cycle executes twice after Controller restart
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Dec 16, 2024
1 parent 7ecdb08 commit dbc1943
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import js7.base.time.JavaTime.JavaTimeZone
import js7.base.utils.ScalaUtils.syntax.*
import js7.data.calendar.{Calendar, CalendarExecutor}
import js7.data.event.KeyedEvent
import js7.data.execution.workflow.instructions.ScheduleCalculator.Do
import js7.data.order.Order.{BetweenCycles, Ready}
import js7.data.order.OrderEvent.{OrderActorEvent, OrderCycleFinished, OrderCycleStarted, OrderCyclingPrepared, OrderMoved}
import js7.data.order.OrderObstacle.WaitingForOtherTime
Expand All @@ -20,13 +21,13 @@ extends EventInstructionExecutor:
type Instr = Cycle
val instructionClass = classOf[Cycle]

def toEvents(cycle: Cycle, order: Order[Order.State], state: StateView) =
def toEvents(instr: Cycle, order: Order[Order.State], state: StateView) =
val now = clock.now()
start(order)
.orElse(order.ifState[Ready].map: order =>
for
workflow <- state.keyToItem(Workflow).checked(order.workflowId)
pair <- toCalendarAndScheduleCalculator(workflow, cycle, state)
pair <- toCalendarAndScheduleCalculator(workflow, instr, state)
(calendar, calculator) = pair
calendarExecutor <- CalendarExecutor.checked(calendar, workflow.timeZone)
timeInterval <- calendarExecutor.orderIdToTimeInterval(order.id)
Expand All @@ -39,35 +40,38 @@ extends EventInstructionExecutor:
periodIndex = -1,
index = 0),
now)
nextCycleStateToEvent(cycleState, order))
cycleState match
case Some(cycleState) =>
(order.id <-: OrderCyclingPrepared(cycleState)) :: Nil
case None =>
endCycling(order))
.orElse(order.ifState[BetweenCycles].map: order =>
order.state.cycleState match
case None =>
Right:
(order.id <-: OrderMoved(order.position.increment)) :: Nil
Right(endCycling(order))

case Some(cycleState) =>
toScheduleCalculator(order, cycle, state)
.flatMap(_.maybeRecalcCycleState(now, cycleState))
toScheduleCalculator(order, instr, state)
.flatMap:
_.onNextCycleIsDue(cycleState, now)
.map:
case None =>
// cycleState is still valid
(cycleState.next <= now).thenList(
order.id <-: OrderCycleStarted())
case Do.KeepWaiting => Nil

case Some(maybeRecalculatedCycleState) =>
nextCycleStateToEvent(maybeRecalculatedCycleState, order))
case Do.StartCycle(skipped) =>
(order.id <-: OrderCycleStarted(skipped)) :: Nil

case Do.ChangeCycleState(cycleState) =>
(order.id <-: OrderCyclingPrepared(cycleState)) :: Nil

case Do.EndCycling =>
endCycling(order))
.getOrElse:
Right(Nil)

private def nextCycleStateToEvent(cycleState: Option[CycleState], order: Order[Order.State])
: List[KeyedEvent[OrderActorEvent]] =
val event = cycleState match
case Some(cycleState) => OrderCyclingPrepared(cycleState)
case None => OrderMoved(order.position.increment)
(order.id <-: event) :: Nil
private def endCycling(order: Order[Ready | BetweenCycles]): List[KeyedEvent[OrderMoved]] =
(order.id <-: OrderMoved(order.position.increment)) :: Nil

override def onReturnFromSubworkflow(instr: Instr, order: Order[Order.State], state: StateView)
override def onReturnFromSubworkflow(instr: Cycle, order: Order[Order.State], state: StateView)
: Checked[List[KeyedEvent[OrderActorEvent]]] =
val checkedKeyedEvent = for
calculator <- toScheduleCalculator(order, instr, state)
Expand Down Expand Up @@ -105,7 +109,7 @@ extends EventInstructionExecutor:
order: Order[Order.State],
calculator: OrderObstacleCalculator) =
order.state match
case Order.BetweenCycles(Some(cycleState: CycleState)) if clock.now() < cycleState.next =>
case BetweenCycles(Some(cycleState: CycleState)) if clock.now() < cycleState.next =>
Right(Set(WaitingForOtherTime(cycleState.next)))

case _ => super.toObstacles(order, calculator)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import java.time.{LocalDateTime, ZoneId}
import js7.base.problem.Checked
import js7.base.time.AdmissionTimeSchemeForJavaTime.*
import js7.base.time.JavaTimestamp.specific.*
import js7.base.time.ScalaTime.*
import js7.base.time.{JavaTimestamp, Timestamp}
import js7.base.utils.ScalaUtils.syntax.*
import js7.data.execution.workflow.instructions.ScheduleCalculator.*
import js7.data.order.CycleState
import js7.data.workflow.instructions.Schedule
import js7.data.workflow.instructions.Schedule.{Continuous, Periodic, Ticking}
Expand All @@ -31,18 +33,31 @@ extends ScheduleSimulator:
index = if periodChanges then 1 else cycleState.index + 1,
next = next)

/**
* If it is to late for next in cycleState, then calculate a new CycleState.
* @return Right(None) iff `cycleState` is still valid
* Right(Some(None)) iff `Cycle` has been finished
*/
def maybeRecalcCycleState(now: Timestamp, cycleState: CycleState)
: Checked[Option[Option[CycleState]]] =
/** Call this just before the next scheduled cycle should start.
* @return What to do
*/
def onNextCycleIsDue(cycleState: CycleState, now: Timestamp): Checked[Do] =
for scheme <- schedule.schemes.checked(cycleState.schemeIndex) yield
!scheme.admissionTimeScheme.isPermitted(now.max(cycleState.next), zone, dateOffset) ?
nextCycleState(cycleState, now)
val skipped = scheme.repeat match
case Ticking(tickDuration) =>
// When at start of a cycle ticks have been missed, we must adjust cycleState.next.
val skippedTicks = (now - cycleState.next).toMillis / tickDuration.toMillis
(skippedTicks max 0) * tickDuration
case _ =>
ZeroDuration
val next = cycleState.next + skipped

if now < next then
Do.KeepWaiting
else if scheme.admissionTimeScheme.isPermitted(now, zone, dateOffset) then
Do.StartCycle(skipped.isPositive ? skipped)
else
nextCycleState(cycleState, now) match
case None => Do.EndCycling
case Some(cs) => Do.ChangeCycleState(cs)

/** Returns schemeIndex and Timestamp. */
/** @return next (schemeIndex, periodIndex, next: Timestamp, tickingSkipped: FiniteDuration).
*/
private def nextCycle(now: Timestamp, cycleState: CycleState): Option[(Int, Int, Timestamp)] =
schedule.schemes.view.zipWithIndex
.flatMap: (scheme, schemeIndex) =>
Expand Down Expand Up @@ -125,3 +140,10 @@ object ScheduleCalculator:
onlyOnePeriod: Boolean = false)
: Checked[ScheduleCalculator] =
Right(new ScheduleCalculator(schedule, zone, dateOffset, onlyOnePeriod))


enum Do:
case KeepWaiting
case StartCycle(skipped: Option[FiniteDuration] = None)
case ChangeCycleState(cycleState: CycleState)
case EndCycling
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import js7.base.time.JavaTimestamp.local
import js7.base.time.ScalaTime.*
import js7.base.time.TimestampForTests.ts
import js7.base.time.{AdmissionTimeScheme, DailyPeriod, TimeInterval, Timestamp}
import js7.data.execution.workflow.instructions.ScheduleCalculator.Do
import js7.data.order.CycleState
import js7.data.workflow.instructions.Schedule
import js7.data.workflow.instructions.Schedule.{Periodic, Scheme, Ticking}
Expand Down Expand Up @@ -121,6 +122,10 @@ final class ScheduleCalculatorTest extends OurTestSuite, ScheduleTester:
"now < first" in:
val cs = calculator.nextCycleState(initialCycleState, ts"2021-10-01T11:00:00Z").get
assert(cs == cycleState0.copy(next = ts"2021-10-01T12:00:00Z", index = 1))
assert(calculator.onNextCycleIsDue(cs, now = ts"2021-10-01T11:59:59Z") ==
Right(Do.KeepWaiting))
assert(calculator.onNextCycleIsDue(cs, now = ts"2021-10-01T12:00:00Z") ==
Right(Do.StartCycle()))

"now == first" in:
val cs = calculator.nextCycleState(initialCycleState, ts"2021-10-01T12:00:00Z").get
Expand Down Expand Up @@ -183,6 +188,36 @@ final class ScheduleCalculatorTest extends OurTestSuite, ScheduleTester:
val cs = calculator.nextCycleState(last, ts"2021-10-01T13:00:00Z").get
assert(cs == cycleState0.copy(next = ts"2021-10-01T15:07:00Z", periodIndex = 0, index = 1))
}

"onNextCycleIsDue" - {
val cs = cycleState0.copy(next = ts"2021-10-01T12:15:00Z", index = 1)

"Too early OrderCycleStarted" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T12:10:00Z") ==
Right(Do.KeepWaiting))

"Just in time OrderCycleStarted" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T12:15:00Z") ==
Right(Do.StartCycle()))

"Late OrderCycleStarted" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T12:29:59Z") ==
Right(Do.StartCycle(None)))

"Late OrderCycleStarted, missing one tick" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T12:30:00Z") ==
Right(Do.StartCycle(skipped = Some(15.minute))))

"Late OrderCycleStarted, missing five ticks" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T12:45:00Z") ==
Right(Do.StartCycle(skipped = Some(2 * 15.minutes))))

"End of Ticking period" in :
assert(calculator.onNextCycleIsDue(cs, ts"2021-10-01T13:00:00Z") ==
Right(Do.ChangeCycleState(cs.copy(
periodIndex = 0,
next = ts"2021-10-01T15:07:00Z"))))
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions js7-data/shared/src/main/scala/js7/data/order/Order.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import js7.base.circeutils.CirceUtils.{deriveCodecWithDefaults, deriveConfigured
import js7.base.circeutils.typed.{Subtype, TypedJsonCodec}
import js7.base.problem.Checked.{CheckedOption, Ops}
import js7.base.problem.{Checked, Problem}
import js7.base.time.ScalaTime.ZeroDuration
import js7.base.time.Timestamp
import js7.base.utils.Assertions.assertThat
import js7.base.utils.ScalaUtils.*
Expand Down Expand Up @@ -554,13 +555,13 @@ extends
copy(
state = BetweenCycles(Some(cycleState))))

case OrderCycleStarted() =>
case OrderCycleStarted(maybeSkipped) =>
state match
case BetweenCycles(Some(cycleState)) =>
val branchId = BranchId.cycle(
cycleState.copy(
next = cycleState.next))
check((isDetachedOrAttached) & !isSuspendedOrStopped,
next = cycleState.next + maybeSkipped.getOrElse(ZeroDuration)))
check(isDetachedOrAttached & !isSuspendedOrStopped,
withPosition(position / branchId % 0)
.copy(
state = Ready))
Expand Down
14 changes: 11 additions & 3 deletions js7-data/shared/src/main/scala/js7/data/order/OrderEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package js7.data.order

import cats.syntax.flatMap.*
import cats.syntax.traverse.*
import io.circe
import io.circe.generic.semiauto.{deriveCodec, deriveDecoder, deriveEncoder}
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Decoder, Encoder, JsonObject}
import js7.base.circeutils.CirceUtils
import js7.base.circeutils.CirceUtils.{RichCirceObjectCodec, deriveCodecWithDefaults, deriveConfiguredCodec, deriveRenamingCodec, deriveRenamingDecoder}
import js7.base.circeutils.ScalaJsonCodecs.{FiniteDurationJsonDecoder, FiniteDurationJsonEncoder}
import js7.base.circeutils.typed.{Subtype, TypedJsonCodec}
import js7.base.io.process.{Stderr, Stdout, StdoutOrStderr}
import js7.base.problem.{Checked, Problem}
import js7.base.time.ScalaTime.*
import js7.base.time.Timestamp
import js7.base.utils.Big
import js7.base.utils.Collections.implicits.RichIterable
Expand All @@ -35,6 +36,7 @@ import js7.data.workflow.position.{BranchPath, Position, PositionOrLabel, Workfl
import js7.data.workflow.{WorkflowId, WorkflowPath}
import org.jetbrains.annotations.TestOnly
import scala.annotation.nowarn
import scala.concurrent.duration.FiniteDuration
import scala.language.implicitConversions

/**
Expand Down Expand Up @@ -756,8 +758,14 @@ object OrderEvent extends Event.CompanionForKey[OrderId, OrderEvent]:
extends OrderCycleEvent


final case class OrderCycleStarted()
extends OrderCycleEvent
/**
* @param skipped Only when Ticking ticks have been skipped.
* Must be a multiple of the tick interval,
* will be added to `next`.
*/
final case class OrderCycleStarted(skipped: Option[FiniteDuration] = None)
extends OrderCycleEvent:
override def toString = s"OrderCycleStarted${skipped.fold("")(o => s"(skipped=${o.pretty})")}"


final case class OrderCycleFinished(cycleState: Option[CycleState])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ final class OrderEventTest extends OurTestSuite:
"TYPE": "OrderCycleStarted"
}""")

testJson[OrderEvent](OrderCycleStarted(Some(10.s)), json"""
{
"TYPE": "OrderCycleStarted",
"skipped": 10
}""")

"OrderCycleFinished" in:
testJson[OrderEvent](OrderCycleFinished(Some(CycleState.empty)), json"""
{
Expand Down
4 changes: 2 additions & 2 deletions js7-tests/src/test/scala/js7/tests/CycleTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,10 @@ with ControllerAgentForScalaTest with ScheduleTester:
t("05:00:00") -> OrderCycleStarted(),
t("05:00:00") -> OrderCycleFinished(Some(cycleState(7, next = t("06:00:00")))),
// delay 2h
t("07:00:00") -> OrderCycleStarted(),
t("07:00:00") -> OrderCycleStarted(skipped = Some(1.h)),
t("07:00:00") -> OrderCycleFinished(Some(cycleState(8, next = t("08:00:00")))),
// delay 3h
t("10:00:00") -> OrderCycleStarted(),
t("10:00:00") -> OrderCycleStarted(skipped = Some(2.h)),
t("10:00:00") -> OrderCycleFinished(Some(cycleState(9, next = t("11:00:00"))))))

"Break" - {
Expand Down

0 comments on commit dbc1943

Please sign in to comment.