Skip to content

Commit

Permalink
Merge branch 'release/2.6' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Nov 8, 2023
2 parents 0f6dc6b + 00bff39 commit cbeb15d
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ extends MainJournalingActor[AgentState, Event], Stash:
//val processLimitIncreased = previous
// .collect { case o: AgentRef => o.processLimit }
// .flatten
// .exists(previous => agentRef.processLimit.forall(previous < _))
// .forall(previous => agentRef.processLimit.forall(previous < _))
//if processLimitIncreased then
self ! Internal.TryStartProcessing
Task.right(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ extends Stash, MainJournalingActor[ControllerState, Event]:
.runToFuture
.pipeTo(self)

for path <- _controllerState.keyToItem(AgentRef).keys do
proceedWithItem(path)
for path <- _controllerState.keyTo(WorkflowPathControl).keys do
proceedWithItem(path)
for itemKey <- _controllerState.keyTo(WorkflowControl).keys do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ extends Service.StoppableByRequest:
//adoptedEventId <- Resource.eval(Task(state.adoptedEventId))
afterEventId <- Resource.eval(agentRefState.map(_.eventId))
directorDriver <- DirectorDriver.resource(
agentPath, afterEventId,
agentDriver, agentPath, afterEventId,
client,
dedicateAgentIfNeeded, onCouplingFailed, onCoupled, onDecoupled,
onEventsFetched,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import js7.controller.agent.AgentDriver.DecoupledProblem
import js7.controller.agent.DirectorDriver.*
import js7.data.agent.AgentRefStateEvent.{AgentCoupled, AgentReset}
import js7.data.agent.Problems.AgentNotDedicatedProblem
import js7.data.agent.{AgentPath, AgentRefState, AgentRunId}
import js7.data.agent.{AgentPath, AgentRef, AgentRefState, AgentRunId}
import js7.data.cluster.ClusterEvent
import js7.data.controller.{ControllerRunId, ControllerState}
import js7.data.delegate.DelegateCouplingState.{Coupled, Resetting}
import js7.data.event.KeyedEvent.NoKey
import js7.data.event.{AnyKeyedEvent, Event, EventId, EventRequest, Stamped}
import js7.data.item.BasicItemEvent.ItemAttachable
import js7.data.item.InventoryItemEvent
import js7.data.order.{OrderEvent, OrderId}
import js7.data.orderwatch.OrderWatchEvent
Expand All @@ -36,6 +38,7 @@ import monix.reactive.Observable
import scala.util.chaining.scalaUtilChainingOps

private[agent] final class DirectorDriver private(
agentDriver: AgentDriver,
agentPath: AgentPath,
initialEventId: EventId,
client: AgentClient,
Expand Down Expand Up @@ -105,8 +108,21 @@ extends Service.StoppableByRequest:
.lock(agentPath)(
journal.persist(controllerState =>
for a <- controllerState.keyTo(AgentRefState).checked(agentPath) yield
(a.couplingState != Coupled || a.problem.nonEmpty)
.thenList(agentPath <-: AgentCoupled)))
Seq(
(a.couplingState != Coupled || a.problem.nonEmpty) ?
(agentPath <-: AgentCoupled),
// The coupled Agent may not yet have an AgentRef (containing the
// processLimit). So we need to check this:
!controllerState.itemToAgentToAttachedState.contains(agentPath) ?
(NoKey <-: ItemAttachable(agentPath, agentPath))
).flatten))
.flatTapT { case (stamped, state) =>
Task
.when(stamped.exists(_.value.event.isInstanceOf[ItemAttachable]))(
state.keyToItem(AgentRef).get(agentPath).fold(Task.unit)(agentRef =>
agentDriver.send(AgentDriver.Queueable.AttachUnsignedItem(agentRef))))
.as(Checked.unit)
}
.rightAs(agentEventId)
}
}
Expand Down Expand Up @@ -242,6 +258,7 @@ private[agent] object DirectorDriver:
classOf[ClusterEvent])

