Skip to content

Commit

Permalink
Remove JoinModel, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
InversionSpaces committed Jun 19, 2023
1 parent 440609c commit 5a16ed3
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,92 @@ import aqua.raw.value.{ApplyGateRaw, LiteralRaw, VarRaw}
import cats.data.State
import cats.data.Chain
import aqua.model.inline.RawValueInliner.unfold
import aqua.types.{CanonStreamType, ScalarType, StreamType, ArrayType}
import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType}
import cats.syntax.monoid.*
import scribe.Logging

object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging {

/**
* To wait for the element of a stream by the given index, the following model is generated:
* (seq
* (seq
* (seq
* (call %init_peer_id% ("math" "add") [0 1] stream_incr)
* (fold $stream s
* (seq
* (seq
* (ap s $stream_test)
* (canon %init_peer_id% $stream_test #stream_iter_canon)
* )
* (xor
* (match #stream_iter_canon.length stream_incr
* (null)
* )
* (next s)
* )
* )
* (never)
* )
* )
* (canon %init_peer_id% $stream_test #stream_result_canon)
* )
* (ap #stream_result_canon stream_gate)
* )
*/
def joinStreamOnIndexModel(
streamName: String,
streamType: StreamType,
idxModel: ValueModel,
idxIncrName: String,
testName: String,
iterName: String,
canonName: String,
iterCanonName: String,
resultName: String
): OpModel.Tree = {
val varSTest = VarModel(testName, streamType)
val iter = VarModel(iterName, streamType.element)

val iterCanon = VarModel(iterCanonName, CanonStreamType(streamType.element))

val resultCanon =
VarModel(canonName, CanonStreamType(streamType.element))

val incrVar = VarModel(idxIncrName, ScalarType.u32)

RestrictionModel(varSTest.name, true).wrap(
increment(idxModel, incrVar),
ForModel(iter.name, VarModel(streamName, streamType), Some(ForModel.NeverMode)).wrap(
PushToStreamModel(
iter,
CallModel.Export(varSTest.name, varSTest.`type`)
).leaf,
CanonicalizeModel(
varSTest,
CallModel.Export(iterCanon.name, iterCanon.`type`)
).leaf,
XorModel.wrap(
MatchMismatchModel(
iterCanon
.copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))),
incrVar,
true
).leaf,
NextModel(iter.name).leaf
)
),
CanonicalizeModel(
varSTest,
CallModel.Export(resultCanon.name, CanonStreamType(streamType.element))
).leaf,
FlattenModel(
resultCanon,
resultName
).leaf
)
}

override def apply[S: Mangler: Exports: Arrows](
afr: ApplyGateRaw,
propertyAllowed: Boolean
Expand All @@ -27,73 +107,19 @@ object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging {
idxFolded <- unfold(afr.idx)
(idxModel, idxInline) = idxFolded
} yield {
val varSTest = VarModel(uniqueTestName, afr.streamType)
val iter = VarModel(uniqueIter, afr.streamType.element)

val iterCanon = VarModel(uniqueIterCanon, CanonStreamType(afr.streamType.element))

val resultCanon =
VarModel(uniqueCanonName, CanonStreamType(afr.streamType.element))

val incrVar = VarModel(uniqueIdxIncr, ScalarType.u32)

// To wait for the element of a stream by the given index, the following model is generated:
// (seq
// (seq
// (seq
// (call %init_peer_id% ("math" "add") [0 1] stream_incr)
// (fold $stream s
// (seq
// (seq
// (ap s $stream_test)
// (canon %init_peer_id% $stream_test #stream_iter_canon)
// )
// (xor
// (match #stream_iter_canon.length stream_incr
// (null)
// )
// (next s)
// )
// )
// (never)
// )
// )
// (canon %init_peer_id% $stream_test #stream_result_canon)
// )
// (ap #stream_result_canon stream_gate)
// )
val gate = RestrictionModel(varSTest.name, true).wrap(
increment(idxModel, incrVar),
ForModel(iter.name, VarModel(afr.name, afr.streamType), Some(ForModel.NeverMode)).wrap(
PushToStreamModel(
iter,
CallModel.Export(varSTest.name, varSTest.`type`)
).leaf,
CanonicalizeModel(
varSTest,
CallModel.Export(iterCanon.name, iterCanon.`type`)
).leaf,
XorModel.wrap(
MatchMismatchModel(
iterCanon
.copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))),
incrVar,
true
).leaf,
NextModel(iter.name).leaf
)
),
CanonicalizeModel(
varSTest,
CallModel.Export(resultCanon.name, CanonStreamType(afr.streamType.element))
).leaf,
FlattenModel(
resultCanon,
uniqueResultName
).leaf
val gate = joinStreamOnIndexModel(
streamName = afr.name,
streamType = afr.streamType,
idxModel = idxModel,
idxIncrName = uniqueIdxIncr,
testName = uniqueTestName,
iterName = uniqueIter,
canonName = uniqueCanonName,
iterCanonName = uniqueIterCanon,
resultName = uniqueResultName
)

