Skip to content

Commit

Permalink
Merge branch 'refs/heads/release/2.6'
Browse files Browse the repository at this point in the history
# Conflicts:
#	js7-tests/src/test/scala/js7/tests/order/SuspendResumeOrdersTest.scala
  • Loading branch information
Zschimmer committed Apr 5, 2024
2 parents fba796e + afb1122 commit 58095ca
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 3 deletions.
10 changes: 10 additions & 0 deletions js7-data/shared/src/main/scala/js7/data/order/Order.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ final case class Order[+S <: Order.State](
copy(
state = if isState[Fresh] then FailedWhileFresh else Failed,
workflowPosition = workflowPosition.copy(position = movedTo),
mark = mark match {
case Some(_: OrderMark.Suspending) => None
case o => o
},
historicOutcomes = outcome_.fold(historicOutcomes)(o => historicOutcomes :+ HistoricOutcome(position, o))))

case OrderFailedInFork(movedTo, outcome) =>
Expand All @@ -151,6 +155,12 @@ final case class Order[+S <: Order.State](
copy(
state = FailedInFork,
workflowPosition = workflowPosition.copy(position = movedTo),
//Same as for OrderFailed? How to suspend a failed child order???
// See SuspendResumeOrdersTest
// mark = mark match {
// case Some(_: OrderMark.Suspending) => None
// case o => o
// },
isResumed = false,
historicOutcomes =
outcome.fold(historicOutcomes)(o => historicOutcomes :+ HistoricOutcome(position, o))))
Expand Down
200 changes: 197 additions & 3 deletions js7-tests/src/test/scala/js7/tests/order/SuspendResumeOrdersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import js7.data.Problems.UnknownOrderProblem
import js7.data.agent.AgentPath
import js7.data.agent.AgentRefStateEvent.AgentReady
import js7.data.command.{CancellationMode, SuspensionMode}
import js7.data.controller.ControllerCommand.{AnswerOrderPrompt, Batch, CancelOrders, Response, ResumeOrder, ResumeOrders, SuspendOrders}
import js7.data.controller.ControllerCommand.{AddOrder, AnswerOrderPrompt, Batch, CancelOrders, Response, ResumeOrder, ResumeOrders, SuspendOrders}
import js7.data.item.VersionId
import js7.data.job.RelativePathExecutable
import js7.data.order.OrderEvent.OrderResumed.{AppendHistoricOutcome, DeleteHistoricOutcome, InsertHistoricOutcome, ReplaceHistoricOutcome}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCaught, OrderDeleted, OrderDetachable, OrderDetached, OrderFailed, OrderFinished, OrderForked, OrderJoined, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStdWritten, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTerminated}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCaught, OrderDeleted, OrderDetachable, OrderDetached, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderJoined, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStdWritten, OrderStdoutWritten, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTerminated}
import js7.data.order.{FreshOrder, HistoricOutcome, Order, OrderEvent, OrderId, Outcome}
import js7.data.problems.{CannotResumeOrderProblem, CannotSuspendOrderProblem, UnreachableOrderPositionProblem}
import js7.data.value.expression.ExpressionParser.expr
Expand All @@ -30,7 +30,10 @@ import js7.data.workflow.instructions.{EmptyInstruction, Execute, Fail, Fork, Pr
import js7.data.workflow.position.BranchId.{Try_, catch_, try_}
import js7.data.workflow.position.BranchPath.syntax.*
import js7.data.workflow.position.Position
import js7.data.workflow.position.{BranchId, Position}
import js7.data.workflow.{Workflow, WorkflowPath}
import js7.launcher.OrderProcess
import js7.launcher.internal.InternalJob
import js7.tests.jobs.EmptyJob
import js7.tests.order.SuspendResumeOrdersTest.*
import js7.tests.testenv.DirectoryProvider.{toLocalSubagentId, waitingForFileScript}
Expand Down Expand Up @@ -65,7 +68,7 @@ final class SuspendResumeOrdersTest
}

