Skip to content

Commit

Permalink
JS-2103 FIX Bug: frozen Orders and "Unhandled message StartProcessing"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Oct 31, 2023
1 parent 266b14d commit e62c86d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package js7.agent.scheduler.order

import akka.actor.{ActorRef, DeadLetterSuppression, Stash, Terminated}
import akka.pattern.ask
import cats.syntax.parallel.*
import com.softwaremill.tagging.{@@, Tagger}
import io.circe.syntax.EncoderOps
import java.time.ZoneId
Expand Down Expand Up @@ -38,7 +39,7 @@ import js7.data.agent.Problems.{AgentDuplicateOrder, AgentIsShuttingDown}
import js7.data.calendar.Calendar
import js7.data.event.JournalEvent.JournalEventsReleased
import js7.data.event.KeyedEvent.NoKey
import js7.data.event.{<-:, Event, EventId, JournalState, KeyedEvent, Stamped}
import js7.data.event.{<-:, Event, EventId, JournalState, Stamped}
import js7.data.execution.workflow.OrderEventSource
import js7.data.execution.workflow.instructions.{ExecuteAdmissionTimeSwitch, InstructionExecutorService}
import js7.data.item.BasicItemEvent.{ItemAttachedToMe, ItemDetached, ItemDetachingFromMe, SignedItemAttachedToMe}
Expand Down Expand Up @@ -718,16 +719,29 @@ final class AgentOrderKeeper(
}
if (!delayed) {
val keyedEvents = orderEventSource.nextEvents(order.id)
keyedEvents foreach { case KeyedEvent(orderId_, event) =>
val future = orderRegister(orderId_).actor ?
OrderActor.Command.HandleEvents(event :: Nil, CorrelId.current)
try Await.result(future, 99.s) // TODO Blocking! SLOW because inhibits parallelization
catch { case NonFatal(t) => logger.error(
s"$orderId_ <-: ${event.toShortString} => ${t.toStringWithCauses}")
val future = keyedEvents
.groupMap(_.key)(_.event)
.toSeq
.parTraverse { case (orderId_ : OrderId, events) =>
Task
.fromFuture(
orderRegister(orderId_).actor ?
OrderActor.Command.HandleEvents(events, CorrelId.current))
.attempt
.map(orderId_ -> events -> _)
}
// TODO Not awaiting the response may lead to duplicate events
// for example when OrderSuspensionMarked is emitted after OrderProcessed and before OrderMoved.
// Then, two OrderMoved are emitted, because the second event is based on the same Order state.
.runToFuture
// TODO Not awaiting the response may lead to duplicate events
// for example when OrderSuspensionMarked is emitted after OrderProcessed and before OrderMoved.
// Then, two OrderMoved are emitted, because the second event is based on the same Order state.
// TODO Blocking! SLOW because inhibits parallelization
try Await.result(future, 99.s)
.collect { case ((orderId_, events), Left(throwable)) =>
logger.error(
s"$orderId_ <-: ${events.map(_.toShortString)} => ${throwable.toStringWithCauses}")
}
catch { case NonFatal(t) => logger.error(
s"${keyedEvents.map(_.toShortString)} => ${t.toStringWithCauses}")
}
if (keyedEvents.isEmpty && order.isProcessable) {
onOrderIsProcessable(order)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ import js7.data.item.{ItemRevision, VersionId}
import js7.data.order.OrderEvent.{OrderAdded, OrderAttachable, OrderAttached, OrderDeleted, OrderDetachable, OrderDetached, OrderFinished, OrderMoved, OrderProcessed, OrderProcessingStarted, OrderStarted}
import js7.data.order.{FreshOrder, OrderId, Outcome}
import js7.data.value.expression.ExpressionParser.expr
import js7.data.workflow.instructions.If
import js7.data.workflow.instructions.{Execute, If}
import js7.data.workflow.instructions.executable.WorkflowJob
import js7.data.workflow.position.{Label, Position}
import js7.data.workflow.{Workflow, WorkflowPath, WorkflowPathControl, WorkflowPathControlPath}
import js7.tests.ControlWorkflowPathSkipJobTest.*
import js7.tests.jobs.EmptyJob
import js7.tests.testenv.ControllerAgentForScalaTest
import js7.tests.testenv.{BlockingItemUpdater, ControllerAgentForScalaTest}
import js7.tests.testenv.DirectoryProvider.toLocalSubagentId
import monix.execution.Scheduler.Implicits.traced
import monix.reactive.Observable

final class ControlWorkflowPathSkipJobTest
extends OurTestSuite with ControllerAgentForScalaTest
extends OurTestSuite with ControllerAgentForScalaTest with BlockingItemUpdater
{
override protected val controllerConfig = config"""
js7.auth.users.TEST-USER.permissions = [ UpdateItem ]
Expand Down Expand Up @@ -85,6 +86,51 @@ extends OurTestSuite with ControllerAgentForScalaTest
OrderDeleted))
}

"JS-2103 Bug: frozen Orders and \"Unhandled message StartProcessing\"" in {
val workflow = Workflow(WorkflowPath("JS-2103-WORKFLOW"), Seq(
EmptyJob.execute(agentPath),
If(expr("true"), Workflow.of(
label @: Execute(WorkflowJob.Name("JOB")))),
EmptyJob.execute(agentPath),
// JS-2103 bug blocks here
Execute(WorkflowJob.Name("JOB"))),
nameToJob = Map(
WorkflowJob.Name("JOB") -> EmptyJob.workflowJob(agentPath)))

withTemporaryItem(workflow) { workflow =>
skipJob(workflow.path, true, ItemRevision(1))
val orderId = OrderId("B")
val events = controller
.runOrder(FreshOrder(orderId, workflow.path, deleteWhenTerminated = true))
.map(_.value)
assert(events == Seq(
OrderAdded(workflow.id, deleteWhenTerminated = true),
OrderAttachable(agentPath),
OrderAttached(agentPath),

OrderStarted,
OrderProcessingStarted(subagentId),
OrderProcessed(Outcome.succeeded),
OrderMoved(Position(1) / "then" % 0),
OrderMoved(Position(1) / "then" % 1,
reason = Some(OrderMoved.SkippedDueToWorkflowPathControl)),
OrderMoved(Position(2)),

OrderProcessingStarted(subagentId),
OrderProcessed(Outcome.succeeded),
OrderMoved(Position(3)),

OrderProcessingStarted(subagentId),
OrderProcessed(Outcome.succeeded),
OrderMoved(Position(4)),

OrderDetachable,
OrderDetached,
OrderFinished(),
OrderDeleted))
}
}

"WorkflowPathControl disappears with the last Workflow version" in {
val controllerApi = directoryProvider.newControllerApi(controller)
val eventId = eventWatch.lastAddedEventId
Expand Down

0 comments on commit e62c86d

Please sign in to comment.