diff --git a/README.md b/README.md index 3d19217..e8dc52c 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ val cp = CP(tensor, rank) .withMaxIterations(50) // the maximum number of iterations (default 25) .withMinFms(0.95) // The Factor Match Score threshold used to stop the iterations (default 0.99) .withNorm(Norms.L1) // The norm to use on the factor matrices (default L2) - .withInitializer(Initializers.hosvd) // The method used to initialize the factor matrices (default gaussian) + .withInitializer(ALS.Initializers.hosvd) // The method used to initialize the factor matrices (default gaussian) .withComputeCorcondia(true) // To decide if CORCONDIA must be computed on the result (default false) ``` CORCONDIA is the [core consistency diagnostic](https://analyticalsciencejournals.onlinelibrary.wiley.com/doi/pdf/10.1002/cem.801), and can be used to evaluate the quality of the decomposition. @@ -93,7 +93,7 @@ Some optional parameters are available: val tucker = HOOI(tensor, ranks) .withMaxIterations(50) // the maximum number of iterations (default 25) .withMinFrobenius(10E-6) // The Frobenius threshold used to stop the iterations (default 10E-5) - .withInitializer(Initializers.hosvd) // The method used to initialize the factor matrices (default hosvd) + .withInitializer(HOOI.Initializers.hosvd) // The method used to initialize the factor matrices (default hosvd) ``` Once the decomposition is set, it is run with: diff --git a/distributed/src/main/scala/mulot/distributed/tensordecomposition/cp/ALS.scala b/distributed/src/main/scala/mulot/distributed/tensordecomposition/cp/ALS.scala index 35f9569..2b94854 100644 --- a/distributed/src/main/scala/mulot/distributed/tensordecomposition/cp/ALS.scala +++ b/distributed/src/main/scala/mulot/distributed/tensordecomposition/cp/ALS.scala @@ -12,28 +12,28 @@ object ALS { def apply(tensor: Tensor, rank: Int)(implicit spark: SparkSession): ALS = { new ALS(tensor, rank)(spark) } -} - -class ALS private(val tensor: Tensor, val rank: Int)(implicit spark: SparkSession) - extends cp.ALS[Tensor, ExtendedBlockMatrix, Map[String, DataFrame]] - with Logging { object Initializers { - def gaussian(tensor: Tensor, rank: Int): Array[ExtendedBlockMatrix] = { + def gaussian(tensor: Tensor, rank: Int)(implicit spark: SparkSession): Array[ExtendedBlockMatrix] = { (for (i <- 1 until tensor.order) yield { ExtendedBlockMatrix.gaussian(tensor.dimensionsSize(i), rank) }).toArray } - def hosvd(tensor: Tensor, rank: Int): Array[ExtendedBlockMatrix] = { + def hosvd(tensor: Tensor, rank: Int)(implicit spark: SparkSession): Array[ExtendedBlockMatrix] = { (for (i <- 1 until tensor.order) yield { ExtendedBlockMatrix.hosvd(tensor, i, rank) }).toArray } } +} + +class ALS private(val tensor: Tensor, val rank: Int)(implicit spark: SparkSession) + extends cp.ALS[Tensor, ExtendedBlockMatrix, Map[String, DataFrame]] + with Logging { protected var highRank: Option[Boolean] = None - override var initializer: (Tensor, Int) => Array[ExtendedBlockMatrix] = Initializers.gaussian + override var initializer: (Tensor, Int) => Array[ExtendedBlockMatrix] = ALS.Initializers.gaussian override protected def copy(): ALS = { val newObject = new ALS(tensor, rank) diff --git a/distributed/src/main/scala/mulot/distributed/tensordecomposition/tucker/HOOI.scala b/distributed/src/main/scala/mulot/distributed/tensordecomposition/tucker/HOOI.scala index 9773570..fd5ac6a 100644 --- a/distributed/src/main/scala/mulot/distributed/tensordecomposition/tucker/HOOI.scala +++ b/distributed/src/main/scala/mulot/distributed/tensordecomposition/tucker/HOOI.scala @@ -8,29 +8,38 @@ import scribe.Logging object HOOI { def apply(tensor: Tensor, ranks: Array[Int])(implicit spark: SparkSession): HOOI = { - new HOOI(tensor, ranks)(spark) + var columnsName = (for (i <- 0 until tensor.order) yield s"row_$i") :+ tensor.valueColumnName + var newTensor = new Tensor( + tensor.data.select(columnsName(0), columnsName.tail: _*).cache(), + tensor.order, + tensor.dimensionsSize, + tensor.dimensionsName, + tensor.dimensionsIndex, + tensor.valueColumnName + ) + new HOOI(newTensor, ranks)(spark) } -} - -class HOOI private[tucker](val tensor: Tensor, val ranks: Array[Int])(implicit spark: SparkSession) - extends tucker.HOOI[Tensor, ExtendedIndexedRowMatrix, Map[String, DataFrame]] - with Logging { object Initializers { - def gaussian(tensor: Tensor, ranks: Array[Int]): Array[ExtendedIndexedRowMatrix] = { + def gaussian(tensor: Tensor, ranks: Array[Int])(implicit spark: SparkSession): Array[ExtendedIndexedRowMatrix] = { (for (i <- 0 until tensor.order) yield { ExtendedIndexedRowMatrix.gaussian(tensor.dimensionsSize(i), ranks(i)) }).toArray } - def hosvd(tensor: Tensor, ranks: Array[Int]): Array[ExtendedIndexedRowMatrix] = { + def hosvd(tensor: Tensor, ranks: Array[Int])(implicit spark: SparkSession): Array[ExtendedIndexedRowMatrix] = { (for (i <- 0 until tensor.order) yield { ExtendedIndexedRowMatrix.fromIndexedRowMatrix(tensor.matricization(i, true)).VofSVD(ranks(i)) }).toArray } } +} + +class HOOI private[tucker](val tensor: Tensor, val ranks: Array[Int])(implicit spark: SparkSession) + extends tucker.HOOI[Tensor, ExtendedIndexedRowMatrix, Map[String, DataFrame]] + with Logging { - override var initializer: (Tensor, Array[Int]) => Array[ExtendedIndexedRowMatrix] = Initializers.hosvd + override var initializer: (Tensor, Array[Int]) => Array[ExtendedIndexedRowMatrix] = HOOI.Initializers.hosvd override protected def copy(): HOOI = { val newObject = new HOOI(tensor, ranks) @@ -80,6 +89,7 @@ class HOOI private[tucker](val tensor: Tensor, val ranks: Array[Int])(implicit s // Compute the new factor matrices for (dimensionIndice <- dimensionsOrder.indices) { val dimension = dimensionsOrder(dimensionIndice) + // Prepare the core tensor for the iteration var coreTensor = new Tensor( previousCoreTensor.data.cache(), @@ -141,6 +151,19 @@ class HOOI private[tucker](val tensor: Tensor, val ranks: Array[Int])(implicit s } } + var finalData = finalCoreTensor.data + for (dimension <- 0 until finalCoreTensor.order) { + finalData = finalData.withColumnRenamed(s"row_$dimension", finalCoreTensor.dimensionsName(dimension)) + } + finalCoreTensor = new Tensor( + finalData, + tensor.order, + tensor.dimensionsSize, + tensor.dimensionsName, + tensor.dimensionsIndex, + tensor.valueColumnName + ) + HOOIResult(factorMatrices, finalCoreTensor) } } diff --git a/distributed/src/test/scala/mulot/distributed/tensordecomposition/tucker/HOOITest.scala b/distributed/src/test/scala/mulot/distributed/tensordecomposition/tucker/HOOITest.scala index b984c3e..93b110f 100644 --- a/distributed/src/test/scala/mulot/distributed/tensordecomposition/tucker/HOOITest.scala +++ b/distributed/src/test/scala/mulot/distributed/tensordecomposition/tucker/HOOITest.scala @@ -22,7 +22,7 @@ class HOOITest extends FunSuite { val ranks = Array(3, 3, 3) val valueColumnName = "val" val tensor = Tensor.fromIndexedDataFrame( - data.toDF("d0", "d1", "d2", valueColumnName), + data.toDF("d0", "d1", "d2", valueColumnName).select(valueColumnName, "d0", "d1", "d2"), sizes, valueColumnName = valueColumnName) println("Tensor created")