override def afterAll() =
try
try
deleteIfExists(triggerFile)
finally
super.afterAll()
Expand Down Expand Up @@ -657,6 +660,185 @@ final class SuspendResumeOrdersTest

// Test moved to RetryTest:
// "FIX JS-2089 Cancel an Order waiting in Retry instruction at an Agent"

"Suspend a processing Order that will fail" in {
// The failed order will not be suspended, because it failed.
val workflow = Workflow.of(WorkflowPath("FAIL"),
FailingSemaJob.execute(agentPath),
EmptyInstruction())
withTemporaryItem(workflow) { workflow =>
val orderId = OrderId("SUSPEND-FAILING-JOB")
var eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(AddOrder(
FreshOrder(orderId, workflow.path, deleteWhenTerminated = true)))
.await(99.s).orThrow
eventWatch.await[OrderStdoutWritten](_.key == orderId, after = eventId)

controller.api.executeCommand(SuspendOrders(Seq(orderId))).await(99.s).orThrow
eventWatch.await[OrderSuspensionMarkedOnAgent](_.key == orderId, after = eventId)

FailingSemaJob.semaphore.flatMap(_.release).await(99.s)
eventWatch.await[OrderFailed](_.key == orderId, after = eventId)

// OrderFailed event has reset OrderMark.Suspending
assert(controllerState.idToOrder(orderId).mark == None)

eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(
ResumeOrder(orderId, Some(Position(1)), asSucceeded = true))
.await(99.s).orThrow
eventWatch.await[OrderTerminated](_.key == orderId, after = eventId)

assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderStdoutWritten("FailingJob\n"),
OrderSuspensionMarked(),
OrderSuspensionMarkedOnAgent,
OrderProcessed(Outcome.failed),
OrderDetachable,
OrderDetached,
OrderFailed(Position(0)),
OrderResumed(Some(Position(1)), asSucceeded = true),
OrderMoved(Position(2)),
OrderFinished(),
OrderDeleted))
}
}

"Suspend a forked processing Order that will fail" in {
val workflow = Workflow.of(WorkflowPath("FAIL-IN-FORK"),
Fork(
Vector(
"BRANCH" -> Workflow.of(
FailingSemaJob.execute(agentPath),
EmptyInstruction())),
joinIfFailed = false))

withTemporaryItem(workflow) { workflow =>
val orderId = OrderId("SUSPEND-FORKED-FAILING-JOB")
val childOrderId = OrderId("SUSPEND-FORKED-FAILING-JOB|BRANCH")
var eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(AddOrder(
FreshOrder(orderId, workflow.path, deleteWhenTerminated = true)))
.await(99.s).orThrow
eventWatch.await[OrderStdoutWritten](_.key == childOrderId, after = eventId)

controller.api.executeCommand(SuspendOrders(Seq(childOrderId))).await(99.s).orThrow
eventWatch.await[OrderSuspensionMarkedOnAgent](_.key == childOrderId, after = eventId)

FailingSemaJob.semaphore.flatMap(_.release).await(99.s)
eventWatch.await[OrderFailed](_.key == childOrderId, after = eventId)

// OrderFailed event has reset OrderMark.Suspending
assert(controllerState.idToOrder(childOrderId).mark == None)

eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(
ResumeOrder(childOrderId, Some(Position(0) / BranchId.fork("BRANCH") % 1),
asSucceeded = true))
.await(99.s).orThrow
eventWatch.await[OrderTerminated](_.key == orderId, after = eventId)

assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderForked(Vector(
"BRANCH" -> childOrderId)),
OrderJoined(Outcome.succeeded),
OrderMoved(Position(1)),
OrderFinished(),
OrderDeleted))

