Skip to content

Commit

Permalink
Allow Frameless to explode Map[A,B]
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayoub Benali authored and pomadchin committed Nov 11, 2021
1 parent f1a7229 commit fa5afa8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
21 changes: 21 additions & 0 deletions dataset/src/main/scala/frameless/TypedDataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,27 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
TypedDataset.create[Out](trans)
}

def explodeMap[A, B, TRep <: HList, OutMod <: HList, OutModValues <: HList, Out]
(column: Witness.Lt[Symbol])
(implicit
i0: TypedColumn.Exists[T, column.T, Map[A, B]],
i1: TypedEncoder[A],
i2: TypedEncoder[B],
i3: LabelledGeneric.Aux[T, TRep],
i4: Modifier.Aux[TRep, column.T, Map[A,B], Tuple2[A,B], OutMod],
i5: Values.Aux[OutMod, OutModValues],
i6: Tupler.Aux[OutModValues, Out],
i7: TypedEncoder[Out]
): TypedDataset[Out] = {
val df = dataset.toDF()
import org.apache.spark.sql.functions.{explode => sparkExplode}

val trans =
df.withColumn(column.value.name,
sparkExplode(df(column.value.name))).as[Out](TypedExpressionEncoder[Out])
TypedDataset.create[Out](trans)
}

/**
* Flattens a column of type Option[A]. Compiles only if the selected column is of type Option[A].
*
Expand Down
15 changes: 15 additions & 0 deletions dataset/src/test/scala/frameless/ExplodeTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,19 @@ class ExplodeTests extends TypedDatasetSuite {
check(forAll(prop[Int] _))
check(forAll(prop[String] _))
}

test("explode on Maps") {
def prop[A: TypedEncoder: ClassTag](xs: List[X1[Map[A, A]]]): Prop = {
val tds = TypedDataset.create(xs)

val framelessResults = tds.explodeMap('a).collect().run().toVector
val scalaResults = xs.flatMap(_.a.toList).map(t => Tuple2(t._1, t._2)).toVector

framelessResults ?= scalaResults
}

check(forAll(prop[Long] _))
check(forAll(prop[Int] _))
check(forAll(prop[String] _))
}
}

0 comments on commit fa5afa8

Please sign in to comment.