Skip to content

Commit

Permalink
Add CP decomposition
Browse files Browse the repository at this point in the history
  • Loading branch information
Annabelle Gillet committed Apr 27, 2021
1 parent 95b1dca commit 97b61a3
Show file tree
Hide file tree
Showing 22 changed files with 112,979 additions and 1,514 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ build
.settings
.worksheet

.idea
88 changes: 68 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,52 @@ Third, TDM library connects to multiple data sources using a polystore architect


# How to use TDM library
* Put the jar in the `lib` directory, at the root of your Scala project.
* Import the TDM functionnalities with
* Put the jar in the `lib` directory, at the root of your Scala project, and import the Spark dependencies. For sbt:
```sbt
val sparkVersion = "3.0.1"

// Spark
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
```
* Import the TDM functionnalities with
```scala
import tdm._
import tdm.core._
```
* Create a Spark session
```
```scala
import org.apache.spark.sql.SparkSession

SparkSession.builder().master("local[*]").getOrCreate()
implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()
```
* Create the tensor dimensions
```
```scala
object User extends TensorDimension[String]
object Hashtag extends TensorDimension[String]
object Time extends TensorDimension[Long]
```
* Ceate the tensor empty
```
```scala
val tensor = TensorBuilder[Int]
.addDimension(User)
.addDimension(Hashtag)
.addDimension(Time)
.build()
```
or from a data source
from a Spark DataFrame
```scala
val df: DataFrame = ...

