From 25f9decdfa12a3aba601d52fcc093813eb169fdc Mon Sep 17 00:00:00 2001 From: Joacim Zschimmer Date: Wed, 18 Dec 2024 09:02:30 +0100 Subject: [PATCH] JS-2149 Sleep instruction --- .../agent/scheduler/order/OrderActor.scala | 9 +- .../scala/js7/base/utils/BigDecimalTest.scala | 8 + .../InstructionExecutorService.scala | 1 + .../workflow/instructions/SleepExecutor.scala | 62 +++++ .../instructions/SleepExecutorTest.scala | 13 ++ .../src/main/scala/js7/data/order/Order.scala | 28 ++- .../scala/js7/data/order/OrderEvent.scala | 5 + .../src/main/scala/js7/data/value/Value.scala | 12 +- .../workflow/instructions/Instructions.scala | 1 + .../data/workflow/instructions/Sleep.scala | 20 ++ .../scala/js7/data/order/OrderEventTest.scala | 8 + .../test/scala/js7/data/order/OrderTest.scala | 13 +- .../src/test/scala/js7/tests/SleepTest.scala | 216 ++++++++++++++++++ 13 files changed, 383 insertions(+), 13 deletions(-) create mode 100644 js7-base/shared/src/test/scala/js7/base/utils/BigDecimalTest.scala create mode 100644 js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/SleepExecutor.scala create mode 100644 js7-data/jvm/src/test/scala/js7/data/execution/workflow/instructions/SleepExecutorTest.scala create mode 100644 js7-data/shared/src/main/scala/js7/data/workflow/instructions/Sleep.scala create mode 100644 js7-tests/src/test/scala/js7/tests/SleepTest.scala diff --git a/js7-agent/src/main/scala/js7/agent/scheduler/order/OrderActor.scala b/js7-agent/src/main/scala/js7/agent/scheduler/order/OrderActor.scala index 3800b8c981..d367f13fe8 100644 --- a/js7-agent/src/main/scala/js7/agent/scheduler/order/OrderActor.scala +++ b/js7-agent/src/main/scala/js7/agent/scheduler/order/OrderActor.scala @@ -24,8 +24,8 @@ import js7.subagent.director.SubagentKeeper import org.apache.pekko.actor.{ActorRef, DeadLetterSuppression, Props, Status} import org.apache.pekko.pattern.pipe import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} import scala.language.unsafeNulls +import scala.util.{Failure, Success} /** * @author Joacim Zschimmer @@ -105,7 +105,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]: private def processingKilled: Receive = receiveEvent orElse receiveCommand orElse receiveTerminate - private def delayedAfterError: Receive = + private def delayed: Receive = startable orElse receiveCommand orElse receiveTerminate private def startable: Receive = @@ -216,8 +216,9 @@ extends KeyedJournalingActor[AgentState, OrderEvent]: case _: Order.Processing => become("processing")(wrap(processing)) case _: Order.Processed => become("processed")(wrap(processed)) case _: Order.ProcessingKilled => become("processingKilled")(wrap(processingKilled)) - case _: Order.DelayingRetry => become("delayingRetry")(wrap(delayedAfterError)) - case _: Order.DelayedAfterError => become("delayedAfterError")(wrap(delayedAfterError)) + case _: Order.DelayingRetry => become("delayingRetry")(wrap(delayed)) + case _: Order.DelayedAfterError => become("delayed")(wrap(delayed)) + case _: Order.Sleeping => become("sleeping")(wrap(delayed)) case _: Order.Forked => become("forked")(wrap(standard)) case _: Order.BetweenCycles => become("forked")(wrap(standard)) case _: Order.Failed => become("failed")(wrap(standard)) diff --git a/js7-base/shared/src/test/scala/js7/base/utils/BigDecimalTest.scala b/js7-base/shared/src/test/scala/js7/base/utils/BigDecimalTest.scala new file mode 100644 index 0000000000..3ba81a6e3b --- /dev/null +++ b/js7-base/shared/src/test/scala/js7/base/utils/BigDecimalTest.scala @@ -0,0 +1,8 @@ +package js7.base.utils + +import js7.base.test.OurTestSuite + +final class BigDecimalTest extends OurTestSuite: + + "toLong does not convert numerically (but bitwise)" in: + assert((BigDecimal(Long.MaxValue) + 1).toLong == Long.MinValue) diff --git a/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/InstructionExecutorService.scala b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/InstructionExecutorService.scala index 7f3b0fbb2f..93fb45f07f 100644 --- a/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/InstructionExecutorService.scala +++ b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/InstructionExecutorService.scala @@ -40,6 +40,7 @@ final class InstructionExecutorService(val clock: WallClock): StopExecutor(this), BreakOrderExecutor(this), CycleExecutor(this), + SleepExecutor(this), BreakExecutor(this) ).toKeyedMap(_.instructionClass: Class[? <: Instruction])) diff --git a/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/SleepExecutor.scala b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/SleepExecutor.scala new file mode 100644 index 0000000000..9367d5b147 --- /dev/null +++ b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/instructions/SleepExecutor.scala @@ -0,0 +1,62 @@ +package js7.data.execution.workflow.instructions + +import js7.base.problem.Checked +import js7.base.time.ScalaTime.* +import js7.base.utils.ScalaUtils.syntax.RichBoolean +import js7.data.event.KeyedEvent +import js7.data.execution.workflow.instructions.SleepExecutor.* +import js7.data.order.OrderEvent.{OrderMoved, OrderSleeping, OrderStarted} +import js7.data.order.OrderObstacle.WaitingForOtherTime +import js7.data.order.{Order, OrderObstacleCalculator} +import js7.data.state.StateView +import js7.data.value.NumberValue +import js7.data.workflow.instructions.Sleep +import scala.concurrent.duration.* + +private[instructions] final class SleepExecutor(protected val service: InstructionExecutorService) +extends EventInstructionExecutor: + + type Instr = Sleep + val instructionClass = classOf[Sleep] + + def toEvents(instr: Sleep, order: Order[Order.State], state: StateView) + : Checked[List[KeyedEvent[OrderStarted | OrderSleeping | OrderMoved]]] = + start(order).getOrElse: + order.ifState[Order.Ready].map: order => + for + scope <- state.toImpureOrderExecutingScope(order, clock.now()) + value <- instr.duration.eval(scope).map(_.missingTo(NumberValue.Zero)) + number <- value.toNumberValue + duration = bigDecimalSecondsToDuration(number.number) + yield + if duration.isPositive then + (order.id <-: OrderSleeping(clock.now() + duration)) :: Nil + else + (order.id <-: OrderMoved(order.position.increment)) :: Nil + .orElse: + order.ifState[Order.Sleeping].map: order => + Right: + order.state.until <= clock.now() thenList: + order.id <-: OrderMoved(order.position.increment) + .getOrElse: + Right(Nil) + + override def toObstacles(order: Order[Order.State], calculator: OrderObstacleCalculator) = + order.state match + case Order.Sleeping(until) => + Right(Set(WaitingForOtherTime(until))) + + case _ => + super.toObstacles(order, calculator) + + +object SleepExecutor: + + private[instructions] def bigDecimalSecondsToDuration(number: BigDecimal): FiniteDuration = + val nanos = number * 1_000_000_000 + if nanos > Long.MaxValue then + 1.h * 24 * 365 * 100 + else if nanos < 0 then + ZeroDuration + else + nanos.toLong.ns diff --git a/js7-data/jvm/src/test/scala/js7/data/execution/workflow/instructions/SleepExecutorTest.scala b/js7-data/jvm/src/test/scala/js7/data/execution/workflow/instructions/SleepExecutorTest.scala new file mode 100644 index 0000000000..9a17bd1d76 --- /dev/null +++ b/js7-data/jvm/src/test/scala/js7/data/execution/workflow/instructions/SleepExecutorTest.scala @@ -0,0 +1,13 @@ +package js7.data.execution.workflow.instructions + +import js7.base.test.OurTestSuite +import js7.base.time.ScalaTime.* +import js7.data.execution.workflow.instructions.SleepExecutor.bigDecimalSecondsToDuration +import scala.concurrent.duration.FiniteDuration + +final class SleepExecutorTest extends OurTestSuite: + + "bigDecimalSecondsToDuration" in: + assert(bigDecimalSecondsToDuration(BigDecimal(Long.MaxValue) + 1) == 1.h * 24 * 365 * 100) + assert(bigDecimalSecondsToDuration(-1) == 0.s) + assert(bigDecimalSecondsToDuration(1.234567890) == 1234567890.ns) diff --git a/js7-data/shared/src/main/scala/js7/data/order/Order.scala b/js7-data/shared/src/main/scala/js7/data/order/Order.scala index 83e6c2b786..70aca54af0 100644 --- a/js7-data/shared/src/main/scala/js7/data/order/Order.scala +++ b/js7-data/shared/src/main/scala/js7/data/order/Order.scala @@ -22,7 +22,7 @@ import js7.data.command.{CancellationMode, SuspensionMode} import js7.data.event.EventDrivenState.EventNotApplicableProblem import js7.data.job.JobKey import js7.data.order.Order.* -import js7.data.order.OrderEvent.* +import js7.data.order.OrderEvent.{OrderMoved, *} import js7.data.orderwatch.{ExternalOrderKey, ExternalOrderName, OrderWatchPath} import js7.data.plan.PlanId import js7.data.subagent.{SubagentBundleId, SubagentId} @@ -221,7 +221,7 @@ extends case OrderAwoke => check( - (isState[DelayingRetry] || isState[DelayedAfterError]) + (isState[Sleeping] || isState[DelayingRetry] || isState[DelayedAfterError]) && !isSuspendedOrStopped && isDetachedOrAttached, copy(state = Ready)) @@ -239,8 +239,9 @@ extends historicOutcomes = historicOutcomes :+ HistoricOutcome(position, outcome))) case OrderMoved(to, _) => - check((isState[IsFreshOrReady] || isState[Processed] || isState[BetweenCycles]) - && isDetachedOrAttached, + check( + (isState[IsFreshOrReady] || isState[Processed] || isState[BetweenCycles] || isState[Sleeping]) + && isDetachedOrAttached, withPosition(to).copy( isResumed = false, state = if isState[Fresh] then state else Ready)) @@ -576,6 +577,11 @@ extends .copy( state = BetweenCycles(cycleState)) + case OrderSleeping(until) => + check(isState[Ready] && isDetachedOrAttached, + copy( + state = Sleeping(until))) + case OrderTransferred(workflowPosition) => if isDetached then Right(copy(workflowPosition = workflowPosition)) @@ -769,6 +775,7 @@ extends isState[BetweenCycles] || isState[DelayingRetry] || isState[DelayedAfterError] + || isState[Sleeping] || (isState[Fresh] && maybeDelayedUntil.isDefined) private def isMarkable = @@ -1077,6 +1084,7 @@ object Order: Subtype(Cancelled), Subtype(Deleted), Subtype(deriveCodec[Prompting]), + Subtype(deriveCodec[Sleeping]), Subtype(deriveCodec[Broken])) sealed trait IsDetachable extends State: @@ -1262,6 +1270,18 @@ object Order: Right: OrderGoes :: OrderCycleStarted() :: Nil + + final case class Sleeping(until: Timestamp) + extends IsStarted, IsDetachable, IsGoCommandable, IsResettable, IsTransferable: + type Self = Sleeping + + override private[Order] def maybeDelayedUntil = Some(until) + + def go(order: Order[Sleeping]): Right[Problem, List[OrderGoes | OrderAwoke | OrderMoved]] = + Right: + List(OrderGoes, OrderAwoke, OrderMoved(order.position.increment)) + + type Failed = Failed.type case object Failed extends IsStarted, IsFailed, IsTransferable diff --git a/js7-data/shared/src/main/scala/js7/data/order/OrderEvent.scala b/js7-data/shared/src/main/scala/js7/data/order/OrderEvent.scala index 65f16d1a6f..29e5c6d53b 100644 --- a/js7-data/shared/src/main/scala/js7/data/order/OrderEvent.scala +++ b/js7-data/shared/src/main/scala/js7/data/order/OrderEvent.scala @@ -772,6 +772,10 @@ object OrderEvent extends Event.CompanionForKey[OrderId, OrderEvent]: extends OrderCycleEvent + final case class OrderSleeping(until: Timestamp) + extends OrderActorEvent + + final case class OrderTransferred(workflowPosition: WorkflowPosition) extends OrderActorEvent @@ -849,5 +853,6 @@ object OrderEvent extends Event.CompanionForKey[OrderId, OrderEvent]: Subtype(deriveCodec[OrderCyclingPrepared]), Subtype(deriveCodecWithDefaults[OrderCycleStarted]), Subtype(deriveCodec[OrderCycleFinished]), + Subtype(deriveCodec[OrderSleeping]), Subtype(deriveCodec[OrderTransferred]), Subtype(deriveCodec[OrderPlanAttached])) diff --git a/js7-data/shared/src/main/scala/js7/data/value/Value.scala b/js7-data/shared/src/main/scala/js7/data/value/Value.scala index 7e18a729e1..57f5bf3399 100644 --- a/js7-data/shared/src/main/scala/js7/data/value/Value.scala +++ b/js7-data/shared/src/main/scala/js7/data/value/Value.scala @@ -47,19 +47,25 @@ sealed trait Value: final def maybe: Option[Value] = (this != MissingValue) ? this - def as[V <: Value](using V: Value.Companion[V]): Checked[V] = + final def as[V <: Value](using V: Value.Companion[V]): Checked[V] = if valueType is V then Right(this.asInstanceOf[V]) else Left(UnexpectedValueTypeProblem(V, this)) - def asMissingOr[V <: Value](using V: Value.Companion[V]): Checked[V | MissingValue] = + final def missingTo(value: Value): Value = + if this == MissingValue then + value + else + this + + final def asMissingOr[V <: Value](using V: Value.Companion[V]): Checked[V | MissingValue] = this match case MissingValue => Right(MissingValue) case _ => as[V] /** Similar to as[V], returns MissingValue as None. */ - def asMaybe[V <: Value](using V: Value.Companion[V]): Checked[Option[V]] = + final def asMaybe[V <: Value](using V: Value.Companion[V]): Checked[Option[V]] = this match case MissingValue => Right(None) case _ => as[V].map(Some(_)) diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Instructions.scala b/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Instructions.scala index 99cc8232f0..0ba1691db0 100644 --- a/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Instructions.scala +++ b/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Instructions.scala @@ -31,6 +31,7 @@ object Instructions: Subtype[AddOrder], Subtype[Options], Subtype[Stop], + Subtype[Sleep], Subtype[BreakOrder], Subtype[EmptyInstruction], Subtype[Gap]) diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Sleep.scala b/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Sleep.scala new file mode 100644 index 0000000000..c8ed203ffa --- /dev/null +++ b/js7-data/shared/src/main/scala/js7/data/workflow/instructions/Sleep.scala @@ -0,0 +1,20 @@ +package js7.data.workflow.instructions + +import io.circe.Codec +import io.circe.generic.semiauto.deriveCodec +import js7.data.source.SourcePos +import js7.data.value.expression.Expression +import js7.data.workflow.Instruction + +final case class Sleep(duration: Expression, sourcePos: Option[SourcePos] = None) +extends Instruction.NoInstructionBlock: + + def withoutSourcePos: Sleep = + copy(sourcePos = None) + + override def toString: String = + s"sleep $duration$sourcePosToString" + + +object Sleep: + given Codec.AsObject[Sleep] = deriveCodec[Sleep] diff --git a/js7-data/shared/src/test/scala/js7/data/order/OrderEventTest.scala b/js7-data/shared/src/test/scala/js7/data/order/OrderEventTest.scala index 92a30894c0..74fa5ff53f 100644 --- a/js7-data/shared/src/test/scala/js7/data/order/OrderEventTest.scala +++ b/js7-data/shared/src/test/scala/js7/data/order/OrderEventTest.scala @@ -1012,6 +1012,14 @@ final class OrderEventTest extends OurTestSuite: "TYPE": "OrderPromptAnswered" }""") + "OrderSleeping" in: + testJson[OrderEvent](OrderSleeping(ts"2024-12-18T12:00:00Z"), + json""" + { + "TYPE": "OrderSleeping", + "until": 1734523200000 + }""") + "OrderTransferred" in: testJson[OrderEvent](OrderTransferred(WorkflowPath("WORKFLOW") ~ "v2" /: Position(7)), json""" diff --git a/js7-data/shared/src/test/scala/js7/data/order/OrderTest.scala b/js7-data/shared/src/test/scala/js7/data/order/OrderTest.scala index 087eb8c4c4..1c289b9374 100644 --- a/js7-data/shared/src/test/scala/js7/data/order/OrderTest.scala +++ b/js7-data/shared/src/test/scala/js7/data/order/OrderTest.scala @@ -16,8 +16,8 @@ import js7.data.board.{BoardPath, Notice, NoticeId, NoticeV2_3} import js7.data.command.{CancellationMode, SuspensionMode} import js7.data.job.{InternalExecutable, JobKey} import js7.data.lock.LockPath -import js7.data.order.Order.{Attached, AttachedState, Attaching, BetweenCycles, Broken, Cancelled, DelayedAfterError, DelayingRetry, Deleted, Detaching, ExpectingNotice, ExpectingNotices, ExternalOrderLink, Failed, FailedInFork, FailedWhileFresh, Finished, Forked, Fresh, InapplicableOrderEventProblem, IsFreshOrReady, Processed, Processing, ProcessingKilled, Prompting, Ready, State, Stopped, StoppedWhileFresh, WaitingForLock} -import js7.data.order.OrderEvent.{LegacyOrderLockEvent, LockDemand, OrderAdded, OrderAttachable, OrderAttached, OrderAttachedToAgent, OrderAwoke, OrderBroken, OrderCancellationMarked, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCatched, OrderCaught, OrderCoreEvent, OrderCycleFinished, OrderCycleStarted, OrderCyclingPrepared, OrderDeleted, OrderDeletionMarked, OrderDetachable, OrderDetached, OrderExternalVanished, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderGoMarked, OrderGoes, OrderJoined, OrderLocksAcquired, OrderLocksQueued, OrderLocksReleased, OrderMoved, OrderNoticeAnnounced, OrderNoticeExpected, OrderNoticePosted, OrderNoticePostedV2_3, OrderNoticesConsumed, OrderNoticesConsumptionStarted, OrderNoticesExpected, OrderNoticesRead, OrderOrderAdded, OrderOutcomeAdded, OrderPlanAttached, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStateReset, OrderStickySubagentEntered, OrderStickySubagentLeaved, OrderStopped, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTransferred} +import js7.data.order.Order.{Attached, AttachedState, Attaching, BetweenCycles, Broken, Cancelled, DelayedAfterError, DelayingRetry, Deleted, Detaching, ExpectingNotice, ExpectingNotices, ExternalOrderLink, Failed, FailedInFork, FailedWhileFresh, Finished, Forked, Fresh, InapplicableOrderEventProblem, IsFreshOrReady, Processed, Processing, ProcessingKilled, Prompting, Ready, Sleeping, State, Stopped, StoppedWhileFresh, WaitingForLock} +import js7.data.order.OrderEvent.{LegacyOrderLockEvent, LockDemand, OrderAdded, OrderAttachable, OrderAttached, OrderAttachedToAgent, OrderAwoke, OrderBroken, OrderCancellationMarked, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCatched, OrderCaught, OrderCoreEvent, OrderCycleFinished, OrderCycleStarted, OrderCyclingPrepared, OrderDeleted, OrderDeletionMarked, OrderDetachable, OrderDetached, OrderExternalVanished, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderGoMarked, OrderGoes, OrderJoined, OrderLocksAcquired, OrderLocksQueued, OrderLocksReleased, OrderMoved, OrderNoticeAnnounced, OrderNoticeExpected, OrderNoticePosted, OrderNoticePostedV2_3, OrderNoticesConsumed, OrderNoticesConsumptionStarted, OrderNoticesExpected, OrderNoticesRead, OrderOrderAdded, OrderOutcomeAdded, OrderPlanAttached, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderSleeping, OrderStarted, OrderStateReset, OrderStickySubagentEntered, OrderStickySubagentLeaved, OrderStopped, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTransferred} import js7.data.orderwatch.{ExternalOrderName, OrderWatchPath} import js7.data.plan.PlanTemplateId import js7.data.subagent.{SubagentBundleId, SubagentId} @@ -376,6 +376,13 @@ final class OrderTest extends OurTestSuite: } }""") + "Sleeping" in: + testJson[State](Sleeping(ts"2024-12-18T12:00:00Z"), + json"""{ + "TYPE": "Sleeping", + "until": 1734523200000 + }""") + "Cancelled" in: testJson[State](Cancelled, json"""{ @@ -501,6 +508,7 @@ final class OrderTest extends OurTestSuite: OrderStickySubagentLeaved, OrderStickySubagentEntered(agentPath), + OrderSleeping(ts"2024-12-18T00:00:00Z"), OrderTransferred(workflowId /: Position(0)), OrderBroken(), @@ -593,6 +601,7 @@ final class OrderTest extends OurTestSuite: case (_: OrderOrderAdded , _ , _ , IsDetached ) => _.isInstanceOf[Ready] case (_: OrderStickySubagentEntered, IsSuspended(false), _ , IsDetached | IsAttached) => _.isInstanceOf[Ready] case (_: OrderOutcomeAdded , _ , _ , _ ) => _.isInstanceOf[Ready] + case (_: OrderSleeping , _ , _ , IsDetached | IsAttached) => _.isInstanceOf[Sleeping] case (_: OrderTransferred , _ , _ , IsDetached ) => _.isInstanceOf[Ready] case (_: OrderBroken , _ , _ , _ ) => _.isInstanceOf[Broken]) diff --git a/js7-tests/src/test/scala/js7/tests/SleepTest.scala b/js7-tests/src/test/scala/js7/tests/SleepTest.scala new file mode 100644 index 0000000000..53af073550 --- /dev/null +++ b/js7-tests/src/test/scala/js7/tests/SleepTest.scala @@ -0,0 +1,216 @@ +package js7.tests + +import cats.effect.unsafe.IORuntime +import js7.agent.RunningAgent +import js7.base.configutils.Configs.HoconStringInterpolator +import js7.base.test.OurTestSuite +import js7.base.thread.CatsBlocking.syntax.await +import js7.base.time.ScalaTime.* +import js7.base.time.TimestampForTests.ts +import js7.base.time.{TestAlarmClock, Timestamp} +import js7.base.utils.ScalaUtils.syntax.RichEither +import js7.controller.RunningController +import js7.data.agent.AgentPath +import js7.data.command.SuspensionMode +import js7.data.controller.ControllerCommand.{GoOrder, ResumeOrder, SuspendOrders} +import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderAwoke, OrderDeleted, OrderDetachable, OrderDetached, OrderFinished, OrderGoMarked, OrderGoes, OrderMoved, OrderProcessed, OrderProcessingStarted, OrderResumed, OrderSleeping, OrderStarted, OrderStateReset, OrderSuspended, OrderSuspensionMarked} +import js7.data.order.{FreshOrder, Order, OrderEvent, OrderId, OrderOutcome} +import js7.data.value.expression.ExpressionParser.expr +import js7.data.workflow.instructions.Sleep +import js7.data.workflow.position.Position +import js7.data.workflow.{Workflow, WorkflowPath} +import js7.tests.SleepTest.* +import js7.tests.jobs.EmptyJob +import js7.tests.testenv.ControllerAgentForScalaTest +import js7.tests.testenv.DirectoryProvider.toLocalSubagentId + +final class SleepTest extends OurTestSuite, ControllerAgentForScalaTest: + + override protected val controllerConfig = config""" + js7.auth.users.TEST-USER.permissions = [ UpdateItem ] + js7.journal.remove-obsolete-files = false + js7.controller.agent-driver.command-batch-delay = 0ms + js7.controller.agent-driver.event-buffer-delay = 0ms""" + + override protected def agentConfig = config""" + js7.job.execution.signed-script-injection-allowed = on""" + + private lazy val clock = TestAlarmClock(ts"2024-12-18T12:00:00Z") + + override protected def controllerTestWiring = RunningController.TestWiring( + alarmClock = Some(clock)) + + override protected def agentTestWiring = RunningAgent.TestWiring( + alarmClock = Some(clock)) + + protected def agentPaths = Seq(agentPath) + protected def items = Nil + + "Sleep at start, at Controller" in: + val eventId = eventWatch.resetLastWatchedEventId() + val workflow = Workflow.of: + Sleep(expr("3")) + withItem(workflow): workflow => + val orderId = OrderId("ORDER-JOBLESS") + controller.api.addOrder: + FreshOrder(orderId, workflow.path, deleteWhenTerminated = true) + .await(99.s).orThrow + + assert(controllerState.idToOrder(orderId).isState[Order.Sleeping]) + clock.tick(2.s) + assert(controllerState.idToOrder(orderId).isState[Order.Sleeping]) + clock.tick(1.s) + eventWatch.awaitNextKey[OrderFinished](orderId) + + assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq( + OrderAdded(workflow.id, deleteWhenTerminated = true), + OrderStarted, + OrderSleeping(until = ts"2024-12-18T12:00:03Z"), + OrderMoved(Position(1)), + OrderFinished(), + OrderDeleted)) + + "Sleep zero seconds" in: + clock.resetTo(ts"2024-12-18T12:00:00Z") + val eventId = eventWatch.resetLastWatchedEventId() + val workflow = Workflow.of( + Sleep(expr("0")), + Sleep(expr("-1")), + Sleep(expr("missing"))) + + withItem(workflow): workflow => + val orderId = OrderId("ORDER-ZERO") + controller.api.addOrder: + FreshOrder(orderId, workflow.path, deleteWhenTerminated = true) + .await(99.s).orThrow + + eventWatch.awaitNextKey[OrderFinished](orderId) + + assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq( + OrderAdded(workflow.id, deleteWhenTerminated = true), + OrderStarted, + OrderMoved(Position(1)), + OrderMoved(Position(2)), + OrderMoved(Position(3)), + OrderFinished(), + OrderDeleted)) + + "Sleep after start, at Agent" in: + clock.resetTo(ts"2024-12-18T13:00:00Z") + val eventId = eventWatch.resetLastWatchedEventId() + val workflow = Workflow.of( + EmptyJob.execute(agentPath), + Sleep(expr("3"))) + withItem(workflow): workflow => + val orderId = OrderId("ORDER-WITH-JOB") + controller.api.addOrder: + FreshOrder(orderId, workflow.path, deleteWhenTerminated = true) + .await(99.s).orThrow + + clock.tick() + eventWatch.awaitNextKey[OrderSleeping](orderId) + clock.tick(2.s) + assert(controllerState.idToOrder(orderId).isState[Order.Sleeping]) + clock.tick(1.s) + eventWatch.awaitNextKey[OrderFinished](orderId) + + assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq( + OrderAdded(workflow.id, deleteWhenTerminated = true), + OrderAttachable(agentPath), + OrderAttached(agentPath), + OrderStarted, + OrderProcessingStarted(subagentId), + OrderProcessed(OrderOutcome.succeeded), + OrderMoved(Position(1)), + OrderSleeping(until = ts"2024-12-18T13:00:03Z"), + OrderMoved(Position(2)), + OrderDetachable, + OrderDetached, + OrderFinished(), + OrderDeleted)) + + "GoOrder" in: + clock.resetTo(ts"2024-12-18T14:00:00Z") + val eventId = eventWatch.resetLastWatchedEventId() + val workflow = Workflow.of( + EmptyJob.execute(agentPath), + Sleep(expr("3"))) + withItem(workflow): workflow => + val orderId = OrderId("ORDER-GO") + controller.api.addOrder: + FreshOrder(orderId, workflow.path, deleteWhenTerminated = true) + .await(99.s).orThrow + + clock.tick() + eventWatch.awaitNextKey[OrderSleeping](orderId) + clock.tick(1.s) + execCmd: + GoOrder(orderId, Position(1)) + eventWatch.awaitNextKey[OrderFinished](orderId) + + assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq( + OrderAdded(workflow.id, deleteWhenTerminated = true), + OrderAttachable(agentPath), + OrderAttached(agentPath), + OrderStarted, + OrderProcessingStarted(subagentId), + OrderProcessed(OrderOutcome.succeeded), + OrderMoved(Position(1)), + OrderSleeping(until = ts"2024-12-18T14:00:03Z"), + OrderGoMarked(Position(1)), + OrderGoes, + OrderAwoke, + OrderMoved(Position(2)), + OrderDetachable, + OrderDetached, + OrderFinished(), + OrderDeleted)) + + "SuspendOrders with reset" in: + clock.resetTo(ts"2024-12-18T15:00:00Z") + val eventId = eventWatch.resetLastWatchedEventId() + val workflow = Workflow.of( + EmptyJob.execute(agentPath), + Sleep(expr("3"))) + withItem(workflow): workflow => + val orderId = OrderId("ORDER-SUSPEND") + controller.api.addOrder: + FreshOrder(orderId, workflow.path, deleteWhenTerminated = true) + .await(99.s).orThrow + + clock.tick() + eventWatch.awaitNextKey[OrderSleeping](orderId) + clock.tick(1.s) + execCmd: + SuspendOrders(orderId :: Nil, SuspensionMode(resetState = true)) + eventWatch.awaitNextKey[OrderSuspended](orderId) + + execCmd: + ResumeOrder(orderId) + clock.tick(3.s) + eventWatch.awaitNextKey[OrderFinished](orderId) + + assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq( + OrderAdded(workflow.id, deleteWhenTerminated = true), + OrderAttachable(agentPath), + OrderAttached(agentPath), + OrderStarted, + OrderProcessingStarted(subagentId), + OrderProcessed(OrderOutcome.succeeded), + OrderMoved(Position(1)), + OrderSleeping(until = ts"2024-12-18T15:00:03Z"), + OrderSuspensionMarked(SuspensionMode(resetState = true)), + OrderDetachable, + OrderDetached, + OrderStateReset, + OrderSuspended, + OrderResumed(), + OrderSleeping(until = ts"2024-12-18T15:00:04Z"), + OrderMoved(Position(2)), + OrderFinished(), + OrderDeleted)) + + +object SleepTest: + private val agentPath = AgentPath("AGENT") + private val subagentId = toLocalSubagentId(agentPath)