Skip to content

Commit

Permalink
JS-2152 refactor SuspendResumeOrdersTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Sep 25, 2024
1 parent c6a5b6d commit c90b6e0
Showing 1 changed file with 127 additions and 118 deletions.
245 changes: 127 additions & 118 deletions js7-tests/src/test/scala/js7/tests/order/SuspendResumeOrdersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import java.nio.file.Files.{createTempFile, deleteIfExists}
import js7.base.catsutils.UnsafeMemoizable.unsafeMemoize
import js7.base.configutils.Configs.HoconStringInterpolator
import js7.base.io.file.FileUtils.touchFile
import js7.base.io.process.ProcessSignal.{SIGKILL, SIGTERM}
import js7.base.log.{CorrelId, CorrelIdWrapped, Logger}
import js7.base.problem.Checked.Ops
import js7.base.problem.Problem
Expand All @@ -20,13 +19,13 @@ import js7.data.Problems.UnknownOrderProblem
import js7.data.agent.AgentPath
import js7.data.agent.AgentRefStateEvent.AgentReady
import js7.data.command.{CancellationMode, SuspensionMode}
import js7.data.controller.ControllerCommand.{AddOrder, AnswerOrderPrompt, Batch, CancelOrders, Response, ResumeOrder, ResumeOrders, SuspendOrders}
import js7.data.controller.ControllerCommand.{AddOrder, AnswerOrderPrompt, Batch, CancelOrders, GoOrder, Response, ResumeOrder, ResumeOrders, SuspendOrders}
import js7.data.event.KeyedEvent
import js7.data.item.VersionId
import js7.data.job.{RelativePathExecutable, ShellScriptExecutable}
import js7.data.order.Order.DelayedAfterError
import js7.data.order.OrderEvent.OrderResumed.{AppendHistoricOutcome, DeleteHistoricOutcome, InsertHistoricOutcome, ReplaceHistoricOutcome}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCaught, OrderDeleted, OrderDetachable, OrderDetached, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderJoined, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStdWritten, OrderStdoutWritten, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTerminated}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderCancellationMarkedOnAgent, OrderCancelled, OrderCaught, OrderDeleted, OrderDetachable, OrderDetached, OrderFailed, OrderFailedInFork, OrderFinished, OrderForked, OrderGoMarked, OrderGoes, OrderJoined, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingKilled, OrderProcessingStarted, OrderPromptAnswered, OrderPrompted, OrderResumed, OrderResumptionMarked, OrderRetrying, OrderStarted, OrderStdWritten, OrderStdoutWritten, OrderSuspended, OrderSuspensionMarked, OrderSuspensionMarkedOnAgent, OrderTerminated}
import js7.data.order.{FreshOrder, HistoricOutcome, Order, OrderEvent, OrderId, OrderMark, OrderOutcome}
import js7.data.problems.{CannotResumeOrderProblem, CannotSuspendOrderProblem, UnreachableOrderPositionProblem}
import js7.data.value.expression.ExpressionParser.expr
Expand All @@ -39,7 +38,7 @@ import js7.data.workflow.position.{BranchId, Position}
import js7.data.workflow.{Workflow, WorkflowPath}
import js7.launcher.OrderProcess
import js7.launcher.internal.InternalJob
import js7.tests.jobs.{EmptyJob, FailingJob}
import js7.tests.jobs.{EmptyJob, FailingJob, SemaphoreJob}
import js7.tests.order.SuspendResumeOrdersTest.*
import js7.tests.testenv.DirectoryProvider.{toLocalSubagentId, waitingForFileScript}
import js7.tests.testenv.{BlockingItemUpdater, ControllerAgentForScalaTest}
Expand Down Expand Up @@ -75,130 +74,137 @@ final class SuspendResumeOrdersTest
finally
super.afterAll()

"Suspend and resume a fresh order" in:
deleteIfExists(triggerFile)
val order = FreshOrder(OrderId("🔺"), singleJobWorkflow.path, scheduledFor = Some(Timestamp.now + 2.s/*1s too short in rare cases*/))
addOrder(order).await(99.s).orThrow
eventWatch.await[OrderAttached](_.key == order.id)

executeCommand(SuspendOrders(Set(order.id))).await(99.s).orThrow
eventWatch.await[OrderSuspended](_.key == order.id)

assert(eventWatch.eventsByKey[OrderEvent](order.id) == Seq(
OrderAdded(singleJobWorkflow.id, order.arguments, order.scheduledFor),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderSuspensionMarked(),
OrderDetachable,
OrderDetached,
OrderSuspended))
val lastEventId = eventWatch.lastAddedEventId

touchFile(triggerFile)
executeCommand(ResumeOrders(Set(order.id))).await(99.s).orThrow

