Skip to content

Commit

Permalink
SubagentAlreadyDedicated error message contains reasons
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Jun 22, 2024
1 parent 337bde7 commit 88ecd19
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ extends SignedItemContainer,
override def maybeAgentPath: Option[AgentPath] =
Some(meta.agentPath)

def agentRunId: AgentRunId =
meta.agentRunId

def controllerId: ControllerId =
meta.controllerId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ EvaluationFailed = "Evaluation failed: $name := $expression: $problem"
RecursiveEvaluation = "Recursive evaluation"
SubagentDriverStopped = "Subagent:$subagentId processes are lost"
SubagentShutDownBeforeProcessStart = "Subagent shut down before process could be started"
SubagentAlreadyDedicated = "Subagent has already been dedicated (maybe it should be restarted?) — $reasons"
ProcessLostDueToUnknownReason = "Process lost"
ProcessLostDueToRestart = "Process lost due to Subagent restart"
ProcessLostDueToReset = "Process lost due to Subagent reset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package js7.data.agent
import js7.base.generic.GenericString
import js7.base.problem.Checked
import js7.base.utils.Base64UUID
import js7.base.utils.ScalaUtils.syntax.RichEither
import js7.data.event.JournalId

/** The ID of an Agent run.
Expand All @@ -18,6 +19,7 @@ final case class AgentRunId(journalId: JournalId) extends GenericString:

object AgentRunId extends GenericString.NonEmpty[AgentRunId]:
val empty: AgentRunId = AgentRunId(JournalId(Base64UUID.zero))
val pastAgentVersion: AgentRunId = checked("--PAST-AGENT-VERSION--").orThrow

protected def unchecked(string: String): Nothing =
throw new NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ object Problems:
def arguments: Map[String, String] = Map(
"subagentId" -> subagentId.string)

type SubagentAlreadyDedicatedProblem = SubagentAlreadyDedicatedProblem.type
case object SubagentAlreadyDedicatedProblem extends Problem.ArgumentlessCoded
final case class SubagentAlreadyDedicatedProblem(reasons: String)
extends Problem.Coded:
def arguments = Map("reasons" -> reasons)

type SubagentNotDedicatedProblem = SubagentNotDedicatedProblem.type
case object SubagentNotDedicatedProblem extends Problem.ArgumentlessCoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import js7.base.problem.Checked
import js7.base.utils.Big
import js7.base.utils.ScalaUtils.syntax.*
import js7.base.version.Version
import js7.data.agent.AgentPath
import js7.data.agent.{AgentPath, AgentRunId}
import js7.data.command.CommonCommand
import js7.data.controller.ControllerId
import js7.data.event.EventId
Expand Down Expand Up @@ -49,6 +49,7 @@ object SubagentCommand extends CommonCommand.Companion:
final case class DedicateSubagent(
subagentId: SubagentId,
agentPath: AgentPath,
agentRunId: AgentRunId,
controllerId: ControllerId)
extends SubagentCommand:
type Response = DedicateSubagent.Response
Expand All @@ -59,6 +60,17 @@ object SubagentCommand extends CommonCommand.Companion:
version/*COMPATIBLE with v2.3*/: Option[Version])
extends SubagentCommand.Response

given Encoder.AsObject[DedicateSubagent] = deriveCodec[DedicateSubagent]
given Decoder[DedicateSubagent] =
c =>
for
subagentId <- c.get[SubagentId]("subagentId")
agentPath <- c.get[AgentPath]("agentPath")
agentRunId <- c.getOrElse[AgentRunId]("agentRunId")(AgentRunId.pastAgentVersion)
controllerId <- c.get[ControllerId]("controllerId")
yield
DedicateSubagent(subagentId, agentPath, agentRunId, controllerId)

