Skip to content

Commit

Permalink
JS-1944 ControlWorkflow suspend=true
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Jun 8, 2022
1 parent cb2b0e1 commit d37a186
Show file tree
Hide file tree
Showing 32 changed files with 844 additions and 77 deletions.
35 changes: 28 additions & 7 deletions js7-agent-data/src/main/scala/js7/agent/data/AgentState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.circe.generic.semiauto.deriveCodec
import js7.agent.data.AgentState.{AgentMetaState, allowedItemStates}
import js7.agent.data.event.AgentEvent
import js7.agent.data.event.AgentEvent.AgentDedicated
import js7.agent.data.orderwatch.{FileWatchStateHandler, FileWatchState}
import js7.agent.data.orderwatch.{FileWatchState, FileWatchStateHandler}
import js7.base.circeutils.typed.{Subtype, TypedJsonCodec}
import js7.base.crypt.Signed
import js7.base.problem.{Checked, Problem}
Expand All @@ -26,7 +26,7 @@ import js7.data.orderwatch.{FileWatch, OrderWatchEvent, OrderWatchPath}
import js7.data.state.EventDrivenStateView
import js7.data.subagent.SubagentItemStateEvent.SubagentShutdown
import js7.data.subagent.{SubagentDirectorState, SubagentId, SubagentItem, SubagentItemState, SubagentItemStateEvent, SubagentSelection, SubagentSelectionId, SubagentSelectionState}
import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath}
import js7.data.workflow.{Workflow, WorkflowControlEvent, WorkflowControlState, WorkflowControlStateHandler, WorkflowId, WorkflowPath}
import monix.reactive.Observable
import scala.collection.MapView

Expand All @@ -40,12 +40,14 @@ final case class AgentState(
pathToItemState_ : Map[UnsignedSimpleItemPath, UnsignedSimpleItemState],
idToOrder: Map[OrderId, Order[Order.State]],
idToWorkflow: Map[WorkflowId, Workflow/*reduced for this Agent!!!*/],
pathToWorkflowControlState_ : Map[WorkflowPath, WorkflowControlState],
pathToJobResource: Map[JobResourcePath, JobResource],
keyToSignedItem : Map[SignableItemKey, Signed[SignableItem]])
extends SignedItemContainer
with EventDrivenStateView[AgentState, Event]
with SubagentDirectorState[AgentState]
with FileWatchStateHandler[AgentState]
with WorkflowControlStateHandler[AgentState]
with SnapshotableState[AgentState]
{
def isAgent = true
Expand All @@ -72,6 +74,7 @@ with SnapshotableState[AgentState]
idToWorkflow.size +
idToOrder.size +
pathToItemState_.size +
pathToWorkflowControlState_.size +
fw.estimatedExtraSnapshotSize +
pathToJobResource.size
//keyToSignedItem.size + // == idToWorkflow.size + pathToJobResource.size
Expand All @@ -84,6 +87,7 @@ with SnapshotableState[AgentState]
Observable.fromIterable(pathTo(FileWatchState).values).flatMap(_.toSnapshotObservable),
Observable.fromIterable(keyToSignedItem.values.view.map(SignedItemAdded(_))),
Observable.fromIterable(idToWorkflow.view.filterKeys(isWithoutSignature).values),
Observable.fromIterable(pathToWorkflowControlState_.values),
Observable.fromIterable(pathToJobResource.view.filterKeys(isWithoutSignature).values),
Observable.fromIterable(pathTo(CalendarState).values).flatMap(_.toSnapshotObservable),
Observable.fromIterable(idToOrder.values)
Expand Down Expand Up @@ -167,10 +171,17 @@ with SnapshotableState[AgentState]
case ItemDetached(itemKey, meta.agentPath) =>
itemKey match {
case WorkflowId.as(workflowId) =>
for (_ <- idToWorkflow.checked(workflowId)) yield
for (_ <- idToWorkflow.checked(workflowId)) yield {
val updatedIdToWorkflow = idToWorkflow - workflowId
copy(
keyToSignedItem = keyToSignedItem - workflowId,
idToWorkflow = idToWorkflow - workflowId)
idToWorkflow = updatedIdToWorkflow,
pathToWorkflowControlState_ =
if (idToWorkflow.keys.exists/*Slow???*/(_.path == workflowId.path))
pathToWorkflowControlState_ - workflowId.path
else
pathToWorkflowControlState_)
}

case path: OrderWatchPath =>
fw.detach(path)
Expand Down Expand Up @@ -228,6 +239,9 @@ with SnapshotableState[AgentState]
controllerId = controllerId,
subagentId = subagentId)))

case KeyedEvent(workflowPath: WorkflowPath, event: WorkflowControlEvent) =>
applyWorkflowControlEvent(workflowPath, event)