def resource(
agentDriver: AgentDriver,
agentPath: AgentPath,
initialEventId: EventId,
client: AgentClient,
Expand All @@ -255,7 +272,7 @@ private[agent] object DirectorDriver:
: Resource[Task, DirectorDriver] =
Service.resource(Task(
new DirectorDriver(
agentPath, initialEventId, client,
agentDriver, agentPath, initialEventId, client,
dedicateAgentIfNeeded,
onCouplingFailed, onCoupled, onDecoupled, adoptEvents,
journal, conf)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ extends UnsignedSimpleItemState:

def agentPath = agentRef.path

def agentPathToAttachedState = Map.empty

def applyEvent(event: AgentRefStateEvent): Checked[AgentRefState] =
event match
case AgentDedicated(agentRunId_, eventId_) =>
Expand Down
2 changes: 0 additions & 2 deletions js7-data/shared/src/main/scala/js7/data/lock/LockState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ extends UnsignedSimpleItemState, Big/*acquired and queue get big, many orders*/:
def updateItem(lock: Lock): Checked[LockState] =
Right(copy(lock = lock))

def agentPathToAttachedState = Map.empty

def enqueue(orderId: OrderId, count: Option[Int]): Checked[LockState] =
if !count.forall(_ <= limit) then
Left(Problem(s"Cannot fulfill lock count=${count getOrElse ""} with $lockPath limit=$limit"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import js7.base.utils.Collections.RichMap
import js7.base.utils.IntelliJUtils.intelliJuseImport
import js7.base.utils.ScalaUtils.syntax.RichPartialFunction
import js7.base.utils.Tests.isStrict
import js7.data.agent.AgentPath
import js7.data.event.KeyedEvent
import js7.data.item.UnsignedSimpleItemEvent.UnsignedSimpleItemAdded
import js7.data.item.{ItemAttachedState, UnsignedSimpleItemState}
import js7.data.item.UnsignedSimpleItemState
import js7.data.order.OrderEvent.{OrderAdded, OrderCoreEvent, OrderDeletionMarked}
import js7.data.order.{FreshOrder, OrderId}
import js7.data.orderwatch.OrderWatchEvent.{ExternalOrderArised, ExternalOrderVanished}
Expand Down Expand Up @@ -226,7 +225,6 @@ object OrderWatchState extends UnsignedSimpleItemState.Companion[OrderWatchState

def apply(
orderWatch: OrderWatch,
agentPathToAttachedState: Map[AgentPath, ItemAttachedState.NotDetached],
sourceToOrderId: Map[ExternalOrderName, ArisedOrHasOrder])
: OrderWatchState =
OrderWatchState(orderWatch, sourceToOrderId, Set.empty, Set.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import js7.data.delegate.DelegateCouplingState
import js7.data.event.SnapshotMeta.SnapshotEventId
import js7.data.event.{EventId, JournalState, SnapshotableState, Stamped}
import js7.data.item.BasicItemEvent.{ItemAttachable, ItemDeletionMarked}
import js7.data.item.ItemAttachedState.{Attachable, Attached}
import js7.data.item.ItemAttachedState.Attachable
import js7.data.item.SignedItemEvent.SignedItemAdded
import js7.data.item.UnsignedSimpleItemEvent.{UnsignedSimpleItemAdded, UnsignedSimpleItemChanged}
import js7.data.item.VersionedEvent.{VersionAdded, VersionedItemAdded, VersionedItemChanged}
Expand Down Expand Up @@ -536,7 +536,6 @@ object ControllerStateTest
subagentSelection.id -> SubagentSelectionState(subagentSelection),
fileWatch.path -> OrderWatchState(
fileWatch,
Map(agentRef.path -> Attached(Some(ItemRevision(7)))),
Map(
ExternalOrderName("ORDER-NAME") -> HasOrder(OrderId("ORDER"), Some(Vanished)))),
WorkflowPathControlPath(workflow.path) -> WorkflowPathControl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ final class OrderWatchStateHandlerTest extends OurTestSuite
assert(state.pathToOrderWatchStateMap == Map(
aOrderWatch.path -> OrderWatchState(
aOrderWatch,
Map.empty,
Map(
ExternalOrderName("A") -> arised("A"),
ExternalOrderName("B") -> arised("B"))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package js7.data.orderwatch
import io.circe.syntax.EncoderOps
import js7.base.circeutils.CirceUtils.JsonStringInterpolator
import js7.base.circeutils.typed.{Subtype, TypedJsonCodec}
import js7.base.test.OurAsyncTestSuite
import js7.base.time.ScalaTime.*
import js7.base.utils.SimplePattern
import js7.data.agent.AgentPath
import js7.data.controller.ControllerState
import js7.data.item.ItemAttachedState.Attached
import js7.data.item.{ItemRevision, UnsignedSimpleItemEvent}
import js7.data.order.OrderId
import js7.data.orderwatch.OrderWatchState.{Arised, ArisedOrHasOrder, ExternalOrderSnapshot, HasOrder, Vanished}
Expand All @@ -17,7 +17,6 @@ import js7.data.value.{NamedValues, StringValue}
import js7.data.workflow.WorkflowPath
import js7.tester.CirceJsonTester.{testJson, testJsonDecoder}
import monix.execution.Scheduler.Implicits.traced
import js7.base.test.OurAsyncTestSuite

final class OrderWatchStateTest extends OurAsyncTestSuite
{
Expand All @@ -31,7 +30,6 @@ final class OrderWatchStateTest extends OurAsyncTestSuite
Some(NamedValue("1")),
delay = 2.s,
Some(ItemRevision(7))),
Map(AgentPath("AGENT") -> Attached(Some(ItemRevision(7)))),
Map( // Not in snapshot, because its duplicate to Order.externalOrderKey
ExternalOrderName("A-NAME") -> Arised(OrderId("A-ORDER"), NamedValues("K" -> StringValue("V"))),
ExternalOrderName("B-NAME") -> HasOrder(OrderId("B-ORDER"), Some(Vanished))))
Expand Down

0 comments on commit cbeb15d

Please sign in to comment.