val tree = SeqModel.wrap((idxInline.predo.toList :+ gate):_*)
val tree = SeqModel.wrap((idxInline.predo.toList :+ gate): _*)

val treeInline =
Inline(idxInline.flattenValues, predo = Chain.one(tree))
Expand Down
25 changes: 0 additions & 25 deletions model/res/src/main/scala/aqua/res/MakeRes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,6 @@ object MakeRes {
)
}

def join(onPeer: ValueModel, operands: NonEmptyList[ValueModel]): ResolvedOp.Tree = {
val streamName = "join-stream"
val aps = operands.collect {
// Optimization: do not join literals
case vm: VarModel =>
ApRes(
operand = vm,
exportTo = CallModel.Export(
name = streamName,
// We put vars of possibly different types inside
`type` = StreamType(TopType)
)
).leaf
}.toList

if (aps.isEmpty) NullRes.leaf // TODO: Return NoAir here?
else
RestrictionRes(
streamName,
isStream = true
).wrap(aps)
}

def resolve(
currentPeerId: Option[ValueModel],
i: Int
Expand Down Expand Up @@ -88,8 +65,6 @@ object MakeRes {
ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf
case FlattenModel(operand, assignTo) =>
ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf
case JoinModel(operands) =>
join(orInit(currentPeerId), operands)
case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) =>
CallServiceRes(
serviceId,
Expand Down
8 changes: 0 additions & 8 deletions model/src/main/scala/aqua/model/OpModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,6 @@ case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export)
override def usesVarNames: Set[String] = operand.usesVarNames
}

case class JoinModel(operands: NonEmptyList[ValueModel]) extends ForceExecModel {

override def toString: String = s"join ${operands.toList.mkString(", ")}"

override lazy val usesVarNames: Set[String] =
operands.toList.flatMap(_.usesVarNames).toSet
}

case class CaptureTopologyModel(name: String) extends NoExecModel
case class ApplyTopologyModel(name: String) extends SeqGroupModel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import aqua.res.{CallRes, CallServiceRes, MakeRes}
import aqua.types.{ArrayType, LiteralType, ScalarType}

import scala.language.implicitConversions
import aqua.types.StreamType
import aqua.model.IntoIndexModel
import cats.data.Chain
import cats.data.Chain.==:
import aqua.model.inline.raw.ApplyGateRawInliner

object ModelBuilder {
implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw)
Expand Down Expand Up @@ -127,4 +132,31 @@ object ModelBuilder {

def through(peer: ValueModel) =
MakeRes.hop(peer)

/**
* @param streamEl [[ValueModel]] of `stream[idx]`
* @return [[OpModel.Tree]] of join of `stream[idx]`
*/
def join(streamEl: ValueModel): OpModel.Tree =
streamEl match {
case VarModel(
streamName,
streamType: StreamType,
IntoIndexModel(idx, idxType) ==: Chain.`nil`
) =>
ApplyGateRawInliner.joinStreamOnIndexModel(
streamName = streamName,
streamType = streamType,
idxModel =
if (idx.forall(Character.isDigit)) LiteralModel(idx, idxType)
else VarModel(idx, idxType),
idxIncrName = streamName + "_incr",
testName = streamName + "_test",
iterName = streamName + "_fold_var",
canonName = streamName + "_result_canon",
iterCanonName = streamName + "_iter_canon",
resultName = streamName + "_gate"
)
case _ => ???
}
}
Loading

0 comments on commit 5a16ed3

Please sign in to comment.