Skip to content

Commit

Permalink
JS-2157 FIX: OrderLocksReleased in stopOnFailure does not wake waitin…
Browse files Browse the repository at this point in the history
…g orders
  • Loading branch information
Zschimmer committed Aug 21, 2024
1 parent 319c936 commit 661a05d
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import js7.data.item.{InventoryItem, InventoryItemEvent, InventoryItemKey, Simpl
import js7.data.job.JobResource
import js7.data.lock.LockState
import js7.data.order.Order.State
import js7.data.order.OrderEvent.{OrderAdded, OrderAwoke, OrderBroken, OrderCoreEvent, OrderDeleted, OrderDetached, OrderForked, OrderLockEvent, OrderMoved, OrderOrderAdded, OrderProcessed, OrderTransferred}
import js7.data.order.OrderEvent.{OrderAdded, OrderAwoke, OrderBroken, OrderCoreEvent, OrderDeleted, OrderDetached, OrderForked, OrderLocksReleased, OrderMoved, OrderOrderAdded, OrderProcessed, OrderTransferred}
import js7.data.order.{FreshOrder, Order, OrderEvent, OrderId, OrderOutcome}
import js7.data.orderwatch.ExternalOrderKey
import js7.data.subagent.SubagentItemState
Expand Down Expand Up @@ -204,6 +204,7 @@ final case class ControllerStateExecutor private(

case KeyedEvent(orderId: OrderId, event: OrderEvent) =>
touchedOrderIds += orderId
touchedOrderIds ++= keyedEventToPendingOrderIds(orderId <-: event)
event match
case OrderDeleted | _: OrderTransferred =>
detachWorkflowCandidates += previous.idToOrder(orderId).workflowId
Expand Down Expand Up @@ -552,7 +553,7 @@ final case class ControllerStateExecutor private(
case OrderLocksReleased(lockPaths) =>
lockPaths.view
.flatMap(controllerState.keyTo(LockState).get)
.flatMap(_.firstQueuedOrderId)
.flatMap(_.queue)

case OrderForked(children) =>
children.view.map(_.orderId)
Expand Down
3 changes: 0 additions & 3 deletions js7-data/shared/src/main/scala/js7/data/lock/LockState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ extends UnsignedSimpleItemState, Big/*acquired and queue get big, many orders*/:
def release(orderId: OrderId): Checked[LockState] =
toLockState(acquired.release(orderId))

def firstQueuedOrderId: Option[OrderId] =
queue.headOption

private def checkLimit(count: Option[Int]): Either[LockRefusal, Unit] =
count match
case None =>
Expand Down
254 changes: 229 additions & 25 deletions js7-tests/src/test/scala/js7/tests/LockTest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package js7.tests

import fs2.Stream
import java.nio.file.Files.delete
import js7.base.configutils.Configs.*
import js7.base.io.file.FileUtils.{touchFile, withTemporaryFile}
Expand All @@ -26,16 +27,14 @@ import js7.data.value.StringValue
import js7.data.value.ValuePrinter.quoteString
import js7.data.value.expression.Expression.{NumericConstant, StringConstant}
import js7.data.value.expression.ExpressionParser.expr
import js7.data.workflow.instructions.{Fail, Finish, Fork, LockInstruction, Prompt, Retry, TryInstruction}
import js7.data.workflow.instructions.{Fail, Finish, Fork, LockInstruction, Options, Prompt, Retry, TryInstruction}
import js7.data.workflow.position.BranchPath.syntax.*
import js7.data.workflow.position.{BranchId, Position}
import js7.data.workflow.{Workflow, WorkflowId, WorkflowParser, WorkflowPath}
import js7.tests.LockTest.*
import js7.tests.jobs.{EmptyJob, FailingJob, SemaphoreJob, SleepJob}
import js7.tests.testenv.DirectoryProvider.{toLocalSubagentId, waitingForFileScript}
import js7.tests.testenv.{BlockingItemUpdater, ControllerAgentForScalaTest}
import cats.effect.unsafe.IORuntime
import fs2.Stream
import scala.collection.immutable.Queue
import scala.util.Random

Expand Down Expand Up @@ -151,49 +150,60 @@ final class LockTest extends OurTestSuite, ControllerAgentForScalaTest, Blocking

"After releasing a lock of 2, two orders with count=1 each start simultaneously" in:
val workflow1 = updateItem(Workflow(
WorkflowPath("FINISH-IN-FORK"),
Seq(
WorkflowPath("RELEASE-2"),
Seq:
LockInstruction.single(
limit2LockPath,
count = Some(1),
lockedWorkflow = Workflow.of(
SleepJob.execute(agentPath, processLimit = 99, arguments = Map(
"sleep" -> expr("0.050"))))))))
lockedWorkflow = Workflow.of:
Prompt(expr("'PROMPT'")))))

