From 6adcab809a2f76bd6104496152b43b0f43b83cbf Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 19 Sep 2024 16:53:23 +0200 Subject: [PATCH] Consistent ordering definition --- .../values/PairSCollectionFunctions.scala | 59 +++++++++------ .../com/spotify/scio/values/SCollection.scala | 63 ++++++++-------- .../scio/values/SCollectionWithFanout.scala | 30 ++++---- .../values/SCollectionWithHotKeyFanout.scala | 32 ++++----- .../values/PairSCollectionFunctionsTest.scala | 71 +++++++++++++------ .../spotify/scio/values/SCollectionTest.scala | 67 ++++++++--------- .../values/SCollectionWithFanoutTest.scala | 42 +++++------ .../SCollectionWithHotKeyFanoutTest.scala | 40 +++++------ 8 files changed, 222 insertions(+), 182 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala index 92add90305..c5c32edf0a 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala @@ -35,6 +35,8 @@ import org.apache.beam.sdk.values.{KV, PCollection} import org.joda.time.{Duration, Instant} import org.slf4j.LoggerFactory +import java.lang.{Double => JDouble} + import scala.collection.compat._ private object PairSCollectionFunctions { @@ -1010,16 +1012,6 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def mapValues[U: Coder](f: V => U): SCollection[(K, U)] = self.map(kv => (kv._1, f(kv._2))) - /** - * Return the max of values for each key as defined by the implicit `Ordering[T]`. - * @return - * a new SCollection of (key, maximum value) pairs - * @group per_key - */ - // Scala lambda is simpler and more powerful than transforms.Max - def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = - this.reduceByKey(ord.max) - /** * Return the min of values for each key as defined by the implicit `Ordering[T]`. * @return @@ -1030,6 +1022,16 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def minByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = this.reduceByKey(ord.min) + /** + * Return the max of values for each key as defined by the implicit `Ordering[T]`. + * @return + * a new SCollection of (key, maximum value) pairs + * @group per_key + */ + // Scala lambda is simpler and more powerful than transforms.Max + def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = + this.reduceByKey(ord.max) + /** * Return latest of values for each key according to its event time, or null if there are no * elements. @@ -1040,6 +1042,30 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def latestByKey: SCollection[(K, V)] = self.applyPerKey(Latest.perKey[K, V]())(kvToTuple) + /** + * Reduce by key with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful + * and better optimized than [[reduceByKey]] in some cases. + * @group per_key + */ + def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { + PairSCollectionFunctions.logger.warn( + "combineByKey/sumByKey does not support default value and may fail in some streaming " + + "scenarios. Consider aggregateByKey/foldByKey instead." + ) + this.applyPerKey(Combine.perKey(Functions.reduceFn(context, sg)))(kvToTuple) + } + + /** + * Return the mean of values for each key as defined by the implicit `Numeric[T]`. + * @return + * a new SCollection of (key, mean value) pairs + * @group per_key + */ + def meanByKey(implicit ev: Numeric[V]): SCollection[(K, Double)] = + self.transform { in => + in.mapValues[JDouble](ev.toDouble).applyPerKey(Mean.perKey[K, JDouble]())(kdToTuple) + } + /** * Merge the values for each key using an associative reduce function. This will also perform the * merging locally on each mapper before sending results to a reducer, similarly to a "combiner" @@ -1095,19 +1121,6 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { } } - /** - * Reduce by key with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful - * and better optimized than [[reduceByKey]] in some cases. - * @group per_key - */ - def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { - PairSCollectionFunctions.logger.warn( - "combineByKey/sumByKey does not support default value and may fail in some streaming " + - "scenarios. Consider aggregateByKey/foldByKey instead." - ) - this.applyPerKey(Combine.perKey(Functions.reduceFn(context, sg)))(kvToTuple) - } - /** * Swap the keys with the values. * @group transform diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 2decd759ab..4955f7f1f6 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -773,6 +773,16 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { */ def map[U: Coder](f: T => U): SCollection[U] = this.parDo(Functions.mapFn(f)) + /** + * Return the min of this SCollection as defined by the implicit `Ordering[T]`. + * @return + * a new SCollection with the minimum element + * @group transform + */ + // Scala lambda is simpler and more powerful than transforms.Min + def min(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.min) + /** * Return the max of this SCollection as defined by the implicit `Ordering[T]`. * @return @@ -784,37 +794,39 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { this.reduce(ord.max) /** - * Return the mean of this SCollection as defined by the implicit `Numeric[T]`. + * Return the latest of this SCollection according to its event time. * @return - * a new SCollection with the mean of elements + * a new SCollection with the latest element * @group transform */ - def mean(implicit ev: Numeric[T]): SCollection[Double] = this.transform { in => - val e = ev // defeat closure - in.map(e.toDouble) - .asInstanceOf[SCollection[JDouble]] - .pApply(Mean.globally().withoutDefaults()) - .asInstanceOf[SCollection[Double]] - } + def latest: SCollection[T] = + this.withTimestamp.max(Ordering.by(_._2)).keys /** - * Return the min of this SCollection as defined by the implicit `Ordering[T]`. - * @return - * a new SCollection with the minimum element + * Reduce with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful and + * better optimized than [[reduce]] in some cases. * @group transform */ - // Scala lambda is simpler and more powerful than transforms.Min - def min(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.min) + def sum(implicit sg: Semigroup[T]): SCollection[T] = { + SCollection.logger.warn( + "combine/sum does not support default value and may fail in some streaming scenarios. " + + "Consider aggregate/fold instead." + ) + this.pApply(Combine.globally(Functions.reduceFn(context, sg)).withoutDefaults()) + } /** - * Return the latest of this SCollection according to its event time. + * Return the mean of this SCollection as defined by the implicit `Numeric[T]`. * @return - * a new SCollection with the latest element + * a new SCollection with the mean of elements * @group transform */ - def latest: SCollection[T] = - this.withTimestamp.max(Ordering.by(_._2)).keys + def mean(implicit ev: Numeric[T]): SCollection[Double] = this.transform { in => + val e = ev // defeat closure + in.map[JDouble](e.toDouble) + .pApply(Mean.globally().withoutDefaults()) + .asInstanceOf[SCollection[Double]] + } /** * Compute the SCollection's data distribution using approximate `N`-tiles. @@ -945,19 +957,6 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { _.map((_, ())).subtractByKey(that).keys } - /** - * Reduce with [[com.twitter.algebird.Semigroup Semigroup]]. This could be more powerful and - * better optimized than [[reduce]] in some cases. - * @group transform - */ - def sum(implicit sg: Semigroup[T]): SCollection[T] = { - SCollection.logger.warn( - "combine/sum does not support default value and may fail in some streaming scenarios. " + - "Consider aggregate/fold instead." - ) - this.pApply(Combine.globally(Functions.reduceFn(context, sg)).withoutDefaults()) - } - /** * Return a sampled subset of any `num` elements of the SCollection. * @group transform diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala index 5b9d0b2f9c..0e3b9b4007 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithFanout.scala @@ -104,6 +104,21 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In Combine.globally(Functions.reduceFn(context, op)).withoutDefaults().withFanout(fanout) ) + /** [[SCollection.min]] with fan out. */ + def min(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.min) + + /** [[SCollection.max]] with fan out. */ + def max(implicit ord: Ordering[T]): SCollection[T] = + this.reduce(ord.max) + + /** [[SCollection.latest]] with fan out. */ + def latest: SCollection[T] = { + coll.transform { in => + new SCollectionWithFanout(in.withTimestamp, this.fanout).max(Ordering.by(_._2)).keys + } + } + /** [[SCollection.sum]] with fan out. */ def sum(implicit sg: Semigroup[T]): SCollection[T] = { SCollection.logger.warn( @@ -115,14 +130,6 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In ) } - /** [[SCollection.min]] with fan out. */ - def min(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.min) - - /** [[SCollection.max]] with fan out. */ - def max(implicit ord: Ordering[T]): SCollection[T] = - this.reduce(ord.max) - /** [[SCollection.mean]] with fan out. */ def mean(implicit ev: Numeric[T]): SCollection[Double] = { val e = ev // defeat closure @@ -133,13 +140,6 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In } } - /** [[SCollection.latest]] with fan out. */ - def latest: SCollection[T] = { - coll.transform { in => - new SCollectionWithFanout(in.withTimestamp, this.fanout).max(Ordering.by(_._2)).keys - } - } - /** [[SCollection.top]] with fan out. */ def top(num: Int)(implicit ord: Ordering[T]): SCollection[Iterable[T]] = { coll.transform { in => diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala index c177557523..116d0f5ca8 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionWithHotKeyFanout.scala @@ -135,15 +135,6 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( def reduceByKey(op: (V, V) => V): SCollection[(K, V)] = self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, op))))(kvToTuple) - /** [[PairSCollectionFunctions.sumByKey]] with hot key fanout. */ - def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { - SCollection.logger.warn( - "combineByKey/sumByKey does not support default value and may fail in some streaming " + - "scenarios. Consider aggregateByKey/foldByKey instead." - ) - self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, sg))))(kvToTuple) - } - /** [[SCollection.min]] with hot key fan out. */ def minByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = self.reduceByKey(ord.min) @@ -152,6 +143,15 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( def maxByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = self.reduceByKey(ord.max) + /** [[SCollection.latest]] with hot key fan out. */ + def latestByKey: SCollection[(K, V)] = { + self.self.transform { in => + new SCollectionWithHotKeyFanout(in.withTimestampedValues, this.hotKeyFanout) + .maxByKey(Ordering.by(_._2)) + .mapValues(_._1) + } + } + /** [[SCollection.mean]] with hot key fan out. */ def meanByKey(implicit ev: Numeric[V]): SCollection[(K, Double)] = { val e = ev // defeat closure @@ -160,13 +160,13 @@ class SCollectionWithHotKeyFanout[K, V] private[values] ( } } - /** [[SCollection.latest]] with hot key fan out. */ - def latestByKey: SCollection[(K, V)] = { - self.self.transform { in => - new SCollectionWithHotKeyFanout(in.withTimestampedValues, this.hotKeyFanout) - .maxByKey(Ordering.by(_._2)) - .mapValues(_._1) - } + /** [[PairSCollectionFunctions.sumByKey]] with hot key fanout. */ + def sumByKey(implicit sg: Semigroup[V]): SCollection[(K, V)] = { + SCollection.logger.warn( + "combineByKey/sumByKey does not support default value and may fail in some streaming " + + "scenarios. Consider aggregateByKey/foldByKey instead." + ) + self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, sg))))(kvToTuple) } /** [[PairSCollectionFunctions.topByKey]] with hot key fanout. */ diff --git a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala index 7a15632217..6764c16655 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala @@ -713,30 +713,66 @@ class PairSCollectionFunctionsTest extends PipelineSpec { } } - it should "support maxByKey()" in { + it should "support minByKey()" in { runWithContext { sc => - val p = - sc.parallelize(Seq(("a", 1), ("a", 10), ("b", 2), ("b", 20))).maxByKey - p should containInAnyOrder(Seq(("a", 10), ("b", 20))) + def minByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).minByKey + + minByKey() should beEmpty + minByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 1), ("b", 2)) + ) } } - it should "support minByKey()" in { + it should "support maxByKey()" in { runWithContext { sc => - val p = - sc.parallelize(Seq(("a", 1), ("a", 10), ("b", 2), ("b", 20))).minByKey - p should containInAnyOrder(Seq(("a", 1), ("b", 2))) + def maxByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).maxByKey + + maxByKey() should beEmpty + maxByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 10), ("b", 20)) + ) } } it should "support latestByKey()" in { runWithContext { sc => - val p = sc - .parallelize(Seq(("a", 1L), ("a", 10L), ("b", 2L), ("b", 20L))) - .timestampBy { case (_, v) => Instant.ofEpochMilli(v) } - .latestByKey + def latestByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc + .parallelize(elems) + .timestampBy { case (_, v) => Instant.ofEpochMilli(v.toLong) } + .latestByKey + + latestByKey() should beEmpty + latestByKey(("a", 1), ("a", 10), ("b", 2), ("b", 20)) should containInAnyOrder( + Seq(("a", 10), ("b", 20)) + ) + } + } + + it should "support sumByKey" in { + runWithContext { sc => + def sumByKey(elems: (String, Int)*): SCollection[(String, Int)] = + sc.parallelize(elems).sumByKey - p should containInAnyOrder(Seq(("a", 10L), ("b", 20L))) + sumByKey() should beEmpty + sumByKey( + Seq(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _)): _* + ) should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) + } + } + + it should "support meanByKey" in { + runWithContext { sc => + def meanByKey(elems: (String, Int)*): SCollection[(String, Double)] = + sc.parallelize(elems).meanByKey + + meanByKey() should beEmpty + meanByKey( + Seq(("a", 1), ("b", 2), ("b", 3)) ++ (0 to 100).map(("c", _)): _* + ) should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) } } @@ -820,15 +856,6 @@ class PairSCollectionFunctionsTest extends PipelineSpec { } } - it should "support sumByKey()" in { - runWithContext { sc => - val p = sc - .parallelize(List(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _))) - .sumByKey - p should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) - } - } - it should "support swap()" in { runWithContext { sc => val p = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))).swap diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index cd1cf8ffcf..3f1c082869 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -494,6 +494,18 @@ class SCollectionTest extends PipelineSpec { } } + it should "support min" in { + runWithContext { sc => + def min[T: Coder: Numeric](elems: T*): SCollection[T] = + sc.parallelize(elems).min + min[Int]() should beEmpty + min(1, 2, 3) should containSingleValue(1) + min(1L, 2L, 3L) should containSingleValue(1L) + min(1f, 2f, 3f) should containSingleValue(1f) + min(1.0, 2.0, 3.0) should containSingleValue(1.0) + } + } + it should "support max" in { runWithContext { sc => def max[T: Coder: Numeric](elems: T*): SCollection[T] = @@ -506,36 +518,37 @@ class SCollectionTest extends PipelineSpec { } } - it should "support mean" in { + it should "support latest" in { runWithContext { sc => - def mean[T: Coder: Numeric](elems: T*): SCollection[Double] = - sc.parallelize(elems).mean - mean[Int]() should beEmpty - mean(1, 2, 3) should containSingleValue(2.0) - mean(1L, 2L, 3L) should containSingleValue(2.0) - mean(1f, 2f, 3f) should containSingleValue(2.0) - mean(1.0, 2.0, 3.0) should containSingleValue(2.0) + def latest(elems: Long*): SCollection[Long] = + sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).latest + latest() should beEmpty + latest(1L, 2L, 3L) should containSingleValue(3L) } } - it should "support min" in { + it should "support sum" in { runWithContext { sc => - def min[T: Coder: Numeric](elems: T*): SCollection[T] = - sc.parallelize(elems).min - min[Int]() should beEmpty - min(1, 2, 3) should containSingleValue(1) - min(1L, 2L, 3L) should containSingleValue(1L) - min(1f, 2f, 3f) should containSingleValue(1f) - min(1.0, 2.0, 3.0) should containSingleValue(1.0) + def sum[T: Coder: Semigroup](elems: T*): SCollection[T] = + sc.parallelize(elems).sum + sum[Int]() should beEmpty + sum(1, 2, 3) should containSingleValue(6) + sum(1L, 2L, 3L) should containSingleValue(6L) + sum(1f, 2f, 3f) should containSingleValue(6f) + sum(1.0, 2.0, 3.0) should containSingleValue(6.0) + sum(1 to 100: _*) should containSingleValue(5050) } } - it should "support latest" in { + it should "support mean" in { runWithContext { sc => - def latest(elems: Long*): SCollection[Long] = - sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).latest - latest() should beEmpty - latest(1L, 2L, 3L) should containSingleValue(3L) + def mean[T: Coder: Numeric](elems: T*): SCollection[Double] = + sc.parallelize(elems).mean + mean[Int]() should beEmpty + mean(1, 2, 3) should containSingleValue(2.0) + mean(1L, 2L, 3L) should containSingleValue(2.0) + mean(1f, 2f, 3f) should containSingleValue(2.0) + mean(1.0, 2.0, 3.0) should containSingleValue(2.0) } } @@ -595,18 +608,6 @@ class SCollectionTest extends PipelineSpec { } } - it should "support sum" in { - runWithContext { sc => - def sum[T: Coder: Semigroup](elems: T*): SCollection[T] = - sc.parallelize(elems).sum - sum(1, 2, 3) should containSingleValue(6) - sum(1L, 2L, 3L) should containSingleValue(6L) - sum(1f, 2f, 3f) should containSingleValue(6f) - sum(1.0, 2.0, 3.0) should containSingleValue(6.0) - sum(1 to 100: _*) should containSingleValue(5050) - } - } - it should "support take()" in { runWithContext { sc => val p = sc.parallelize(Seq(1, 2, 3, 4, 5)) diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithFanoutTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithFanoutTest.scala index 862ca33cfb..64d703cb7f 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithFanoutTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithFanoutTest.scala @@ -61,18 +61,6 @@ class SCollectionWithFanoutTest extends NamedTransformSpec { } } - it should "support sum" in { - runWithContext { sc => - def sum[T: Coder: Semigroup](elems: T*): SCollection[T] = - sc.parallelize(elems).withFanout(10).sum - sum(1, 2, 3) should containSingleValue(6) - sum(1L, 2L, 3L) should containSingleValue(6L) - sum(1f, 2f, 3f) should containSingleValue(6f) - sum(1.0, 2.0, 3.0) should containSingleValue(6.0) - sum(1 to 100: _*) should containSingleValue(5050) - } - } - it should "support min" in { runWithContext { sc => def min[T: Coder: Ordering](elems: T*): SCollection[T] = @@ -97,6 +85,27 @@ class SCollectionWithFanoutTest extends NamedTransformSpec { } } + it should "support latest" in { + runWithContext { sc => + def latest(elems: Long*): SCollection[Long] = + sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).withFanout(10).latest + latest(1L, 2L, 3L) should containSingleValue(3L) + latest(1L to 100L: _*) should containSingleValue(100L) + } + } + + it should "support sum" in { + runWithContext { sc => + def sum[T: Coder: Semigroup](elems: T*): SCollection[T] = + sc.parallelize(elems).withFanout(10).sum + sum(1, 2, 3) should containSingleValue(6) + sum(1L, 2L, 3L) should containSingleValue(6L) + sum(1f, 2f, 3f) should containSingleValue(6f) + sum(1.0, 2.0, 3.0) should containSingleValue(6.0) + sum(1 to 100: _*) should containSingleValue(5050) + } + } + it should "support mean" in { runWithContext { sc => def mean[T: Coder: Numeric](elems: T*): SCollection[Double] = @@ -109,15 +118,6 @@ class SCollectionWithFanoutTest extends NamedTransformSpec { } } - it should "support latest" in { - runWithContext { sc => - def latest(elems: Long*): SCollection[Long] = - sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).withFanout(10).latest - latest(1L, 2L, 3L) should containSingleValue(3L) - latest(1L to 100L: _*) should containSingleValue(100L) - } - } - it should "support top()" in { runWithContext { sc => def top3[T: Ordering: Coder](elems: T*): SCollection[Iterable[T]] = diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithHotKeyFanoutTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithHotKeyFanoutTest.scala index f253939dd6..6c5eabb242 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithHotKeyFanoutTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionWithHotKeyFanoutTest.scala @@ -84,16 +84,6 @@ class SCollectionWithHotKeyFanoutTest extends NamedTransformSpec { } } - it should "support sumByKey" in { - runWithContext { sc => - val p = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _))) - val r1 = p.withHotKeyFanout(10).sumByKey - val r2 = p.withHotKeyFanout(_.hashCode).sumByKey - r1 should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) - r2 should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) - } - } - it should "support minByKey" in { runWithContext { sc => val p = sc.parallelize(List(("a", 1), ("b", 2), ("b", 3)) ++ (1 to 100).map(("c", _))) @@ -114,16 +104,6 @@ class SCollectionWithHotKeyFanoutTest extends NamedTransformSpec { } } - it should "support meanByKey" in { - runWithContext { sc => - val p = sc.parallelize(List(("a", 1), ("b", 2), ("b", 3)) ++ (0 to 100).map(("c", _))) - val r1 = p.withHotKeyFanout(10).meanByKey - val r2 = p.withHotKeyFanout(_.hashCode).meanByKey - r1 should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) - r2 should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) - } - } - it should "support latestByKey" in { runWithContext { sc => val p = sc @@ -136,6 +116,26 @@ class SCollectionWithHotKeyFanoutTest extends NamedTransformSpec { } } + it should "support sumByKey" in { + runWithContext { sc => + val p = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2)) ++ (1 to 100).map(("c", _))) + val r1 = p.withHotKeyFanout(10).sumByKey + val r2 = p.withHotKeyFanout(_.hashCode).sumByKey + r1 should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) + r2 should containInAnyOrder(Seq(("a", 1), ("b", 4), ("c", 5050))) + } + } + + it should "support meanByKey" in { + runWithContext { sc => + val p = sc.parallelize(List(("a", 1), ("b", 2), ("b", 3)) ++ (0 to 100).map(("c", _))) + val r1 = p.withHotKeyFanout(10).meanByKey + val r2 = p.withHotKeyFanout(_.hashCode).meanByKey + r1 should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) + r2 should containInAnyOrder(Seq(("a", 1.0), ("b", 2.5), ("c", 50.0))) + } + } + it should "support topByKey()" in { runWithContext { sc => val p = sc.parallelize(