diff --git a/dependencies.md b/dependencies.md index b00f3f9e..35fbb0c4 100644 --- a/dependencies.md +++ b/dependencies.md @@ -31,9 +31,9 @@ | Dependency | License | | ------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | [EXASolution JDBC Driver][23] | [EXAClient License][24] | -| [Exasol SQL Statement Builder][25] | [MIT License][26] | -| [error-reporting-java8][27] | [MIT License][28] | -| [spark-connector-common-java][29] | [MIT License][30] | +| [spark-connector-common-java][25] | [MIT License][26] | +| [Exasol SQL Statement Builder][27] | [MIT License][28] | +| [error-reporting-java8][29] | [MIT License][30] | | [Spark Project Core][31] | [Apache 2.0 License][12] | | [Spark Project SQL][31] | [Apache 2.0 License][12] | | [Guava: Google Core Libraries for Java][32] | [Apache License, Version 2.0][8] | @@ -101,7 +101,7 @@ | Dependency | License | | ------------------------------------------- | --------------------------------- | | [Scala Library][79] | [Apache-2.0][34] | -| [spark-connector-common-java][29] | [MIT License][30] | +| [spark-connector-common-java][25] | [MIT License][26] | | [Spark Project Core][31] | [Apache 2.0 License][12] | | [Spark Project SQL][31] | [Apache 2.0 License][12] | | Apache Hadoop Client Aggregator | [Apache License, Version 2.0][3] | @@ -117,11 +117,12 @@ | [JUnit Jupiter (Aggregator)][84] | [Eclipse Public License v2.0][85] | | [JUnit Jupiter API][84] | [Eclipse Public License v2.0][85] | | [Test Database Builder for Java][58] | [MIT License][59] | +| [Test utilities for `java.util.logging`][86] | [MIT][87] | | [Matcher for SQL Result Sets][60] | [MIT License][61] | | [Test containers for Exasol on Docker][62] | [MIT License][63] | -| [Testcontainers :: JUnit Jupiter Extension][86] | [MIT][87] | +| [Testcontainers :: JUnit Jupiter Extension][88] | [MIT][89] | | [mockito-junit-jupiter][53] | [The MIT License][54] | -| [Testcontainers :: Localstack][86] | [MIT][87] | +| [Testcontainers :: Localstack][88] | [MIT][89] | | [AWS Java SDK for Amazon S3][80] | [Apache License, Version 2.0][81] | ### Plugin Dependencies @@ -149,7 +150,7 @@ | [OpenFastTrace Maven Plugin][18] | [GNU General Public License v3.0][19] | | [Maven Clean Plugin][20] | [The Apache Software License, Version 2.0][8] | | [Maven Resources Plugin][78] | [The Apache Software License, Version 2.0][8] | -| [Maven JAR Plugin][88] | [The Apache Software License, Version 2.0][8] | +| [Maven JAR Plugin][90] | [The Apache Software License, Version 2.0][8] | | [Maven Install Plugin][21] | [The Apache Software License, Version 2.0][8] | | [Maven Site Plugin 3][22] | [The Apache Software License, Version 2.0][8] | @@ -178,12 +179,12 @@ [22]: http://maven.apache.org/plugins/maven-site-plugin/ [23]: http://www.exasol.com [24]: https://repo1.maven.org/maven2/com/exasol/exasol-jdbc/7.1.20/exasol-jdbc-7.1.20-license.txt -[25]: https://github.com/exasol/sql-statement-builder/ -[26]: https://github.com/exasol/sql-statement-builder/blob/main/LICENSE -[27]: https://github.com/exasol/error-reporting-java/ -[28]: https://github.com/exasol/error-reporting-java/blob/main/LICENSE -[29]: https://github.com/exasol/spark-connector-common-java/ -[30]: https://github.com/exasol/spark-connector-common-java/blob/main/LICENSE +[25]: https://github.com/exasol/spark-connector-common-java/ +[26]: https://github.com/exasol/spark-connector-common-java/blob/main/LICENSE +[27]: https://github.com/exasol/sql-statement-builder/ +[28]: https://github.com/exasol/sql-statement-builder/blob/main/LICENSE +[29]: https://github.com/exasol/error-reporting-java/ +[30]: https://github.com/exasol/error-reporting-java/blob/main/LICENSE [31]: https://spark.apache.org/ [32]: https://github.com/google/guava [33]: https://netty.io/index.html @@ -239,6 +240,8 @@ [83]: http://repository.jboss.org/licenses/apache-2.0.txt [84]: https://junit.org/junit5/ [85]: https://www.eclipse.org/legal/epl-v20.html -[86]: https://testcontainers.org -[87]: http://opensource.org/licenses/MIT -[88]: http://maven.apache.org/plugins/maven-jar-plugin/ +[86]: https://github.com/exasol/java-util-logging-testing/ +[87]: https://opensource.org/licenses/MIT +[88]: https://testcontainers.org +[89]: http://opensource.org/licenses/MIT +[90]: http://maven.apache.org/plugins/maven-jar-plugin/ diff --git a/doc/changes/changelog.md b/doc/changes/changelog.md index 41c1595b..ba7f5b81 100644 --- a/doc/changes/changelog.md +++ b/doc/changes/changelog.md @@ -1,5 +1,6 @@ # Changes +* [2.1.0](changes_2.1.0.md) * [2.0.0](changes_2.0.0.md) * [1.4.0](changes_1.4.0.md) * [1.3.0](changes_1.3.0.md) diff --git a/doc/changes/changes_2.1.0.md b/doc/changes/changes_2.1.0.md new file mode 100644 index 00000000..783e920f --- /dev/null +++ b/doc/changes/changes_2.1.0.md @@ -0,0 +1,37 @@ +# Spark Connector 2.1.0, released 2023-08-02 + +Code name: Added filter pushdown and column selection for `S3` variant + +## Summary + +This release adds predicate pushdown and column selection for the `S3` variant. Similarly, uses the latest API changes from the `spark-connector-common-java` library. + +## Features + +* #190: Added predicate pushdown and column selection for `S3` variant + +## Dependency Updates + +### Spark Exasol Connector With JDBC + +#### Compile Dependency Updates + +* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0` + +#### Plugin Dependency Updates + +* Updated `com.diffplug.spotless:spotless-maven-plugin:2.37.0` to `2.38.0` + +### Spark Exasol Connector With S3 + +#### Compile Dependency Updates + +* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0` +* Updated `software.amazon.awssdk:s3:2.20.103` to `2.20.115` + +#### Test Dependency Updates + +* Updated `com.amazonaws:aws-java-sdk-s3:1.12.506` to `1.12.518` +* Added `com.exasol:java-util-logging-testing:2.0.3` +* Updated `org.junit.jupiter:junit-jupiter-api:5.9.3` to `5.10.0` +* Updated `org.junit.jupiter:junit-jupiter:5.9.3` to `5.10.0` diff --git a/exasol-jdbc/pom.xml b/exasol-jdbc/pom.xml index 6c795fc9..4a8edc88 100644 --- a/exasol-jdbc/pom.xml +++ b/exasol-jdbc/pom.xml @@ -23,6 +23,10 @@ com.exasol exasol-jdbc + + com.exasol + spark-connector-common-java + com.exasol sql-statement-builder-java8 @@ -362,7 +366,7 @@ com.diffplug.spotless spotless-maven-plugin - 2.37.0 + 2.38.0 diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolQueryEnricher.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolQueryEnricher.scala deleted file mode 100644 index 204355b4..00000000 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolQueryEnricher.scala +++ /dev/null @@ -1,67 +0,0 @@ -package com.exasol.spark - -import org.apache.spark.sql.sources.Filter - -import com.exasol.spark.util.Filters -import com.exasol.sql.StatementFactory -import com.exasol.sql.dql.select.Select -import com.exasol.sql.dql.select.rendering.SelectRenderer -import com.exasol.sql.expression.BooleanTerm.and -import com.exasol.sql.expression.ExpressionTerm.stringLiteral -import com.exasol.sql.expression.function.exasol.ExasolAggregateFunction.COUNT -import com.exasol.sql.rendering.StringRendererConfig - -/** - * Improves the original user query with column pruning and predicate - * pushdown. - * - * @param userQuery A user provided initial query - */ -final case class ExasolQueryEnricher(userQuery: String) { - - private[this] val PLACEHOLDER_TABLE_NAME = "" - - /** - * Enriches user query with column pruning and where clause using the - * provided column names and filters. - * - * Additionally, if no column names are provided it creates a {@code - * COUNT('*')} query. - * - * @param columns A list of column names - * @param filters A list of Spark [[org.apache.spark.sql.sources.Filter]]-s - * @return An enriched query with column selection and predicate pushdown - */ - def enrichQuery(columns: Array[String], filters: Array[Filter]): String = { - val select = StatementFactory.getInstance().select() - val _ = select.from().table(PLACEHOLDER_TABLE_NAME) - addColumns(columns, select) - addFilters(filters, select) - renderSelectQuery(PLACEHOLDER_TABLE_NAME, select) - } - - private[this] def addColumns(columns: Array[String], select: Select): Unit = { - if (columns.isEmpty) { - val _ = select.function(COUNT, stringLiteral("*")) - } else { - columns.foreach(column => select.field(column)) - } - () - } - - private[this] def addFilters(filters: Array[Filter], select: Select): Unit = { - val booleanExpressions = Filters.booleanExpressionFromFilters(filters.toSeq) - if (!booleanExpressions.isEmpty) { - val _ = select.where(and(booleanExpressions: _*)) - } - () - } - - private[this] def renderSelectQuery(subSelectPlaceholder: String, select: Select): String = { - val rendererConfig = StringRendererConfig.builder().quoteIdentifiers(true).build() - val renderer = new SelectRenderer(rendererConfig) - select.accept(renderer) - renderer.render().replace(s""""$subSelectPlaceholder"""", s"($userQuery) A") - } - -} diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala index dae9fa2c..aa9549f0 100644 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala +++ b/exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala @@ -1,5 +1,7 @@ package com.exasol.spark +import java.util.Optional + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row @@ -7,10 +9,12 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType +import com.exasol.spark.common.FilterConverter +import com.exasol.spark.common.StatementGeneratorFactory import com.exasol.spark.rdd.ExasolRDD import com.exasol.spark.util.ExasolConnectionManager -import com.exasol.spark.util.Filters import com.exasol.spark.util.Types +import com.exasol.sql.expression.BooleanExpression /** * The Exasol specific implementation of Spark @@ -58,20 +62,21 @@ class ExasolRelation( override def buildScan(requiredColumns: Array[String]): RDD[Row] = buildScan(requiredColumns, Array.empty) - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + val predicate = new FilterConverter().convert(filters) if (requiredColumns.isEmpty) { - makeEmptyRDD(filters) + makeEmptyRDD(predicate) } else { - new ExasolRDD( - sqlContext.sparkContext, - getEnrichedQuery(requiredColumns, filters), - Types.selectColumns(requiredColumns, schema), - manager - ) + val query = getEnrichedQuery(requiredColumns, predicate) + logInfo("Creating Spark RDD from Exasol query '" + query + "'.") + new ExasolRDD(sqlContext.sparkContext, query, Types.selectColumns(requiredColumns, schema), manager) } + } - override def unhandledFilters(filters: Array[Filter]): Array[Filter] = - filters.filterNot(Filters.filterToBooleanExpression(_).isDefined) + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + val filterConverter = new FilterConverter() + filters.filter(!filterConverter.isFilterSupported(_)) + } /** * When a count action is run from Spark dataframe we do not have to read the @@ -86,13 +91,23 @@ class ExasolRelation( * @return An RDD of empty Row-s which has as many elements as count(*) from * enriched query */ - private[this] def makeEmptyRDD(filters: Array[Filter]): RDD[Row] = { - val cntQuery = getEnrichedQuery(Array.empty[String], filters) - val cnt = manager.withCountQuery(cntQuery) + private[this] def makeEmptyRDD(predicate: Optional[BooleanExpression]): RDD[Row] = { + val stmtGenerator = StatementGeneratorFactory.countStarFrom(s"($queryString)") + if (predicate.isPresent()) { + stmtGenerator.where(predicate.get()) + } + val countStarQuery = stmtGenerator.render() + logInfo("Running count star query '" + countStarQuery + "'.") + val cnt = manager.withCountQuery(countStarQuery) sqlContext.sparkContext.parallelize(1L to cnt, 4).map(_ => Row.empty) } - private[this] def getEnrichedQuery(columns: Array[String], filters: Array[Filter]): String = - ExasolQueryEnricher(queryString).enrichQuery(columns, filters) + private[this] def getEnrichedQuery(columns: Array[String], predicate: Optional[BooleanExpression]): String = { + val stmtGenerator = StatementGeneratorFactory.selectFrom(s"($queryString)").columns(columns: _*) + if (predicate.isPresent()) { + stmtGenerator.where(predicate.get()) + } + return stmtGenerator.render() + } } diff --git a/exasol-jdbc/src/main/scala/com/exasol/spark/util/Filters.scala b/exasol-jdbc/src/main/scala/com/exasol/spark/util/Filters.scala deleted file mode 100644 index c750dfc2..00000000 --- a/exasol-jdbc/src/main/scala/com/exasol/spark/util/Filters.scala +++ /dev/null @@ -1,110 +0,0 @@ -package com.exasol.spark.util - -import org.apache.spark.sql.sources._ - -import com.exasol.sql.expression.BooleanExpression -import com.exasol.sql.expression.BooleanTerm -import com.exasol.sql.expression.ExpressionTerm._ -import com.exasol.sql.expression.ValueExpression -import com.exasol.sql.expression.literal.BigDecimalLiteral - -/** - * A helper class with functions to create Exasol where clauses from Spark - * [[org.apache.spark.sql.sources.Filter]]-s. - */ -object Filters { - - /** - * Converts a sequence of filters into an Exasol boolean expressions. - * - * @param filters a sequence of Spark source filters - * @return a sequence of Exasol boolean expressions - */ - def booleanExpressionFromFilters(filters: Seq[Filter]): Seq[BooleanExpression] = - filters.map(filterToBooleanExpression(_)).map(_.toList).flatten - - /** - * Given a Spark source [[org.apache.spark.sql.sources.Filter]], - * creates an Exasol boolean expression. - * - * @param filter a Spark source filter - * @return an Exasol boolean expression, [[scala.None]] is returned if - * expression cannot be created from the filter - */ - // Suppression is accepted since we have terminal conditions in the - // recursion. - def filterToBooleanExpression(filter: Filter): Option[BooleanExpression] = - Option(filter match { - case EqualTo(attribute, value) => - BooleanTerm.eq(column(attribute), getLiteral(value)) - case Not(EqualTo(attribute, value)) => - BooleanTerm.compare(column(attribute), "<>", getLiteral(value)) - case GreaterThan(attribute, value) => - BooleanTerm.gt(column(attribute), getLiteral(value)) - case GreaterThanOrEqual(attribute, value) => - BooleanTerm.ge(column(attribute), getLiteral(value)) - case LessThan(attribute, value) => - BooleanTerm.lt(column(attribute), getLiteral(value)) - case LessThanOrEqual(attribute, value) => - BooleanTerm.le(column(attribute), getLiteral(value)) - case IsNull(attribute) => BooleanTerm.isNull(column(attribute)) - case IsNotNull(attribute) => BooleanTerm.isNotNull(column(attribute)) - case StringEndsWith(attribute, value) => - BooleanTerm.like(column(attribute), stringLiteral(s"%$value")) - case StringContains(attribute, value) => - BooleanTerm.like(column(attribute), stringLiteral(s"%$value%")) - case StringStartsWith(attribute, value) => - BooleanTerm.like(column(attribute), stringLiteral(s"$value%")) - case In(attribute, values) => - BooleanTerm.in(column(attribute), values.map(getLiteral(_)): _*) - case Not(In(attribute, values)) => - BooleanTerm.notIn(column(attribute), values.map(getLiteral(_)): _*) - case Not(notFilter) => - filterToBooleanExpression(notFilter).map(BooleanTerm.not(_)).getOrElse(null) - case And(leftFilter, rightFilter) => andFilterToExpression(leftFilter, rightFilter) - case Or(leftFilter, rightFilter) => orFilterToExpression(leftFilter, rightFilter) - case _ => null - }) - - private[this] def andFilterToExpression( - leftFilter: Filter, - rightFilter: Filter - ): BooleanExpression = { - val leftExpr = filterToBooleanExpression(leftFilter) - val rightExpr = filterToBooleanExpression(rightFilter) - if (leftExpr.isDefined && rightExpr.isDefined) { - BooleanTerm.and(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) - } else { - null - } - } - - private[this] def orFilterToExpression( - leftFilter: Filter, - rightFilter: Filter - ): BooleanExpression = { - val leftExpr = filterToBooleanExpression(leftFilter) - val rightExpr = filterToBooleanExpression(rightFilter) - if (leftExpr.isDefined && rightExpr.isDefined) { - BooleanTerm.or(leftExpr.getOrElse(null), rightExpr.getOrElse(null)) - } else { - null - } - } - - private[this] def getLiteral(value: Any): ValueExpression = - value match { - case booleanValue: Boolean => booleanLiteral(booleanValue) - case stringValue: String => stringLiteral(stringValue) - case byteValue: Byte => integerLiteral(byteValue.toInt) - case shortValue: Short => integerLiteral(shortValue.toInt) - case integerValue: Int => integerLiteral(integerValue) - case longValue: Long => longLiteral(longValue) - case floatValue: Float => floatLiteral(floatValue) - case doubleValue: Double => doubleLiteral(doubleValue) - case decimalValue: BigDecimal => BigDecimalLiteral.of(decimalValue.underlying()) - case decimalValue: java.math.BigDecimal => BigDecimalLiteral.of(decimalValue) - case _ => stringLiteral(s"$value") - } - -} diff --git a/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolQueryEnricherSuite.scala b/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolQueryEnricherSuite.scala deleted file mode 100644 index 0ca2d19f..00000000 --- a/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolQueryEnricherSuite.scala +++ /dev/null @@ -1,38 +0,0 @@ -package com.exasol.spark - -import org.apache.spark.sql.sources._ - -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class ExasolQueryEnricherSuite extends AnyFunSuite with Matchers { - - private[this] val userQuery = "SELECT 1" - private[this] val queryEnricher = ExasolQueryEnricher(userQuery) - - test("enriches empty filters") { - val query = queryEnricher.enrichQuery(Array("a", "b"), Array.empty[Filter]) - assert(query === s"""SELECT "a", "b" FROM ($userQuery) A""") - } - - test("enriches empty columns") { - val query = queryEnricher.enrichQuery(Array.empty[String], Array.empty[Filter]) - assert(query === s"""SELECT COUNT('*') FROM ($userQuery) A""") - } - - test("enriches empty columns with a filter") { - val query = queryEnricher.enrichQuery(Array.empty[String], Array(Not(EqualTo("c", 3)))) - assert(query === s"""SELECT COUNT('*') FROM ($userQuery) A WHERE ("c" <> 3)""") - } - - test("enriches a single filter") { - val query = queryEnricher.enrichQuery(Array("a", "b"), Array(EqualTo("a", 1))) - assert(query === s"""SELECT "a", "b" FROM ($userQuery) A WHERE ("a" = 1)""") - } - - test("enriches many filters") { - val query = queryEnricher.enrichQuery(Array("a"), Array(EqualTo("a", 1), LessThan("b", 2))) - assert(query === s"""SELECT "a" FROM ($userQuery) A WHERE ("a" = 1) AND ("b" < 2)""") - } - -} diff --git a/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolRelationSuite.scala b/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolRelationSuite.scala index a2798ca5..da010e66 100644 --- a/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolRelationSuite.scala +++ b/exasol-jdbc/src/test/scala/com/exasol/spark/ExasolRelationSuite.scala @@ -30,7 +30,7 @@ class ExasolRelationSuite extends AnyFunSuite with Matchers with MockitoSugar { test("buildScan returns empty RDD with empty columns (count pushdown)") { val userQuery = "SELECT FROM DUAL" - val countQuery = s"""SELECT COUNT('*') FROM ($userQuery) A""" + val countQuery = s"""SELECT COUNT('*') FROM ($userQuery)""" val expectedCount = 5L val manager = mock[ExasolConnectionManager] when(manager.withCountQuery(countQuery)).thenReturn(expectedCount) diff --git a/exasol-jdbc/src/test/scala/com/exasol/spark/util/FiltersSuite.scala b/exasol-jdbc/src/test/scala/com/exasol/spark/util/FiltersSuite.scala deleted file mode 100644 index f25632be..00000000 --- a/exasol-jdbc/src/test/scala/com/exasol/spark/util/FiltersSuite.scala +++ /dev/null @@ -1,155 +0,0 @@ -package com.exasol.spark.util - -import org.apache.spark.sql.sources._ - -import com.exasol.sql.expression.BooleanTerm -import com.exasol.sql.expression.rendering.ValueExpressionRenderer -import com.exasol.sql.rendering.StringRendererConfig - -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -class FiltersSuite extends AnyFunSuite with Matchers { - - private[this] def getWhereClause(filters: Seq[Filter]): String = { - val booleanExpressions = Filters.booleanExpressionFromFilters(filters) - val andExpression = BooleanTerm.and(booleanExpressions: _*) - val rendererConfig = StringRendererConfig.builder().quoteIdentifiers(true).build() - val renderer = new ValueExpressionRenderer(rendererConfig) - andExpression.accept(renderer) - renderer.render() - } - - test("renders empty filters") { - assert(getWhereClause(Seq.empty[Filter]) === "") - } - - test("renders equal-to") { - assert(getWhereClause(Seq(EqualTo("field", "a"))) === """("field" = 'a')""") - } - - test("renders equal-to with different data types") { - val filters = Seq( - EqualTo("bool_col", false), - EqualTo("str_col", "XYZ"), - EqualTo("str_col", "\u00d6"), - EqualTo("str_col", "He said 'good morning'"), - EqualTo("int_col", 42), - EqualTo("float_col", 13.0f), - EqualTo("double_col", 100.0), - EqualTo("date_col", "2018-01-01"), - EqualTo("datetime_col", "2018-01-01 00:00:59.123") - ) - val expected = - """ - | ("bool_col" = FALSE) - |AND ("str_col" = 'XYZ') - |AND ("str_col" = 'Ö') - |AND ("str_col" = 'He said 'good morning'') - |AND ("int_col" = 42) - |AND ("float_col" = 13.0) - |AND ("double_col" = 100.0) - |AND ("date_col" = '2018-01-01') - |AND ("datetime_col" = '2018-01-01 00:00:59.123') - """.stripMargin.replaceAll("\\s+", " ").trim() - assert(getWhereClause(filters) === expected) - } - - test("renders not-equal-to") { - assert(getWhereClause(Seq(Not(EqualTo("field", 1.0)))) === """("field" <> 1.0)""") - } - - test("renders greater-than") { - assert(getWhereClause(Seq(GreaterThan("field", 1))) === """("field" > 1)""") - } - - test("renders greater-than-or-equal") { - assert(getWhereClause(Seq(GreaterThanOrEqual("field", 3L))) === """("field" >= 3)""") - } - - test("renders less-than") { - assert(getWhereClause(Seq(LessThan("field", 2.1f))) === """("field" < 2.1)""") - } - - test("renders less-than-or-equal") { - assert(getWhereClause(Seq(LessThanOrEqual("field", "e"))) === """("field" <= 'e')""") - } - - test("renders is-null") { - assert(getWhereClause(Seq(IsNull("field"))) === """("field" IS NULL)""") - } - - test("renders is-not-null") { - assert(getWhereClause(Seq(IsNotNull("field"))) === """("field" IS NOT NULL)""") - } - - test("renders string-ends-with") { - assert(getWhereClause(Seq(StringEndsWith("field", "xyz"))) === """("field" LIKE '%xyz')""") - } - - test("renders string-contains") { - assert(getWhereClause(Seq(StringContains("field", "in"))) === """("field" LIKE '%in%')""") - } - - test("renders string-starts-with") { - assert(getWhereClause(Seq(StringStartsWith("field", "abc"))) === """("field" LIKE 'abc%')""") - } - - test("renders in") { - val javaDecimals: Array[Any] = Array(1.1, 2.3).map(java.math.BigDecimal.valueOf(_)) - assert(getWhereClause(Seq(In("field", javaDecimals))) === """("field" IN (1.1, 2.3))""") - } - - test("renders not-in") { - val decimals: Array[Any] = Array(BigDecimal(1.1), BigDecimal(2.2)) - assert(getWhereClause(Seq(Not(In("field", decimals)))) === """("field" NOT IN (1.1, 2.2))""") - } - - test("renders not") { - assert( - getWhereClause(Seq(Not(StringEndsWith("field", ".")))) === """NOT(("field" LIKE '%.'))""" - ) - } - - test("renders and") { - val filter = And(LessThan("field1", 1), EqualTo("field2", "one")) - val expected = """(("field1" < 1) AND ("field2" = 'one'))""" - assert(getWhereClause(Seq(filter)) === expected) - } - - test("renders or") { - val filter = Or(GreaterThanOrEqual("field1", 13.0), Not(EqualTo("field2", "one"))) - val expected = """(("field1" >= 13.0) OR ("field2" <> 'one'))""" - assert(getWhereClause(Seq(filter)) === expected) - } - - test("renders nested list of filters") { - val filters = Seq( - Or(EqualTo("str_col", "abc"), EqualTo("int_col", 123.toShort)), - Or(Not(LessThan("int_col", 1.toByte)), GreaterThan("str_col", "a")), - Or(EqualTo("str_col", "xyz"), And(EqualTo("float_col", 3.14), Not(EqualTo("int_col", 3)))) - ) - val expected = - """ - | (("str_col" = 'abc') OR ("int_col" = 123)) - |AND (NOT(("int_col" < 1)) OR ("str_col" > 'a')) - |AND (("str_col" = 'xyz') OR (("float_col" = 3.14) AND ("int_col" <> 3))) - """.stripMargin.replaceAll("\\s+", " ").trim() - assert(getWhereClause(filters) === expected) - } - - test("returns empty when one of and expressions is null") { - val expr = Filters.filterToBooleanExpression(And(EqualTo("a", 1), EqualNullSafe("b", "abc"))) - assert(expr === None) - } - - test("returns empty when one of or expressions is null") { - val expr = Filters.filterToBooleanExpression(Or(EqualNullSafe("b", "x"), LessThan("c", 1))) - assert(expr === None) - } - - test("returns empty when filter is null") { - assert(Filters.filterToBooleanExpression(EqualNullSafe("b", "x")) === None) - } - -} diff --git a/exasol-s3/pom.xml b/exasol-s3/pom.xml index dd20beb6..b9d33714 100644 --- a/exasol-s3/pom.xml +++ b/exasol-s3/pom.xml @@ -41,7 +41,7 @@ software.amazon.awssdk s3 - 2.20.103 + 2.20.115 org.apache.hadoop @@ -69,6 +69,11 @@ test-db-builder-java test + + com.exasol + java-util-logging-testing + test + com.exasol hamcrest-resultset-matcher @@ -101,7 +106,7 @@ com.amazonaws aws-java-sdk-s3 - 1.12.506 + 1.12.518 test @@ -124,6 +129,7 @@ shade + ${shade.skip} ${project.artifactId}-${project.version}-assembly false false diff --git a/exasol-s3/src/main/java/com/exasol/spark/s3/ExasolS3ScanBuilder.java b/exasol-s3/src/main/java/com/exasol/spark/s3/ExasolS3ScanBuilder.java index 9bf46bbb..835d8f3c 100644 --- a/exasol-s3/src/main/java/com/exasol/spark/s3/ExasolS3ScanBuilder.java +++ b/exasol-s3/src/main/java/com/exasol/spark/s3/ExasolS3ScanBuilder.java @@ -14,12 +14,13 @@ import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat; import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable; import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import com.exasol.errorreporting.ExaError; -import com.exasol.spark.common.ExasolOptions; -import com.exasol.spark.common.ExasolValidationException; +import com.exasol.spark.common.*; +import com.exasol.sql.expression.BooleanExpression; import scala.collection.JavaConverters; @@ -59,7 +60,9 @@ public Filter[] pushFilters(final Filter[] filters) { } private List getUnsupportedFilters(final Filter[] filters) { - return Collections.emptyList(); + final FilterConverter filterConverter = new FilterConverter(); + return Arrays.asList(filters).stream().filter(f -> !filterConverter.isFilterSupported(f)) + .collect(Collectors.toList()); } @Override @@ -120,7 +123,13 @@ private CaseInsensitiveStringMap getUpdatedMapWithCSVOptions(final CaseInsensiti * @return Enriched SQL query for the intermediate storage. */ protected String getScanQuery() { - return "SELECT * FROM " + getTableOrQuery() + " "; + final Optional predicate = new FilterConverter().convert(this.pushedFilters); + final SelectStatementGenerator stmtGenerator = StatementGeneratorFactory.selectFrom(getTableOrQuery()) + .columns(getColumnNames()); + if (predicate.isPresent()) { + stmtGenerator.where(predicate.get()); + } + return stmtGenerator.render(); } private String getTableOrQuery() { @@ -131,8 +140,14 @@ private String getTableOrQuery() { } } + private String[] getColumnNames() { + return Stream.of(this.schema.fields()).map(StructField::name).toArray(String[]::new); + } + private void prepareIntermediateData(final String bucketKey) { - final String exportQuery = new S3ExportQueryGenerator(this.options, bucketKey).generateQuery(getScanQuery()); + final String selectQuery = getScanQuery(); + LOGGER.info(() -> "Preparing data for query '" + selectQuery + "'."); + final String exportQuery = new S3ExportQueryGenerator(this.options, bucketKey).generateQuery(selectQuery); new S3DataExporter(this.options, bucketKey).exportData(exportQuery); } diff --git a/exasol-s3/src/test/java/com/exasol/spark/s3/S3ColumnSelectionIT.java b/exasol-s3/src/test/java/com/exasol/spark/s3/S3ColumnSelectionIT.java new file mode 100644 index 00000000..ec7dbae4 --- /dev/null +++ b/exasol-s3/src/test/java/com/exasol/spark/s3/S3ColumnSelectionIT.java @@ -0,0 +1,77 @@ +package com.exasol.spark.s3; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.jupiter.api.Assertions.assertAll; + +import java.util.logging.Logger; + +import org.apache.spark.sql.*; +import org.junit.jupiter.api.*; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.exasol.dbbuilder.dialects.Table; +import com.exasol.logging.CapturingLogHandler; + +@Tag("integration") +@Testcontainers +class S3ColumnSelectionIT extends S3IntegrationTestSetup { + private static Table table; + private final CapturingLogHandler capturingLogHandler = new CapturingLogHandler(); + + @BeforeAll + static void setupAll() { + table = exasolSchema.createTableBuilder("table_for_column_selection") // + .column("c1", "VARCHAR(10)") // + .column("c2", "DECIMAL(18,0)") // + .build() // + .insert("one", 314) // + .insert("two", 272) // + .insert("three", 1337); + } + + @BeforeEach + void beforeEach() { + Logger.getLogger("com.exasol").addHandler(this.capturingLogHandler); + this.capturingLogHandler.reset(); + } + + @AfterEach + void afterEach() { + Logger.getLogger("com.exasol").removeHandler(this.capturingLogHandler); + } + + @Test + void testSelectStar() { + final Dataset df = spark.read() // + .format("exasol-s3") // + .option("table", table.getFullyQualifiedName()) // + .options(getSparkOptions()) // + .load(); + + assertAll( + () -> assertThat(df.collectAsList(), + contains(RowFactory.create("one", 314), RowFactory.create("two", 272), + RowFactory.create("three", 1337))), + () -> assertThat(this.capturingLogHandler.getCapturedData(), + containsString("SELECT * FROM \"DEFAULT_SCHEMA\".\"table_for_column_selection\""))); + } + + @Test + void testSelectColumn() { + final Dataset df = spark.read() // + .format("exasol-s3") // + .option("table", table.getFullyQualifiedName()) // + .options(getSparkOptions()) // + .load() // + .select("c1") // + .as(Encoders.STRING()); + + assertAll(() -> assertThat(df.collectAsList(), contains("one", "two", "three")), + () -> assertThat(df.queryExecution().toString(), containsString("Project [c1")), + () -> assertThat(this.capturingLogHandler.getCapturedData(), + containsString("SELECT \"c1\" FROM \"DEFAULT_SCHEMA\".\"table_for_column_selection\""))); + } + +} diff --git a/exasol-s3/src/test/java/com/exasol/spark/s3/S3PredicatePushdownIT.java b/exasol-s3/src/test/java/com/exasol/spark/s3/S3PredicatePushdownIT.java new file mode 100644 index 00000000..f371759c --- /dev/null +++ b/exasol-s3/src/test/java/com/exasol/spark/s3/S3PredicatePushdownIT.java @@ -0,0 +1,196 @@ +package com.exasol.spark.s3; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertAll; + +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import org.apache.spark.sql.*; +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.junit.jupiter.Testcontainers; + +import com.exasol.dbbuilder.dialects.Table; +import com.exasol.logging.CapturingLogHandler; + +@Tag("integration") +@Testcontainers +class S3PredicatePushdownIT extends S3IntegrationTestSetup { + private static Table table; + private final CapturingLogHandler capturingLogHandler = new CapturingLogHandler(); + + @BeforeAll + static void setupAll() { + table = exasolSchema.createTableBuilder("table_for_predicate_pushdown") // + .column("column_string", "VARCHAR(10)") // + .column("column_integer", "DECIMAL(9,0)") // + .column("column_double", "DOUBLE") // + .column("column_boolean", "BOOLEAN") // + .build() // + .insert("one", 314, 3.14, false) // + .insert("two", 272, 2.72, false) // + .insert("three", 1337, 13.37, true); + } + + @BeforeEach + void beforeEach() { + Logger.getLogger("com.exasol").addHandler(this.capturingLogHandler); + this.capturingLogHandler.reset(); + } + + @AfterEach + void afterEach() { + Logger.getLogger("com.exasol").removeHandler(this.capturingLogHandler); + } + + private Dataset getSparkDataframe() { + return spark.read() // + .format("exasol-s3") // + .option("table", table.getFullyQualifiedName()) // + .options(getSparkOptions()) // + .load(); + } + + @Test + void testEqualTo() { + final Dataset df = getSparkDataframe() // + .select("column_integer", "column_boolean") // + .filter("column_string = 'one'"); + final List rows = df.collectAsList(); + assertAll(() -> assertThat(rows.size(), equalTo(1)), // + () -> assertThat(rows, contains(RowFactory.create(314, false))), + () -> assertThat(df.queryExecution().toString(), containsString("Filter ('column_string = one)")), // + () -> assertThat(this.capturingLogHandler.getCapturedData(), + containsString("WHERE (\"column_string\" IS NOT NULL) AND (\"column_string\" = 'one')"))); + } + + @Test + void testLessThan() { + final Dataset df = getSparkDataframe() // + .select("column_string") // + .filter("column_double < 3.00") // + .as(Encoders.STRING()); + assertThat(df.collectAsList(), contains("two")); + } + + @Test + void testLessThanOrEqual() { + final Dataset df = getSparkDataframe() // + .select("column_string") // + .filter("column_integer <= 314") // + .as(Encoders.STRING()); + assertThat(df.collectAsList(), contains("one", "two")); + } + + @Test + void testGreaterThan() { + final Dataset df = getSparkDataframe() // + .select("column_double") // + .filter("column_integer > 300") // + .as(Encoders.DOUBLE()); + assertThat(df.collectAsList(), contains(3.14, 13.37)); + } + + @Test + void testPredicateGreaterThanOrEqual() { + final Dataset df = getSparkDataframe() // + .select("column_double") // + .filter("column_integer >= 100") // + .as(Encoders.DOUBLE()); + assertThat(df.collectAsList(), contains(3.14, 2.72, 13.37)); + } + + @Test + void testStartsWith() { + final Dataset df = getSparkDataframe() // + .select("column_string") // + .filter("column_string LIKE 'o%'") // + .as(Encoders.STRING()); + final List rows = df.collectAsList(); + assertAll(() -> assertThat(rows.size(), equalTo(1)), // + () -> assertThat(rows, contains("one")), // + () -> assertThat(df.queryExecution().toString(), containsString("LIKE o%")), // + () -> assertThat(this.capturingLogHandler.getCapturedData(), containsString( + "WHERE (\"column_string\" IS NOT NULL) AND (\"column_string\" LIKE 'o%' ESCAPE '\\')"))); + } + + @Test + void testStringContains() { + final Dataset df = getSparkDataframe() // + .select("column_string") // + .filter("column_string LIKE '%n%'") // + .as(Encoders.STRING()); + assertThat(df.collectAsList(), contains("one")); + } + + @Test + void testStringEndsWith() { + final Dataset df = getSparkDataframe() // + .select("column_integer") // + .filter("column_string LIKE '%ee'") // + .as(Encoders.INT()); + assertThat(df.collectAsList(), contains(1337)); + } + + private static final Table escapedStringsTable = exasolSchema // + .createTableBuilder("table_for_predicate_pushdown_escaped_strings") // + .column("column_integer", "DECIMAL(9,0)") // + .column("column_string", "VARCHAR(30)") // + .build() // + .insert("1", "unders\\corewildcard") // + .insert("2", "%underscore_wild%card%") // + .insert("3", "underscoreXwildcard") // + .insert("4", "contains'singlequote") // + .insert("5", "escaped\\_underscore"); + + private static final Stream stringFilters() { + return Stream.of(// + Arguments.of(functions.col("column_string").startsWith("%under"), 2), // + Arguments.of(functions.col("column_string").contains("e_wild%"), 2), // + Arguments.of(functions.col("column_string").endsWith("card%"), 2), // + Arguments.of(functions.col("column_string").contains("s\\cor"), 1), // + Arguments.of(functions.col("column_string").contains("ains'sing"), 4), // + Arguments.of(functions.col("column_string").contains("d\\_"), 5) // + ); + } + + @ParameterizedTest + @MethodSource("stringFilters") + void testPredicateStringLiteralsEscaped(final Column column, final int id) { + final Dataset df = spark.read() // + .format("exasol-s3") // + .option("table", escapedStringsTable.getFullyQualifiedName()) // + .options(getSparkOptions()) // + .load() // + .filter(column) // + .select("column_integer") // + .as(Encoders.INT()); + assertThat(df.collectAsList(), contains(id)); + } + + @Test + void testNonPushedFiltersAreRunPostScan() { + final Dataset df = getSparkDataframe() // + .select("column_string", "column_integer", "column_boolean") // + .filter(functions.col("column_string").eqNullSafe("one")) // not pushed, should be filtered after scan + .filter(functions.col("column_double").gt(0.0)); + assertThat(df.collectAsList(), contains(RowFactory.create("one", 314, false))); + } + + @Test + void testMultipleFilters() { + final Dataset df = getSparkDataframe() // + .select("column_string", "column_boolean") // + .filter(functions.col("column_boolean").equalTo(false)) // + .filter(functions.col("column_double").gt(0.00)); + assertThat(df.collectAsList(), contains(RowFactory.create("one", false), RowFactory.create("two", false))); + } + +} diff --git a/parent-pom/pom.xml b/parent-pom/pom.xml index 437959c0..fa430c4d 100644 --- a/parent-pom/pom.xml +++ b/parent-pom/pom.xml @@ -15,11 +15,12 @@ pk_generated_parent.pom - 2.0.0 + 2.1.0 8 2.20.0 - 5.9.3 + 5.10.0 5.4.0 + 3.1.3 @@ -46,7 +47,7 @@ com.exasol spark-connector-common-java - 1.1.1 + 2.0.0 org.apache.spark @@ -206,7 +207,7 @@ io.netty netty-all - 4.1.94.Final + 4.1.96.Final provided @@ -351,6 +352,12 @@ 3.4.2 test + + com.exasol + java-util-logging-testing + 2.0.3 + test + com.exasol hamcrest-resultset-matcher @@ -485,7 +492,6 @@ 2.13 3.3.4 2.14.2 - 3.1.2 exasol-jdbc @@ -501,7 +507,6 @@ 2.12 3.3.4 2.14.2 - 3.1.2 exasol-jdbc @@ -521,7 +526,6 @@ Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 --> 2.13.4.2 - 3.1.2 exasol-jdbc @@ -536,7 +540,6 @@ 2.12 3.3.2 2.13.4.2 - 3.1.2 exasol-jdbc