Skip to content

Commit

Permalink
Use Spark internal.Logging instead of LazyLogging (#56)
Browse files Browse the repository at this point in the history
* Uses Spark internal.Logging instead of LazyLogging
* Updates versions: plugins, dependency spark, exasol docker db.
* Refactors Scaladoc comments.
  • Loading branch information
morazow authored Jul 31, 2019
1 parent 39c7b64 commit f4d7171
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 180 deletions.
6 changes: 2 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import sbt._
object Dependencies {

// Versions
private val SparkVersion = "2.4.0"
private val ExasolJdbcVersion = "6.0.13"
private val TypesafeLoggingVersion = "3.9.0"
private val SparkVersion = "2.4.3"
private val ExasolJdbcVersion = "6.1.3"

private val ScalaTestVersion = "3.0.5"
private val MockitoVersion = "2.23.4"
Expand All @@ -28,7 +27,6 @@ object Dependencies {
private val CoreDependencies: Seq[ModuleID] = Seq(
"org.apache.spark" %% "spark-core" % sparkCurrentVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkCurrentVersion % "provided",
"com.typesafe.scala-logging" %% "scala-logging" % TypesafeLoggingVersion,
"com.exasol" % "exasol-jdbc" % ExasolJdbcVersion
)

Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ addSbtPlugin("org.danielnixon" % "sbt-extrawarts" % "1.0.3")

// Adds a `assembly` task to create a fat JAR with all of its dependencies
// https://github.com/sbt/sbt-assembly
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

// Adds a `BuildInfo` tasks
// https://github.com/sbt/sbt-buildinfo
Expand All @@ -32,7 +32,7 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0")

// Adds a `dependencyUpdates` task to check Maven repositories for dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.1")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.2")

// Adds a `scalafmt` task for automatic source code formatting
// https://github.com/lucidsoftware/neo-sbt-scalafmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class ExasolDockerContainer<SELF extends ExasolDockerContainer<SELF>> extends
JdbcDatabaseContainer<SELF> {
public static final String EXASOL_IMAGE = "exasol/docker-db";
public static final String EXASOL_VERSION = "6.0.13-d1";
public static final String EXASOL_VERSION = "6.1.3-d1";
public static final String EXASOL_HOST = "192.168.0.2";
public static final Integer EXASOL_PORT = 8888;
// wait for 5 minutes to startup
Expand Down
52 changes: 32 additions & 20 deletions src/main/scala/com/exasol/spark/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import com.exasol.spark.util.Types
import com.exasol.spark.writer.ExasolWriter

/**
* A data source for creating integration between Exasol and Spark
* The default entry source for creating integration between Exasol and Spark.
*
* It also serves as a factory class to create [[ExasolRelation]] instances for Spark application.
* Additionally, it serves as a factory class to create [[ExasolRelation]]
* instances for Spark application.
*/
class DefaultSource
extends RelationProvider
Expand All @@ -25,13 +26,15 @@ class DefaultSource
override def shortName(): String = "exasol"

/**
* Creates an [[ExasolRelation]] using provided Spark [[org.apache.spark.sql.SQLContext]] and
* parameters
* Creates an [[ExasolRelation]] using provided Spark
* [[org.apache.spark.sql.SQLContext]] and parameters.
*
* The schema is inferred by running the Exasol query with `LIMIT 1` clause.
* Since the '''schema''' is not provided, it is inferred by running an Exasol
* query with `LIMIT 1` clause.
*
* @param sqlContext A Spark [[org.apache.spark.sql.SQLContext]] context
* @param parameters The parameters provided as options, `query` parameter is required for read
* @param parameters The parameters provided as options, `query` parameter is
* required for read
* @return An [[ExasolRelation]] relation
*/
override def createRelation(
Expand All @@ -44,12 +47,14 @@ class DefaultSource
}

/**
* Creates an [[ExasolRelation]] using the provided Spark [[org.apache.spark.sql.SQLContext]],
* parameters and schema
* Creates an [[ExasolRelation]] using the provided Spark
* [[org.apache.spark.sql.SQLContext]], parameters and schema.
*
* @param sqlContext A Spark [[org.apache.spark.sql.SQLContext]] context
* @param parameters The parameters provided as options, `query` parameter is required for read
* @param schema A user provided schema used to select columns for the relation
* @param parameters The parameters provided as options, `query` parameter is
* required for read
* @param schema A user provided schema used to select columns for the
* relation
* @return An [[ExasolRelation]] relation
*/
override def createRelation(
Expand All @@ -63,12 +68,15 @@ class DefaultSource
}

/**
* Creates an [[ExasolRelation]] after saving a Spark dataframe into Exasol table
* Creates an [[ExasolRelation]] after saving a
* [[org.apache.spark.sql.DataFrame]] into Exasol table.
*
* @param sqlContext A Spark [[org.apache.spark.sql.SQLContext]] context
* @param mode One of Spark save modes, [[org.apache.spark.sql.SaveMode]]
* @param parameters The parameters provided as options, `table` parameter is required for write
* @param data A Spark [[org.apache.spark.sql.DataFrame]] to save as a Exasol table
* @param parameters The parameters provided as options, `table` parameter is
* required for write
* @param data A Spark [[org.apache.spark.sql.DataFrame]] to save as a Exasol
* table
* @return An [[ExasolRelation]] relation
*/
override def createRelation(
Expand Down Expand Up @@ -119,7 +127,7 @@ class DefaultSource
createRelation(sqlContext, newParams, data.schema)
}

def saveDFTable(
private[this] def saveDFTable(
sqlContext: SQLContext,
df: DataFrame,
tableName: String,
Expand All @@ -132,7 +140,11 @@ class DefaultSource
newDF.rdd.foreachPartition(iter => writer.insertPartition(iter))
}

def createDFTable(df: DataFrame, tableName: String, manager: ExasolConnectionManager): Unit = {
private[this] def createDFTable(
df: DataFrame,
tableName: String,
manager: ExasolConnectionManager
): Unit = {
if (!manager.config.create_table) {
throw new UnsupportedOperationException(
s"""
Expand All @@ -152,7 +164,7 @@ class DefaultSource
}

/**
* Rearrange dataframe partitions into Exasol nodes number
* Rearrange dataframe partitions into Exasol data nodes count.
*
* If `nodesCnt` < `df.rdd.getNumPartitions` then perform
*
Expand All @@ -171,7 +183,6 @@ class DefaultSource
* so that there a partition for each data node.
*
* If the number of partitions and nodes are same, then do nothing.
*
*/
def repartitionPerNode(df: DataFrame, nodesCnt: Int): DataFrame = {
val rddPartitionCnt = df.rdd.getNumPartitions
Expand All @@ -193,7 +204,7 @@ class DefaultSource
)
}

// Creates an ExasolConnectionManager with merged configuration values
// Creates an ExasolConnectionManager with merged configuration values.
private[this] def createManager(
parameters: Map[String, String],
sqlContext: SQLContext
Expand All @@ -202,8 +213,9 @@ class DefaultSource
ExasolConnectionManager(config)
}

// Merges user provided parameters with `spark.exasol.*` runtime configurations. If both of them
// define a key=value pair, then the one provided at runtime is used.
// Merges user provided parameters with `spark.exasol.*` runtime
// configurations. If both of them define a key=value pair, then the one
// provided at runtime is used.
private[spark] def mergeConfigurations(
parameters: Map[String, String],
sparkConf: Map[String, String]
Expand Down
40 changes: 25 additions & 15 deletions src/main/scala/com/exasol/spark/ExasolRelation.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exasol.spark

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
Expand All @@ -15,8 +16,15 @@ import com.exasol.spark.util.ExasolConnectionManager
import com.exasol.spark.util.Filters
import com.exasol.spark.util.Types

import com.typesafe.scalalogging.LazyLogging

/**
* The Exasol specific implementation of Spark
* [[org.apache.spark.sql.sources.BaseRelation]].
*
* @param context A Spark [[org.apache.spark.sql.SQLContext]]
* @param queryString A user provided Exasol SQL query string
* @param configSchema An optional user provided '''schema''
* @param manager An Exasol connection manager
*/
class ExasolRelation(
context: SQLContext,
queryString: String,
Expand All @@ -26,7 +34,7 @@ class ExasolRelation(
with PrunedFilteredScan
with PrunedScan
with TableScan
with LazyLogging {
with Logging {

override def sqlContext: SQLContext = context

Expand All @@ -44,7 +52,7 @@ class ExasolRelation(
}

override def schema: StructType = configSchema.fold(inferSchema) { userSchema =>
logger.info(s"Using provided schema $userSchema")
logInfo(s"Using provided schema $userSchema")
userSchema
}

Expand All @@ -68,20 +76,21 @@ class ExasolRelation(

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
val dataTypes = schema.map(field => field.name -> field.dataType).toMap
// remove if a filter is defined (handled)
filters.filterNot(Filters.filterExpr(_, dataTypes).isDefined)
}

/**
* When a count action is run from Spark dataframe we do not have to read the actual data and
* perform all serializations through the network. Instead we can create a RDD with empty Row-s
* with expected number of rows from actual query.
* When a count action is run from Spark dataframe we do not have to read the
* actual data and perform all serializations through the network. Instead we
* can create a RDD with empty Row-s with expected number of rows from actual
* query.
*
* This also called count pushdown.
*
* @param filters A list of [[org.apache.spark.sql.sources.Filter]]-s that can be pushed as
* where clause
* @return An RDD of empty Row-s which has as many elements as count(*) from enriched query
* @param filters A list of [[org.apache.spark.sql.sources.Filter]]-s that can
* be pushed as where clause
* @return An RDD of empty Row-s which has as many elements as count(*) from
* enriched query
*/
private[this] def makeEmptyRDD(filters: Array[Filter]): RDD[Row] = {
val cntQuery = enrichQuery(Array.empty[String], filters)
Expand All @@ -92,10 +101,11 @@ class ExasolRelation(
/**
* Improves the original query with column pushdown and predicate pushdown.
*
* It will use provided column names to create a sub select query and similarly add where clause
* if filters are provided.
* It will use provided column names to create a sub select query and
* similarly add where clause if filters are provided.
*
* Additionally, if no column names are provided it creates a 'COUNT(*)' query.
* Additionally, if no column names are provided it creates a `COUNT(*)`
* query.
*
* @param columns A list of column names
* @param filters A list of Spark [[org.apache.spark.sql.sources.Filter]]-s
Expand All @@ -106,7 +116,7 @@ class ExasolRelation(
val filterStr = Filters.createWhereClause(schema, filters)
val whereClause = if (filterStr.trim.isEmpty) "" else s"WHERE $filterStr"
val enrichedQuery = s"SELECT $columnStr FROM ($queryString) A $whereClause"
logger.info(s"Running with enriched query: $enrichedQuery")
logInfo(s"Running with enriched query: $enrichedQuery")
enrichedQuery
}

Expand Down
10 changes: 1 addition & 9 deletions src/main/scala/com/exasol/spark/ExasolWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import com.exasol.spark.util.Converter
import com.exasol.spark.util.ExasolConnectionManager
import com.exasol.spark.util.Types

import com.typesafe.scalalogging.LazyLogging

/**
*
*/
Expand All @@ -24,8 +22,7 @@ class ExasolWriter(
tableName: String,
rddSchema: StructType,
manager: ExasolConnectionManager
) extends Serializable
with LazyLogging {
) extends Serializable {

// scalastyle:off null
@transient private var mainConnection: EXAConnection = null
Expand All @@ -41,12 +38,10 @@ class ExasolWriter(
mainConnection = manager.writerMainConnection()

if (mainConnection == null) {
logger.error("Could not create main connection!")
throw new RuntimeException("Could not create main connection to Exasol!")
}

val cnt = manager.initParallel(mainConnection)
logger.info(s"Initiated $cnt parallel sub connections")

// Close Exasol main connection when SparkContext finishes. This is a lifetime of a Spark
// application.
Expand Down Expand Up @@ -107,15 +102,12 @@ class ExasolWriter(
val _ = stmt.executeBatch()
totalCnt += rowCnt
}
logger.info(s"Inserted in total $totalCnt rows into table $tableName")

()
} catch {
case ex: SQLException =>
logger.error(s"Error during statement batch execution ${ex.printStackTrace()}")
throw ex
} finally {
logger.info("Closing sub connection statement and connection")
stmt.close()
subConn.commit()
subConn.close()
Expand Down
Loading

0 comments on commit f4d7171

Please sign in to comment.