Skip to content

Commit

Permalink
JS-2149 Sleep instruction
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Dec 18, 2024
1 parent b55cef0 commit 25f9dec
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import js7.subagent.director.SubagentKeeper
import org.apache.pekko.actor.{ActorRef, DeadLetterSuppression, Props, Status}
import org.apache.pekko.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.language.unsafeNulls
import scala.util.{Failure, Success}

/**
* @author Joacim Zschimmer
Expand Down Expand Up @@ -105,7 +105,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]:
private def processingKilled: Receive =
receiveEvent orElse receiveCommand orElse receiveTerminate

private def delayedAfterError: Receive =
private def delayed: Receive =
startable orElse receiveCommand orElse receiveTerminate

private def startable: Receive =
Expand Down Expand Up @@ -216,8 +216,9 @@ extends KeyedJournalingActor[AgentState, OrderEvent]:
case _: Order.Processing => become("processing")(wrap(processing))
case _: Order.Processed => become("processed")(wrap(processed))
case _: Order.ProcessingKilled => become("processingKilled")(wrap(processingKilled))
case _: Order.DelayingRetry => become("delayingRetry")(wrap(delayedAfterError))
case _: Order.DelayedAfterError => become("delayedAfterError")(wrap(delayedAfterError))
case _: Order.DelayingRetry => become("delayingRetry")(wrap(delayed))
case _: Order.DelayedAfterError => become("delayed")(wrap(delayed))
case _: Order.Sleeping => become("sleeping")(wrap(delayed))
case _: Order.Forked => become("forked")(wrap(standard))
case _: Order.BetweenCycles => become("forked")(wrap(standard))
case _: Order.Failed => become("failed")(wrap(standard))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package js7.base.utils

import js7.base.test.OurTestSuite

final class BigDecimalTest extends OurTestSuite:

"toLong does not convert numerically (but bitwise)" in:
assert((BigDecimal(Long.MaxValue) + 1).toLong == Long.MinValue)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class InstructionExecutorService(val clock: WallClock):
StopExecutor(this),
BreakOrderExecutor(this),
CycleExecutor(this),
SleepExecutor(this),
BreakExecutor(this)
).toKeyedMap(_.instructionClass: Class[? <: Instruction]))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package js7.data.execution.workflow.instructions

import js7.base.problem.Checked
import js7.base.time.ScalaTime.*
import js7.base.utils.ScalaUtils.syntax.RichBoolean
import js7.data.event.KeyedEvent
import js7.data.execution.workflow.instructions.SleepExecutor.*
import js7.data.order.OrderEvent.{OrderMoved, OrderSleeping, OrderStarted}
import js7.data.order.OrderObstacle.WaitingForOtherTime
import js7.data.order.{Order, OrderObstacleCalculator}
import js7.data.state.StateView
import js7.data.value.NumberValue
import js7.data.workflow.instructions.Sleep
import scala.concurrent.duration.*

private[instructions] final class SleepExecutor(protected val service: InstructionExecutorService)
extends EventInstructionExecutor:

type Instr = Sleep
val instructionClass = classOf[Sleep]

def toEvents(instr: Sleep, order: Order[Order.State], state: StateView)
: Checked[List[KeyedEvent[OrderStarted | OrderSleeping | OrderMoved]]] =
start(order).getOrElse:
order.ifState[Order.Ready].map: order =>
for
scope <- state.toImpureOrderExecutingScope(order, clock.now())
value <- instr.duration.eval(scope).map(_.missingTo(NumberValue.Zero))
number <- value.toNumberValue
duration = bigDecimalSecondsToDuration(number.number)
yield
if duration.isPositive then
(order.id <-: OrderSleeping(clock.now() + duration)) :: Nil
else
(order.id <-: OrderMoved(order.position.increment)) :: Nil
.orElse:
order.ifState[Order.Sleeping].map: order =>
Right:
order.state.until <= clock.now() thenList:
order.id <-: OrderMoved(order.position.increment)
.getOrElse:
Right(Nil)

override def toObstacles(order: Order[Order.State], calculator: OrderObstacleCalculator) =
order.state match
case Order.Sleeping(until) =>
Right(Set(WaitingForOtherTime(until)))

case _ =>
super.toObstacles(order, calculator)


object SleepExecutor:

private[instructions] def bigDecimalSecondsToDuration(number: BigDecimal): FiniteDuration =
val nanos = number * 1_000_000_000
if nanos > Long.MaxValue then
1.h * 24 * 365 * 100
else if nanos < 0 then
ZeroDuration
else
nanos.toLong.ns
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package js7.data.execution.workflow.instructions

import js7.base.test.OurTestSuite
import js7.base.time.ScalaTime.*
import js7.data.execution.workflow.instructions.SleepExecutor.bigDecimalSecondsToDuration
import scala.concurrent.duration.FiniteDuration

final class SleepExecutorTest extends OurTestSuite:

"bigDecimalSecondsToDuration" in:
assert(bigDecimalSecondsToDuration(BigDecimal(Long.MaxValue) + 1) == 1.h * 24 * 365 * 100)
assert(bigDecimalSecondsToDuration(-1) == 0.s)
assert(bigDecimalSecondsToDuration(1.234567890) == 1234567890.ns)
28 changes: 24 additions & 4 deletions js7-data/shared/src/main/scala/js7/data/order/Order.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import js7.data.command.{CancellationMode, SuspensionMode}
import js7.data.event.EventDrivenState.EventNotApplicableProblem
import js7.data.job.JobKey
import js7.data.order.Order.*
import js7.data.order.OrderEvent.*
import js7.data.order.OrderEvent.{OrderMoved, *}
import js7.data.orderwatch.{ExternalOrderKey, ExternalOrderName, OrderWatchPath}
import js7.data.plan.PlanId
import js7.data.subagent.{SubagentBundleId, SubagentId}
Expand Down Expand Up @@ -221,7 +221,7 @@ extends

case OrderAwoke =>
check(
(isState[DelayingRetry] || isState[DelayedAfterError])
(isState[Sleeping] || isState[DelayingRetry] || isState[DelayedAfterError])
&& !isSuspendedOrStopped
&& isDetachedOrAttached,
copy(state = Ready))
Expand All @@ -239,8 +239,9 @@ extends
historicOutcomes = historicOutcomes :+ HistoricOutcome(position, outcome)))

case OrderMoved(to, _) =>
check((isState[IsFreshOrReady] || isState[Processed] || isState[BetweenCycles])
&& isDetachedOrAttached,
check(
(isState[IsFreshOrReady] || isState[Processed] || isState[BetweenCycles] || isState[Sleeping])
&& isDetachedOrAttached,
withPosition(to).copy(
isResumed = false,
state = if isState[Fresh] then state else Ready))
Expand Down Expand Up @@ -576,6 +577,11 @@ extends
.copy(
state = BetweenCycles(cycleState))

case OrderSleeping(until) =>
check(isState[Ready] && isDetachedOrAttached,
copy(
state = Sleeping(until)))

case OrderTransferred(workflowPosition) =>
if isDetached then
Right(copy(workflowPosition = workflowPosition))
Expand Down Expand Up @@ -769,6 +775,7 @@ extends
isState[BetweenCycles]
|| isState[DelayingRetry]
|| isState[DelayedAfterError]
|| isState[Sleeping]
|| (isState[Fresh] && maybeDelayedUntil.isDefined)

private def isMarkable =
Expand Down Expand Up @@ -1077,6 +1084,7 @@ object Order:
Subtype(Cancelled),
Subtype(Deleted),
Subtype(deriveCodec[Prompting]),
Subtype(deriveCodec[Sleeping]),
Subtype(deriveCodec[Broken]))

sealed trait IsDetachable extends State:
Expand Down Expand Up @@ -1262,6 +1270,18 @@ object Order:
Right:
OrderGoes :: OrderCycleStarted() :: Nil


final case class Sleeping(until: Timestamp)
extends IsStarted, IsDetachable, IsGoCommandable, IsResettable, IsTransferable:
type Self = Sleeping

override private[Order] def maybeDelayedUntil = Some(until)

def go(order: Order[Sleeping]): Right[Problem, List[OrderGoes | OrderAwoke | OrderMoved]] =
Right:
List(OrderGoes, OrderAwoke, OrderMoved(order.position.increment))


type Failed = Failed.type
case object Failed extends IsStarted, IsFailed, IsTransferable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,10 @@ object OrderEvent extends Event.CompanionForKey[OrderId, OrderEvent]:
extends OrderCycleEvent


final case class OrderSleeping(until: Timestamp)
extends OrderActorEvent


final case class OrderTransferred(workflowPosition: WorkflowPosition)
extends OrderActorEvent

Expand Down Expand Up @@ -849,5 +853,6 @@ object OrderEvent extends Event.CompanionForKey[OrderId, OrderEvent]:
Subtype(deriveCodec[OrderCyclingPrepared]),
Subtype(deriveCodecWithDefaults[OrderCycleStarted]),
Subtype(deriveCodec[OrderCycleFinished]),
Subtype(deriveCodec[OrderSleeping]),
Subtype(deriveCodec[OrderTransferred]),
Subtype(deriveCodec[OrderPlanAttached]))
12 changes: 9 additions & 3 deletions js7-data/shared/src/main/scala/js7/data/value/Value.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,25 @@ sealed trait Value:
final def maybe: Option[Value] =
(this != MissingValue) ? this

def as[V <: Value](using V: Value.Companion[V]): Checked[V] =
final def as[V <: Value](using V: Value.Companion[V]): Checked[V] =
if valueType is V then
Right(this.asInstanceOf[V])
else
Left(UnexpectedValueTypeProblem(V, this))

def asMissingOr[V <: Value](using V: Value.Companion[V]): Checked[V | MissingValue] =
final def missingTo(value: Value): Value =
if this == MissingValue then
value
else
this

final def asMissingOr[V <: Value](using V: Value.Companion[V]): Checked[V | MissingValue] =
this match
case MissingValue => Right(MissingValue)
case _ => as[V]

/** Similar to as[V], returns MissingValue as None. */
def asMaybe[V <: Value](using V: Value.Companion[V]): Checked[Option[V]] =
final def asMaybe[V <: Value](using V: Value.Companion[V]): Checked[Option[V]] =
this match
case MissingValue => Right(None)
case _ => as[V].map(Some(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object Instructions:
Subtype[AddOrder],
Subtype[Options],
Subtype[Stop],
Subtype[Sleep],
Subtype[BreakOrder],
Subtype[EmptyInstruction],
Subtype[Gap])
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package js7.data.workflow.instructions

import io.circe.Codec
import io.circe.generic.semiauto.deriveCodec
import js7.data.source.SourcePos
import js7.data.value.expression.Expression
import js7.data.workflow.Instruction

final case class Sleep(duration: Expression, sourcePos: Option[SourcePos] = None)
extends Instruction.NoInstructionBlock:

def withoutSourcePos: Sleep =
copy(sourcePos = None)

override def toString: String =
s"sleep $duration$sourcePosToString"


object Sleep:
given Codec.AsObject[Sleep] = deriveCodec[Sleep]
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,14 @@ final class OrderEventTest extends OurTestSuite:
"TYPE": "OrderPromptAnswered"
}""")

"OrderSleeping" in:
testJson[OrderEvent](OrderSleeping(ts"2024-12-18T12:00:00Z"),
json"""
{
"TYPE": "OrderSleeping",
"until": 1734523200000
}""")

"OrderTransferred" in:
testJson[OrderEvent](OrderTransferred(WorkflowPath("WORKFLOW") ~ "v2" /: Position(7)),
json"""
Expand Down
13 changes: 11 additions & 2 deletions js7-data/shared/src/test/scala/js7/data/order/OrderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import js7.data.board.{BoardPath, Notice, NoticeId, NoticeV2_3}
import js7.data.command.{CancellationMode, SuspensionMode}
import js7.data.job.{InternalExecutable, JobKey}
import js7.data.lock.LockPath
import js7.data.order.Order.{Attached, AttachedState, Attaching, BetweenCycles, Broken, Cancelled, DelayedAfterError, DelayingRetry, Deleted, Detaching, ExpectingNotice, ExpectingNotices, ExternalOrderLink, Failed, FailedInFork, FailedWhileFresh, Finished, Forked, Fresh, InapplicableOrderEventProblem, IsFreshOrReady, Processed, Processing, ProcessingKilled, Prompting, Ready, State, Stopped, StoppedWhileFresh, WaitingForLock}
import js7.data.order.OrderEvent.{LegacyOrderLockEvent, LockDemand, OrderAdded, OrderAttachable, OrderAttached, OrderAttachedToAgent, OrderAwoke, OrderBroken, OrderCancellationMarked, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCatched, OrderCaught, OrderCoreEvent, OrderCycleFinished, OrderCycleStarted, OrderCyclingPrepared, OrderDeleted, OrderDeletionMarked, OrderDetachable, OrderDetached, OrderExternalVanished, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderGoMarked, OrderGoes, OrderJoined, OrderLocksAcquired, OrderLocksQueued, OrderLocksReleased, OrderMoved, OrderNoticeAnnounced, OrderNoticeExpected, OrderNoticePosted, OrderNoticePostedV2_3, OrderNoticesConsumed, OrderNoticesConsumptionStarted, OrderNoticesExpected, OrderNoticesRead, OrderOrderAdded, OrderOutcomeAdded, OrderPlanAttached, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStateReset, OrderStickySubagentEntered, OrderStickySubagentLeaved, OrderStopped, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTransferred}
import js7.data.order.Order.{Attached, AttachedState, Attaching, BetweenCycles, Broken, Cancelled, DelayedAfterError, DelayingRetry, Deleted, Detaching, ExpectingNotice, ExpectingNotices, ExternalOrderLink, Failed, FailedInFork, FailedWhileFresh, Finished, Forked, Fresh, InapplicableOrderEventProblem, IsFreshOrReady, Processed, Processing, ProcessingKilled, Prompting, Ready, Sleeping, State, Stopped, StoppedWhileFresh, WaitingForLock}
import js7.data.order.OrderEvent.{LegacyOrderLockEvent, LockDemand, OrderAdded, OrderAttachable, OrderAttached, OrderAttachedToAgent, OrderAwoke, OrderBroken, OrderCancellationMarked, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCatched, OrderCaught, OrderCoreEvent, OrderCycleFinished, OrderCycleStarted, OrderCyclingPrepared, OrderDeleted, OrderDeletionMarked, OrderDetachable, OrderDetached, OrderExternalVanished, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderGoMarked, OrderGoes, OrderJoined, OrderLocksAcquired, OrderLocksQueued, OrderLocksReleased, OrderMoved, OrderNoticeAnnounced, OrderNoticeExpected, OrderNoticePosted, OrderNoticePostedV2_3, OrderNoticesConsumed, OrderNoticesConsumptionStarted, OrderNoticesExpected, OrderNoticesRead, OrderOrderAdded, OrderOutcomeAdded, OrderPlanAttached, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderSleeping, OrderStarted, OrderStateReset, OrderStickySubagentEntered, OrderStickySubagentLeaved, OrderStopped, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTransferred}
import js7.data.orderwatch.{ExternalOrderName, OrderWatchPath}
import js7.data.plan.PlanTemplateId
import js7.data.subagent.{SubagentBundleId, SubagentId}
Expand Down Expand Up @@ -376,6 +376,13 @@ final class OrderTest extends OurTestSuite:
}
}""")

"Sleeping" in:
testJson[State](Sleeping(ts"2024-12-18T12:00:00Z"),
json"""{
"TYPE": "Sleeping",
"until": 1734523200000
}""")

"Cancelled" in:
testJson[State](Cancelled,
json"""{
Expand Down Expand Up @@ -501,6 +508,7 @@ final class OrderTest extends OurTestSuite:
OrderStickySubagentLeaved,
OrderStickySubagentEntered(agentPath),

OrderSleeping(ts"2024-12-18T00:00:00Z"),
OrderTransferred(workflowId /: Position(0)),

OrderBroken(),
Expand Down Expand Up @@ -593,6 +601,7 @@ final class OrderTest extends OurTestSuite:
case (_: OrderOrderAdded , _ , _ , IsDetached ) => _.isInstanceOf[Ready]
case (_: OrderStickySubagentEntered, IsSuspended(false), _ , IsDetached | IsAttached) => _.isInstanceOf[Ready]
case (_: OrderOutcomeAdded , _ , _ , _ ) => _.isInstanceOf[Ready]
case (_: OrderSleeping , _ , _ , IsDetached | IsAttached) => _.isInstanceOf[Sleeping]
case (_: OrderTransferred , _ , _ , IsDetached ) => _.isInstanceOf[Ready]
case (_: OrderBroken , _ , _ , _ ) => _.isInstanceOf[Broken])

Expand Down
Loading

0 comments on commit 25f9dec

Please sign in to comment.