// TODO Modify order start time here, when possible. Otherwise we wait until the scheduled start time
"Suspend and resume an Order while in State" - {
"Fresh" in:
val eventId = eventWatch.lastAddedEventId
val workflow = Workflow.of(WorkflowPath("FRESH"), EmptyJob.execute(agentPath))
withItem(workflow): workflow =>
val orderId = OrderId("FRESH")
val scheduledFor = Some(Timestamp.now + 100.s)
controller.api.addOrder:
FreshOrder(orderId, workflow.path, scheduledFor = scheduledFor, deleteWhenTerminated = true)
.await(99.s).orThrow
eventWatch.await[OrderAttached](_.key == orderId)

// ResumeOrders command expected a suspended or suspending order
assert(executeCommand(ResumeOrders(Set(order.id))).await(99.s) == Left(CannotResumeOrderProblem))
execCmd(SuspendOrders(Set(orderId)))
eventWatch.await[OrderSuspended](_.key == orderId)
execCmd(ResumeOrders(Set(orderId)))

eventWatch.await[OrderFinished](_.key == order.id)
assert(eventWatch.eventsByKey[OrderEvent](order.id, after = lastEventId) == Seq(
OrderResumed(),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderProcessed(OrderOutcome.succeededRC0),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderFinished()))
// ResumeOrders command expects a suspended or suspending order
assert:
controller.api.executeCommand(ResumeOrders(Set(orderId))).await(99.s) == Left:
CannotResumeOrderProblem

"An order reaching end of workflow is suspendible" in:
val order = FreshOrder(OrderId("🔻"), singleJobWorkflow.path)
addOrder(order).await(99.s).orThrow
eventWatch.await[OrderProcessingStarted](_.key == order.id)

executeCommand(SuspendOrders(Set(order.id))).await(99.s).orThrow
eventWatch.await[OrderSuspensionMarkedOnAgent](_.key == order.id)
touchFile(triggerFile)
eventWatch.await[OrderSuspended](_.key == order.id)
assert(eventWatch.eventsByKey[OrderEvent](order.id).filterNot(_.isInstanceOf[OrderStdWritten]) == Seq(
OrderAdded(singleJobWorkflow.id, order.arguments, order.scheduledFor),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderSuspensionMarked(),
OrderSuspensionMarkedOnAgent,
OrderProcessed(OrderOutcome.succeededRC0),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderSuspended))
execCmd(GoOrder(orderId, Position(0)))
eventWatch.await[OrderFinished](_.key == orderId)
assert(eventWatch.eventsByKey[OrderEvent](orderId, after = eventId) == Seq(
OrderAdded(workflow.id, scheduledFor = scheduledFor, deleteWhenTerminated = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderSuspensionMarked(),
OrderDetachable,
OrderDetached,
OrderSuspended,

val lastEventId = eventWatch.lastAddedEventId
executeCommand(ResumeOrders(Set(order.id))).await(99.s).orThrow
eventWatch.await[OrderFinished](_.key == order.id)
OrderResumed(),
OrderAttachable(agentPath),
OrderAttached(agentPath),

assert(eventWatch.eventsByKey[OrderEvent](order.id, after = lastEventId) == Seq(
OrderResumed(),
OrderFinished()))
OrderGoMarked(Position(0)),
OrderGoes,
OrderStarted,
OrderProcessingStarted(subagentId),
OrderProcessed(OrderOutcome.succeeded),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderFinished(),
OrderDeleted))

"Suspend with kill" in:
deleteIfExists(triggerFile)
val order = FreshOrder(OrderId("♣️"), singleJobWorkflow.path)
addOrder(order).await(99.s).orThrow
eventWatch.await[OrderProcessingStarted](_.key == order.id)
sleep(200.ms) // Wait until process has been started
"Processing, last instruction of workflow" in :
eventWatch.resetLastWatchedEventId()
val workflow = Workflow.of(WorkflowPath("PROCESSING"),
OurSemaphoreJob.execute(agentPath))
withItem(workflow): workflow =>
val order = FreshOrder(OrderId("PROCESSING"), workflow.path)
controller.api.addOrder(order).await(99.s).orThrow
eventWatch.awaitNext[OrderStdoutWritten](_.key == order.id)

execCmd(SuspendOrders(Set(order.id)))
eventWatch.awaitNext[OrderSuspensionMarkedOnAgent](_.key == order.id)
OurSemaphoreJob.continue()
eventWatch.awaitNext[OrderProcessed](_.key == order.id)
eventWatch.awaitNext[OrderSuspended](_.key == order.id)

executeCommand(ResumeOrders(Set(order.id))).await(99.s).orThrow
eventWatch.awaitNext[OrderFinished](_.key == order.id)

assert(eventWatch.eventsByKey[OrderEvent](order.id) == Seq(
OrderAdded(workflow.id, order.arguments, order.scheduledFor),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderStdoutWritten(OurSemaphoreJob.stdoutLine),
OrderSuspensionMarked(),
OrderSuspensionMarkedOnAgent,
OrderProcessed(OrderOutcome.succeeded),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderSuspended,

executeCommand(SuspendOrders(Set(order.id), SuspensionMode(Some(CancellationMode.Kill()))))
.await(99.s).orThrow
eventWatch.await[OrderSuspended](_.key == order.id)
OrderResumed(),
OrderFinished()))

"Processing, kill the process" in:
eventWatch.resetLastWatchedEventId()
val workflow = Workflow.of(WorkflowPath("PROCESSING-KILL"),
OurSemaphoreJob.execute(agentPath))
withItem(workflow): workflow =>
val orderId = OrderId("PROCESSING-KILL")
addOrder(FreshOrder(orderId, workflow.path, deleteWhenTerminated = true))
.await(99.s).orThrow
eventWatch.awaitNext[OrderProcessingStarted](_.key == orderId)
eventWatch.awaitNext[OrderStdoutWritten](_.key == orderId)

val events = eventWatch.eventsByKey[OrderEvent](order.id)
.filterNot(_.isInstanceOf[OrderStdWritten])
.map:
case OrderProcessed(OrderOutcome.Killed(failed: OrderOutcome.Failed)) if failed.namedValues == NamedValues.rc(SIGKILL) =>
// Sometimes, SIGTERM does not work and SIGKILL be sent. Something wrong with the bash script ????
logger.error("SIGTERM did not work")
OrderProcessed(OrderOutcome.Killed(failed.copy(namedValues = NamedValues.rc(SIGTERM)))) // Repair, to let test succceed
case o => o

assert(events == Seq(
OrderAdded(singleJobWorkflow.id, order.arguments, order.scheduledFor),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderSuspensionMarked(SuspensionMode(Some(CancellationMode.Kill()))),
OrderSuspensionMarkedOnAgent,
OrderProcessed(OrderOutcome.Killed(
if isWindows then
OrderOutcome.Failed.rc(1)
else
OrderOutcome.Failed(NamedValues.rc(SIGTERM)))),
OrderProcessingKilled,
OrderDetachable,
OrderDetached,
OrderSuspended))
execCmd(SuspendOrders(Set(orderId), SuspensionMode.killImmediately))
eventWatch.awaitNext[OrderSuspended](_.key == orderId)

val lastEventId = eventWatch.lastAddedEventId
touchFile(triggerFile)
executeCommand(ResumeOrders(Set(order.id), asSucceeded = true)).await(99.s).orThrow
eventWatch.await[OrderTerminated](_.key == order.id)
OurSemaphoreJob.continue()
execCmd(ResumeOrders(Set(orderId), asSucceeded = true/*job starts again*/))
eventWatch.awaitNext[OrderTerminated](_.key == orderId)

assert(eventWatch.eventsByKey[OrderEvent](order.id, after = lastEventId) == Seq(
OrderResumed(asSucceeded = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderProcessingStarted(subagentId),
OrderProcessed(OrderOutcome.succeededRC0),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderFinished()))
assert(eventWatch.eventsByKey[OrderEvent](orderId) == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(subagentId),
OrderStdoutWritten(OurSemaphoreJob.stdoutLine),
OrderSuspensionMarked(SuspensionMode.killImmediately),
OrderSuspensionMarkedOnAgent,
OrderProcessed(OrderOutcome.Killed(
if isWindows then
OrderOutcome.Failed.rc(1)
else
OrderOutcome.Failed(Some("Canceled")))),
OrderProcessingKilled,
OrderDetachable,
OrderDetached,
OrderSuspended,
OrderResumed(asSucceeded = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderProcessingStarted(subagentId),
OrderStdoutWritten(OurSemaphoreJob.stdoutLine),
OrderProcessed(OrderOutcome.succeeded),
OrderMoved(Position(1)),
OrderDetachable,
OrderDetached,
OrderFinished(),
OrderDeleted))
}

"Suspend with kill, trapped with exit 0" in:
if !isUnix then
Expand Down Expand Up @@ -970,6 +976,9 @@ object SuspendResumeOrdersTest:
EmptyJob.execute(agentPath),
Fail())

private class OurSemaphoreJob extends SemaphoreJob(OurSemaphoreJob)
private object OurSemaphoreJob extends SemaphoreJob.Companion[OurSemaphoreJob]

final class FailingSemaJob extends InternalJob:
def toOrderProcess(step: Step) =
OrderProcess.cancelable:
Expand Down

0 comments on commit c90b6e0

Please sign in to comment.