Skip to content

Commit

Permalink
Make on propagate error
Browse files Browse the repository at this point in the history
  • Loading branch information
InversionSpaces committed Jul 10, 2023
1 parent b661194 commit b7ea3be
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 53 deletions.
17 changes: 12 additions & 5 deletions model/inline/src/main/scala/aqua/model/inline/TagInliner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import cats.syntax.option.*
import cats.instances.list.*
import cats.data.{Chain, State, StateT}
import cats.syntax.show.*
import cats.syntax.bifunctor.*
import scribe.{log, Logging}
import aqua.model.inline.Inline.parDesugarPrefixOpt

Expand Down Expand Up @@ -186,11 +187,17 @@ object TagInliner extends Logging {
flat(vm, tree, true)
}
(pid, pif) = peerIdDe
viaD = Chain.fromSeq(viaDeFlattened.map(_._1))
viaF = viaDeFlattened.flatMap(_._2)

} yield TagInlined.Single(
model = OnModel(pid, viaD),
(viaD, viaF) = viaDeFlattened.unzip
.bimap(Chain.fromSeq, _.flatten)
toModel = (children: Chain[OpModel.Tree]) =>
XorModel.wrap(
OnModel(pid, viaD).wrap(
children
),
FailModel(ValueModel.lastError).leaf
)
} yield TagInlined.Mapping(
toModel = toModel,
prefix = parDesugarPrefix(viaF.prependedAll(pif))
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object Transform extends Logging {

// Transform the body of the function: wrap it with initCallable, provide function arguments via service calls
val transform: RawTag.Tree => RawTag.Tree =
argsProvider.transform andThen initCallable.transform
argsProvider.transform

// Callback on the init peer id, either done via relay or not
val callback = initCallable.service(conf.callbackSrvId)
Expand All @@ -117,8 +117,9 @@ object Transform extends Logging {
// Pre transform and inline the function
model <- funcToModelTree(func, preTransformer)
// Post transform the function
errorsModel = errorsCatcher.transform(model)
tracingModel <- tracing(errorsModel)
initModel = initCallable.onInitPeer.wrap(model)
// errorsModel = errorsCatcher.transform(model)
tracingModel <- tracing(initModel)
// Resolve topology
resolved <- Topology.resolve(tracingModel)
// Clear the tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,43 +187,51 @@ object Topology extends Logging {
.filterNot(lastPeerId => before.headOption.exists(_.peerId == lastPeerId))
)

private def decideBefore(cursor: OpModelTreeCursor): Before =
(cursor.parentOp, cursor.op) match {
case (_, _: FailModel) => Fail
case (Some(XorModel), _) => XorBranch
case (Some(_: SeqGroupModel), _) => SeqGroupBranch
case (None, _) => Root
case _ => Default
}

private def decideBegins(cursor: OpModelTreeCursor): Begins =
(cursor.parentOp, cursor.op) match {
case (_, _: FailModel) => Fail
case (Some(_: SeqGroupModel), _: NextModel) => SeqNext
case (_, _: ForModel) => For
// No begin optimization for detach
case (_, ParModel) => ParGroup
case _ => Default
}

private def decideEnds(cursor: OpModelTreeCursor): Ends =
cursor.op match {
case _: SeqGroupModel => SeqGroup
case XorModel => XorGroup
case _: ParGroupModel => ParGroup
case _ if cursor.parentOp.isEmpty => Root
case _ => Default
}

private def decideAfter(cursor: OpModelTreeCursor): After =
(cursor.parentOp, cursor.op) match {
case (_, _: FailModel) => Fail
case (Some(_: ParGroupModel), _) => ParGroupBranch
case (Some(XorModel), _) => XorBranch
case (Some(_: SeqGroupModel), _) => SeqGroupBranch
case (None, _) => Root
case _ => Default
}

def make(cursor: OpModelTreeCursor): Topology =
Topology(
cursor,
// Before
cursor.parentOp match {
case Some(XorModel) => XorBranch
case Some(_: SeqGroupModel) => SeqGroupBranch
case None => Root
case _ => Default
},
// Begin
(cursor.parentOp, cursor.op) match {
case (Some(_: SeqGroupModel), _: NextModel) =>
SeqNext
case (_, _: ForModel) =>
For
case (_, ParModel) => // No begin optimization for detach
ParGroup
case _ =>
Default
},
// End
cursor.op match {
case _: SeqGroupModel => SeqGroup
case XorModel => XorGroup
case _: ParGroupModel => ParGroup
case _ if cursor.parentOp.isEmpty => Root
case _ => Default
},
// After
cursor.parentOp match {
case Some(_: ParGroupModel) => ParGroupBranch
case Some(XorModel) => XorBranch
case Some(_: SeqGroupModel) => SeqGroupBranch
case None => Root
case _ => Default
}
cursor = cursor,
before = decideBefore(cursor),
begins = decideBegins(cursor),
ends = decideEnds(cursor),
after = decideAfter(cursor)
)

def resolve(op: OpModel.Tree, debug: Boolean = false): Eval[Res] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@ import aqua.model.{OnModel, ValueModel}
import cats.Eval
import cats.data.Chain
import cats.syntax.apply.*
import cats.syntax.functor.*
import cats.syntax.monad.*
import cats.instances.tuple.*

trait Begins {

def beginsOn(current: Topology): Eval[List[OnModel]] = current.pathOn

def pathBefore(current: Topology): Eval[Chain[ValueModel]] =
(current.beforeOn, current.beginsOn).mapN { case (bef, beg) =>
(PathFinder.findPath(bef, beg), bef, beg)
}.flatMap { case (pb, bef, beg) =>
// Handle the case when we need to go through the relay, but miss the hop as it's the first
// peer where we go, but there's no service calls there
current.firstExecutesOn.map {
case Some(where) if where != beg =>
pb ++ Topology.findRelayPathEnforcement(bef, beg)
case _ => pb
(current.beforeOn, current.beginsOn).tupled
.fproduct(PathFinder.findPath.tupled)
.flatMap { case ((bef, beg), path) =>
// Handle the case when we need to go through the relay, but miss the hop as it's the first
// peer where we go, but there's no service calls there
current.firstExecutesOn.map {
case Some(where) if where != beg =>
path ++ Topology.findRelayPathEnforcement(bef, beg)
case _ => path
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package aqua.model.transform.topology.strategy

import aqua.model.transform.topology.Topology
import aqua.model.ValueModel

import aqua.model.{OnModel, XorModel}

import cats.data.Chain
import cats.Eval
import cats.syntax.apply.*
import cats.syntax.functor.*
import cats.syntax.traverse.*
import cats.syntax.option.*
import cats.syntax.applicative.*

object Fail extends Before with Begins with After {

override def forceExit(current: Topology): Eval[Boolean] =
Eval.now(false) // override just to be explicit

override def beforeOn(current: Topology): Eval[List[OnModel]] =
current.parent
.map(_.cursor.op)
.collect { case XorModel =>
current.prevSibling.traverse(_.beginsOn)
}
.flatSequence
.flatMap(
_.fold(super.beforeOn(current))(_.pure)
)

override def pathBefore(current: Topology): Eval[Chain[ValueModel]] =
for {
path <- super.pathBefore(current)
begins <- current.beginsOn
hop = begins.headOption
.map(_.peerId)
.filterNot(
path.lastOption.contains
)
} yield path ++ Chain.fromOption(hop)
}

0 comments on commit b7ea3be

Please sign in to comment.