Skip to content

Commit

Permalink
JS-2105 FIX: Don't crash when detaching a running order due to cancel…
Browse files Browse the repository at this point in the history
…lation or suspension
  • Loading branch information
Zschimmer committed Nov 3, 2023
1 parent 93b004f commit f7b1b78
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import js7.data.execution.workflow.instructions.{ExecuteAdmissionTimeSwitch, Ins
import js7.data.item.BasicItemEvent.{ItemAttachedToMe, ItemDetached, ItemDetachingFromMe, SignedItemAttachedToMe}
import js7.data.item.{InventoryItem, SignableItem, UnsignedItem}
import js7.data.job.{JobKey, JobResource}
import js7.data.order.Order.InapplicableOrderEventProblem
import js7.data.order.OrderEvent.{OrderAttachedToAgent, OrderCoreEvent, OrderDetached, OrderProcessed}
import js7.data.order.{Order, OrderEvent, OrderId}
import js7.data.orderwatch.{FileWatch, OrderWatchPath}
Expand Down Expand Up @@ -586,10 +587,11 @@ final class AgentOrderKeeper(
val promise = Promise[Unit]()
orderEntry.detachResponses ::= promise
(orderEntry.actor ? OrderActor.Command.HandleEvents(OrderDetached :: Nil, CorrelId.current))
.mapTo[Completed]
.mapTo[Checked[Completed]]
.onComplete {
case Failure(t) => promise.tryFailure(t)
case Success(Completed) =>
case Success(Left(problem)) => promise.tryFailure(problem.throwable)
case Success(Right(Completed)) =>
// Ignore this and instead await OrderActor termination and removal from orderRegister.
// Otherwise in case of a quick Controller restart, CoupleController would response with this OrderId
// and the Controller will try again to DetachOrder, while the original DetachOrder is still in progress.
Expand All @@ -614,13 +616,37 @@ final class AgentOrderKeeper(
case Left(problem) => Future.failed(problem.throwable)
case Right(None) => Future.successful(Right(AgentCommand.Response.Accepted))
case Right(Some(events)) =>
val sender = this.sender()
// Several MarkOrder in sequence are not properly handled
// one after the other because execution is asynchronous.
// A second command may may see the same not yet updated order.
// TODO Queue for each order? And no more OrderActor?
(orderEntry.actor ? OrderActor.Command.HandleEvents(events, CorrelId.current))
.mapTo[Completed]
.map(_ => Right(AgentCommand.Response.Accepted))
Task
.deferFuture(
(orderEntry.actor ? OrderActor.Command.HandleEvents(events, CorrelId.current))
.mapTo[Checked[Completed]])
.flatMap {
case Left(problem)
if problem.exists(_.isInstanceOf[InapplicableOrderEventProblem]) =>
Task.sleep(100.ms) // brake
.*>(Task.defer {
logger.warn(s"Repeating $cmd due to race condition: $problem")
val promise = Promise[Checked[Response]]()
self.!(Input.ExternalCommand(cmd, CorrelId.current, promise))(sender)
Task
.fromFuture(promise.future)
.map(_.map(_.asInstanceOf[Response.Accepted]))
})

case Left(problem) =>
// Should not happen. Controller does not handle the problem.
logger.warn(s"$cmd => $problem")
Task.left(problem)

case Right(Completed) =>
Task.right(AgentCommand.Response.Accepted)
}
.runToFuture
}
}

Expand Down Expand Up @@ -725,9 +751,10 @@ final class AgentOrderKeeper(
.parTraverse { case (orderId_ : OrderId, events) =>
Task
.fromFuture(
orderRegister(orderId_).actor ?
(orderRegister(orderId_).actor ?
OrderActor.Command.HandleEvents(events, CorrelId.current))
.attempt
.mapTo[Checked[Completed]])
.materializeIntoChecked
.map(orderId_ -> events -> _)
}
.runToFuture
Expand All @@ -736,9 +763,9 @@ final class AgentOrderKeeper(
// Then, two OrderMoved are emitted, because the second event is based on the same Order state.
// TODO Blocking! SLOW because inhibits parallelization
try Await.result(future, 99.s)
.collect { case ((orderId_, events), Left(throwable)) =>
.collect { case ((orderId_, events), Left(problem)) =>
logger.error(
s"$orderId_ <-: ${events.map(_.toShortString)} => ${throwable.toStringWithCauses}")
s"$orderId_ <-: ${events.map(_.toShortString)} => $problem")
}
catch { case NonFatal(t) => logger.error(
s"${keyedEvents.map(_.toShortString)} => ${t.toStringWithCauses}")
Expand Down
41 changes: 20 additions & 21 deletions js7-agent/src/main/scala/js7/agent/scheduler/order/OrderActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import js7.base.io.process.ProcessSignal
import js7.base.io.process.ProcessSignal.{SIGKILL, SIGTERM}
import js7.base.log.{CorrelId, Logger}
import js7.base.monixutils.MonixBase.syntax.RichCheckedTask
import js7.base.problem.Checked
import js7.base.problem.Checked.Ops
import js7.base.utils.Assertions.assertThat
import js7.base.utils.ScalaUtils.syntax.*
Expand Down Expand Up @@ -81,9 +82,9 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
sender() ! Status.Failure(problem.throwable)

case Right(orderAttachedToAgent) =>
becomeAsStateOf(attachedOrder, force = true)
persist(orderAttachedToAgent) {
(event, updatedState) =>
becomeAsStateOf(attachedOrder, force = true)
update(event :: Nil)
Completed
} pipeTo sender()
Expand All @@ -110,10 +111,11 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
receiveEvent orElse {
case Input.StartProcessing =>
orderCorrelId.bind[Unit] {
if (order.isProcessable) {
if (!order.isProcessable)
logger.warn("Input.StartProcessing but !order.isProcessable")
else {
// Separate CorrelId for each order process
CorrelId.bindNew {
become("processing")(processing)
subagentKeeper
.processOrder(
order.checkedState[Order.IsFreshOrReady].orThrow,
Expand Down Expand Up @@ -141,12 +143,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
}

private def processing: Receive =
receiveCommand orElse receiveEvent orElse {
case Internal.UpdateEvents(events, correlId) =>
correlId.bind {
update(events)
}

receiveEvent orElse receiveCommand orElse {
case Input.Terminate(signal) =>
terminating = true
if (subagentKeeper.orderIsLocal(orderId)) {
Expand All @@ -167,24 +164,27 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
receiveEvent orElse receiveCommand orElse receiveTerminate

private def receiveEvent: Receive = {
case Command.HandleEvents(events, correlId) =>
case Internal.UpdateEvents(events, correlId) =>
correlId.bind {
update(events)
}

case Command.HandleEvents(events, correlId) =>
correlId.bind[Unit] {
handleEvents(events) pipeTo sender()
}
}

private def handleEvents(events: Seq[OrderCoreEvent]): Future[Completed] =
private def handleEvents(events: Seq[OrderCoreEvent]): Future[Checked[Completed]] =
order.applyEvents(events) match {
case Left(problem) =>
logger.error(s"${events.headOption.getOrElse("?")}...: $problem")
Future.successful(Completed)
Future.successful(Left(problem))

case Right(updated) =>
becomeAsStateOf(updated)
if (events.size == 1 && events.head.isInstanceOf[OrderCancellationMarked] && updated == order) // Duplicate, already cancelling with same CancellationMode?
Future.successful(Completed)
Future.successful(Right(Completed))
else
persistTransaction(events) { (event, updatedState) =>
persistTransactionReturnChecked(events) { (event, updatedState) =>
update(events)
if (terminating) {
context.stop(self)
Expand Down Expand Up @@ -223,7 +223,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
if (anOrder.isDetaching)
become("detaching")(detaching)
else
if (force || anOrder.state.getClass != order.state.getClass) {
if (true || force || anOrder.state.getClass != order.state.getClass) {
anOrder.state match {
case _: Order.Fresh => become("fresh")(wrap(fresh))
case _: Order.Ready => become("ready")(wrap(ready))
Expand Down Expand Up @@ -254,7 +254,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
}

private def detaching: Receive =
receiveCommand orElse receiveEvent orElse receiveTerminate
receiveEvent orElse receiveCommand orElse receiveTerminate

private def receiveTerminate: Receive = {
case _: Input.Terminate =>
Expand All @@ -274,6 +274,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
private def update(events: Seq[OrderEvent]) = {
val previousOrderOrNull = order
events foreach updateOrder
becomeAsStateOf(order)
context.parent ! Output.OrderChanged(orderId, CorrelId.current, previousOrderOrNull, events)
events.last match {
case OrderDetached =>
Expand All @@ -287,8 +288,6 @@ extends KeyedJournalingActor[AgentState, OrderEvent]
case _: OrderProcessed =>
if (terminating) {
context.stop(self)
} else {
become("processed")(processed)
}
case _ =>
}
Expand All @@ -313,7 +312,7 @@ extends KeyedJournalingActor[AgentState, OrderEvent]

override def unhandled(msg: Any) =
msg match {
case msg @ (_: Command | _: Input) =>
case msg @ (_: Command | _: Input | _: Internal.UpdateEvents) =>
logger.error(s"Unhandled message $msg in Actor state '$actorStateName', Order state is ${order.state}")

case _ =>
Expand Down
76 changes: 69 additions & 7 deletions js7-tests/src/test/scala/js7/tests/RetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,46 @@ package js7.tests
import izumi.reflect.Tag
import js7.base.configutils.Configs.*
import js7.base.io.process.Processes.ShellFileExtension as sh
import js7.base.log.Logger
import js7.base.log.Logger.syntax.*
import js7.base.problem.Checked.Ops
import js7.base.system.OperatingSystem.isWindows
import js7.base.test.OurTestSuite
import js7.base.thread.MonixBlocking.syntax.*
import js7.base.time.ScalaTime.*
import js7.base.utils.Tests.isIntelliJIdea
import js7.data.agent.AgentPath
import js7.data.command.CancellationMode
import js7.data.controller.ControllerCommand.CancelOrders
import js7.data.event.{EventId, EventRequest, EventSeq}
import js7.data.job.RelativePathExecutable
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderAwoke, OrderCaught, OrderDetachable, OrderDetached, OrderFailed, OrderFinished, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingStarted, OrderRetrying, OrderStarted}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderAwoke, OrderCancelled, OrderCaught, OrderDetachable, OrderDetached, OrderFailed, OrderFinished, OrderMoved, OrderOutcomeAdded, OrderProcessed, OrderProcessingStarted, OrderRetrying, OrderStarted}
import js7.data.order.{FreshOrder, OrderEvent, OrderId, Outcome}
import js7.data.value.NamedValues
import js7.data.workflow.instructions.{Fail, Retry, TryInstruction}
import js7.data.workflow.position.BranchId.{Else, Then, catch_, try_}
import js7.data.workflow.position.Position
import js7.data.workflow.{Workflow, WorkflowParser, WorkflowPath}
import js7.tests.RetryTest.*
import js7.tests.jobs.EmptyJob
import js7.tests.jobs.{EmptyJob, FailingJob}
import js7.tests.testenv.DirectoryProvider.toLocalSubagentId
import js7.tests.testenv.{BlockingItemUpdater, ControllerAgentForScalaTest}
import monix.execution.Scheduler.Implicits.traced
import scala.concurrent.duration.*
import scala.reflect.ClassTag
import scala.util.Random

final class RetryTest extends OurTestSuite with ControllerAgentForScalaTest with BlockingItemUpdater
{
override protected val controllerConfig = config"""
js7.auth.users.TEST-USER.permissions = [ UpdateItem ]
js7.journal.simulate-sync = 10ms # Avoid excessive syncs in case of test failure
"""
js7.controller.agent-driver.command-batch-delay = 0ms
js7.controller.agent-driver.event-buffer-delay = 0ms"""

override protected def agentConfig = config"""
js7.journal.simulate-sync = 10ms # Avoid excessive syncs in case of test failure
js7.job.execution.signed-script-injection-allowed = on"""
js7.job.execution.signed-script-injection-allowed = on
js7.controller.agent-driver.command-batch-delay = 0ms
js7.controller.agent-driver.event-buffer-delay = 0ms"""

protected val agentPaths = agentPath :: Nil
protected val items = Nil
Expand Down Expand Up @@ -321,11 +326,13 @@ final class RetryTest extends OurTestSuite with ControllerAgentForScalaTest with
val orderId = OrderId("🟪")
var eventId = eventWatch.lastAddedEventId
controller.addOrderBlocking(FreshOrder(orderId, workflow.id.path))
for (_ <- 1 to 10)
for (_ <- 1 to 3)
eventId = eventWatch.await[OrderRetrying](_.key == orderId, after = eventId).last.eventId
controller
.executeCommandForTest(CancelOrders(Seq(orderId), CancellationMode.FreshOrStarted()))
.orThrow
eventWatch.await[OrderCancelled](_.key == orderId)

assert(eventWatch
.keyedEvents[OrderEvent](_.key == orderId, after = EventId.BeforeFirst)
.take(25)
Expand Down Expand Up @@ -362,10 +369,64 @@ final class RetryTest extends OurTestSuite with ControllerAgentForScalaTest with
OrderProcessed(Outcome.succeeded),
OrderMoved(Position(0) / catch_(2) % 1),
OrderRetrying(Position(0) / try_(3) % 0)))

controller
.executeCommandForTest(CancelOrders(Seq(orderId), CancellationMode.FreshOrStarted()))
.orThrow
eventWatch.await[OrderCancelled](_.key == orderId)
}
}
}

private def repeatTest(n: Int)(body: Int => Any): Unit = {
for (i <- 1 to n) {
logger.debugCall(s"#$i")(
withClue(s"#$i: ")(
body(i)))
}
}

"JS-2105 Cancel while retrying (Engine has to synchronize OrderDetachable with ongoing events)" in
repeatTest(if (isIntelliJIdea) 100 else 10) { testIndex =>
// No more InapplicableEventProblem!
val workflow = Workflow(WorkflowPath("CANCEL-WHILE-RETRYING"), Seq(
TryInstruction(
Workflow.of(
FailingJob.execute(agentPath)),
Workflow.of(
Retry()))))

withTemporaryItem(workflow) { workflow =>
val orderId = OrderId(s"🟨$testIndex")
var eventId = eventWatch.lastAddedEventId
controller.addOrderBlocking(FreshOrder(orderId, workflow.id.path, deleteWhenTerminated = true))
eventId = eventWatch.await[OrderRetrying](_.key == orderId, after = eventId).last.eventId
sleep(Random.nextInt(10).ms)
controller
.executeCommandForTest(CancelOrders(Seq(orderId), CancellationMode.FreshOrStarted()))
.orThrow
eventWatch.await[OrderCancelled](_.key == orderId)

assert(eventWatch
.keyedEvents[OrderEvent](_.key == orderId, after = EventId.BeforeFirst)
.take(9)
.map(_.event)
.map {
case e: OrderRetrying => e.copy(delayedUntil = None)
case e => e
} == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderMoved(Position(0) / try_(0) % 0),
OrderAttachable(agentPath),
OrderAttached(agentPath),
OrderStarted,
OrderProcessingStarted(Some(toLocalSubagentId(agentPath))),
OrderProcessed(FailingJob.outcome),
OrderCaught(Position(0) / "catch+0" % 0),
OrderRetrying(Position(0) / try_(1) % 0)))
}
}

private def awaitAndCheckEventSeq[E <: OrderEvent: ClassTag: Tag](after: EventId, orderId: OrderId, expected: Vector[OrderEvent]): Unit =
{
eventWatch.await[E](_.key == orderId, after = after)
Expand All @@ -382,6 +443,7 @@ final class RetryTest extends OurTestSuite with ControllerAgentForScalaTest with

object RetryTest
{
private val logger = Logger[this.type]
private val agentPath = AgentPath("AGENT")
private val subagentId = toLocalSubagentId(agentPath)
}

0 comments on commit f7b1b78

Please sign in to comment.