From d37a18675725bfb52c45410ba22363e07d770efe Mon Sep 17 00:00:00 2001 From: Joacim Zschimmer Date: Wed, 8 Jun 2022 17:13:40 +0200 Subject: [PATCH] JS-1944 ControlWorkflow suspend=true --- .../scala/js7/agent/data/AgentState.scala | 35 ++- .../js7/agent/data/AgentStateBuilder.scala | 9 +- .../agent/data/commands/AgentCommand.scala | 12 +- .../scala/js7/agent/data/AgentStateTest.scala | 22 +- .../data/commands/AgentCommandTest.scala | 12 + .../js7/agent/command/CommandActor.scala | 5 +- .../js7/agent/scheduler/AgentActor.scala | 3 +- .../scheduler/order/AgentOrderKeeper.scala | 20 ++ .../controller/ControllerOrderKeeper.scala | 96 +++++++- .../js7/controller/agent/AgentDriver.scala | 12 +- .../js7/controller/agent/CommandQueue.scala | 11 + .../controller/JControllerCommand.scala | 7 +- .../controller/JControllerStateTest.scala | 19 +- .../execution/workflow/OrderEventSource.scala | 4 +- .../data/controller/ControllerCommand.scala | 7 + .../js7/data/controller/ControllerState.scala | 57 ++++- .../controller/ControllerStateBuilder.scala | 31 ++- .../data/delegate/DelegateCouplingState.scala | 6 +- .../js7/data/item/ItemAttachedState.scala | 53 +++-- .../main/scala/js7/data/state/StateView.scala | 11 +- .../scala/js7/data/workflow/Workflow.scala | 5 +- .../js7/data/workflow/WorkflowControl.scala | 14 ++ .../data/workflow/WorkflowControlEvent.scala | 34 +++ .../data/workflow/WorkflowControlState.scala | 36 +++ .../WorkflowControlStateHandler.scala | 20 ++ .../controller/ControllerCommandTest.scala | 10 + .../data/controller/ControllerStateTest.scala | 65 +++++- .../delegate/DelegateCouplingStateTest.scala | 22 ++ .../scala/js7/data/state/TestStateView.scala | 7 +- .../workflow/WorkflowControlEventTest.scala | 32 +++ .../workflow/WorkflowControlStateTest.scala | 24 ++ .../scala/js7/tests/SuspendWorkflowTest.scala | 220 ++++++++++++++++++ 32 files changed, 844 insertions(+), 77 deletions(-) create mode 100644 js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControl.scala create mode 100644 js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlEvent.scala create mode 100644 js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlState.scala create mode 100644 js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlStateHandler.scala create mode 100644 js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlEventTest.scala create mode 100644 js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlStateTest.scala create mode 100644 js7-tests/src/test/scala/js7/tests/SuspendWorkflowTest.scala diff --git a/js7-agent-data/src/main/scala/js7/agent/data/AgentState.scala b/js7-agent-data/src/main/scala/js7/agent/data/AgentState.scala index fc6840ad21..b2884115ff 100644 --- a/js7-agent-data/src/main/scala/js7/agent/data/AgentState.scala +++ b/js7-agent-data/src/main/scala/js7/agent/data/AgentState.scala @@ -4,7 +4,7 @@ import io.circe.generic.semiauto.deriveCodec import js7.agent.data.AgentState.{AgentMetaState, allowedItemStates} import js7.agent.data.event.AgentEvent import js7.agent.data.event.AgentEvent.AgentDedicated -import js7.agent.data.orderwatch.{FileWatchStateHandler, FileWatchState} +import js7.agent.data.orderwatch.{FileWatchState, FileWatchStateHandler} import js7.base.circeutils.typed.{Subtype, TypedJsonCodec} import js7.base.crypt.Signed import js7.base.problem.{Checked, Problem} @@ -26,7 +26,7 @@ import js7.data.orderwatch.{FileWatch, OrderWatchEvent, OrderWatchPath} import js7.data.state.EventDrivenStateView import js7.data.subagent.SubagentItemStateEvent.SubagentShutdown import js7.data.subagent.{SubagentDirectorState, SubagentId, SubagentItem, SubagentItemState, SubagentItemStateEvent, SubagentSelection, SubagentSelectionId, SubagentSelectionState} -import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControlEvent, WorkflowControlState, WorkflowControlStateHandler, WorkflowId, WorkflowPath} import monix.reactive.Observable import scala.collection.MapView @@ -40,12 +40,14 @@ final case class AgentState( pathToItemState_ : Map[UnsignedSimpleItemPath, UnsignedSimpleItemState], idToOrder: Map[OrderId, Order[Order.State]], idToWorkflow: Map[WorkflowId, Workflow/*reduced for this Agent!!!*/], + pathToWorkflowControlState_ : Map[WorkflowPath, WorkflowControlState], pathToJobResource: Map[JobResourcePath, JobResource], keyToSignedItem : Map[SignableItemKey, Signed[SignableItem]]) extends SignedItemContainer with EventDrivenStateView[AgentState, Event] with SubagentDirectorState[AgentState] with FileWatchStateHandler[AgentState] +with WorkflowControlStateHandler[AgentState] with SnapshotableState[AgentState] { def isAgent = true @@ -72,6 +74,7 @@ with SnapshotableState[AgentState] idToWorkflow.size + idToOrder.size + pathToItemState_.size + + pathToWorkflowControlState_.size + fw.estimatedExtraSnapshotSize + pathToJobResource.size //keyToSignedItem.size + // == idToWorkflow.size + pathToJobResource.size @@ -84,6 +87,7 @@ with SnapshotableState[AgentState] Observable.fromIterable(pathTo(FileWatchState).values).flatMap(_.toSnapshotObservable), Observable.fromIterable(keyToSignedItem.values.view.map(SignedItemAdded(_))), Observable.fromIterable(idToWorkflow.view.filterKeys(isWithoutSignature).values), + Observable.fromIterable(pathToWorkflowControlState_.values), Observable.fromIterable(pathToJobResource.view.filterKeys(isWithoutSignature).values), Observable.fromIterable(pathTo(CalendarState).values).flatMap(_.toSnapshotObservable), Observable.fromIterable(idToOrder.values) @@ -167,10 +171,17 @@ with SnapshotableState[AgentState] case ItemDetached(itemKey, meta.agentPath) => itemKey match { case WorkflowId.as(workflowId) => - for (_ <- idToWorkflow.checked(workflowId)) yield + for (_ <- idToWorkflow.checked(workflowId)) yield { + val updatedIdToWorkflow = idToWorkflow - workflowId copy( keyToSignedItem = keyToSignedItem - workflowId, - idToWorkflow = idToWorkflow - workflowId) + idToWorkflow = updatedIdToWorkflow, + pathToWorkflowControlState_ = + if (idToWorkflow.keys.exists/*Slow???*/(_.path == workflowId.path)) + pathToWorkflowControlState_ - workflowId.path + else + pathToWorkflowControlState_) + } case path: OrderWatchPath => fw.detach(path) @@ -228,6 +239,9 @@ with SnapshotableState[AgentState] controllerId = controllerId, subagentId = subagentId))) + case KeyedEvent(workflowPath: WorkflowPath, event: WorkflowControlEvent) => + applyWorkflowControlEvent(workflowPath, event) + case _ => applyStandardEvent(keyedEvent) } @@ -242,6 +256,11 @@ with SnapshotableState[AgentState] remove: Iterable[OrderWatchPath] ) = update(addItemStates = fileWatchStates, removeItemStates = remove) + def pathToWorkflowControlState = pathToWorkflowControlState_.view + + protected def updateWorkflowControlState(s: WorkflowControlState) = + copy(pathToWorkflowControlState_ = pathToWorkflowControlState_.updated(s.workflowPath, s)) + protected def update( orders: Iterable[Order[Order.State]], removeOrders: Iterable[OrderId], @@ -305,7 +324,7 @@ with ItemContainer.Companion[AgentState] val empty = AgentState(EventId.BeforeFirst, SnapshotableState.Standards.empty, AgentMetaState.empty, - Map.empty, Map.empty, Map.empty, Map.empty, Map.empty) + Map.empty, Map.empty, Map.empty, Map.empty, Map.empty, Map.empty) private val allowedItemStates: Set[InventoryItemState.AnyCompanion] = Set(AgentRefState, SubagentItemState, FileWatchState) @@ -344,7 +363,8 @@ with ItemContainer.Companion[AgentState] Subtype(SignedItemAdded.jsonCodec(this)), // For Repo and SignedItemAdded Subtype(signableSimpleItemJsonCodec), Subtype(unsignedSimpleItemJsonCodec), - Subtype[BasicItemEvent]) + Subtype[BasicItemEvent], + Subtype[WorkflowControlState]) implicit val keyedEventJsonCodec: KeyedEventTypedJsonCodec[Event] = { KeyedEventTypedJsonCodec[Event]( @@ -353,6 +373,7 @@ with ItemContainer.Companion[AgentState] KeyedSubtype[OrderEvent], KeyedSubtype[AgentEvent], KeyedSubtype[InventoryItemEvent], - KeyedSubtype[OrderWatchEvent]) + KeyedSubtype[OrderWatchEvent], + KeyedSubtype[WorkflowControlEvent]) } } diff --git a/js7-agent-data/src/main/scala/js7/agent/data/AgentStateBuilder.scala b/js7-agent-data/src/main/scala/js7/agent/data/AgentStateBuilder.scala index 83eeb5a0cf..5633090eaf 100644 --- a/js7-agent-data/src/main/scala/js7/agent/data/AgentStateBuilder.scala +++ b/js7-agent-data/src/main/scala/js7/agent/data/AgentStateBuilder.scala @@ -1,7 +1,7 @@ package js7.agent.data import js7.agent.data.AgentState.AgentMetaState -import js7.agent.data.orderwatch.{FileWatchStateHandler, FileWatchState} +import js7.agent.data.orderwatch.{FileWatchState, FileWatchStateHandler} import js7.base.crypt.Signed import js7.base.problem.Checked._ import js7.base.utils.Collections.implicits._ @@ -13,7 +13,7 @@ import js7.data.item.{SignableItem, SignableItemKey, SignedItemEvent, UnsignedSi import js7.data.job.{JobResource, JobResourcePath} import js7.data.order.{Order, OrderId} import js7.data.subagent.{SubagentItemState, SubagentSelection, SubagentSelectionState} -import js7.data.workflow.{Workflow, WorkflowId} +import js7.data.workflow.{Workflow, WorkflowControlState, WorkflowId, WorkflowPath} import scala.collection.mutable final class AgentStateBuilder @@ -31,6 +31,7 @@ extends SnapshotableStateBuilder[AgentState] private val fileWatchStateBuilder = new FileWatchStateHandler.Builder private val pathToJobResource = mutable.Map.empty[JobResourcePath, JobResource] private val keyToSignedItem = mutable.Map.empty[SignableItemKey, Signed[SignableItem]] + private val pathToWorkflowControlState = mutable.Map.empty[WorkflowPath, WorkflowControlState] private var _state = AgentState.empty protected def onInitializeState(state: AgentState) = @@ -68,6 +69,9 @@ extends SnapshotableStateBuilder[AgentState] case o: AgentMetaState => agentMetaState = o + + case o: WorkflowControlState => + pathToWorkflowControlState(o.workflowPath) = o } private def onSignedItemAdded(added: SignedItemEvent.SignedItemAdded): Unit = { @@ -90,6 +94,7 @@ extends SnapshotableStateBuilder[AgentState] (pathToItemState.view ++ fileWatchStateBuilder.result).toMap, idToOrder.toMap, idToWorkflow.toMap, + pathToWorkflowControlState.toMap, pathToJobResource.toMap, keyToSignedItem.toMap) } diff --git a/js7-agent-data/src/main/scala/js7/agent/data/commands/AgentCommand.scala b/js7-agent-data/src/main/scala/js7/agent/data/commands/AgentCommand.scala index 6c7e731f75..8b7c9c1060 100644 --- a/js7-agent-data/src/main/scala/js7/agent/data/commands/AgentCommand.scala +++ b/js7-agent-data/src/main/scala/js7/agent/data/commands/AgentCommand.scala @@ -23,9 +23,10 @@ import js7.data.agent.{AgentPath, AgentRunId} import js7.data.command.CommonCommand import js7.data.controller.ControllerId import js7.data.event.{EventId, ItemContainer} -import js7.data.item.{InventoryItemKey, SignableItem, UnsignedSimpleItem} +import js7.data.item.{InventoryItemKey, ItemRevision, SignableItem, UnsignedSimpleItem} import js7.data.order.{Order, OrderId, OrderMark} import js7.data.subagent.SubagentId +import js7.data.workflow.WorkflowPath /** * @author Joacim Zschimmer @@ -204,6 +205,14 @@ object AgentCommand extends CommonCommand.Companion type Response = Response.Accepted } + final case class ControlWorkflow( + workflowPath: WorkflowPath, + suspend: Boolean, + revision: ItemRevision) + extends AgentCommand { + type Response = Response.Accepted + } + final case class ResetSubagent(subagentId: SubagentId, force: Boolean) extends AgentCommand { type Response = Response.Accepted @@ -226,6 +235,7 @@ object AgentCommand extends CommonCommand.Companion Subtype(deriveCodec[DetachItem]), Subtype(deriveCodec[AttachOrder]), Subtype(deriveCodec[DetachOrder]), + Subtype(deriveCodec[ControlWorkflow]), Subtype(TakeSnapshot), Subtype(deriveCodec[ResetSubagent])) } diff --git a/js7-agent-data/src/test/scala/js7/agent/data/AgentStateTest.scala b/js7-agent-data/src/test/scala/js7/agent/data/AgentStateTest.scala index 0e947a53ed..ba92c8126d 100644 --- a/js7-agent-data/src/test/scala/js7/agent/data/AgentStateTest.scala +++ b/js7-agent-data/src/test/scala/js7/agent/data/AgentStateTest.scala @@ -33,8 +33,9 @@ import js7.data.orderwatch.{ExternalOrderName, FileWatch, OrderWatchPath} import js7.data.subagent.{SubagentId, SubagentItem, SubagentSelection, SubagentSelectionId} import js7.data.value.expression.Expression import js7.data.value.expression.ExpressionParser.expr +import js7.data.workflow.WorkflowControlEvent.WorkflowControlUpdated import js7.data.workflow.position._ -import js7.data.workflow.{Workflow, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControl, WorkflowControlState, WorkflowPath} import js7.tester.CirceJsonTester.removeJNull import monix.execution.Scheduler.Implicits.traced import monix.reactive.Observable @@ -63,6 +64,7 @@ final class AgentStateTest extends AsyncFreeSpec private val itemSigner = new ItemSigner(SillySigner.Default, AgentState.signableItemJsonCodec) private val signedWorkflow = itemSigner.sign(workflow) private val signedJobResource = itemSigner.sign(JobResource(JobResourcePath("JOBRESOURCE"))) + private val workflowControl = WorkflowControl(workflow.path, true, ItemRevision(1)) private val calendar = Calendar( CalendarPath("CALENDAR"), @@ -96,6 +98,8 @@ final class AgentStateTest extends AsyncFreeSpec Map.empty, Map.empty, Map.empty, + Map( + workflow.path -> WorkflowControlState(workflowControl)), Map.empty, Map.empty ).applyEvents(Seq( @@ -151,8 +155,8 @@ final class AgentStateTest extends AsyncFreeSpec } } - "estimatedExtraSnapshotSize" in { - assert(agentState.estimatedSnapshotSize == 13) + "estimatedSnapshotSize" in { + assert(agentState.estimatedSnapshotSize == 14) for (n <- agentState.toSnapshotObservable.countL.runToFuture) yield assert(n == agentState.estimatedSnapshotSize) } @@ -250,6 +254,15 @@ final class AgentStateTest extends AsyncFreeSpec "versionId": "1.0", "instructions": [] }""", + json"""{ + "TYPE": "WorkflowControlState", + "workflowControl": { + "path": "WORKFLOW", + "suspended": true, + "revision": 1 + }, + "attachedToAgents": [] + }""", json"""{ "TYPE": "JobResource", "path": "UNSIGNED-v2.2-JOB-RESOURCE", @@ -333,6 +346,7 @@ final class AgentStateTest extends AsyncFreeSpec agentState = agentState.applyEvent(NoKey <-: ItemAttachedToMe(workflow)).orThrow agentState = agentState.applyEvent(NoKey <-: ItemAttachedToMe(unsignedJobResource)).orThrow agentState = agentState.applyEvent(NoKey <-: SignedItemAttachedToMe(signedWorkflow)).orThrow + agentState = agentState.applyEvent(workflow.path <-: WorkflowControlUpdated(true, ItemRevision(1))).orThrow agentState = agentState.applyEvent(NoKey <-: SignedItemAttachedToMe(signedJobResource)).orThrow agentState = agentState.applyEvent(orderId <-: OrderAttachedToAgent( @@ -354,6 +368,8 @@ final class AgentStateTest extends AsyncFreeSpec attachedState = Some(Order.Attached(agentPath)), parent = Some(orderId))), Map( workflow.id -> workflow), + Map( + workflow.path -> WorkflowControlState(workflowControl)), Map( unsignedJobResource.path -> unsignedJobResource, signedJobResource.value.path -> signedJobResource.value), diff --git a/js7-agent-data/src/test/scala/js7/agent/data/commands/AgentCommandTest.scala b/js7-agent-data/src/test/scala/js7/agent/data/commands/AgentCommandTest.scala index 9ad67d8d12..74ddcad0b1 100644 --- a/js7-agent-data/src/test/scala/js7/agent/data/commands/AgentCommandTest.scala +++ b/js7-agent-data/src/test/scala/js7/agent/data/commands/AgentCommandTest.scala @@ -301,6 +301,18 @@ final class AgentCommandTest extends AnyFreeSpec } } + "ControlWorkflow" in { + check( + AgentCommand.ControlWorkflow( + WorkflowPath("WORKFLOW"), suspend = true, ItemRevision(1)), + json"""{ + "TYPE": "ControlWorkflow", + "workflowPath": "WORKFLOW", + "suspend": true, + "revision": 1 + }""") + } + "ResetSubagent" in { check( AgentCommand.ResetSubagent(SubagentId("SUBAGENT"), force = false), diff --git a/js7-agent/src/main/scala/js7/agent/command/CommandActor.scala b/js7-agent/src/main/scala/js7/agent/command/CommandActor.scala index e031ec1ced..5f075784e6 100644 --- a/js7-agent/src/main/scala/js7/agent/command/CommandActor.scala +++ b/js7-agent/src/main/scala/js7/agent/command/CommandActor.scala @@ -7,7 +7,7 @@ import cats.instances.future._ import cats.syntax.traverse._ import js7.agent.command.CommandActor._ import js7.agent.data.commands.AgentCommand -import js7.agent.data.commands.AgentCommand.{AttachItem, AttachSignedItem, Batch, CoupleController, DedicateAgentDirector, DetachItem, EmergencyStop, NoOperation, OrderCommand, Reset, ResetSubagent, Response, ShutDown, TakeSnapshot} +import js7.agent.data.commands.AgentCommand.{AttachItem, AttachSignedItem, Batch, ControlWorkflow, CoupleController, DedicateAgentDirector, DetachItem, EmergencyStop, NoOperation, OrderCommand, Reset, ResetSubagent, Response, ShutDown, TakeSnapshot} import js7.agent.scheduler.AgentHandle import js7.base.circeutils.JavaJsonCodecs.instant.StringInstantJsonCodec import js7.base.log.{CorrelId, CorrelIdWrapped, Logger} @@ -109,7 +109,8 @@ extends Actor { case command @ (_: OrderCommand | _: DedicateAgentDirector | _: CoupleController | _: Reset | _: TakeSnapshot.type | _: ShutDown | - _: AttachItem | _: AttachSignedItem | _: DetachItem | _: ResetSubagent) => + _: AttachItem | _: AttachSignedItem | _: DetachItem | _: ResetSubagent | + _: ControlWorkflow) => // FIXME Delay CoupleController until all AttachOrder (extends OrderCommand) (and DetachOrder?) have been finished, to return a properly updated state agentHandle.executeCommand(command, meta.user.id, response) diff --git a/js7-agent/src/main/scala/js7/agent/scheduler/AgentActor.scala b/js7-agent/src/main/scala/js7/agent/scheduler/AgentActor.scala index a8f706fa47..8d322105a4 100644 --- a/js7-agent/src/main/scala/js7/agent/scheduler/AgentActor.scala +++ b/js7-agent/src/main/scala/js7/agent/scheduler/AgentActor.scala @@ -186,7 +186,8 @@ private[agent] final class AgentActor private( _: AgentCommand.AttachItem | _: AgentCommand.AttachSignedItem | _: AgentCommand.DetachItem | - _: AgentCommand.ResetSubagent) => + _: AgentCommand.ResetSubagent | + _: AgentCommand.ControlWorkflow) => // TODO Check AgentRunId ? started.toOption match { case None => diff --git a/js7-agent/src/main/scala/js7/agent/scheduler/order/AgentOrderKeeper.scala b/js7-agent/src/main/scala/js7/agent/scheduler/order/AgentOrderKeeper.scala index ccd727b1e0..9d2c729d31 100644 --- a/js7-agent/src/main/scala/js7/agent/scheduler/order/AgentOrderKeeper.scala +++ b/js7-agent/src/main/scala/js7/agent/scheduler/order/AgentOrderKeeper.scala @@ -45,6 +45,7 @@ import js7.data.orderwatch.{FileWatch, OrderWatchPath} import js7.data.state.OrderEventHandler.FollowUp import js7.data.state.{OrderEventHandler, StateView} import js7.data.subagent.{SubagentId, SubagentItem, SubagentSelection, SubagentSelectionId} +import js7.data.workflow.WorkflowControlEvent.WorkflowControlUpdated import js7.data.workflow.instructions.Execute import js7.data.workflow.instructions.executable.WorkflowJob import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath} @@ -424,6 +425,23 @@ with Stash .rightAs(AgentCommand.Response.Accepted) .runToFuture + case AgentCommand.ControlWorkflow(workflowPath, suspend, revision) => + if (!persistence.currentState.idToWorkflow.keys.exists(_.path == workflowPath)) + Future.successful(Left(Problem(s"Unknown $workflowPath"))) + else + persistKeyedEvent(workflowPath <-: WorkflowControlUpdated(suspend, revision)) { + (stampedEvent, journaledState) => + if (!suspend) { + // Event it Workflow was already suspended. + // This allows the used to force continuation of Orders (just in case) + for (order <- persistence.currentState.orders + if order.workflowPath == workflowPath) { + proceedWithOrder(order) + } + } + Right(AgentCommand.Response.Accepted) + } + case AgentCommand.TakeSnapshot => (journalActor ? JournalActor.Input.TakeSnapshot) .mapTo[JournalActor.Output.SnapshotTaken.type] @@ -807,6 +825,8 @@ with Stash def workflowPathToId(workflowPath: WorkflowPath) = persistence.currentState.workflowPathToId(workflowPath) + def pathToWorkflowControlState = persistence.currentState.pathToWorkflowControlState + def pathToItemState = persistence.currentState.pathToItemState def keyToItem = persistence.currentState.keyToItem diff --git a/js7-controller/src/main/scala/js7/controller/ControllerOrderKeeper.scala b/js7-controller/src/main/scala/js7/controller/ControllerOrderKeeper.scala index 2204eeb65b..e48c668cdf 100644 --- a/js7-controller/src/main/scala/js7/controller/ControllerOrderKeeper.scala +++ b/js7-controller/src/main/scala/js7/controller/ControllerOrderKeeper.scala @@ -47,6 +47,7 @@ import js7.data.agent.{AgentPath, AgentRef, AgentRefState, AgentRunId} import js7.data.board.BoardEvent.{NoticeDeleted, NoticePosted} import js7.data.board.{BoardPath, BoardState, Notice, NoticeId} import js7.data.calendar.{Calendar, CalendarExecutor} +import js7.data.controller.ControllerCommand.ControlWorkflow import js7.data.controller.ControllerEvent.{ControllerShutDown, ControllerTestEvent} import js7.data.controller.ControllerStateExecutor.convertImplicitly import js7.data.controller.{ControllerCommand, ControllerEvent, ControllerState, VerifiedUpdateItems, VerifiedUpdateItemsExecutor} @@ -71,8 +72,9 @@ import js7.data.state.OrderEventHandler.FollowUp import js7.data.subagent.SubagentItemStateEvent.{SubagentEventsObserved, SubagentResetStartedByController} import js7.data.subagent.{SubagentId, SubagentItem, SubagentItemState, SubagentItemStateEvent} import js7.data.value.expression.scopes.NowScope +import js7.data.workflow.WorkflowControlEvent.{WorkflowControlAttached, WorkflowControlUpdated} import js7.data.workflow.position.WorkflowPosition -import js7.data.workflow.{Instruction, Workflow} +import js7.data.workflow.{Instruction, Workflow, WorkflowControl, WorkflowControlState, WorkflowPath} import js7.journal.recover.Recovered import js7.journal.state.FileStatePersistence import js7.journal.{CommitOptions, JournalActor, MainJournalingActor} @@ -401,6 +403,15 @@ with MainJournalingActor[ControllerState, Event] proceedWithOrders(_controllerState.idToOrder.keys) orderQueue.enqueue(_controllerState.idToOrder.keys) + _controllerState.workflowControlPathToIgnorantAgent + .foreach { case (workflowPath, agentPaths) => + for (agentPath <- agentPaths) { + attachWorkflowControlToAgent( + _controllerState.pathToWorkflowControl(workflowPath), + agentPath) + } + } + // Start fetching events from Agents after AttachOrder has been sent to AgentDrivers. // This is to handle race-condition: An Agent may have already completed an order. // So send AttachOrder before DetachOrder. @@ -533,6 +544,14 @@ with MainJournalingActor[ControllerState, Event] case _ => Timestamped(keyedEvent) :: Nil } + case KeyedEvent(workflowPath: WorkflowPath, event: WorkflowControlUpdated) => + // If event.revision != WorkflowControl.revision, + // then the event is informational only. + Timestamped( + workflowPath <-: + WorkflowControlAttached(agentPath, event.suspended, event.revision) + ) :: Nil + case _ => logger.error(s"Unknown event received from ${agentEntry.agentPath}: $keyedEvent") Nil @@ -735,6 +754,9 @@ with MainJournalingActor[ControllerState, Event] case ControllerCommand.ResumeOrder(orderId, position, historicOps) => executeOrderMarkCommands(Vector(orderId))(orderEventSource.resume(_, position, historicOps)) + case cmd: ControllerCommand.ControlWorkflow => + controlWorkflow(cmd) + case ControllerCommand.ResumeOrders(orderIds) => executeOrderMarkCommands(orderIds.toVector)(orderEventSource.resume(_, None, Nil)) @@ -956,6 +978,52 @@ with MainJournalingActor[ControllerState, Event] .map(_.map(_ => ControllerCommand.Response.Accepted)) } + private def controlWorkflow(cmd: ControlWorkflow): Future[Checked[ControllerCommand.Response]] = + _controllerState.repo.pathToItems(Workflow).checked(cmd.workflowPath) match { + case Left(problem) => Future.successful(Left(problem)) + case Right(_) => + val workflowControl = _controllerState.pathToWorkflowControl + .getOrElse(cmd.workflowPath, WorkflowControl(cmd.workflowPath)) + // Continue even if WorkflowControl is not changed. + // This allows the caller to force the redistribution of the WorkflowControl. + val event = WorkflowControlUpdated(suspended = cmd.suspend, workflowControl.revision.next) + persistKeyedEvent(cmd.workflowPath <-: event) { (stamped, updated) => + handleEvents(stamped :: Nil, updated) + val controlState = updated.pathToWorkflowControlState_(cmd.workflowPath) + attachWorkflowControlToAgents(controlState) + if (!controlState.workflowControl.suspended) { + orderQueue.enqueue( + updated.orders.filter(_.workflowPath == controlState.workflowPath).map(_.id)) + } + Right(ControllerCommand.Response.Accepted) + } + } + + private def attachWorkflowControlToAgents(controlState: WorkflowControlState): Unit = + // For each Workflow version of controlState.workflowPath + _controllerState.repo.pathToItems(Workflow).checked(controlState.workflowPath) + .foreach { workflows => + // Send WorkflowControl to each Agent that is referenced by an attached Workflow. + for ( + workflow <- workflows.view; + agentPath <- workflow.referencedAgentPaths + if _controllerState.agentAttachments.itemToDelegateToAttachedState.get(workflow.id) + .exists(_.get(agentPath).exists(_.isAttachableOrAttached)) + if !controlState.attachedToAgents.contains(agentPath) + ) { + attachWorkflowControlToAgent(controlState.workflowControl, agentPath) + } + } + + private def attachWorkflowControlToAgent(control: WorkflowControl, agentPath: AgentPath): Unit = { + // WorkflowControlAttaching event, damit Control nicht mit jedem Auftrag geschickt wird, bis + // WorkflowControlAttached. + agentRegister(agentPath).actor ! AgentDriver.Input.ControlWorkflow( + control.path, + control.suspended, + control.revision) + } + private def logEvent(event: Event): Unit = event match { case e: InventoryItemEvent => logger.trace(s"${e.key} ${e.getClass.scalaName}") @@ -1318,26 +1386,38 @@ with MainJournalingActor[ControllerState, Event] if (order.isAttaching && !agentEntry.isResetting) { val orderEntry = orderRegister(order.id) if (!orderEntry.triedToAttached) { - signedWorkflow.value.referencedAttachableToAgentSignablePaths + val workflow = signedWorkflow.value + import agentEntry.{actor, agentPath} + + // Maybe attach Workflow + workflow.referencedAttachableToAgentSignablePaths .flatMap(_controllerState.pathToSignedSimpleItem.get) .appended(signedWorkflow) - .filter(signedItem => isDetachedOrAttachable(signedItem.value, agentEntry.agentPath)) + .filter(signedItem => isDetachedOrAttachable(signedItem.value, agentPath)) .foreach { signedItem => - agentEntry.actor ! AgentDriver.Input.AttachSignedItem(signedItem) + actor ! AgentDriver.Input.AttachSignedItem(signedItem) } - signedWorkflow.value.referencedAttachableToAgentUnsignedPaths + // Maybe attach WorkflowControl + _controllerState.pathToWorkflowControlState.get(workflow.path) + .filterNot(_.attachedToAgents contains agentPath) + .map(_.workflowControl) + .foreach( + attachWorkflowControlToAgent(_, agentPath)) + + // Maybe attach more required Items + workflow.referencedAttachableToAgentUnsignedPaths .flatMap(_controllerState.pathToUnsignedSimpleItem.get) - .filter(isDetachedOrAttachable(_, agentEntry.agentPath)) + .filter(isDetachedOrAttachable(_, agentPath)) .foreach { item => - agentEntry.actor ! AgentDriver.Input.AttachUnsignedItem(item) + actor ! AgentDriver.Input.AttachUnsignedItem(item) } orderEntry.triedToAttached = true // TODO AttachOrder mit parent orders! // Agent markiert die als bloß gebraucht für Kindaufträge // Mit Referenzzähler: der letzte Kindauftrag löscht seine Elternaufträge - agentEntry.actor ! AgentDriver.Input.AttachOrder(order, agentEntry.agentPath) + actor ! AgentDriver.Input.AttachOrder(order, agentPath) } } } diff --git a/js7-controller/src/main/scala/js7/controller/agent/AgentDriver.scala b/js7-controller/src/main/scala/js7/controller/agent/AgentDriver.scala index 60fa01850a..fa26982ace 100644 --- a/js7-controller/src/main/scala/js7/controller/agent/AgentDriver.scala +++ b/js7-controller/src/main/scala/js7/controller/agent/AgentDriver.scala @@ -37,11 +37,12 @@ import js7.data.controller.ControllerState import js7.data.delegate.DelegateCouplingState.{Coupled, Resetting} import js7.data.event.{AnyKeyedEvent, Event, EventId, EventRequest, KeyedEvent, Stamped} import js7.data.item.ItemAttachedState.{Attachable, Attached} -import js7.data.item.{InventoryItemEvent, InventoryItemKey, SignableItem, UnsignedSimpleItem} +import js7.data.item.{InventoryItemEvent, InventoryItemKey, ItemRevision, SignableItem, UnsignedSimpleItem} import js7.data.order.OrderEvent.{OrderAttachedToAgent, OrderDetached} import js7.data.order.{Order, OrderEvent, OrderId, OrderMark} import js7.data.orderwatch.OrderWatchEvent import js7.data.subagent.{SubagentId, SubagentItemStateEvent} +import js7.data.workflow.{WorkflowControlEvent, WorkflowPath} import js7.journal.state.StatePersistence import monix.eval.Task import monix.execution.atomic.AtomicInt @@ -589,7 +590,8 @@ private[controller] object AgentDriver classOf[AgentEvent.AgentShutDown], classOf[SubagentItemStateEvent], classOf[InventoryItemEvent], - classOf[OrderWatchEvent]) + classOf[OrderWatchEvent], + classOf[WorkflowControlEvent]) private val DecoupledProblem = Problem.pure("Agent has been decoupled") def props(agentRef: AgentRef, agentRunId: Option[AgentRunId], eventId: EventId, @@ -641,6 +643,12 @@ private[controller] object AgentDriver final case class Reset(force: Boolean) extends DeadLetterSuppression final case class ResetSubagent(subagentId: SubagentId, force: Boolean) extends Queueable + + final case class ControlWorkflow( + workflowPath: WorkflowPath, + suspend: Boolean, + revision: ItemRevision) + extends Queueable } object Output { diff --git a/js7-controller/src/main/scala/js7/controller/agent/CommandQueue.scala b/js7-controller/src/main/scala/js7/controller/agent/CommandQueue.scala index caadad633b..6736db93e7 100644 --- a/js7-controller/src/main/scala/js7/controller/agent/CommandQueue.scala +++ b/js7-controller/src/main/scala/js7/controller/agent/CommandQueue.scala @@ -40,6 +40,14 @@ private[agent] abstract class CommandQueue(logger: ScalaLogger, batchSize: Int)( def enqueue(input: Queueable): Unit = input match { case o: Input.DetachOrder => detachQueue += o + case controlWorkflow: Input.ControlWorkflow => + val duplicates = queue.collect { + case o: Input.ControlWorkflow if o.workflowPath == controlWorkflow.workflowPath => o + } + queue --= duplicates + queueSet --= duplicates + queue += controlWorkflow + queueSet += controlWorkflow case o => queue += o queueSet += o @@ -179,6 +187,9 @@ private[agent] abstract class CommandQueue(logger: ScalaLogger, batchSize: Int)( case Input.ResetSubagent(subagentId, force) => AgentCommand.ResetSubagent(subagentId, force = force) + + case Input.ControlWorkflow(workflowPath, suspend, revision) => + AgentCommand.ControlWorkflow(workflowPath, suspend, revision) } final def onOrdersDetached(orderIds: View[OrderId]): Unit = { diff --git a/js7-data-for-java/src/main/scala/js7/data_for_java/controller/JControllerCommand.scala b/js7-data-for-java/src/main/scala/js7/data_for_java/controller/JControllerCommand.scala index 4413d1c5c9..170cd8273b 100644 --- a/js7-data-for-java/src/main/scala/js7/data_for_java/controller/JControllerCommand.scala +++ b/js7-data-for-java/src/main/scala/js7/data_for_java/controller/JControllerCommand.scala @@ -9,7 +9,8 @@ import js7.base.problem.Problem import js7.base.time.JavaTimestamp import js7.data.board.{BoardPath, NoticeId} import js7.data.controller.ControllerCommand -import js7.data.controller.ControllerCommand.{AddOrder, PostNotice} +import js7.data.controller.ControllerCommand.{AddOrder, ControlWorkflow, PostNotice} +import js7.data.workflow.WorkflowPath import js7.data_for_java.common.JJsonable import js7.data_for_java.order.JFreshOrder import scala.jdk.OptionConverters.RichOptional @@ -40,6 +41,10 @@ object JControllerCommand extends JJsonable.Companion[JControllerCommand] endOfLife.toScala .map(JavaTimestamp.ofInstant))) + @Nonnull + def controlWorkflow(workflowPath: WorkflowPath, suspend: Boolean): JControllerCommand = + JControllerCommand( + ControlWorkflow(workflowPath, suspend = suspend)) @Nonnull override def fromJson(@Nonnull jsonString: String): VEither[Problem, JControllerCommand] = diff --git a/js7-data-for-java/src/test/scala/js7/data_for_java/controller/JControllerStateTest.scala b/js7-data-for-java/src/test/scala/js7/data_for_java/controller/JControllerStateTest.scala index e0979cd976..5356ff4aa8 100644 --- a/js7-data-for-java/src/test/scala/js7/data_for_java/controller/JControllerStateTest.scala +++ b/js7-data-for-java/src/test/scala/js7/data_for_java/controller/JControllerStateTest.scala @@ -14,7 +14,7 @@ import js7.data.controller.{ControllerId, ControllerMetaState, ControllerState} import js7.data.delegate.DelegateCouplingState import js7.data.event.{EventId, JournalState, SnapshotableState} import js7.data.item.VersionedEvent.{VersionAdded, VersionedItemAdded} -import js7.data.item.{ClientAttachments, ItemSigner, Repo, VersionId} +import js7.data.item.{ItemSigner, Repo, VersionId} import js7.data.node.NodeId import js7.data.order.{Order, OrderId} import js7.data.subagent.SubagentId @@ -74,9 +74,9 @@ private object JControllerStateTest private val itemSigner = new ItemSigner(SillySigner.Default, versionedItemJsonCodec) - private val controllerState = ControllerState( - EventId(1001), - SnapshotableState.Standards( + private val controllerState = ControllerState.empty.copy( + eventId = EventId(1001), + standards = SnapshotableState.Standards( JournalState(Map(UserId("A") -> EventId(1000))), ClusterState.Coupled( ClusterSetting( @@ -86,24 +86,21 @@ private object JControllerStateTest activeId = NodeId("A"), Seq(ClusterSetting.Watch(Uri("https://CLUSTER-WATCH"))), ClusterTiming(10.s, 20.s)))), - ControllerMetaState( + controllerMetaState = ControllerMetaState( ControllerId("CONTROLLER-ID"), Timestamp("2019-05-24T12:00:00Z"), Timezone("Europe/Berlin")), - Map(AgentPath("AGENT") -> + pathToItemState_ = Map(AgentPath("AGENT") -> AgentRefState( AgentRef(AgentPath("AGENT"), Seq(SubagentId("SUBAGENT"))), None, None, DelegateCouplingState.Reset.fresh, EventId(7), None)), - Repo.empty + repo = Repo.empty .applyEvents(List( VersionAdded(v1), VersionedItemAdded(itemSigner.sign(aWorkflow)), VersionedItemAdded(itemSigner.sign(bWorkflow))) ).orThrow, - Map.empty, - ClientAttachments.empty, - Set.empty, - Vector( + idToOrder = Vector( Order( OrderId("A-ORDER"), (WorkflowPath("A-WORKFLOW") ~ v1) /: Position(0), diff --git a/js7-data/jvm/src/main/scala/js7/data/execution/workflow/OrderEventSource.scala b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/OrderEventSource.scala index 30a9768e40..e2d021c714 100644 --- a/js7-data/jvm/src/main/scala/js7/data/execution/workflow/OrderEventSource.scala +++ b/js7-data/jvm/src/main/scala/js7/data/execution/workflow/OrderEventSource.scala @@ -34,7 +34,9 @@ final class OrderEventSource(state: StateView) def nextEvents(orderId: OrderId): Seq[KeyedEvent[OrderActorEvent]] = { val order = idToOrder(orderId) - if (order.isState[Order.Broken]) + if (state.isWorkflowSuspended(order.workflowPath)) + Nil + else if (order.isState[Order.Broken]) Nil // Avoid issuing a second OrderBroken (would be a loop) else checkedNextEvents(order) |> (invalidToEvent(order, _)) diff --git a/js7-data/shared/src/main/scala/js7/data/controller/ControllerCommand.scala b/js7-data/shared/src/main/scala/js7/data/controller/ControllerCommand.scala index dc79f640eb..6fa6e152e0 100644 --- a/js7-data/shared/src/main/scala/js7/data/controller/ControllerCommand.scala +++ b/js7-data/shared/src/main/scala/js7/data/controller/ControllerCommand.scala @@ -25,6 +25,7 @@ import js7.data.node.NodeId import js7.data.order.OrderEvent.OrderResumed import js7.data.order.{FreshOrder, OrderId} import js7.data.subagent.SubagentId +import js7.data.workflow.WorkflowPath import js7.data.workflow.position.Position import scala.collection.immutable import scala.concurrent.duration.FiniteDuration @@ -211,6 +212,11 @@ object ControllerCommand extends CommonCommand.Companion type Response = Response.Accepted } + final case class ControlWorkflow(workflowPath: WorkflowPath, suspend: Boolean) + extends ControllerCommand with Big { + type Response = Response.Accepted + } + case object TakeSnapshot extends ControllerCommand { type Response = Response.Accepted } @@ -281,6 +287,7 @@ object ControllerCommand extends CommonCommand.Companion Subtype(deriveConfiguredCodec[ResumeOrder]), Subtype(deriveConfiguredCodec[ResumeOrders]), Subtype(deriveConfiguredCodec[SuspendOrders]), + Subtype(deriveConfiguredCodec[ControlWorkflow]), Subtype(deriveConfiguredCodec[ClusterAppointNodes]), Subtype(ClusterSwitchOver), Subtype(deriveConfiguredCodec[InternalClusterCommand]), diff --git a/js7-data/shared/src/main/scala/js7/data/controller/ControllerState.scala b/js7-data/shared/src/main/scala/js7/data/controller/ControllerState.scala index fa558ef4ee..c557c8fac6 100644 --- a/js7-data/shared/src/main/scala/js7/data/controller/ControllerState.scala +++ b/js7-data/shared/src/main/scala/js7/data/controller/ControllerState.scala @@ -32,12 +32,12 @@ import js7.data.job.{JobResource, JobResourcePath} import js7.data.lock.{Lock, LockPath, LockState} import js7.data.order.OrderEvent.OrderAddedX import js7.data.order.{Order, OrderEvent, OrderId} -import js7.data.orderwatch.{FileWatch, OrderWatch, OrderWatchEvent, OrderWatchStateHandler, OrderWatchPath, OrderWatchState} +import js7.data.orderwatch.{FileWatch, OrderWatch, OrderWatchEvent, OrderWatchPath, OrderWatchState, OrderWatchStateHandler} import js7.data.state.EventDrivenStateView import js7.data.subagent.SubagentItemStateEvent.SubagentShutdown import js7.data.subagent.{SubagentId, SubagentItem, SubagentItemState, SubagentItemStateEvent, SubagentSelection, SubagentSelectionId, SubagentSelectionState} import js7.data.value.Value -import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControlEvent, WorkflowControlState, WorkflowControlStateHandler, WorkflowId, WorkflowPath} import monix.reactive.Observable import scala.collection.{MapView, View} import scala.util.chaining.scalaUtilChainingOps @@ -53,12 +53,14 @@ final case class ControllerState( repo: Repo, pathToSignedSimpleItem: Map[SignableSimpleItemPath, Signed[SignableSimpleItem]], agentAttachments: ClientAttachments[AgentPath], + pathToWorkflowControlState_ : Map[WorkflowPath, WorkflowControlState], /** Used for OrderWatch to allow to attach it from Agent. */ deletionMarkedItems: Set[InventoryItemKey], idToOrder: Map[OrderId, Order[Order.State]]) extends SignedItemContainer with EventDrivenStateView[ControllerState, Event] with OrderWatchStateHandler[ControllerState] +with WorkflowControlStateHandler[ControllerState] with SnapshotableState[ControllerState] { def isAgent = false @@ -77,6 +79,7 @@ with SnapshotableState[ControllerState] pathTo(BoardState).values.view.map(_.noticeCount).sum + pathToSignedSimpleItem.size + agentAttachments.estimatedSnapshotSize + + pathToWorkflowControlState_.size + deletionMarkedItems.size + idToOrder.size @@ -94,6 +97,7 @@ with SnapshotableState[ControllerState] Observable.fromIterable(pathTo(OrderWatchState).values).flatMap(_.toSnapshotObservable), Observable.fromIterable(pathToSignedSimpleItem.values).map(SignedItemAdded(_)), Observable.fromIterable(repo.toEvents), + Observable.fromIterable(pathToWorkflowControlState_.values), agentAttachments.toSnapshotObservable, Observable.fromIterable(deletionMarkedItems.map(ItemDeletionMarked(_))), Observable.fromIterable(idToOrder.values) @@ -265,10 +269,15 @@ with SnapshotableState[ControllerState] agentAttachments = agentAttachments.applyItemDeleted(event)) itemKey match { - case id: VersionedItemId_ => - for (repo <- repo.deleteItem(id)) yield + case WorkflowId.as(workflowId) => + for (repo <- repo.deleteItem(workflowId)) yield updated.copy( - repo = repo) + repo = repo, + pathToWorkflowControlState_ = + if (!repo.pathToItems(Workflow).contains(workflowId.path)) + pathToWorkflowControlState_ - workflowId.path + else + pathToWorkflowControlState_) case lockPath: LockPath => Right(updated.copy( @@ -352,6 +361,9 @@ with SnapshotableState[ControllerState] pathToItemState_ = pathToItemState_.updated(subagentId, o)) } + case KeyedEvent(workflowPath: WorkflowPath, event: WorkflowControlEvent) => + applyWorkflowControlEvent(workflowPath, event) + case KeyedEvent(_, _: ControllerShutDown) => Right(this) @@ -361,6 +373,16 @@ with SnapshotableState[ControllerState] case _ => applyStandardEvent(keyedEvent) } + def pathToWorkflowControlState = pathToWorkflowControlState_.view + + protected def updateWorkflowControlState(state: WorkflowControlState): ControllerState = + copy( + pathToWorkflowControlState_ = pathToWorkflowControlState_.updated(state.workflowPath, state)) + + /** The Agents for each WorkflowControl which have not attached the current revision. */ + def workflowControlPathToIgnorantAgent: Map[WorkflowPath, Set[AgentPath]] = + ControllerState.workflowControlPathToIgnorantAgent(orders, pathToWorkflowControlState_) + protected def pathToOrderWatchState = pathTo(OrderWatchState) protected def updateOrderWatchStates( @@ -619,6 +641,7 @@ with ItemContainer.Companion[ControllerState] Repo.empty, Map.empty, ClientAttachments.empty, + Map.empty, Set.empty, Map.empty) @@ -645,7 +668,8 @@ with ItemContainer.Companion[ControllerState] Subtype[VersionedEvent], // These events describe complete objects Subtype[InventoryItemEvent], // For Repo and SignedItemAdded Subtype[OrderWatchState.Snapshot], - Subtype[Order[Order.State]]) + Subtype[Order[Order.State]], + Subtype[WorkflowControlState]) implicit lazy val keyedEventJsonCodec: KeyedEventTypedJsonCodec[Event] = KeyedEventTypedJsonCodec/*.named("ControllerState.keyedEventJsonCodec"*/( @@ -658,9 +682,28 @@ with ItemContainer.Companion[ControllerState] KeyedSubtype[SubagentItemStateEvent], KeyedSubtype[OrderWatchEvent], KeyedSubtype[OrderEvent], - KeyedSubtype[BoardEvent]) + KeyedSubtype[BoardEvent], + KeyedSubtype[WorkflowControlEvent]) object implicits { implicit val snapshotObjectJsonCodec = ControllerState.snapshotObjectJsonCodec } + + /** The Agents for each WorkflowControl which have not attached the current revision. */ + def workflowControlPathToIgnorantAgent( + orders: Iterable[Order[Order.State]], + pathToWorkflowControlState: Map[WorkflowPath, WorkflowControlState]) + : Map[WorkflowPath, Set[AgentPath]] = + orders.iterator + .map(o => o -> o.attachedState) + .collect { + case (o, Some(Order.Attached(agentPath))) => o.workflowPath -> agentPath + case (o, Some(Order.Attaching(agentPath))) => o.workflowPath -> agentPath + } + .filter { case (workflowPath, agentPath) => + pathToWorkflowControlState.get(workflowPath) + .exists(o => !o.attachedToAgents.contains(agentPath)) + } + .toSet[(WorkflowPath, AgentPath)] + .groupMap(_._1)(_._2) } diff --git a/js7-data/shared/src/main/scala/js7/data/controller/ControllerStateBuilder.scala b/js7-data/shared/src/main/scala/js7/data/controller/ControllerStateBuilder.scala index a72f36a8a1..f3c85803fb 100644 --- a/js7-data/shared/src/main/scala/js7/data/controller/ControllerStateBuilder.scala +++ b/js7-data/shared/src/main/scala/js7/data/controller/ControllerStateBuilder.scala @@ -17,23 +17,24 @@ import js7.data.event.{Event, EventDrivenState, JournalEvent, JournalState, Keye import js7.data.item.BasicItemEvent.{ItemAttachedStateEvent, ItemDeleted, ItemDeletionMarked} import js7.data.item.SignedItemEvent.{SignedItemAdded, SignedItemChanged} import js7.data.item.UnsignedSimpleItemEvent.{UnsignedSimpleItemAdded, UnsignedSimpleItemChanged} -import js7.data.item.{BasicItemEvent, ClientAttachments, InventoryItemEvent, InventoryItemKey, Repo, SignableSimpleItem, SignableSimpleItemPath, SignedItemEvent, UnsignedSimpleItemEvent, UnsignedSimpleItemPath, UnsignedSimpleItemState, VersionedEvent, VersionedItemId_} +import js7.data.item.{BasicItemEvent, ClientAttachments, InventoryItemEvent, InventoryItemKey, Repo, SignableSimpleItem, SignableSimpleItemPath, SignedItemEvent, UnsignedSimpleItemEvent, UnsignedSimpleItemPath, UnsignedSimpleItemState, VersionedEvent} import js7.data.job.{JobResource, JobResourcePath} import js7.data.lock.{Lock, LockPath, LockState} import js7.data.order.OrderEvent.{OrderAddedX, OrderNoticesExpected} import js7.data.order.{Order, OrderEvent, OrderId} -import js7.data.orderwatch.{OrderWatch, OrderWatchEvent, OrderWatchStateHandler, OrderWatchPath, OrderWatchState} +import js7.data.orderwatch.{OrderWatch, OrderWatchEvent, OrderWatchPath, OrderWatchState, OrderWatchStateHandler} import js7.data.state.EventDrivenStateView import js7.data.state.WorkflowAndOrderRecovering.followUpRecoveredWorkflowsAndOrders import js7.data.subagent.SubagentItemStateEvent.SubagentShutdown import js7.data.subagent.{SubagentId, SubagentItem, SubagentItemState, SubagentItemStateEvent, SubagentSelection, SubagentSelectionId, SubagentSelectionState} -import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControlEvent, WorkflowControlState, WorkflowControlStateHandler, WorkflowId, WorkflowPath} import scala.collection.mutable final class ControllerStateBuilder extends SnapshotableStateBuilder[ControllerState] with EventDrivenStateView[ControllerStateBuilder, Event] with OrderWatchStateHandler[ControllerStateBuilder] +with WorkflowControlStateHandler[ControllerStateBuilder] { protected val S = ControllerState val companion = ControllerStateBuilder @@ -44,11 +45,13 @@ with OrderWatchStateHandler[ControllerStateBuilder] private val _idToOrder = mutable.Map.empty[OrderId, Order[Order.State]] private val _pathToItemState = mutable.Map.empty[UnsignedSimpleItemPath, UnsignedSimpleItemState] private var agentAttachments = ClientAttachments.empty[AgentPath] + private val _pathToWorkflowControlState = mutable.Map.empty[WorkflowPath, WorkflowControlState] private val deletionMarkedItems = mutable.Set[InventoryItemKey]() private val pathToSignedSimpleItem = mutable.Map.empty[SignableSimpleItemPath, Signed[SignableSimpleItem]] val isAgent = false - val idToOrder = _idToOrder + def idToOrder = _idToOrder + def pathToWorkflowControlState = _pathToWorkflowControlState.view lazy val idToWorkflow: PartialFunction[WorkflowId, Workflow] = new PartialFunction[WorkflowId, Workflow] { @@ -86,6 +89,7 @@ with OrderWatchStateHandler[ControllerStateBuilder] _pathToItemState ++= state.pathToItemState_ pathToSignedSimpleItem ++= state.pathToSignedSimpleItem agentAttachments = state.agentAttachments + _pathToWorkflowControlState ++= state.pathToWorkflowControlState_ deletionMarkedItems ++= state.deletionMarkedItems } @@ -150,6 +154,9 @@ with OrderWatchStateHandler[ControllerStateBuilder] case UnsignedSimpleItemAdded(orderWatch: OrderWatch) => ow.addOrderWatch(orderWatch).orThrow + case workflowControlState: WorkflowControlState => + updateWorkflowControlState(workflowControlState) + case snapshot: OrderWatchState.ExternalOrderSnapshot => ow.applySnapshot(snapshot).orThrow @@ -307,8 +314,11 @@ with OrderWatchStateHandler[ControllerStateBuilder] agentAttachments = agentAttachments.applyItemDeleted(event) event.key match { - case id: VersionedItemId_ => - repo = repo.deleteItem(id).orThrow + case WorkflowId.as(workflowId) => + repo = repo.deleteItem(workflowId).orThrow + if (!repo.pathToItems(Workflow).contains(workflowId.path)) { + _pathToWorkflowControlState -= workflowId.path + } case path: OrderWatchPath => ow.removeOrderWatch(path) @@ -367,6 +377,9 @@ with OrderWatchStateHandler[ControllerStateBuilder] _pathToItemState(boardState.path) = boardState.removeNotice(noticeId).orThrow } + case KeyedEvent(workflowPath: WorkflowPath, event: WorkflowControlEvent) => + applyWorkflowControlEvent(workflowPath, event).orThrow + case KeyedEvent(_, _: ControllerShutDown) => case KeyedEvent(_, ControllerTestEvent) => @@ -403,6 +416,11 @@ with OrderWatchStateHandler[ControllerStateBuilder] protected def pathToOrderWatchState = pathTo(OrderWatchState) + protected def updateWorkflowControlState(state: WorkflowControlState) = { + _pathToWorkflowControlState(state.workflowPath) = state + this + } + protected def updateOrderWatchStates( orderWatchStates: Iterable[OrderWatchState], remove: Iterable[OrderWatchPath] @@ -430,6 +448,7 @@ with OrderWatchStateHandler[ControllerStateBuilder] repo, pathToSignedSimpleItem.toMap, agentAttachments, + _pathToWorkflowControlState.toMap, deletionMarkedItems.toSet, _idToOrder.toMap) diff --git a/js7-data/shared/src/main/scala/js7/data/delegate/DelegateCouplingState.scala b/js7-data/shared/src/main/scala/js7/data/delegate/DelegateCouplingState.scala index 13d1c9756d..4348d90eca 100644 --- a/js7-data/shared/src/main/scala/js7/data/delegate/DelegateCouplingState.scala +++ b/js7-data/shared/src/main/scala/js7/data/delegate/DelegateCouplingState.scala @@ -4,7 +4,9 @@ import io.circe.generic.extras.Configuration.default.withDefaults import io.circe.generic.extras.semiauto.deriveConfiguredCodec import io.circe.generic.semiauto.deriveCodec import io.circe.{Decoder, Encoder, Json} +import js7.base.annotation.javaApi import js7.base.circeutils.typed.{Subtype, TypedJsonCodec} +import js7.base.utils.ScalaUtils.syntax.RichJavaClass sealed trait DelegateCouplingState @@ -17,7 +19,9 @@ object DelegateCouplingState val restart = Reset(Restart) val byCommand = Reset(ResetCommand) - sealed trait Reason + sealed trait Reason { + @javaApi val string = getClass.simpleScalaName + } /** Initially state. */ case object Fresh extends Reason diff --git a/js7-data/shared/src/main/scala/js7/data/item/ItemAttachedState.scala b/js7-data/shared/src/main/scala/js7/data/item/ItemAttachedState.scala index adbf29134f..01c981fbab 100644 --- a/js7-data/shared/src/main/scala/js7/data/item/ItemAttachedState.scala +++ b/js7-data/shared/src/main/scala/js7/data/item/ItemAttachedState.scala @@ -5,28 +5,26 @@ import js7.base.circeutils.typed.{Subtype, TypedJsonCodec} import js7.base.utils.ScalaUtils.syntax.RichJavaClass sealed trait ItemAttachedState +{ + def isAttachableOrAttached: Boolean +} object ItemAttachedState { - sealed trait NotDetached extends ItemAttachedState - { - def toShortString: String = - getClass.simpleScalaName - } - object NotDetached { - implicit val jsonCodec = TypedJsonCodec[NotDetached]( - Subtype(Attachable), - Subtype(deriveCodec[Attached]), - Subtype(Detachable)) - } case object Attachable - extends NotDetached + extends NotDetached { + def isAttachableOrAttached = true + } final case class Attached(itemRevision: Option[ItemRevision]) - extends NotDetached + extends NotDetached { + def isAttachableOrAttached = true + } object Attached { private val none = new Attached(None) + implicit val jsonCodec = deriveCodec[Attached] + def apply(itemRevision: Option[ItemRevision]): Attached = if (itemRevision.isEmpty) none @@ -35,7 +33,32 @@ object ItemAttachedState } case object Detachable - extends NotDetached + extends NotDetached { + def isAttachableOrAttached = false + } - case object Detached extends ItemAttachedState + case object Detached + extends ItemAttachedState { + def isAttachableOrAttached = false + } + + sealed trait NotDetached + extends ItemAttachedState + { + def toShortString: String = + getClass.simpleScalaName + } + object NotDetached { + implicit val jsonCodec = TypedJsonCodec[NotDetached]( + Subtype(Attachable), + Subtype[Attached], + Subtype(Detachable)) + } + + sealed trait AttachableOrAttached extends NotDetached + object AttachableOrAttached { + implicit val jsonCodec = TypedJsonCodec[NotDetached]( + Subtype(Attachable), + Subtype[Attached]) + } } diff --git a/js7-data/shared/src/main/scala/js7/data/state/StateView.scala b/js7-data/shared/src/main/scala/js7/data/state/StateView.scala index e16a1f83ce..c01b338846 100644 --- a/js7-data/shared/src/main/scala/js7/data/state/StateView.scala +++ b/js7-data/shared/src/main/scala/js7/data/state/StateView.scala @@ -18,7 +18,7 @@ import js7.data.value.expression.scopes.{JobResourceScope, NowScope, OrderScopes import js7.data.workflow.instructions.executable.WorkflowJob import js7.data.workflow.instructions.{BoardInstruction, End} import js7.data.workflow.position.WorkflowPosition -import js7.data.workflow.{Instruction, Workflow, WorkflowId, WorkflowPath} +import js7.data.workflow.{Instruction, Workflow, WorkflowControl, WorkflowControlState, WorkflowId, WorkflowPath} import scala.collection.MapView import scala.reflect.ClassTag @@ -48,6 +48,11 @@ trait StateView extends ItemContainer def workflowPathToId(workflowPath: WorkflowPath): Checked[WorkflowId] + def pathToWorkflowControlState: MapView[WorkflowPath, WorkflowControlState] + + final def pathToWorkflowControl: MapView[WorkflowPath, WorkflowControl] = + pathToWorkflowControlState.view.mapValues(_.workflowControl) + def pathToItemState: MapView[UnsignedSimpleItemPath, UnsignedSimpleItemState] final def pathTo[A <: UnsignedSimpleItemState](A: UnsignedSimpleItemState.Companion[A]) @@ -135,6 +140,10 @@ trait StateView extends ItemContainer (order.state.eq(FailedInFork) || endReached) } + final def isWorkflowSuspended(workflowPath: WorkflowPath): Boolean = + pathToWorkflowControlState.get(workflowPath) + .exists(_.workflowControl.suspended) + /** A pure (stable, repeatable) Scope. */ final def toPureScope(order: Order[Order.State]): Checked[Scope] = for (orderScopes <- toOrderScopes(order)) yield diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/Workflow.scala b/js7-data/shared/src/main/scala/js7/data/workflow/Workflow.scala index d9f7813232..a85ff3ee7c 100644 --- a/js7-data/shared/src/main/scala/js7/data/workflow/Workflow.scala +++ b/js7-data/shared/src/main/scala/js7/data/workflow/Workflow.scala @@ -171,10 +171,13 @@ extends VersionedItem referencedLockAndBoardPaths, referencedJobResourcePaths, calendarPath, - workflowJobs.view.map(_.agentPath).toSet, + referencedAgentPaths, workflowJobs.view.flatMap(_.subagentSelectionId).toSet) } + def referencedAgentPaths: Set[AgentPath] = + workflowJobs.map(_.agentPath).toSet + private[workflow] def workflowJobs: View[WorkflowJob] = keyToJob.values.view diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControl.scala b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControl.scala new file mode 100644 index 0000000000..446ac88731 --- /dev/null +++ b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControl.scala @@ -0,0 +1,14 @@ +package js7.data.workflow + +import io.circe.generic.semiauto.deriveCodec +import js7.data.item.ItemRevision + +final case class WorkflowControl( + path: WorkflowPath, + suspended: Boolean = false, + revision: ItemRevision = ItemRevision(0)) + +object WorkflowControl +{ + implicit val jsonCodec = deriveCodec[WorkflowControl] +} diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlEvent.scala b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlEvent.scala new file mode 100644 index 0000000000..0eec644206 --- /dev/null +++ b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlEvent.scala @@ -0,0 +1,34 @@ +package js7.data.workflow + +import io.circe.generic.semiauto.deriveCodec +import js7.base.circeutils.typed.{Subtype, TypedJsonCodec} +import js7.data.agent.AgentPath +import js7.data.event.Event +import js7.data.item.ItemRevision + +sealed trait WorkflowControlEvent extends Event.ForScala3[WorkflowControlEvent] +{ + val companion = WorkflowControlEvent +} + +object WorkflowControlEvent extends Event.Companion[WorkflowControlEvent] +{ + type Key = WorkflowPath + + final case class WorkflowControlUpdated(suspended: Boolean, revision: ItemRevision) + extends WorkflowControlEvent + + /** + * @param revision if != WorkflowControl.revision, then this event is informational only. + * @param suspended informational only + */ + final case class WorkflowControlAttached( + agentPath: AgentPath, + suspended: Boolean, + revision: ItemRevision) + extends WorkflowControlEvent + + implicit val jsonCodec = TypedJsonCodec[WorkflowControlEvent]( + Subtype(deriveCodec[WorkflowControlUpdated]), + Subtype(deriveCodec[WorkflowControlAttached])) +} diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlState.scala b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlState.scala new file mode 100644 index 0000000000..c279c20d20 --- /dev/null +++ b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlState.scala @@ -0,0 +1,36 @@ +package js7.data.workflow + +import io.circe.generic.semiauto.deriveCodec +import js7.base.problem.Checked +import js7.data.agent.AgentPath +import js7.data.workflow.WorkflowControlEvent.{WorkflowControlAttached, WorkflowControlUpdated} + +final case class WorkflowControlState( + workflowControl: WorkflowControl, + attachedToAgents: Set[AgentPath] = Set.empty) +{ + def workflowPath: WorkflowPath = + workflowControl.path + + def applyEvent(event: WorkflowControlEvent): Checked[WorkflowControlState] = + event match { + case WorkflowControlUpdated(suspended, revision) => + Right(copy( + workflowControl.copy( + suspended = suspended, + revision = revision), + attachedToAgents = Set.empty)) + + case WorkflowControlAttached(agentPath, suspended, revision) => + Right( + if (revision != workflowControl.revision) + this // Ignore late event + else + copy( + attachedToAgents = attachedToAgents + agentPath)) + } +} + +object WorkflowControlState { + implicit val jsonCodec = deriveCodec[WorkflowControlState] +} diff --git a/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlStateHandler.scala b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlStateHandler.scala new file mode 100644 index 0000000000..31010f8b85 --- /dev/null +++ b/js7-data/shared/src/main/scala/js7/data/workflow/WorkflowControlStateHandler.scala @@ -0,0 +1,20 @@ +package js7.data.workflow + +import js7.base.problem.Checked +import js7.base.utils.ScalaUtils.syntax._ + +trait WorkflowControlStateHandler[Self] +{ + this: Self => + + protected def pathToWorkflowControlState: PartialFunction[WorkflowPath, WorkflowControlState] + + protected def updateWorkflowControlState(state: WorkflowControlState): Self + + protected final def applyWorkflowControlEvent(path: WorkflowPath, event: WorkflowControlEvent) + : Checked[Self] = + pathToWorkflowControlState + .getOrElse(path, WorkflowControlState(WorkflowControl(path))) + .applyEvent(event) + .map(updateWorkflowControlState) +} diff --git a/js7-data/shared/src/test/scala/js7/data/controller/ControllerCommandTest.scala b/js7-data/shared/src/test/scala/js7/data/controller/ControllerCommandTest.scala index ad3050483e..3f470d0861 100644 --- a/js7-data/shared/src/test/scala/js7/data/controller/ControllerCommandTest.scala +++ b/js7-data/shared/src/test/scala/js7/data/controller/ControllerCommandTest.scala @@ -365,6 +365,16 @@ final class ControllerCommandTest extends AnyFreeSpec }""") } + "ControlWorkflow" in { + testJson[ControllerCommand]( + ControlWorkflow(WorkflowPath("WORKFLOW"), suspend = true), + json"""{ + "TYPE": "ControlWorkflow", + "workflowPath": "WORKFLOW", + "suspend": true + }""") + } + "ClusterAppointNodes" in { testJson[ControllerCommand]( ClusterAppointNodes( diff --git a/js7-data/shared/src/test/scala/js7/data/controller/ControllerStateTest.scala b/js7-data/shared/src/test/scala/js7/data/controller/ControllerStateTest.scala index 8183c5c5d1..232773a3d6 100644 --- a/js7-data/shared/src/test/scala/js7/data/controller/ControllerStateTest.scala +++ b/js7-data/shared/src/test/scala/js7/data/controller/ControllerStateTest.scala @@ -36,7 +36,7 @@ import js7.data.value.expression.ExpressionParser.expr import js7.data.workflow.instructions.executable.WorkflowJob import js7.data.workflow.instructions.{Execute, ExpectNotices, LockInstruction} import js7.data.workflow.position.Position -import js7.data.workflow.{Workflow, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControl, WorkflowControlState, WorkflowPath} import js7.tester.CirceJsonTester.testJson import js7.tester.DiffxAssertions.assertEqual import monix.execution.Scheduler.Implicits.traced @@ -48,7 +48,7 @@ import org.scalatest.freespec.AsyncFreeSpec final class ControllerStateTest extends AsyncFreeSpec { "estimatedSnapshotSize" in { - assert(controllerState.estimatedSnapshotSize == 20) + assert(controllerState.estimatedSnapshotSize == 21) for (n <- controllerState.toSnapshotObservable.countL.runToFuture) yield assert(controllerState.estimatedSnapshotSize == n) } @@ -97,10 +97,11 @@ final class ControllerStateTest extends AsyncFreeSpec HasOrder(OrderId("ORDER"), Some(Vanished))), SignedItemAdded(signedJobResource), VersionAdded(versionId), - VersionedItemAdded(signedWorkflow), + VersionedItemAdded(signedWorkflow)) ++ + controllerState.pathToWorkflowControlState_.values ++ + Seq( ItemAttachable(jobResource.path, agentRef.path), - ItemDeletionMarked(fileWatch.path) - ) ++ + ItemDeletionMarked(fileWatch.path)) ++ controllerState.idToOrder.values) } @@ -299,6 +300,14 @@ final class ControllerStateTest extends AsyncFreeSpec }, "string": "{\"TYPE\":\"Workflow\",\"path\":\"WORKFLOW\",\"versionId\":\"1.0\",\"instructions\":[{\"TYPE\":\"Lock\",\"lockPath\":\"LOCK\",\"lockedWorkflow\":{\"instructions\":[{\"TYPE\":\"Execute.Anonymous\",\"job\":{\"agentPath\":\"AGENT\",\"subagentSelectionId\":\"SELECTION\",\"executable\":{\"TYPE\":\"ShellScriptExecutable\",\"script\":\"\"},\"jobResourcePaths\":[\"JOB-RESOURCE\"],\"parallelism\":1}}]}},{\"TYPE\":\"ExpectNotices\",\"boardPaths\":\"'BOARD'\"}]}" } + }, { + "TYPE": "WorkflowControlState", + "workflowControl": { + "path": "WORKFLOW", + "suspended": true, + "revision": 0 + }, + "attachedToAgents": [] }, { "TYPE": "ItemAttachable", "delegateId": "Agent:AGENT", @@ -433,6 +442,49 @@ final class ControllerStateTest extends AsyncFreeSpec itemRevision = Some(ItemRevision(2)))) } } + + "workflowControlPathToIgnorantAgent" in { + assert(ControllerState.workflowControlPathToIgnorantAgent(Nil, Map.empty) == Map.empty) + + val aWorkflowPath = WorkflowPath("A") + val bWorkflowPath = WorkflowPath("B") + val cWorkflowPath = WorkflowPath("C") + val aAgentPath = AgentPath("A") + val bAgentPath = AgentPath("B") + val cAgentPath = AgentPath("C") + + val orders = Seq( + Order(OrderId("X"), aWorkflowPath ~ "1", Order.Ready), + Order(OrderId("A"), aWorkflowPath ~ "1", Order.Ready, + attachedState = Some(Order.Attaching(aAgentPath))), + Order(OrderId("B"), bWorkflowPath ~ "1", Order.Ready, + attachedState = Some(Order.Attached(bAgentPath))), + Order(OrderId("C"), cWorkflowPath ~ "1", Order.Ready, + attachedState = Some(Order.Detaching(cAgentPath)))) + + assert( + ControllerState.workflowControlPathToIgnorantAgent( + orders, + Seq( + WorkflowControlState(WorkflowControl(aWorkflowPath)), + WorkflowControlState(WorkflowControl(bWorkflowPath)), + ).toKeyedMap(_.workflowPath) + ) == + Map( + aWorkflowPath -> Set(aAgentPath), + bWorkflowPath -> Set(bAgentPath))) + + assert( + ControllerState.workflowControlPathToIgnorantAgent( + orders, + Seq( + WorkflowControlState(WorkflowControl(aWorkflowPath), Set(aAgentPath, bAgentPath)), + WorkflowControlState(WorkflowControl(bWorkflowPath), Set(aAgentPath)), + ).toKeyedMap(_.workflowPath) + ) == + Map( + bWorkflowPath -> Set(bAgentPath))) + } } object ControllerStateTest @@ -533,6 +585,9 @@ object ControllerStateTest jobResource.path -> signedJobResource), ClientAttachments(Map( jobResource.path -> Map(agentRef.path -> Attachable))), + pathToWorkflowControlState_ = Map( + workflow.path -> WorkflowControlState(WorkflowControl(workflow.path, suspended = true)), + ), deletionMarkedItems = Set(fileWatch.path), Seq( Order(orderId, workflow.id /: Position(0), Order.Fresh, diff --git a/js7-data/shared/src/test/scala/js7/data/delegate/DelegateCouplingStateTest.scala b/js7-data/shared/src/test/scala/js7/data/delegate/DelegateCouplingStateTest.scala index 6cc4bdac04..cc72e6c386 100644 --- a/js7-data/shared/src/test/scala/js7/data/delegate/DelegateCouplingStateTest.scala +++ b/js7-data/shared/src/test/scala/js7/data/delegate/DelegateCouplingStateTest.scala @@ -68,4 +68,26 @@ final class DelegateCouplingStateTest extends AnyFreeSpec "TYPE": "ShutDown" }""") } + + "For Java" - { + "Reset.Fresh.string" in { + val s = DelegateCouplingState.Reset(DelegateCouplingState.Reset.Fresh) + assert(s.reason.string == "Fresh") + } + + "Reset.Shutdown.string" in { + val s = DelegateCouplingState.Reset(DelegateCouplingState.Reset.Shutdown) + assert(s.reason.string == "Shutdown") + } + + "Reset.Restart.string" in { + val s = DelegateCouplingState.Reset(DelegateCouplingState.Reset.Restart) + assert(s.reason.string == "Restart") + } + + "Reset.ResetCommand.string" in { + val s = DelegateCouplingState.Reset(DelegateCouplingState.Reset.ResetCommand) + assert(s.reason.string == "ResetCommand") + } + } } diff --git a/js7-data/shared/src/test/scala/js7/data/state/TestStateView.scala b/js7-data/shared/src/test/scala/js7/data/state/TestStateView.scala index 087062bc72..82b3132e89 100644 --- a/js7-data/shared/src/test/scala/js7/data/state/TestStateView.scala +++ b/js7-data/shared/src/test/scala/js7/data/state/TestStateView.scala @@ -10,7 +10,7 @@ import js7.data.event.{Event, EventDrivenState, KeyedEvent} import js7.data.item.{InventoryItem, InventoryItemKey, UnsignedSimpleItemPath, UnsignedSimpleItemState} import js7.data.lock.LockPath import js7.data.order.{Order, OrderEvent, OrderId} -import js7.data.workflow.{Workflow, WorkflowId, WorkflowPath} +import js7.data.workflow.{Workflow, WorkflowControlState, WorkflowId, WorkflowPath} import scala.collection.MapView case class TestStateView( @@ -18,7 +18,8 @@ case class TestStateView( controllerId: ControllerId = ControllerId("CONTROLLER"), idToOrder: Map[OrderId, Order[Order.State]] = new NotImplementedMap, idToWorkflow: PartialFunction[WorkflowId, Workflow] = new NotImplementedMap, - pathToItemState_ : Map[UnsignedSimpleItemPath, UnsignedSimpleItemState] = new NotImplementedMap) + pathToItemState_ : Map[UnsignedSimpleItemPath, UnsignedSimpleItemState] = new NotImplementedMap, + pathToWorkflowControlState_ : Map[WorkflowPath, WorkflowControlState] = Map.empty) extends EventDrivenStateView[TestStateView, Event] { val companion = TestStateView @@ -36,6 +37,8 @@ extends EventDrivenStateView[TestStateView, Event] def workflowPathToId(workflowPath: WorkflowPath) = Left(Problem.pure("workflowPathToId is not implemented")) + def pathToWorkflowControlState = pathToWorkflowControlState_.view + def pathToItemState = pathToItemState_.view lazy val keyToItem: MapView[InventoryItemKey, InventoryItem] = diff --git a/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlEventTest.scala b/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlEventTest.scala new file mode 100644 index 0000000000..a43bb4f426 --- /dev/null +++ b/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlEventTest.scala @@ -0,0 +1,32 @@ +package js7.data.workflow + +import js7.base.circeutils.CirceUtils.JsonStringInterpolator +import js7.data.agent.AgentPath +import js7.data.item.ItemRevision +import js7.data.workflow.WorkflowControlEvent.{WorkflowControlAttached, WorkflowControlUpdated} +import js7.tester.CirceJsonTester +import org.scalatest.freespec.AnyFreeSpec + +final class WorkflowControlEventTest extends AnyFreeSpec +{ + "WorkflowControlUpdated" in { + CirceJsonTester.testJson[WorkflowControlEvent]( + WorkflowControlUpdated(suspended = true, ItemRevision(123)), + json"""{ + "TYPE": "WorkflowControlUpdated", + "suspended": true, + "revision": 123 + }""") + } + + "WorkflowControlAttached" in { + CirceJsonTester.testJson[WorkflowControlEvent]( + WorkflowControlAttached(AgentPath("AGENT"), true, ItemRevision(123)), + json"""{ + "TYPE": "WorkflowControlAttached", + "agentPath": "AGENT", + "suspended": true, + "revision": 123 + }""") + } +} diff --git a/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlStateTest.scala b/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlStateTest.scala new file mode 100644 index 0000000000..59f59429d8 --- /dev/null +++ b/js7-data/shared/src/test/scala/js7/data/workflow/WorkflowControlStateTest.scala @@ -0,0 +1,24 @@ +package js7.data.workflow + +import js7.base.circeutils.CirceUtils.JsonStringInterpolator +import js7.data.agent.AgentPath +import js7.tester.CirceJsonTester.testJson +import org.scalatest.freespec.AnyFreeSpec + +final class WorkflowControlStateTest extends AnyFreeSpec +{ + "JSON" in { + testJson( + WorkflowControlState( + WorkflowControl(WorkflowPath("WORKFLOW")), + Set(AgentPath("AGENT"))), + json"""{ + "workflowControl": { + "path": "WORKFLOW", + "suspended": false, + "revision": 0 + }, + "attachedToAgents": [ "AGENT" ] + }""") + } +} diff --git a/js7-tests/src/test/scala/js7/tests/SuspendWorkflowTest.scala b/js7-tests/src/test/scala/js7/tests/SuspendWorkflowTest.scala new file mode 100644 index 0000000000..ecd0cc931b --- /dev/null +++ b/js7-tests/src/test/scala/js7/tests/SuspendWorkflowTest.scala @@ -0,0 +1,220 @@ +package js7.tests + +import cats.syntax.parallel._ +import java.util.concurrent.TimeoutException +import js7.agent.RunningAgent +import js7.agent.data.event.AgentEvent.AgentReady +import js7.base.configutils.Configs.HoconStringInterpolator +import js7.base.thread.Futures.implicits.SuccessFuture +import js7.base.thread.MonixBlocking.syntax.RichTask +import js7.base.time.ScalaTime._ +import js7.controller.RunningController +import js7.data.agent.AgentPath +import js7.data.controller.ControllerCommand +import js7.data.event.EventId +import js7.data.item.BasicItemEvent.ItemDetached +import js7.data.item.ItemOperation.{AddVersion, RemoveVersioned} +import js7.data.item.{ItemRevision, VersionId} +import js7.data.order.OrderEvent.{OrderAttached, OrderFinished, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderStarted, OrderStdoutWritten} +import js7.data.order.{FreshOrder, OrderId} +import js7.data.value.expression.ExpressionParser.expr +import js7.data.workflow.WorkflowControlEvent.{WorkflowControlAttached, WorkflowControlUpdated} +import js7.data.workflow.instructions.Prompt +import js7.data.workflow.{Workflow, WorkflowControl, WorkflowControlState, WorkflowPath} +import js7.proxy.ControllerApi +import js7.tests.SuspendWorkflowTest._ +import js7.tests.jobs.SemaphoreJob +import js7.tests.testenv.DirectoryProviderForScalaTest +import monix.execution.Scheduler.Implicits.traced +import monix.reactive.Observable +import org.scalatest.freespec.AnyFreeSpec + +final class SuspendWorkflowTest extends AnyFreeSpec with DirectoryProviderForScalaTest +{ + override protected val controllerConfig = config""" + js7.auth.users.TEST-USER.permissions = [ UpdateItem ] + js7.controller.agent-driver.command-batch-delay = 0ms + js7.controller.agent-driver.event-buffer-delay = 0ms + """ + override protected val agentConfig = config""" + js7.job.execution.signed-script-injection-allowed = on + """ + + protected val agentPaths = Seq(aAgentPath, bAgentPath) + protected val items = Seq(aWorkflow, bWorkflow) + + private var controller: RunningController = null + private var aAgent: RunningAgent = null + private var bAgent: RunningAgent = null + + private def eventWatch = controller.eventWatch + + override def afterAll() = { + Seq(aAgent, bAgent) + .flatMap(Option(_)).parTraverse(_.terminate()).await(99.s) + controller.terminate(suppressSnapshot = true).await(99.s) + super.afterAll() + } + + "UpdateWorkflowController suspend=true" in { + aAgent = directoryProvider.startAgent(aAgentPath).await(99.s) + controller = directoryProvider.startController().await(99.s) + implicit val controllerApi = directoryProvider.newControllerApi(controller) + + var eventId = suspendWorkflow(aWorkflow.path, true, ItemRevision(1)) + + val aOrderId = OrderId("A") + controllerApi.addOrder(FreshOrder(aOrderId, aWorkflow.path, deleteWhenTerminated = true)) + .await(99.s) + intercept[TimeoutException]( + eventWatch.await[OrderStarted](_.key == aOrderId, after = eventId, timeout = 500.ms)) + + eventId = suspendWorkflow(aWorkflow.path, false, ItemRevision(2)) + eventId = eventWatch.await[OrderPrompted](_.key == aOrderId, after = eventId).head.eventId + + eventId = suspendWorkflow(aWorkflow.path, true, ItemRevision(3)) + + controllerApi.executeCommand(ControllerCommand.AnswerOrderPrompt(aOrderId)).await(99.s) + // OrderPromptAnswered happens despite suspended Workflow + eventId = eventWatch.await[OrderPromptAnswered](_.key == aOrderId, after = eventId) + .head.eventId + + intercept[TimeoutException] { + eventWatch.await[OrderAttached](_.key == aOrderId, after = eventId, timeout = 500.ms) + } + + eventId = suspendWorkflow(aWorkflow.path, false, ItemRevision(4)) + + eventId = eventWatch + .await[OrderStdoutWritten](_ == aOrderId <-: OrderStdoutWritten("ASemaphoreJob\n"), after = eventId) + .head.eventId + + eventId = suspendWorkflow(aWorkflow.path, true, ItemRevision(5)) + assert(eventWatch.await[WorkflowControlAttached](after = eventId).map(_.value) == Seq( + aWorkflow.path <-: WorkflowControlAttached(aAgentPath, suspended = true, ItemRevision(5)))) + + ASemaphoreJob.continue() + intercept[TimeoutException] { + eventWatch.await[OrderProcessingStarted](_.key == aOrderId, after = eventId, timeout = 500.ms) + } + + eventId = suspendWorkflow(aWorkflow.path, false, ItemRevision(6)) + eventWatch.await[OrderFinished](_.key == aOrderId, after = eventId) + + controller.controllerState.await(99.s).pathToWorkflowControlState_(aWorkflow.path) == + WorkflowControlState( + WorkflowControl( + aWorkflow.path, + suspended = false, + revision = ItemRevision(6)), + attachedToAgents = Set(aAgentPath)) + + suspendWorkflow(aWorkflow.path, true, ItemRevision(7)) + controllerApi.stop.await(99.s) + } + + "After Controller recovery, the WorkflowControl is attached to the remaining Agents" in { + val bOrderId = OrderId("B") + bAgent = directoryProvider.startAgent(bAgentPath).await(99.s) + var agentEventId = bAgent.eventWatch.await[AgentReady]().last.eventId + + locally { + val eventWatch = controller.eventWatch + val eventId = eventWatch.lastAddedEventId + implicit val controllerApi = directoryProvider.newControllerApi(controller) + controllerApi.addOrder(FreshOrder(bOrderId, bWorkflow.path, deleteWhenTerminated = true)) + .await(99.s) + eventWatch + .await[OrderStdoutWritten](_ == bOrderId <-: OrderStdoutWritten("B1SemaphoreJob\n"), after = eventId) + + val terminated = bAgent.terminate().runToFuture + B1SemaphoreJob.continue() + terminated.await(99.s) + + suspendWorkflow(bWorkflow.path, false, ItemRevision(1)) + } + + controller.terminate().await(99.s) + + bAgent = directoryProvider.startAgent(bAgentPath).await(99.s) + bAgent.eventWatch.await[AgentReady](after = agentEventId) + agentEventId = bAgent.eventWatch + .await[OrderStdoutWritten](_ == bOrderId <-: OrderStdoutWritten("B2SemaphoreJob\n"), after = agentEventId) + .head.eventId + + controller = directoryProvider.startController().await(99.s) + implicit val controllerApi = directoryProvider.newControllerApi(controller) + val eventWatch = controller.eventWatch + val eventId = eventWatch.lastAddedEventId + + assert( + eventWatch.await[WorkflowControlAttached](after = eventId).map(_.value) == Seq( + bWorkflow.path <-: WorkflowControlAttached(bAgentPath, suspended = false, ItemRevision(1)))) + + suspendWorkflow(bWorkflow.path, false, ItemRevision(2)) + B2SemaphoreJob.continue() + eventWatch.await[OrderFinished](_.key == bOrderId) + + controller.controllerState.await(99.s).pathToWorkflowControlState_(bWorkflow.path) == + WorkflowControlState( + WorkflowControl(bWorkflow.path, suspended = true, ItemRevision(1)), + attachedToAgents = Set(bAgentPath)) + } + + "WorkflowControl disappears with the last Workflow version" in { + val controllerApi = directoryProvider.newControllerApi(controller) + val eventId = eventWatch.lastAddedEventId + + controllerApi + .updateItems(Observable( + AddVersion(VersionId("DELETE")), + RemoveVersioned(aWorkflow.path), + RemoveVersioned(bWorkflow.path))) + .await(99.s) + assert(eventWatch.await[ItemDetached](_.event.key == aWorkflow.id, after = eventId) + .head.value.event + == ItemDetached(aWorkflow.id, aAgentPath)) + assert(eventWatch.await[ItemDetached](_.event.key == bWorkflow.id, after = eventId) + .head.value.event + == ItemDetached(bWorkflow.id, bAgentPath)) + + assert(aAgent.currentAgentState().pathToWorkflowControlState.isEmpty) + assert(controller.controllerState.await(99.s).pathToWorkflowControlState.isEmpty) + } + + private def suspendWorkflow(workflowPath: WorkflowPath, suspend: Boolean, revision: ItemRevision) + (implicit controllerApi: ControllerApi) + : EventId = { + val eventId = eventWatch.lastAddedEventId + controllerApi + .executeCommand(ControllerCommand.ControlWorkflow(workflowPath, suspend = suspend)) + .await(99.s) + val keyedEvents = eventWatch.await[WorkflowControlUpdated](after = eventId) + assert(keyedEvents.map(_.value) == Seq( + workflowPath <-: WorkflowControlUpdated(suspend, revision))) + keyedEvents.last.eventId + } +} + +object SuspendWorkflowTest +{ + private val aAgentPath = AgentPath("A-AGENT") + private val bAgentPath = AgentPath("B-AGENT") + + private val aWorkflow = Workflow(WorkflowPath("A-WORKFLOW") ~ VersionId("INITIAL"), Seq( + Prompt(expr("'PROMPT'")), + ASemaphoreJob.execute(aAgentPath))) + + private val bWorkflow = Workflow(WorkflowPath("B-WORKFLOW") ~ VersionId("INITIAL"), Seq( + B1SemaphoreJob.execute(bAgentPath), + B2SemaphoreJob.execute(bAgentPath))) + + final class ASemaphoreJob extends SemaphoreJob(ASemaphoreJob) + object ASemaphoreJob extends SemaphoreJob.Companion[ASemaphoreJob] + + final class B1SemaphoreJob extends SemaphoreJob(B1SemaphoreJob) + object B1SemaphoreJob extends SemaphoreJob.Companion[B1SemaphoreJob] + + final class B2SemaphoreJob extends SemaphoreJob(B2SemaphoreJob) + object B2SemaphoreJob extends SemaphoreJob.Companion[B2SemaphoreJob] +}