case _ => applyStandardEvent(keyedEvent)
}

Expand All @@ -242,6 +256,11 @@ with SnapshotableState[AgentState]
remove: Iterable[OrderWatchPath]
) = update(addItemStates = fileWatchStates, removeItemStates = remove)

def pathToWorkflowControlState = pathToWorkflowControlState_.view

protected def updateWorkflowControlState(s: WorkflowControlState) =
copy(pathToWorkflowControlState_ = pathToWorkflowControlState_.updated(s.workflowPath, s))

protected def update(
orders: Iterable[Order[Order.State]],
removeOrders: Iterable[OrderId],
Expand Down Expand Up @@ -305,7 +324,7 @@ with ItemContainer.Companion[AgentState]

val empty = AgentState(EventId.BeforeFirst, SnapshotableState.Standards.empty,
AgentMetaState.empty,
Map.empty, Map.empty, Map.empty, Map.empty, Map.empty)
Map.empty, Map.empty, Map.empty, Map.empty, Map.empty, Map.empty)

private val allowedItemStates: Set[InventoryItemState.AnyCompanion] =
Set(AgentRefState, SubagentItemState, FileWatchState)
Expand Down Expand Up @@ -344,7 +363,8 @@ with ItemContainer.Companion[AgentState]
Subtype(SignedItemAdded.jsonCodec(this)), // For Repo and SignedItemAdded
Subtype(signableSimpleItemJsonCodec),
Subtype(unsignedSimpleItemJsonCodec),
Subtype[BasicItemEvent])
Subtype[BasicItemEvent],
Subtype[WorkflowControlState])

implicit val keyedEventJsonCodec: KeyedEventTypedJsonCodec[Event] = {
KeyedEventTypedJsonCodec[Event](
Expand All @@ -353,6 +373,7 @@ with ItemContainer.Companion[AgentState]
KeyedSubtype[OrderEvent],
KeyedSubtype[AgentEvent],
KeyedSubtype[InventoryItemEvent],
KeyedSubtype[OrderWatchEvent])
KeyedSubtype[OrderWatchEvent],
KeyedSubtype[WorkflowControlEvent])
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package js7.agent.data

import js7.agent.data.AgentState.AgentMetaState
import js7.agent.data.orderwatch.{FileWatchStateHandler, FileWatchState}
import js7.agent.data.orderwatch.{FileWatchState, FileWatchStateHandler}
import js7.base.crypt.Signed
import js7.base.problem.Checked._
import js7.base.utils.Collections.implicits._
Expand All @@ -13,7 +13,7 @@ import js7.data.item.{SignableItem, SignableItemKey, SignedItemEvent, UnsignedSi
import js7.data.job.{JobResource, JobResourcePath}
import js7.data.order.{Order, OrderId}
import js7.data.subagent.{SubagentItemState, SubagentSelection, SubagentSelectionState}
import js7.data.workflow.{Workflow, WorkflowId}
import js7.data.workflow.{Workflow, WorkflowControlState, WorkflowId, WorkflowPath}
import scala.collection.mutable

final class AgentStateBuilder
Expand All @@ -31,6 +31,7 @@ extends SnapshotableStateBuilder[AgentState]
private val fileWatchStateBuilder = new FileWatchStateHandler.Builder
private val pathToJobResource = mutable.Map.empty[JobResourcePath, JobResource]
private val keyToSignedItem = mutable.Map.empty[SignableItemKey, Signed[SignableItem]]
private val pathToWorkflowControlState = mutable.Map.empty[WorkflowPath, WorkflowControlState]
private var _state = AgentState.empty

protected def onInitializeState(state: AgentState) =
Expand Down Expand Up @@ -68,6 +69,9 @@ extends SnapshotableStateBuilder[AgentState]

case o: AgentMetaState =>
agentMetaState = o

case o: WorkflowControlState =>
pathToWorkflowControlState(o.workflowPath) = o
}

