Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#190: Added Spark filter pushdown for S3 variant #191

Merged
merged 9 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions dependencies.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions doc/changes/changes_2.1.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Spark Connector 2.1.0, released 2023-08-02

Code name: Added filter pushdown and column selection for `S3` variant

## Summary

This release adds predicate pushdown and column selection for the `S3` variant. Similarly, uses the latest API changes from the `spark-connector-common-java` library.

## Features

* #190: Added predicate pushdown and column selection for `S3` variant

## Dependency Updates

### Spark Exasol Connector With JDBC

#### Compile Dependency Updates

* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0`

#### Plugin Dependency Updates

* Updated `com.diffplug.spotless:spotless-maven-plugin:2.37.0` to `2.38.0`

### Spark Exasol Connector With S3

#### Compile Dependency Updates

* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0`
* Updated `software.amazon.awssdk:s3:2.20.103` to `2.20.115`

#### Test Dependency Updates

* Updated `com.amazonaws:aws-java-sdk-s3:1.12.506` to `1.12.518`
* Added `com.exasol:java-util-logging-testing:2.0.3`
* Updated `org.junit.jupiter:junit-jupiter-api:5.9.3` to `5.10.0`
* Updated `org.junit.jupiter:junit-jupiter:5.9.3` to `5.10.0`
6 changes: 5 additions & 1 deletion exasol-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<groupId>com.exasol</groupId>
<artifactId>exasol-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.exasol</groupId>
<artifactId>spark-connector-common-java</artifactId>
</dependency>
<dependency>
<groupId>com.exasol</groupId>
<artifactId>sql-statement-builder-java8</artifactId>
Expand Down Expand Up @@ -362,7 +366,7 @@
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.37.0</version>
<version>2.38.0</version>
<configuration>
<scala>
<scalafmt>
Expand Down

This file was deleted.

47 changes: 31 additions & 16 deletions exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package com.exasol.spark

import java.util.Optional

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType

import com.exasol.spark.common.FilterConverter
import com.exasol.spark.common.StatementGeneratorFactory
import com.exasol.spark.rdd.ExasolRDD
import com.exasol.spark.util.ExasolConnectionManager
import com.exasol.spark.util.Filters
import com.exasol.spark.util.Types
import com.exasol.sql.expression.BooleanExpression

/**
* The Exasol specific implementation of Spark
Expand Down Expand Up @@ -58,20 +62,21 @@ class ExasolRelation(
override def buildScan(requiredColumns: Array[String]): RDD[Row] =
buildScan(requiredColumns, Array.empty)

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] =
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicate = new FilterConverter().convert(filters)
if (requiredColumns.isEmpty) {
makeEmptyRDD(filters)
makeEmptyRDD(predicate)
} else {
new ExasolRDD(
sqlContext.sparkContext,
getEnrichedQuery(requiredColumns, filters),
Types.selectColumns(requiredColumns, schema),
manager
)
val query = getEnrichedQuery(requiredColumns, predicate)
logInfo("Creating Spark RDD from Exasol query '" + query + "'.")
new ExasolRDD(sqlContext.sparkContext, query, Types.selectColumns(requiredColumns, schema), manager)
}
}

override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
filters.filterNot(Filters.filterToBooleanExpression(_).isDefined)
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
val filterConverter = new FilterConverter()
filters.filter(!filterConverter.isFilterSupported(_))
}

/**
* When a count action is run from Spark dataframe we do not have to read the
Expand All @@ -86,13 +91,23 @@ class ExasolRelation(
* @return An RDD of empty Row-s which has as many elements as count(*) from
* enriched query
*/
private[this] def makeEmptyRDD(filters: Array[Filter]): RDD[Row] = {
val cntQuery = getEnrichedQuery(Array.empty[String], filters)
val cnt = manager.withCountQuery(cntQuery)
private[this] def makeEmptyRDD(predicate: Optional[BooleanExpression]): RDD[Row] = {
val stmtGenerator = StatementGeneratorFactory.countStarFrom(s"($queryString)")
if (predicate.isPresent()) {
stmtGenerator.where(predicate.get())
}
val countStarQuery = stmtGenerator.render()
logInfo("Running count star query '" + countStarQuery + "'.")
val cnt = manager.withCountQuery(countStarQuery)
sqlContext.sparkContext.parallelize(1L to cnt, 4).map(_ => Row.empty)
}

private[this] def getEnrichedQuery(columns: Array[String], filters: Array[Filter]): String =
ExasolQueryEnricher(queryString).enrichQuery(columns, filters)
private[this] def getEnrichedQuery(columns: Array[String], predicate: Optional[BooleanExpression]): String = {
val stmtGenerator = StatementGeneratorFactory.selectFrom(s"($queryString)").columns(columns: _*)
if (predicate.isPresent()) {
stmtGenerator.where(predicate.get())
}
return stmtGenerator.render()
}

}
Loading