val tensor = TensorBuilder[Int](df)
.addDimension(User, "user")
.addDimension(Hashtag, "hashtag")
.addDimension(Time, "time")
.build("value")
```
or from a data source
```scala
val query = """
SELECT user_screen_name AS user, ht.hashtag AS hashtag, published_hour AS time, COUNT(*) AS value
FROM tweet t
Expand All @@ -70,30 +88,48 @@ val tensor = TensorBuilder[Int](connection)
```

* Manually add values to tensor
```
```scala
tensor.addValue(User.value("u1"), Hashtag.value("ht1"), Time.value(1))(11)
```

# Accessing elements of a tensor
Elements of a tensor can be accessed by first collecting the tensor
```scala
val collectedTensor = tensor.collect()
```
then, the value of the tensor or of a given dimension can be retrieved
```scala
collectedTensor(0) // Get the value of the tensor
collectedTensor(User, 0) // Get the value of the dimension User
```

The values of the tensor can be ordered in ascending or descending order
```scala
val ascCollectedTensor = collectedTensor.orderByValues()
val descCollectedTensor = collectedTensor.orderByValuesDesc()
```

# Available operators
## Data manipulation
All operators produce a new tensor and does not modifiy tensors on which the operator is applied.

* Projection: remove the dimension specified, and only keep tensor's elements that match the dimension value
```
```scala
tensor.projection(User)("u1")
```

* Restriction: keep tensor's elements which dimensions' value match the given condition(s)
```
```scala
tensor.restriction(User.condition(v => v == "u1"), Hashtag.condition(v => v == "ht1"))
```

* Selection: keep tensor's elements which value match the given condition
```
```scala
tensor.selection(v => v > 10)
```

* Union: keep all the elements of the 2 tensors, and apply the given function for the common elements
```
```scala
val tensor2 = TensorBuilder[Int]()
.addDimension(User)
.addDimension(Hashtag)
Expand All @@ -103,7 +139,7 @@ tensor.union(tensor2)((v1, v2) => v1 + v2)
```

* Intersection: keep only the common elements between the 2 tensors, and apply the given function for each value
```
```scala
val tensor2 = TensorBuilder[Int]()
.addDimension(User)
.addDimension(Hashtag)
Expand All @@ -112,17 +148,17 @@ val tensor2 = TensorBuilder[Int]()
tensor.intersection(tensor2)((v1, v2) => v1 + v2)
```

* Natural join: join 2 tensors on their common dimension(s), and keep elements that are present for the common dimension(s) in both tensors
```
* Natural join: join 2 tensors on their common dimension(s), and keep elements that are present for the common dimension(s) in both tensors. Apply the given function for each value
```scala
object Email extends TensorDimension[String]
val tensor3 = TensorBuilder[Int]()
.addDimension(User)
.addDimension(Email)
.build()
tensor.naturalJoin(tensor3)
tensor.naturalJoin(tensor3)((v1, v2) => v1 + v2)
```
* Difference: remove the elements from the first tensor that are also present in the second tensor
```
```scala
val tensor2 = TensorBuilder[Int]()
.addDimension(User)
.addDimension(Hashtag)
Expand All @@ -131,12 +167,24 @@ val tensor2 = TensorBuilder[Int]()
tensor.difference(tensor2)
```
* Rename a dimension: replace the phantom type of a dimension by another
```
```scala
object UserName extends TensorDimension[String]
tensor.withDimensionRenamed(User, UserName)
```

## Decompositions
### Canonical polyadic (or CANDECOMP/PARAFAC)
Perform the canonical polyadic decomposition for a given rank. Some optional parameters are also available:
* nbIterations: the maximum number of iterations
* norm: the norm to apply on the columns of the factor matrices. The l1 and l2 norms are available
* minFms: the Factor Match Score limit at which to stop the algorithm. To be performant even at large scale, our decomposition check the convergence by measuring the difference between the factor matrices of two iterations. The minFms set the limit
* highRank: improve the computation of the pinverse if set to true. By default, is true when rank >= 100
* computeCorcondia: set to true to compute the core consistency diagnostic ([CORCONDIA](https://analyticalsciencejournals.onlinelibrary.wiley.com/doi/pdf/10.1002/cem.801))
```scala
tensor.canonicalPolyadicDecomposition(3)
```

# Roadmap
* Use TDM Library as a backend for machine learning libraries such as TensorFlow, or PyTorch.
* Develop and optimize tensor operators and algebraic expression over tensors.
* Develop other decompositions.
* Provide analytics methods based on tensor operations (community detection, centrality, etc.)

19 changes: 13 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import Dependencies._

ThisBuild / scalaVersion := "2.12.8"
ThisBuild / version := "0.2.0"
ThisBuild / version := "0.3.0"
ThisBuild / organization := "tdm"
ThisBuild / organizationName := "tdm"

val sparkVersion = "2.4.4"
val sparkVersion = "3.0.1"

lazy val root = (project in file("."))
.settings(
name := "TDM",
libraryDependencies += scalaTest % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test
)

test in assembly := {}
test in assembly := {}
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
cp filter {_.data.getName != "mulot_2.12-0.3.jar"}
}

// Shapeless
resolvers ++= Seq(
Expand All @@ -29,9 +31,14 @@ libraryDependencies += "com.chuusai" %% "shapeless" % "2.3.3"
libraryDependencies += "ru.yandex.qatools.embed" % "postgresql-embedded" % "2.10" % Test
libraryDependencies += "org.postgresql" % "postgresql" % "42.2.5" % Test

// Breeze
libraryDependencies += "org.scalanlp" %% "breeze" % "1.1"
libraryDependencies += "org.scalanlp" %% "breeze-natives" % "1.1"

// Spark
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"

// Coveralls

Expand Down
Binary file added lib/mulot_2.12-0.3.jar
Binary file not shown.
77 changes: 77 additions & 0 deletions src/main/scala/tdm/core/CollectedTensor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package tdm.core

import org.apache.spark.sql.Row
import shapeless.{HList, HMap}
import tdm._

class CollectedTensor[T, DL <: HList] private[core]
(val typeList: List[TensorDimension[_]], val dimensions: HMap[DimensionMap], val data: Array[Row])
(implicit tensorTypeAuthorized: Numeric[T]) {
private var cachedRow: Option[Row] = None
private var cachedIndex: Option[Int] = None

/**
* Number of non-zero elements of this tensor.
*/
val size = data.length

/**
* Produce the range of all index of the non-zero elements of this tensor.
*
* @return a [[Range]] value from 0 to one less than the number of non-zero elements of this tensor.
*/
def indices: Range = data.indices

/**
* Get the value of the tensor at the index i.
*
* @param i
* @return
*/
def apply(i: Int): T = {
val row = getRow(i)
row.getAs[T](cachedRow.get.fieldIndex(Tensor.TENSOR_VALUES_COLUMN_NAME))
}

/**
* Get the value of the given dimension at the index i.
*
* @param dimension
* @param i
* @return
*/
def apply[CT, D <: TensorDimension[_]](dimension: D, i: Int)
(implicit eq: dimension.DimensionType =:= CT,
contains: ContainsConstraint[DL, D]): CT = {
val row = getRow(i)
row.getAs[CT](row.fieldIndex(dimension.name))
}

/**
* Sort the values by ascending order.
*
* @return the sorted [[CollectedTensor]]
*/
def orderByValues(): CollectedTensor[T, DL] = {
new CollectedTensor[T, DL](typeList, dimensions,
data.sortWith((r1, r2) => tensorTypeAuthorized.lteq(r1.getAs[T](r1.fieldIndex(Tensor.TENSOR_VALUES_COLUMN_NAME)), r2.getAs[T](r2.fieldIndex(Tensor.TENSOR_VALUES_COLUMN_NAME)))))
}

/**
* Sort the values by descending order.
*
* @return the sorted [[CollectedTensor]]
*/
def orderByValuesDesc(): CollectedTensor[T, DL] = {
new CollectedTensor[T, DL](typeList, dimensions,
data.sortWith((r1, r2) => tensorTypeAuthorized.gteq(r1.getAs[T](r1.fieldIndex(Tensor.TENSOR_VALUES_COLUMN_NAME)), r2.getAs[T](r2.fieldIndex(Tensor.TENSOR_VALUES_COLUMN_NAME)))))
}

private def getRow(i: Int): Row = {
if (cachedIndex.isEmpty || cachedIndex.get != i) {
cachedRow = Some(data(i))
cachedIndex = Some(i)
}
cachedRow.get
}
}
4 changes: 2 additions & 2 deletions src/main/scala/tdm/core/Dimension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import tdm._

/**
* Class to extend in order to create a tensor dimension.
*
*
* For example:
* object Dimension1 extends TensorDimension[String]
*
*
* Only types Double, Float, Long, Int, Short, Byte, Boolean, Char or String are authorized.
*/
abstract class TensorDimension[T](implicit typeAuthorized: AuthorizedType[T]) {
Expand Down
37 changes: 37 additions & 0 deletions src/main/scala/tdm/core/KruskalTensor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tdm.core

import org.apache.spark.sql.{DataFrame, SparkSession}
import shapeless.{::, HList, HMap, HNil}
import tdm.DimensionMap

class KruskalTensor[DL <: HList] private[core] (val typeList: List[TensorDimension[_]],
val lambdas: Map[Int, Double],
val factorMatrices: Map[String, DataFrame],
val corcondia: Option[Double])
(implicit spark: SparkSession) {

/**
* Extract a specific dimension result from this [[KruskalTensor]] as a 2-order tensor,
* with dimensions [[D]] and [[Rank]].
*
* @param dimension: the dimension to extract
* @return a 2-order tensor with schema [[D]] :: [[Rank]]
*/
def extract[D <: TensorDimension[_]](dimension: D): Tensor[Double, D :: Rank.type :: HNil] = {
// Initializing the dimensions of the new tensor
var newDimensions = HMap.empty[DimensionMap]
newDimensions = newDimensions + (dimension -> dimension.produceDimension())
newDimensions = newDimensions + (Rank -> Rank.produceDimension())

// Intializing the new tensor
val tensor = new Tensor[Double, D :: Rank.type :: HNil](typeList, newDimensions)
tensor.values = factorMatrices(dimension.name)
tensor.empty = false

tensor
}

def reconstruct(): Tensor[Double, DL] = ???
}

object Rank extends TensorDimension[Int]
Loading

0 comments on commit 97b61a3

Please sign in to comment.