val workflow2 = updateItem(Workflow(
WorkflowPath("WORKFLOW-2"),
Seq(
Seq:
LockInstruction.single(
limit2LockPath,
count = Some(2),
lockedWorkflow = Workflow.of(
SleepJob.execute(agentPath, processLimit = 99, arguments = Map(
"sleep" -> expr("0.100"))))))))
lockedWorkflow = Workflow.of:
Prompt(expr("'PROMPT'")))))

val order2Id = OrderId("🟥-TWO")
val aOrderId = OrderId("🟥-A")
val bOrderId = OrderId("🟥-B")
val cOrderId = OrderId("🟥-C")
controller.api.addOrder(FreshOrder(order2Id, workflow2.path, deleteWhenTerminated = true))
.await(99.s).orThrow
controller.eventWatch.await[OrderLocksAcquired](_.key == order2Id)
controller.api.addOrder(FreshOrder(aOrderId, workflow1.path, deleteWhenTerminated = true))
.await(99.s).orThrow
controller.api.addOrder(FreshOrder(bOrderId, workflow1.path, deleteWhenTerminated = true))
.await(99.s).orThrow
controller.eventWatch.awaitNext[OrderLocksAcquired](_.key == order2Id)

for orderId <- Seq(aOrderId, bOrderId, cOrderId) do
controller.api.addOrder(FreshOrder(orderId, workflow1.path, deleteWhenTerminated = true))
.await(99.s).orThrow
controller.eventWatch.awaitNext[OrderLocksQueued](_.key == orderId)

controller.api.executeCommand(AnswerOrderPrompt(order2Id)).await(99.s).orThrow
controller.eventWatch.awaitNext[OrderLocksReleased](_.key == order2Id)

controller.eventWatch.await[OrderLocksAcquired](_.key == aOrderId)
controller.eventWatch.await[OrderLocksAcquired](_.key == bOrderId)
controller.eventWatch.await[OrderPrompted](_.key == aOrderId)
controller.eventWatch.await[OrderPrompted](_.key == bOrderId)
controller.api.executeCommand(AnswerOrderPrompt(aOrderId)).await(99.s).orThrow
controller.api.executeCommand(AnswerOrderPrompt(bOrderId)).await(99.s).orThrow

controller.eventWatch.awaitNext[OrderLocksAcquired](_.key == cOrderId)
controller.api.executeCommand(AnswerOrderPrompt(cOrderId)).await(99.s).orThrow

controller.eventWatch.await[OrderTerminated](_.key == aOrderId)
controller.eventWatch.await[OrderTerminated](_.key == bOrderId)
val stampedEvents = controller.eventWatch.allStamped[OrderLockEvent]
val aAquired = stampedEvents.find(stamped => stamped.value.key == aOrderId && stamped.value.event.isInstanceOf[OrderLocksAcquired]).get
val bAquired = stampedEvents.find(stamped => stamped.value.key == bOrderId && stamped.value.event.isInstanceOf[OrderLocksAcquired]).get
val aReleased = stampedEvents.find(stamped => stamped.value.key == aOrderId && stamped.value.event.isInstanceOf[OrderLocksReleased]).get
val bReleased = stampedEvents.find(stamped => stamped.value.key == aOrderId && stamped.value.event.isInstanceOf[OrderLocksReleased]).get
assert(aAquired.eventId < bReleased.eventId, "- a acquired lock after b released")
assert(bAquired.eventId < aReleased.eventId, "- b acquired lock after a released")
controller.eventWatch.await[OrderTerminated](_.key == cOrderId)

