From facc74a9fe6a8d7d7622163e5652a569c4dfdf41 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Thu, 3 Aug 2023 16:35:45 -0700 Subject: [PATCH] rx (feature): Support join/zip upto 5 elements Fixes #3079 --- .../src/main/scala/wvlet/airframe/rx/Rx.scala | 30 +++++++--- .../scala/wvlet/airframe/rx/RxRunner.scala | 10 +++- .../test/scala/wvlet/airframe/rx/RxTest.scala | 57 +++++++++++++++++++ 3 files changed, 86 insertions(+), 11 deletions(-) diff --git a/airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala b/airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala index 63672a6441..524ceebefb 100644 --- a/airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala +++ b/airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala @@ -114,9 +114,10 @@ trait Rx[+A] extends RxOps[A] { * Combine two Rx streams to form a sequence of pairs. This will emit a new pair when both of the streams are * updated. */ - def zip[B](other: Rx[B]): Rx[(A, B)] = Rx.zip(this, other) - def zip[B, C](b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Rx.zip(this, b, c) - def zip[B, C, D](b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Rx.zip(this, b, c, d) + def zip[B](other: Rx[B]): Rx[(A, B)] = Rx.zip(this, other) + def zip[B, C](b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Rx.zip(this, b, c) + def zip[B, C, D](b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Rx.zip(this, b, c, d) + def zip[B, C, D, E](b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]): Rx[(A, B, C, D, E)] = Rx.zip(this, b, c, d, e) /** * Emit a new output if one of Rx[A] or Rx[B] is changed. @@ -126,9 +127,10 @@ trait Rx[+A] extends RxOps[A] { * Using joins will be more intuitive than nesting multiple Rx operators like Rx[A].map { x => ... Rx[B].map { ...} * }. */ - def join[B](other: Rx[B]): Rx[(A, B)] = Rx.join(this, other) - def join[B, C](b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Rx.join(this, b, c) - def join[B, C, D](b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Rx.join(this, b, c, d) + def join[B](other: Rx[B]): Rx[(A, B)] = Rx.join(this, other) + def join[B, C](b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Rx.join(this, b, c) + def join[B, C, D](b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Rx.join(this, b, c, d) + def join[B, C, D, E](b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]): Rx[(A, B, C, D, E)] = Rx.join(this, b, c, d, e) /** * Combine Rx stream and Future operators. @@ -278,10 +280,13 @@ object Rx extends LogSupport { def join[A, B](a: Rx[A], b: Rx[B]): Rx[(A, B)] = JoinOp(a, b) def join[A, B, C](a: Rx[A], b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Join3Op(a, b, c) def join[A, B, C, D](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Join4Op(a, b, c, d) + def join[A, B, C, D, E](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]): Rx[(A, B, C, D, E)] = + Join5Op(a, b, c, d, e) - def zip[A, B](a: Rx[A], b: Rx[B]): Rx[(A, B)] = ZipOp(a, b) - def zip[A, B, C](a: Rx[A], b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Zip3Op(a, b, c) - def zip[A, B, C, D](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Zip4Op(a, b, c, d) + def zip[A, B](a: Rx[A], b: Rx[B]): Rx[(A, B)] = ZipOp(a, b) + def zip[A, B, C](a: Rx[A], b: Rx[B], c: Rx[C]): Rx[(A, B, C)] = Zip3Op(a, b, c) + def zip[A, B, C, D](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D]): Rx[(A, B, C, D)] = Zip4Op(a, b, c, d) + def zip[A, B, C, D, E](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]): Rx[(A, B, C, D, E)] = Zip5Op(a, b, c, d, e) def concat[A, A1 >: A](a: Rx[A], b: Rx[A1]): Rx[A1] = ConcatOp(a, b) @@ -373,6 +378,10 @@ object Rx extends LogSupport { case class Zip4Op[A, B, C, D](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D]) extends Rx[(A, B, C, D)] { override def parents: Seq[Rx[_]] = Seq(a, b, c, d) } + case class Zip5Op[A, B, C, D, E](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]) extends Rx[(A, B, C, D, E)] { + override def parents: Seq[Rx[_]] = Seq(a, b, c, d, e) + } + case class JoinOp[A, B](a: Rx[A], b: Rx[B]) extends Rx[(A, B)] { override def parents: Seq[Rx[_]] = Seq(a, b) } @@ -382,6 +391,9 @@ object Rx extends LogSupport { case class Join4Op[A, B, C, D](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D]) extends Rx[(A, B, C, D)] { override def parents: Seq[Rx[_]] = Seq(a, b, c, d) } + case class Join5Op[A, B, C, D, E](a: Rx[A], b: Rx[B], c: Rx[C], d: Rx[D], e: Rx[E]) extends Rx[(A, B, C, D, E)] { + override def parents: Seq[Rx[_]] = Seq(a, b, c, d, e) + } case class ConcatOp[A](first: Rx[A], next: Rx[A]) extends Rx[A] { override def parents: Seq[Rx[_]] = Seq(first, next) diff --git a/airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala b/airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala index f46c85c500..2703391c73 100644 --- a/airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala +++ b/airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala @@ -381,12 +381,16 @@ class RxRunner( zip(z)(effect) case z @ Zip4Op(r1, r2, r3, r4) => zip(z)(effect) + case z @ Zip5Op(r1, r2, r3, r4, r5) => + zip(z)(effect) case j @ JoinOp(r1, r2) => join(j)(effect) case j @ Join3Op(r1, r2, r3) => join(j)(effect) case j @ Join4Op(r1, r2, r3, r4) => join(j)(effect) + case j @ Join5Op(r1, r2, r3, r4, r5) => + join(j)(effect) case RxOptionOp(in) => run(in) { case e @ OnNext(v) => @@ -541,8 +545,10 @@ class RxRunner( effect(OnNext((values(0), values(1), values(2)).asInstanceOf[A])) case 4 => effect(OnNext((values(0), values(1), values(2), values(3)).asInstanceOf[A])) - case _ => - ??? + case 5 => + effect(OnNext((values(0), values(1), values(2), values(3), values(4)).asInstanceOf[A])) + case other => + throw new NotImplementedError(s"combining 5+ more Rx operators is not yet supported: ${other}") } } toContinue diff --git a/airframe-rx/src/test/scala/wvlet/airframe/rx/RxTest.scala b/airframe-rx/src/test/scala/wvlet/airframe/rx/RxTest.scala index 44c16571c9..7193f18b80 100644 --- a/airframe-rx/src/test/scala/wvlet/airframe/rx/RxTest.scala +++ b/airframe-rx/src/test/scala/wvlet/airframe/rx/RxTest.scala @@ -260,6 +260,30 @@ object RxTest extends AirSpec { ) } + test("zip5") { + val a = Rx.variable(1) + val b = Rx.variable("a") + val c = Rx.variable(true) + val d = Rx.variable(false) + val e = Rx.variable(10) + + val x = a.zip(b, c, d, e) + val ev = Seq.newBuilder[RxEvent] + val ca = RxRunner.runContinuously(x)(ev += _) + + a := 2 + b := "b" + c := false + d := true + e := 20 + + val events = ev.result() + events shouldBe Seq( + OnNext(1, "a", true, false, 10), + OnNext(2, "b", false, true, 20) + ) + } + test("join") { val x = Rx.variable(1) val y = Rx.variable("a") @@ -341,6 +365,39 @@ object RxTest extends AirSpec { ) } + test("join5") { + val x = Rx.variable(1) + val y = Rx.variable("a") + val z = Rx.variable(true) + val w = Rx.variable(10) + val v = Rx.variable(false) + val rx = x.join(y, z, w, v) + + val b = Seq.newBuilder[RxEvent] + RxRunner.run(rx)(b += _) + + y := "b" + y := "c" + w := 20 + z := false + x := 2 + y := "d" + v := true + + val events = b.result() + debug(events) + events shouldBe Seq( + OnNext(1, "a", true, 10, false), + OnNext(1, "b", true, 10, false), + OnNext(1, "c", true, 10, false), + OnNext(1, "c", true, 20, false), + OnNext(1, "c", false, 20, false), + OnNext(2, "c", false, 20, false), + OnNext(2, "d", false, 20, false), + OnNext(2, "d", false, 20, true) + ) + } + implicit val ec: scala.concurrent.ExecutionContext = compat.defaultExecutionContext test("from Future[X]") {