assert(eventWatch.eventsByKey[OrderEvent](childOrderId) == Seq(
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderProcessingStarted(subagentId),
OrderStdoutWritten("FailingJob\n"),
OrderSuspensionMarked(),
OrderSuspensionMarkedOnAgent,
OrderProcessed(Outcome.failed),
OrderDetachable,
OrderDetached,
OrderFailed(Position(0) / BranchId.fork("BRANCH") % 0),
OrderResumed(Some(Position(0) / BranchId.fork("BRANCH") % 1), asSucceeded = true),
OrderMoved(Position(0) / BranchId.fork("BRANCH") % 2)))
}
}

"Suspend a forked processing Order that will fail, joinIfFailed = true" in {
val workflow = Workflow.of(WorkflowPath("FAIL-IN-FORK-JOIN-IF-FAILED"),
Fork(
Vector(
"BRANCH" -> Workflow.of(
FailingSemaJob.execute(agentPath),
EmptyInstruction())),
joinIfFailed = true))

withTemporaryItem(workflow) { workflow =>
val orderId = OrderId("SUSPEND-FORKED-FAILING-JOB-JOIN-IF-FAILED")
val childOrderId = OrderId("SUSPEND-FORKED-FAILING-JOB-JOIN-IF-FAILED|BRANCH")
var eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(AddOrder(
FreshOrder(orderId, workflow.path, deleteWhenTerminated = true)))
.await(99.s).orThrow
eventWatch.await[OrderStdoutWritten](_.key == childOrderId, after = eventId)

controller.api.executeCommand(SuspendOrders(Seq(childOrderId))).await(99.s).orThrow
eventWatch.await[OrderSuspensionMarkedOnAgent](_.key == childOrderId, after = eventId)

FailingSemaJob.semaphore.flatMap(_.release).await(99.s)
eventWatch.await[OrderFailedInFork](_.key == childOrderId, after = eventId)

if (true) {
eventWatch.await[OrderTerminated](_.key == orderId, after = eventId)
pending // TODO Allow to suspend a failing child order before join?
} else {
// OrderFailed event has reset OrderMark.Suspending
assert(controllerState.idToOrder(childOrderId).mark == None)

eventId = eventWatch.lastAddedEventId
controller.api
.executeCommand(
ResumeOrder(childOrderId, Some(Position(0) / BranchId.fork("BRANCH") % 1),
asSucceeded = true))
.await(99.s).orThrow
eventWatch.await[OrderTerminated](_.key == orderId, after = eventId)

assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderStarted,
OrderForked(Vector(
"BRANCH" -> childOrderId)),
OrderJoined(Outcome.succeeded),
OrderMoved(Position(1)),
OrderFinished(),
OrderDeleted))

assert(eventWatch.eventsByKey[OrderEvent](childOrderId) == Seq(
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderProcessingStarted(subagentId),
OrderStdoutWritten("FailingJob\n"),
OrderSuspensionMarked(),
OrderSuspensionMarkedOnAgent,
OrderProcessed(Outcome.failed),
OrderDetachable,
OrderDetached,
OrderFailedInFork(Position(0) / BranchId.fork("BRANCH") % 0),
OrderResumed(Some(Position(0) / BranchId.fork("BRANCH") % 1), asSucceeded = true),
OrderMoved(Position(0) / BranchId.fork("BRANCH") % 2)))
}
}
}
}


Expand Down Expand Up @@ -700,4 +882,16 @@ object SuspendResumeOrdersTest
WorkflowPath("FAILING") ~ versionId,
EmptyJob.execute(agentPath),
Fail())

final class FailingSemaJob extends InternalJob {
def toOrderProcess(step: Step) =
OrderProcess(
step.outTaskObserver.send("FailingJob\n")
.*>(FailingSemaJob.semaphore)
.flatMap(_.acquire)
.as(Outcome.failed))
}
private object FailingSemaJob extends InternalJob.Companion[FailingSemaJob] {
val semaphore = Semaphore[Task](0).memoize
}
}

0 comments on commit 58095ca

Please sign in to comment.