private def onSignedItemAdded(added: SignedItemEvent.SignedItemAdded): Unit = {
Expand All @@ -90,6 +94,7 @@ extends SnapshotableStateBuilder[AgentState]
(pathToItemState.view ++ fileWatchStateBuilder.result).toMap,
idToOrder.toMap,
idToWorkflow.toMap,
pathToWorkflowControlState.toMap,
pathToJobResource.toMap,
keyToSignedItem.toMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import js7.data.agent.{AgentPath, AgentRunId}
import js7.data.command.CommonCommand
import js7.data.controller.ControllerId
import js7.data.event.{EventId, ItemContainer}
import js7.data.item.{InventoryItemKey, SignableItem, UnsignedSimpleItem}
import js7.data.item.{InventoryItemKey, ItemRevision, SignableItem, UnsignedSimpleItem}
import js7.data.order.{Order, OrderId, OrderMark}
import js7.data.subagent.SubagentId
import js7.data.workflow.WorkflowPath

/**
* @author Joacim Zschimmer
Expand Down Expand Up @@ -204,6 +205,14 @@ object AgentCommand extends CommonCommand.Companion
type Response = Response.Accepted
}

final case class ControlWorkflow(
workflowPath: WorkflowPath,
suspend: Boolean,
revision: ItemRevision)
extends AgentCommand {
type Response = Response.Accepted
}

final case class ResetSubagent(subagentId: SubagentId, force: Boolean)
extends AgentCommand {
type Response = Response.Accepted
Expand All @@ -226,6 +235,7 @@ object AgentCommand extends CommonCommand.Companion
Subtype(deriveCodec[DetachItem]),
Subtype(deriveCodec[AttachOrder]),
Subtype(deriveCodec[DetachOrder]),
Subtype(deriveCodec[ControlWorkflow]),
Subtype(TakeSnapshot),
Subtype(deriveCodec[ResetSubagent]))
}
Expand Down
22 changes: 19 additions & 3 deletions js7-agent-data/src/test/scala/js7/agent/data/AgentStateTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import js7.data.orderwatch.{ExternalOrderName, FileWatch, OrderWatchPath}
import js7.data.subagent.{SubagentId, SubagentItem, SubagentSelection, SubagentSelectionId}
import js7.data.value.expression.Expression
import js7.data.value.expression.ExpressionParser.expr
import js7.data.workflow.WorkflowControlEvent.WorkflowControlUpdated
import js7.data.workflow.position._
import js7.data.workflow.{Workflow, WorkflowPath}
import js7.data.workflow.{Workflow, WorkflowControl, WorkflowControlState, WorkflowPath}
import js7.tester.CirceJsonTester.removeJNull
import monix.execution.Scheduler.Implicits.traced
import monix.reactive.Observable
Expand Down Expand Up @@ -63,6 +64,7 @@ final class AgentStateTest extends AsyncFreeSpec
private val itemSigner = new ItemSigner(SillySigner.Default, AgentState.signableItemJsonCodec)
private val signedWorkflow = itemSigner.sign(workflow)
private val signedJobResource = itemSigner.sign(JobResource(JobResourcePath("JOBRESOURCE")))
private val workflowControl = WorkflowControl(workflow.path, true, ItemRevision(1))

private val calendar = Calendar(
CalendarPath("CALENDAR"),
Expand Down Expand Up @@ -96,6 +98,8 @@ final class AgentStateTest extends AsyncFreeSpec
Map.empty,
Map.empty,
Map.empty,
Map(
workflow.path -> WorkflowControlState(workflowControl)),
Map.empty,
Map.empty
).applyEvents(Seq(
Expand Down Expand Up @@ -151,8 +155,8 @@ final class AgentStateTest extends AsyncFreeSpec
}
}

"estimatedExtraSnapshotSize" in {
assert(agentState.estimatedSnapshotSize == 13)
"estimatedSnapshotSize" in {
assert(agentState.estimatedSnapshotSize == 14)
for (n <- agentState.toSnapshotObservable.countL.runToFuture)
yield assert(n == agentState.estimatedSnapshotSize)
}
Expand Down Expand Up @@ -250,6 +254,15 @@ final class AgentStateTest extends AsyncFreeSpec
"versionId": "1.0",
"instructions": []
}""",
json"""{
"TYPE": "WorkflowControlState",
"workflowControl": {
"path": "WORKFLOW",
"suspended": true,
"revision": 1
},
"attachedToAgents": []
}""",
json"""{
"TYPE": "JobResource",
"path": "UNSIGNED-v2.2-JOB-RESOURCE",
Expand Down Expand Up @@ -333,6 +346,7 @@ final class AgentStateTest extends AsyncFreeSpec
agentState = agentState.applyEvent(NoKey <-: ItemAttachedToMe(workflow)).orThrow
agentState = agentState.applyEvent(NoKey <-: ItemAttachedToMe(unsignedJobResource)).orThrow
agentState = agentState.applyEvent(NoKey <-: SignedItemAttachedToMe(signedWorkflow)).orThrow
agentState = agentState.applyEvent(workflow.path <-: WorkflowControlUpdated(true, ItemRevision(1))).orThrow
agentState = agentState.applyEvent(NoKey <-: SignedItemAttachedToMe(signedJobResource)).orThrow
agentState = agentState.applyEvent(orderId <-:
OrderAttachedToAgent(
Expand All @@ -354,6 +368,8 @@ final class AgentStateTest extends AsyncFreeSpec
attachedState = Some(Order.Attached(agentPath)), parent = Some(orderId))),
Map(
workflow.id -> workflow),
Map(
workflow.path -> WorkflowControlState(workflowControl)),
Map(
unsignedJobResource.path -> unsignedJobResource,
signedJobResource.value.path -> signedJobResource.value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ final class AgentCommandTest extends AnyFreeSpec
}
}

"ControlWorkflow" in {
check(
AgentCommand.ControlWorkflow(
WorkflowPath("WORKFLOW"), suspend = true, ItemRevision(1)),
json"""{
"TYPE": "ControlWorkflow",
"workflowPath": "WORKFLOW",
"suspend": true,
"revision": 1
}""")
}

"ResetSubagent" in {
check(
AgentCommand.ResetSubagent(SubagentId("SUBAGENT"), force = false),
Expand Down
5 changes: 3 additions & 2 deletions js7-agent/src/main/scala/js7/agent/command/CommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cats.instances.future._
import cats.syntax.traverse._
import js7.agent.command.CommandActor._
import js7.agent.data.commands.AgentCommand
import js7.agent.data.commands.AgentCommand.{AttachItem, AttachSignedItem, Batch, CoupleController, DedicateAgentDirector, DetachItem, EmergencyStop, NoOperation, OrderCommand, Reset, ResetSubagent, Response, ShutDown, TakeSnapshot}
import js7.agent.data.commands.AgentCommand.{AttachItem, AttachSignedItem, Batch, ControlWorkflow, CoupleController, DedicateAgentDirector, DetachItem, EmergencyStop, NoOperation, OrderCommand, Reset, ResetSubagent, Response, ShutDown, TakeSnapshot}
import js7.agent.scheduler.AgentHandle
import js7.base.circeutils.JavaJsonCodecs.instant.StringInstantJsonCodec
import js7.base.log.{CorrelId, CorrelIdWrapped, Logger}
Expand Down Expand Up @@ -109,7 +109,8 @@ extends Actor {

case command @ (_: OrderCommand | _: DedicateAgentDirector | _: CoupleController | _: Reset |
_: TakeSnapshot.type | _: ShutDown |
_: AttachItem | _: AttachSignedItem | _: DetachItem | _: ResetSubagent) =>
_: AttachItem | _: AttachSignedItem | _: DetachItem | _: ResetSubagent |
_: ControlWorkflow) =>
// FIXME Delay CoupleController until all AttachOrder (extends OrderCommand) (and DetachOrder?) have been finished, to return a properly updated state
agentHandle.executeCommand(command, meta.user.id, response)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ private[agent] final class AgentActor private(
_: AgentCommand.AttachItem |
_: AgentCommand.AttachSignedItem |
_: AgentCommand.DetachItem |
_: AgentCommand.ResetSubagent) =>
_: AgentCommand.ResetSubagent |
_: AgentCommand.ControlWorkflow) =>
// TODO Check AgentRunId ?
started.toOption match {
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import js7.data.orderwatch.{FileWatch, OrderWatchPath}
import js7.data.state.OrderEventHandler.FollowUp
import js7.data.state.{OrderEventHandler, StateView}
import js7.data.subagent.{SubagentId, SubagentItem, SubagentSelection, SubagentSelectionId}
import js7.data.workflow.WorkflowControlEvent.WorkflowControlUpdated
import js7.data.workflow.instructions.Execute
import js7.data.workflow.instructions.executable.WorkflowJob
import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath}
Expand Down Expand Up @@ -424,6 +425,23 @@ with Stash
.rightAs(AgentCommand.Response.Accepted)
.runToFuture

case AgentCommand.ControlWorkflow(workflowPath, suspend, revision) =>
if (!persistence.currentState.idToWorkflow.keys.exists(_.path == workflowPath))
Future.successful(Left(Problem(s"Unknown $workflowPath")))
else
persistKeyedEvent(workflowPath <-: WorkflowControlUpdated(suspend, revision)) {
(stampedEvent, journaledState) =>
if (!suspend) {
// Event it Workflow was already suspended.
// This allows the used to force continuation of Orders (just in case)
for (order <- persistence.currentState.orders
if order.workflowPath == workflowPath) {
proceedWithOrder(order)
}
}
Right(AgentCommand.Response.Accepted)
}

case AgentCommand.TakeSnapshot =>
(journalActor ? JournalActor.Input.TakeSnapshot)
.mapTo[JournalActor.Output.SnapshotTaken.type]
Expand Down Expand Up @@ -807,6 +825,8 @@ with Stash
def workflowPathToId(workflowPath: WorkflowPath) =
persistence.currentState.workflowPathToId(workflowPath)

def pathToWorkflowControlState = persistence.currentState.pathToWorkflowControlState

def pathToItemState = persistence.currentState.pathToItemState

def keyToItem = persistence.currentState.keyToItem
Expand Down
Loading

0 comments on commit d37a186

Please sign in to comment.