assert(controllerState.keyTo(LockState)(limit2LockPath) ==
LockState(
Lock(limit2LockPath, limit = 2, itemRevision = Some(ItemRevision(0))),
Available,
Queue.empty))

deleteItems(workflow1.path, workflow2.path)

"Multiple orders with count=1 and count=2 finish" in:
val workflow1 = updateItem(Workflow.of(workflow1Path,
Expand Down Expand Up @@ -233,7 +243,7 @@ final class LockTest extends OurTestSuite, ControllerAgentForScalaTest, Blocking

"Failed order" in:
val workflow = updateItem(Workflow(
WorkflowPath("FINISH-IN-FORK"),
WorkflowPath("FAILED-ORDER"),
Seq(
EmptyJob.execute(agentPath), // Complicate with a Lock at non-first position
LockInstruction.single(
Expand Down Expand Up @@ -292,6 +302,8 @@ final class LockTest extends OurTestSuite, ControllerAgentForScalaTest, Blocking
Lock(lockPath, itemRevision = Some(ItemRevision(0))),
Available, Queue.empty))

deleteItems(workflow.path)

"Failed order in try/catch" in:
val workflow = defineWorkflow(workflowNotation = """
define workflow {
Expand Down Expand Up @@ -980,6 +992,198 @@ final class LockTest extends OurTestSuite, ControllerAgentForScalaTest, Blocking
eventWatch.await[OrderDeleted](_.key == orderId)
}

"OrderLockReleased wakes waiting orders (JS-2157)" - {
"After OrderLocksReleased continue waiting Order" in:
val workflow = Workflow(
WorkflowPath("LOCK-CONTINUE-QUEUED"),
Seq:
LockInstruction.single(lockPath,
count = None,
lockedWorkflow = Workflow.of(
Prompt(expr("'PROMPT'")))))

withTemporaryItem(workflow, awaitDeletion = true): workflow =>
val eventId = eventWatch.lastAddedEventId

val aOrderId = OrderId("LOCK-CONTINUE-QUEUED-A")
controller.api
.addOrder:
FreshOrder(aOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksAcquired](_.key == aOrderId)

val bOrderId = OrderId("LOCK-CONTINUE-QUEUED-B")
controller.api
.addOrder:
FreshOrder(bOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksQueued](_.key == bOrderId)

controller.api.executeCommand(AnswerOrderPrompt(aOrderId)).await(99.s).orThrow
eventWatch.awaitNext[OrderLocksReleased](_.key == aOrderId)
eventWatch.await[OrderTerminated](_.key == aOrderId, after = eventId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == aOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderPrompted(StringValue("PROMPT")),
OrderPromptAnswered(),
OrderMoved(Position(0) / "lock" % 1),
OrderLocksReleased(List(lockPath)),
OrderFinished(),
OrderDeleted))

eventWatch.awaitNext[OrderLocksAcquired](_.key == bOrderId)
controller.api.executeCommand(AnswerOrderPrompt(bOrderId)).await(99.s).orThrow
eventWatch.awaitNext[OrderTerminated](_.key == bOrderId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == bOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderLocksQueued(List(LockDemand(lockPath))),
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderPrompted(StringValue("PROMPT")),
OrderPromptAnswered(),
OrderMoved(Position(0) / "lock" % 1),
OrderLocksReleased(List(lockPath)),
OrderFinished(),
OrderDeleted))

"After OrderLocksReleased due to cancellation in Prompt continue waiting Order" in :
val workflow = Workflow(
WorkflowPath("LOCK-CONTINUE-QUEUED"),
Seq:
LockInstruction.single(lockPath,
count = None,
lockedWorkflow = Workflow.of(
Prompt(expr("'PROMPT'")),
Fail())))

withTemporaryItem(workflow, awaitDeletion = true): workflow =>
val eventId = eventWatch.lastAddedEventId

val aOrderId = OrderId("LOCK-CONTINUE-QUEUED-A")
controller.api
.addOrder:
FreshOrder(aOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksAcquired](_.key == aOrderId)

val bOrderId = OrderId("LOCK-CONTINUE-QUEUED-B")
controller.api
.addOrder:
FreshOrder(bOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksQueued](_.key == bOrderId)

controller.api.executeCommand(CancelOrders(Seq(aOrderId))).await(99.s).orThrow
eventWatch.awaitNext[OrderLocksReleased](_.key == aOrderId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == aOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderPrompted(StringValue("PROMPT")),
OrderOperationCancelled,
OrderLocksReleased(List(lockPath)),
OrderCancelled,
OrderDeleted))

eventWatch.awaitNext[OrderLocksAcquired](_.key == bOrderId)
controller.api.executeCommand(CancelOrders(Seq(bOrderId))).await(99.s).orThrow

eventWatch.await[OrderTerminated](_.key == aOrderId, after = eventId)
eventWatch.await[OrderTerminated](_.key == bOrderId, after = eventId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == bOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderLocksQueued(List(LockDemand(lockPath))),
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderPrompted(StringValue("PROMPT")),
OrderOperationCancelled,
OrderLocksReleased(List(lockPath)),
OrderCancelled,
OrderDeleted))

"After OrderLocksReleased in stopOnFailure-block continue waiting Order" in:
val workflow = Workflow(
WorkflowPath("STOP-ON-FAILURE-LOCK"),
Seq:
Options(stopOnFailure = true):
LockInstruction.single(lockPath,
count = None,
lockedWorkflow = Workflow.of(
Fail())))

withTemporaryItem(workflow, awaitDeletion = true): workflow =>
val eventId = eventWatch.lastAddedEventId

val aOrderId = OrderId("STOP-ON-FAILURE-LOCK-A")
controller.api
.addOrder:
FreshOrder(aOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksAcquired](_.key == aOrderId)

val bOrderId = OrderId("STOP-ON-FAILURE-LOCK-B")
controller.api
.addOrder:
FreshOrder(bOrderId, workflow.path, deleteWhenTerminated = true)
.await(99.s).orThrow

eventWatch.awaitNext[OrderLocksQueued](_.key == bOrderId)

controller.api.executeCommand(CancelOrders(Seq(aOrderId))).await(99.s).orThrow
eventWatch.awaitNext[OrderTerminated](_.key == aOrderId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == aOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderMoved(Position(0) / "options" % 0),
OrderStarted,
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderOutcomeAdded(OrderOutcome.failed),
OrderStopped,
OrderLocksReleased(List(lockPath)),
OrderCancelled,
OrderDeleted))

eventWatch.awaitNext[OrderLocksAcquired](_.key == bOrderId)
eventWatch.awaitNext[OrderStopped](_.key == bOrderId)
controller.api.executeCommand(CancelOrders(Seq(bOrderId))).await(99.s).orThrow
eventWatch.awaitNext[OrderTerminated](_.key == bOrderId)

locally:
val keyedEvents = eventWatch.keyedEvents[OrderEvent](_.key == bOrderId, after = eventId)
assert(keyedEvents.map(_.event) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderMoved(Position(0) / "options" % 0),
OrderStarted,
OrderLocksQueued(List(LockDemand(lockPath))),
OrderLocksAcquired(List(LockDemand(lockPath))),
OrderOutcomeAdded(OrderOutcome.failed),
OrderStopped,
OrderLocksReleased(List(lockPath)),
OrderCancelled,
OrderDeleted))
}

"Lock is not deletable while in use by a Workflow" in:
val workflow = defineWorkflow(workflowNotation = """
define workflow {
Expand Down

0 comments on commit 661a05d

Please sign in to comment.