Skip to content

Commit

Permalink
Merge branch 'release/2.6' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Zschimmer committed Nov 8, 2023
2 parents 0f6dc6b + 00bff39 commit 53360db
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ extends MainJournalingActor[AgentState, Event], Stash:
//val processLimitIncreased = previous
// .collect { case o: AgentRef => o.processLimit }
// .flatten
// .exists(previous => agentRef.processLimit.forall(previous < _))
// .forall(previous => agentRef.processLimit.forall(previous < _))
//if processLimitIncreased then
self ! Internal.TryStartProcessing
Task.right(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ extends Stash, MainJournalingActor[ControllerState, Event]:
.runToFuture
.pipeTo(self)

for path <- _controllerState.keyToItem(AgentRef).keys do
proceedWithItem(path)
for path <- _controllerState.keyTo(WorkflowPathControl).keys do
proceedWithItem(path)
for itemKey <- _controllerState.keyTo(WorkflowControl).keys do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import js7.controller.agent.AgentDriver.DecoupledProblem
import js7.controller.agent.DirectorDriver.*
import js7.data.agent.AgentRefStateEvent.{AgentCoupled, AgentReset}
import js7.data.agent.Problems.AgentNotDedicatedProblem
import js7.data.agent.{AgentPath, AgentRefState, AgentRunId}
import js7.data.agent.{AgentPath, AgentRef, AgentRefState, AgentRunId}
import js7.data.cluster.ClusterEvent
import js7.data.controller.{ControllerRunId, ControllerState}
import js7.data.delegate.DelegateCouplingState.{Coupled, Resetting}
import js7.data.event.KeyedEvent.NoKey
import js7.data.event.{AnyKeyedEvent, Event, EventId, EventRequest, Stamped}
import js7.data.item.BasicItemEvent.ItemAttachable
import js7.data.item.InventoryItemEvent
import js7.data.order.{OrderEvent, OrderId}
import js7.data.orderwatch.OrderWatchEvent
Expand All @@ -36,6 +38,7 @@ import monix.reactive.Observable
import scala.util.chaining.scalaUtilChainingOps

private[agent] final class DirectorDriver private(
agentDriver: AgentDriver,
agentPath: AgentPath,
initialEventId: EventId,
client: AgentClient,
Expand Down Expand Up @@ -105,8 +108,21 @@ extends Service.StoppableByRequest:
.lock(agentPath)(
journal.persist(controllerState =>
for a <- controllerState.keyTo(AgentRefState).checked(agentPath) yield
(a.couplingState != Coupled || a.problem.nonEmpty)
.thenList(agentPath <-: AgentCoupled)))
Seq(
(a.couplingState != Coupled || a.problem.nonEmpty) ?
(agentPath <-: AgentCoupled),
// The coupled Agent may not yet have an AgentRef (containing the
// processLimit). So we need to check this:
!controllerState.itemToAgentToAttachedState.contains(agentPath) ?
(NoKey <-: ItemAttachable(agentPath, agentPath))
).flatten))
.flatTapT { case (stamped, state) =>
Task
.when(stamped.exists(_.value.event.isInstanceOf[ItemAttachable]))(
state.keyToItem(AgentRef).get(agentPath).fold(Task.unit)(agentRef =>
agentDriver.send(AgentDriver.Queueable.AttachUnsignedItem(agentRef))))
.as(Checked.unit)
}
.rightAs(agentEventId)
}
}
Expand Down Expand Up @@ -242,6 +258,7 @@ private[agent] object DirectorDriver:
classOf[ClusterEvent])

def resource(
agentDriver: AgentDriver,
agentPath: AgentPath,
initialEventId: EventId,
client: AgentClient,
Expand All @@ -255,7 +272,7 @@ private[agent] object DirectorDriver:
: Resource[Task, DirectorDriver] =
Service.resource(Task(
new DirectorDriver(
agentPath, initialEventId, client,
agentDriver, agentPath, initialEventId, client,
dedicateAgentIfNeeded,
onCouplingFailed, onCoupled, onDecoupled, adoptEvents,
journal, conf)))
Expand Down

0 comments on commit 53360db

Please sign in to comment.