final case class CoupleDirector(
subagentId: SubagentId,
subagentRunId: SubagentRunId,
Expand Down Expand Up @@ -143,7 +155,7 @@ object SubagentCommand extends CommonCommand.Companion:

implicit val jsonCodec: TypedJsonCodec[SubagentCommand] = TypedJsonCodec[SubagentCommand](
Subtype(deriveCodec[Batch]),
Subtype(deriveCodec[DedicateSubagent]),
Subtype[DedicateSubagent],
Subtype(deriveCodec[CoupleDirector]),
//Subtype(deriveCodec[AttachItem]),
Subtype[AttachSignedItem],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package js7.data.subagent
import js7.base.crypt.Signed
import js7.base.problem.Checked
import js7.base.utils.ScalaUtils.syntax.*
import js7.data.agent.AgentRunId
import js7.data.event.JournaledState
import js7.data.item.SignableItem
import js7.data.job.{JobKey, JobResource, JobResourcePath}
Expand All @@ -17,6 +18,7 @@ trait SubagentDirectorState[S <: SubagentDirectorState[S]]
extends JournaledState[S]:
this: S =>

def agentRunId: AgentRunId
def idToOrder: Map[OrderId, Order[Order.State]]
def idToWorkflow: Map[WorkflowId, Workflow]
def workflowJob(workflowPosition: WorkflowPosition): Checked[WorkflowJob]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import js7.base.io.process.ProcessSignal.SIGTERM
import js7.base.test.OurTestSuite
import js7.base.time.ScalaTime.*
import js7.base.utils.Base64UUID
import js7.data.agent.AgentPath
import js7.data.agent.{AgentPath, AgentRunId}
import js7.data.controller.{ControllerId, ControllerState}
import js7.data.event.JournalId
import js7.data.item.ItemSigner
import js7.data.order.{Order, OrderId}
import js7.data.other.HeartbeatTiming
Expand All @@ -25,6 +26,21 @@ final class SubagentCommandTest extends OurTestSuite:
DedicateSubagent(
SubagentId("SUBAGENT"),
AgentPath("AGENT"),
AgentRunId(JournalId(Base64UUID.zero)),
ControllerId("CONTROLLER")),
json"""{
"TYPE": "DedicateSubagent",
"subagentId": "SUBAGENT",
"agentPath": "AGENT",
"agentRunId": "AAAAAAAAAAAAAAAAAAAAAA",
"controllerId": "CONTROLLER"
}""")

testJsonDecoder[SubagentCommand](
DedicateSubagent(
SubagentId("SUBAGENT"),
AgentPath("AGENT"),
AgentRunId.pastAgentVersion,
ControllerId("CONTROLLER")),
json"""{
"TYPE": "DedicateSubagent",
Expand Down
16 changes: 10 additions & 6 deletions js7-subagent/src/main/scala/js7/subagent/DedicatedSubagent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import js7.base.utils.CatsUtils.syntax.logWhenItTakesLonger
import js7.base.utils.ScalaUtils.syntax.*
import js7.base.utils.{AsyncLock, Atomic}
import js7.core.command.CommandMeta
import js7.data.agent.AgentPath
import js7.data.agent.{AgentPath, AgentRunId}
import js7.data.controller.ControllerId
import js7.data.event.KeyedEvent.NoKey
import js7.data.job.{JobConf, JobKey}
Expand Down Expand Up @@ -46,6 +46,7 @@ final class DedicatedSubagent private(
val commandExecutor: SubagentCommandExecutor,
val journal: Journal[SubagentState],
val agentPath: AgentPath,
val agentRunId: AgentRunId,
val controllerId: ControllerId,
jobLauncherConf: JobLauncherConf,
subagentConf: SubagentConf)
Expand All @@ -59,10 +60,12 @@ extends Service.StoppableByRequest:
private val stoppingLock = AsyncLock()
private val orderToProcessing = AsyncMap.stoppable[OrderId, Processing]()
//private val director = AsyncVariable(none[Allocated[IO, DirectorRegisterable]])
private var _isUsed = false
@volatile private var _dontWaitForDirector = false
private val shuttingDown = Atomic(false)

def isLocal = true
def isUsed: Boolean =
_isUsed || isShuttingDown

def isShuttingDown: Boolean =
shuttingDown.get()
Expand Down Expand Up @@ -93,9 +96,8 @@ extends Service.StoppableByRequest:
.both(
orderIdToJobDriver.stop,
signal.fold(IO.unit)(killAndStopAllJobs))
.*>(IO {
fileValueState.close()
})
.*>(IO:
fileValueState.close())
.*>(orderToProcessing.initiateStopWithProblem(SubagentIsShuttingDownProblem))
.*>(
if dontWaitForDirector then IO:
Expand Down Expand Up @@ -194,6 +196,7 @@ extends Service.StoppableByRequest:
executeDefaultArguments: Map[String, Expression])
: IO[Checked[FiberIO[OrderProcessed]]] =
IO.defer:
_isUsed = true
orderToProcessing
.updateChecked(order.id, {
case Some(processing) =>
Expand Down Expand Up @@ -375,14 +378,15 @@ object DedicatedSubagent:
commandExecutor: SubagentCommandExecutor,
journal: Journal[SubagentState],
agentPath: AgentPath,
agentRunId: AgentRunId,
controllerId: ControllerId,
jobLauncherConf: JobLauncherConf,
subagentConf: SubagentConf)
(using ioRuntime: IORuntime)
: ResourceIO[DedicatedSubagent] =
Service.resource(IO(
new DedicatedSubagent(
subagentId, subagentRunId, commandExecutor, journal, agentPath, controllerId,
subagentId, subagentRunId, commandExecutor, journal, agentPath, agentRunId, controllerId,
jobLauncherConf, subagentConf)))

private final class Processing(
Expand Down
63 changes: 35 additions & 28 deletions js7-subagent/src/main/scala/js7/subagent/Subagent.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package js7.subagent

import cats.effect.Deferred
import cats.effect.unsafe.{IORuntime, Scheduler}
import cats.effect.{FiberIO, IO, Resource, ResourceIO}
import cats.effect.{Deferred, FiberIO, IO, Resource, ResourceIO}
import cats.syntax.traverse.*
import java.nio.file.Path
import js7.base.Js7Version
import js7.base.auth.{SessionToken, SimpleUser}
import js7.base.catsutils.CatsEffectExtensions.*
import js7.base.configutils.Configs.RichConfig
import js7.base.crypt.generic.DirectoryWatchingSignatureVerifier
import js7.base.eventbus.StandardEventBus
Expand Down Expand Up @@ -43,6 +41,7 @@ import js7.subagent.Subagent.*
import js7.subagent.configuration.SubagentConf
import js7.subagent.web.SubagentWebServer
import org.apache.pekko.actor.ActorSystem
import scala.collection.mutable

final class Subagent private(
val webServer: PekkoWebServer,
Expand Down Expand Up @@ -119,14 +118,6 @@ extends MainService, Service.StoppableByRequest:
// release = unregisterDirector)
yield ())

//def directorRegisteringResource(registerable: DirectorRegisterable): ResourceIO[Unit] =
// for {
// _ <- webServer.registeringRouteResource(registerable.toRoute)
// //_ <- Resource.make(
// // acquire = registerDirector(registerable))(
// // release = unregisterDirector)
// } yield ()

//private def registerDirector(registerable: DirectorRegisterable): IO[registerable.type] =
// directorRegisterable
// .update {
Expand All @@ -150,23 +141,39 @@ extends MainService, Service.StoppableByRequest:
def executeDedicateSubagent(cmd: DedicateSubagent): IO[Checked[DedicateSubagent.Response]] =
DedicatedSubagent
.resource(cmd.subagentId, subagentRunId, commandExecutor, journal,
cmd.agentPath, cmd.controllerId, jobLauncherConf, conf)
cmd.agentPath, cmd.agentRunId, cmd.controllerId, jobLauncherConf, conf)
.toAllocated
.flatMap(allocatedDedicatedSubagent => IO.defer {
val isFirst = dedicatedAllocated.trySet(allocatedDedicatedSubagent)
if !isFirst then
// TODO Idempotent: Frisch gewidmeter Subagent ist okay. Kein Kommando darf eingekommen sein.
//if (cmd.subagentId == dedicatedAllocated.orThrow.subagentId)
// IO.pure(Right(DedicateSubagent.Response(subagentRunId, EventId.BeforeFirst)))
//else
logger.warn(s"$cmd => $SubagentAlreadyDedicatedProblem: $dedicatedAllocated")
IO.left(SubagentAlreadyDedicatedProblem)
else
// TODO Check agentPath, controllerId (handle in SubagentState?)
logger.info(s"Subagent dedicated to be ${cmd.subagentId} in ${cmd.agentPath}, is ready")
IO.right(
DedicateSubagent.Response(subagentRunId, EventId.BeforeFirst, Some(Js7Version)))
})
.flatMap: allocatedDedicatedSubagent =>
IO:
dedicatedAllocated.trySet(allocatedDedicatedSubagent)
.flatMap: isFirst =>
val ok = IO:
logger.info(s"Subagent dedicated to be ${cmd.subagentId} in ${cmd.agentPath}, is ready")
Right(DedicateSubagent.Response(subagentRunId, EventId.BeforeFirst, Some(Js7Version)))
if isFirst then
ok
else
// Maybe it's a duplicate, idempotent command?
val errors = mutable.Buffer.empty[String]
val existing = dedicatedAllocated.orThrow.allocatedThing
if existing.subagentId != cmd.subagentId then
errors += s"Renaming dedication as ${cmd.subagentId} rejected"
if existing.agentPath != cmd.agentPath then
errors += s"Subagent is dedicated to an other ${existing.agentPath}"
if existing.controllerId != cmd.controllerId then
errors += s"Subagent is dedicated to ${existing.agentPath} of alien ${existing.controllerId}"
else if existing.agentPath == cmd.agentPath && existing.agentRunId != cmd.agentRunId then
errors += s"Subagent is dedicated to a past or alien ${existing.agentPath}"
if errors.isEmpty && existing.isUsed then
errors += s"Subagent is already in use"
if errors.nonEmpty then
val problem = SubagentAlreadyDedicatedProblem(reasons = errors.mkString(", "))
logger.warn(s"$cmd => $problem")
IO.unlessA(isFirst):
allocatedDedicatedSubagent.release
.as(Left(problem))
else
ok

def startOrderProcess(
order: Order[Order.Processing],
Expand Down Expand Up @@ -279,7 +286,7 @@ object Subagent:
ioExecutor: IOExecutor,
testEventBus: StandardEventBus[Any],
actorSystem: ActorSystem):
implicit def iox: IOExecutor = ioExecutor
given iox: IOExecutor = ioExecutor

type ItemSignatureKeysUpdated = ItemSignatureKeysUpdated.type
case object ItemSignatureKeysUpdated
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ extends SubagentDriver, Service.StoppableByRequest:
if wasRemote then
IO.right(())
else
val agentRunId = journal.unsafeCurrentState().agentRunId
subagent
.executeDedicateSubagent(
DedicateSubagent(subagentId, subagentItem.agentPath, controllerId))
DedicateSubagent(subagentId, subagentItem.agentPath, agentRunId, controllerId))
.flatMapT(response =>
persistDedicated(response.subagentRunId))))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ final class SubagentKeeper[S <: SubagentDirectorState[S]: Tag](
.get(subagentId)
.fold(IO.unit)(subagentDriver =>
subagentDriver.tryShutdown
.*>(stateVar.update(state => IO(
state.removeSubagent(subagentId))))
.*>(subagentDriver.terminate))))
.*>(stateVar.update(state => IO:
state.removeSubagent(subagentId)))
.*>(subagentDriver.terminate)))
.*>(journal
.persistKeyedEvent(ItemDetached(subagentId, agentPath))
.orThrow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ final class ResetSubagentWhileRunning2Test extends OurTestSuite, SubagentTester:
val eventId = eventWatch.await[SubagentReset](_.key == bareSubagentId).head.eventId
val failed1 = eventWatch.await[SubagentCouplingFailed](_.key == bareSubagentId,
after = eventId).head
assert(failed1.value.event == SubagentCouplingFailed(SubagentAlreadyDedicatedProblem))
assert(failed1.value.event ==
SubagentCouplingFailed(SubagentAlreadyDedicatedProblem("Subagent is already in use")))
//subagent.shutdown(Some(SIGKILL))
}
// SubagentShutdown is suppressed like any other event from Subagent after Reset
//eventWatch.await[SubagentShutdown](_.key == bareSubagentId)

assert(eventWatch.allKeyedEvents[SubagentItemStateEvent]
.collect {
case KeyedEvent(`bareSubagentId`, event @ SubagentCouplingFailed(SubagentAlreadyDedicatedProblem)) =>
case KeyedEvent(`bareSubagentId`, event @ SubagentCouplingFailed(SubagentAlreadyDedicatedProblem("Subagent is already in use"))) =>
Some(event)

case ke @ KeyedEvent(`bareSubagentId`, event @ SubagentCouplingFailed(problem)) =>
Expand All @@ -97,7 +98,7 @@ final class ResetSubagentWhileRunning2Test extends OurTestSuite, SubagentTester:
SubagentResetStarted(false),
SubagentCouplingFailed(Problem("decoupled")),
SubagentReset,
SubagentCouplingFailed(SubagentAlreadyDedicatedProblem)/*time dependent?*/))
SubagentCouplingFailed(SubagentAlreadyDedicatedProblem("Subagent is already in use"))/*time dependent?*/))

assert(eventWatch.allKeyedEvents[OrderEvent] == Seq(
aOrderId <-: OrderAdded(workflow.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final class ResetSubagentWhileRunningTest extends OurTestSuite, SubagentTester:

assert(eventWatch.allKeyedEvents[SubagentItemStateEvent]
.collect {
case KeyedEvent(`bareSubagentId`, event @ SubagentCouplingFailed(SubagentAlreadyDedicatedProblem)) =>
case KeyedEvent(`bareSubagentId`, event @ SubagentCouplingFailed(SubagentAlreadyDedicatedProblem("Subagent is already in use"))) =>
Some(event)

case KeyedEvent(`bareSubagentId`, SubagentDedicated(runId, _)) =>
Expand All @@ -92,7 +92,7 @@ final class ResetSubagentWhileRunningTest extends OurTestSuite, SubagentTester:
SubagentResetStarted(false),
SubagentCouplingFailed(Problem("decoupled")),
SubagentReset,
SubagentCouplingFailed(SubagentAlreadyDedicatedProblem)/*time dependent?*/))
SubagentCouplingFailed(SubagentAlreadyDedicatedProblem("Subagent is already in use"))/*time dependent?*/))

assert(eventWatch.allKeyedEvents[OrderEvent] == Seq(
aOrderId <-: OrderAdded(workflow.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ final class StealAndResetSubagentTest extends OurTestSuite, SubagentTester:
controller.api.addOrder(FreshOrder(thieveOrderId, thieveWorkflow.path)).await(99.s).orThrow
eventWatch.await[OrderAttached](_.key == thieveOrderId)

// STEAL
// STEAL THE SUBAGENT
controller.api.executeCommand(ResetSubagent(stolenSubagentItem.id, force = true))
.await(99.s).orThrow
eventWatch.await[SubagentResetStarted](_.key == stolenSubagentItem.id)

// The stolen orders are killed
// The stolen orders are canceled (and its processes have been killed)
val processed = eventWatch.await[OrderProcessed](_.key == aOrderId).head
assert(processed.value.event ==
OrderProcessed(OrderOutcome.Killed(OrderOutcome.Failed(Some("Canceled")))))
Expand Down

0 comments on commit 88ecd19

Please sign in to comment.