From 0a39ad5ea3b3c216dde5e175bad503802f71fdc1 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Mon, 19 Jun 2023 15:06:26 +0000 Subject: [PATCH] Remove JoinModel, fix tests --- .../inline/raw/ApplyGateRawInliner.scala | 158 ++++++++++-------- .../res/src/main/scala/aqua/res/MakeRes.scala | 25 --- model/src/main/scala/aqua/model/OpModel.scala | 8 - .../aqua/model/transform/ModelBuilder.scala | 32 ++++ .../transform/topology/TopologySpec.scala | 60 +++---- 5 files changed, 156 insertions(+), 127 deletions(-) diff --git a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala index 2413ac52e..29b534bf8 100644 --- a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala @@ -7,12 +7,92 @@ import aqua.raw.value.{ApplyGateRaw, LiteralRaw, VarRaw} import cats.data.State import cats.data.Chain import aqua.model.inline.RawValueInliner.unfold -import aqua.types.{CanonStreamType, ScalarType, StreamType, ArrayType} +import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType} import cats.syntax.monoid.* import scribe.Logging object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { + /** + * To wait for the element of a stream by the given index, the following model is generated: + * (seq + * (seq + * (seq + * (call %init_peer_id% ("math" "add") [0 1] stream_incr) + * (fold $stream s + * (seq + * (seq + * (ap s $stream_test) + * (canon %init_peer_id% $stream_test #stream_iter_canon) + * ) + * (xor + * (match #stream_iter_canon.length stream_incr + * (null) + * ) + * (next s) + * ) + * ) + * (never) + * ) + * ) + * (canon %init_peer_id% $stream_test #stream_result_canon) + * ) + * (ap #stream_result_canon stream_gate) + * ) + */ + def joinStreamOnIndexModel( + streamName: String, + streamType: StreamType, + idxModel: ValueModel, + idxIncrName: String, + testName: String, + iterName: String, + canonName: String, + iterCanonName: String, + resultName: String + ): OpModel.Tree = { + val varSTest = VarModel(testName, streamType) + val iter = VarModel(iterName, streamType.element) + + val iterCanon = VarModel(iterCanonName, CanonStreamType(streamType.element)) + + val resultCanon = + VarModel(canonName, CanonStreamType(streamType.element)) + + val incrVar = VarModel(idxIncrName, ScalarType.u32) + + RestrictionModel(varSTest.name, true).wrap( + increment(idxModel, incrVar), + ForModel(iter.name, VarModel(streamName, streamType), Some(ForModel.NeverMode)).wrap( + PushToStreamModel( + iter, + CallModel.Export(varSTest.name, varSTest.`type`) + ).leaf, + CanonicalizeModel( + varSTest, + CallModel.Export(iterCanon.name, iterCanon.`type`) + ).leaf, + XorModel.wrap( + MatchMismatchModel( + iterCanon + .copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))), + incrVar, + true + ).leaf, + NextModel(iter.name).leaf + ) + ), + CanonicalizeModel( + varSTest, + CallModel.Export(resultCanon.name, CanonStreamType(streamType.element)) + ).leaf, + FlattenModel( + resultCanon, + resultName + ).leaf + ) + } + override def apply[S: Mangler: Exports: Arrows]( afr: ApplyGateRaw, propertyAllowed: Boolean @@ -27,73 +107,19 @@ object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { idxFolded <- unfold(afr.idx) (idxModel, idxInline) = idxFolded } yield { - val varSTest = VarModel(uniqueTestName, afr.streamType) - val iter = VarModel(uniqueIter, afr.streamType.element) - - val iterCanon = VarModel(uniqueIterCanon, CanonStreamType(afr.streamType.element)) - - val resultCanon = - VarModel(uniqueCanonName, CanonStreamType(afr.streamType.element)) - - val incrVar = VarModel(uniqueIdxIncr, ScalarType.u32) - - // To wait for the element of a stream by the given index, the following model is generated: - // (seq - // (seq - // (seq - // (call %init_peer_id% ("math" "add") [0 1] stream_incr) - // (fold $stream s - // (seq - // (seq - // (ap s $stream_test) - // (canon %init_peer_id% $stream_test #stream_iter_canon) - // ) - // (xor - // (match #stream_iter_canon.length stream_incr - // (null) - // ) - // (next s) - // ) - // ) - // (never) - // ) - // ) - // (canon %init_peer_id% $stream_test #stream_result_canon) - // ) - // (ap #stream_result_canon stream_gate) - // ) - val gate = RestrictionModel(varSTest.name, true).wrap( - increment(idxModel, incrVar), - ForModel(iter.name, VarModel(afr.name, afr.streamType), Some(ForModel.NeverMode)).wrap( - PushToStreamModel( - iter, - CallModel.Export(varSTest.name, varSTest.`type`) - ).leaf, - CanonicalizeModel( - varSTest, - CallModel.Export(iterCanon.name, iterCanon.`type`) - ).leaf, - XorModel.wrap( - MatchMismatchModel( - iterCanon - .copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))), - incrVar, - true - ).leaf, - NextModel(iter.name).leaf - ) - ), - CanonicalizeModel( - varSTest, - CallModel.Export(resultCanon.name, CanonStreamType(afr.streamType.element)) - ).leaf, - FlattenModel( - resultCanon, - uniqueResultName - ).leaf + val gate = joinStreamOnIndexModel( + streamName = afr.name, + streamType = afr.streamType, + idxModel = idxModel, + idxIncrName = uniqueIdxIncr, + testName = uniqueTestName, + iterName = uniqueIter, + canonName = uniqueCanonName, + iterCanonName = uniqueIterCanon, + resultName = uniqueResultName ) - val tree = SeqModel.wrap((idxInline.predo.toList :+ gate):_*) + val tree = SeqModel.wrap((idxInline.predo.toList :+ gate): _*) val treeInline = Inline(idxInline.flattenValues, predo = Chain.one(tree)) diff --git a/model/res/src/main/scala/aqua/res/MakeRes.scala b/model/res/src/main/scala/aqua/res/MakeRes.scala index 64ea017ec..a6b35a2fe 100644 --- a/model/res/src/main/scala/aqua/res/MakeRes.scala +++ b/model/res/src/main/scala/aqua/res/MakeRes.scala @@ -28,29 +28,6 @@ object MakeRes { ) } - def join(onPeer: ValueModel, operands: NonEmptyList[ValueModel]): ResolvedOp.Tree = { - val streamName = "join-stream" - val aps = operands.collect { - // Optimization: do not join literals - case vm: VarModel => - ApRes( - operand = vm, - exportTo = CallModel.Export( - name = streamName, - // We put vars of possibly different types inside - `type` = StreamType(TopType) - ) - ).leaf - }.toList - - if (aps.isEmpty) NullRes.leaf // TODO: Return NoAir here? - else - RestrictionRes( - streamName, - isStream = true - ).wrap(aps) - } - def resolve( currentPeerId: Option[ValueModel], i: Int @@ -88,8 +65,6 @@ object MakeRes { ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf case FlattenModel(operand, assignTo) => ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf - case JoinModel(operands) => - join(orInit(currentPeerId), operands) case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) => CallServiceRes( serviceId, diff --git a/model/src/main/scala/aqua/model/OpModel.scala b/model/src/main/scala/aqua/model/OpModel.scala index b3093b4ab..7106746be 100644 --- a/model/src/main/scala/aqua/model/OpModel.scala +++ b/model/src/main/scala/aqua/model/OpModel.scala @@ -192,14 +192,6 @@ case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export) override def usesVarNames: Set[String] = operand.usesVarNames } -case class JoinModel(operands: NonEmptyList[ValueModel]) extends ForceExecModel { - - override def toString: String = s"join ${operands.toList.mkString(", ")}" - - override lazy val usesVarNames: Set[String] = - operands.toList.flatMap(_.usesVarNames).toSet -} - case class CaptureTopologyModel(name: String) extends NoExecModel case class ApplyTopologyModel(name: String) extends SeqGroupModel diff --git a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala index 6b7ea6e87..d844ed65f 100644 --- a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala +++ b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala @@ -21,6 +21,11 @@ import aqua.res.{CallRes, CallServiceRes, MakeRes} import aqua.types.{ArrayType, LiteralType, ScalarType} import scala.language.implicitConversions +import aqua.types.StreamType +import aqua.model.IntoIndexModel +import cats.data.Chain +import cats.data.Chain.==: +import aqua.model.inline.raw.ApplyGateRawInliner object ModelBuilder { implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw) @@ -127,4 +132,31 @@ object ModelBuilder { def through(peer: ValueModel) = MakeRes.hop(peer) + + /** + * @param streamEl [[ValueModel]] of `stream[idx]` + * @return [[OpModel.Tree]] of join of `stream[idx]` + */ + def join(streamEl: ValueModel): OpModel.Tree = + streamEl match { + case VarModel( + streamName, + streamType: StreamType, + IntoIndexModel(idx, idxType) ==: Chain.`nil` + ) => + ApplyGateRawInliner.joinStreamOnIndexModel( + streamName = streamName, + streamType = streamType, + idxModel = + if (idx.forall(Character.isDigit)) LiteralModel(idx, idxType) + else VarModel(idx, idxType), + idxIncrName = streamName + "_incr", + testName = streamName + "_test", + iterName = streamName + "_fold_var", + canonName = streamName + "_result_canon", + iterCanonName = streamName + "_iter_canon", + resultName = streamName + "_gate" + ) + case _ => ??? + } } diff --git a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala index fbf77ddf9..7577823b3 100644 --- a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala +++ b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala @@ -16,11 +16,25 @@ import cats.syntax.option.* import aqua.types.ArrayType import aqua.raw.ConstantRaw.initPeerId import aqua.model.ForModel.NullMode +import aqua.raw.value.ValueRaw class TopologySpec extends AnyFlatSpec with Matchers { import ModelBuilder._ + def joinModelRes(streamEl: ValueRaw | ValueModel): (OpModel.Tree, ResolvedOp.Tree) = { + val model = join(streamEl match { + case vm: ValueModel => vm + case vr: ValueRaw => ValueModel.fromRaw(vr) + }) + + // Join should not contain any topology hops + // So assume that translation works correctly for it + val res = Topology.resolve(model) + + (model, res.value) + } + "topology resolver" should "do nothing on init peer" in { val init = OnModel( @@ -435,6 +449,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val stream = ValueModel.fromRaw(streamRaw) val streamEl = ValueModel.fromRaw(streamRawEl) + val (joinModel, joinRes) = joinModelRes(streamEl) + val init = SeqModel.wrap( DeclareStreamModel(stream).leaf, @@ -451,7 +467,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(streamEl)).leaf, + joinModel, callModel(3, Nil, streamRaw :: Nil) ) ) @@ -481,14 +497,10 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(streamEl :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, stream :: Nil) ) + proc.equalsOrShowDiff(expected) should be(true) } @@ -502,6 +514,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val stream = ValueModel.fromRaw(streamRaw) val streamEl = ValueModel.fromRaw(streamRawEl) + val (joinModel, joinRes) = joinModelRes(streamEl) + val init = SeqModel.wrap( DeclareStreamModel(stream).leaf, @@ -520,7 +534,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(streamEl)).leaf, + joinModel, callModel(3, Nil, streamRaw :: Nil) ) ) @@ -552,12 +566,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(streamEl :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, stream :: Nil) ) @@ -784,6 +793,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val usedWithIdx = used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) + val (joinModel, joinRes) = joinModelRes(usedWithIdx) + val init = OnModel(initPeer, Chain.one(relay)).wrap( foldPar( @@ -793,7 +804,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { callModel(1, CallModel.Export(used.name, used.`type`) :: Nil) ) ), - JoinModel(NonEmptyList.one(usedWithIdx)).leaf, + joinModel, callModel(3, Nil, used :: Nil) ) @@ -818,12 +829,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(usedWithIdx :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) ) @@ -835,6 +841,9 @@ class TopologySpec extends AnyFlatSpec with Matchers { val used = VarRaw("used", StreamType(ScalarType.string)) val usedWithIdx = used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) + + val (joinModel, joinRes) = joinModelRes(usedWithIdx) + val init = OnModel(initPeer, Chain.one(relay)).wrap( foldPar( @@ -846,7 +855,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(usedWithIdx)).leaf, + joinModel, callModel(3, Nil, used :: Nil) ) @@ -873,12 +882,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(usedWithIdx :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) )