From bc8630d1a41ebce89eb43f72773eb6577350117a Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 28 Jun 2023 15:14:28 -0700 Subject: [PATCH] Add create and refresh index Signed-off-by: Chen Dai --- .../main/antlr4/FlintSparkSqlExtensions.g4 | 21 ++++++++- .../src/main/antlr4/SparkSqlBase.g4 | 16 +++++-- .../spark/sql/FlintSparkSqlAstBuilder.scala | 34 ++++++++++++++- .../flint/spark/FlintSparkSqlSuite.scala | 43 ++++++++++++++++--- 4 files changed, 103 insertions(+), 11 deletions(-) diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 75f71f3ead..f696608112 100644 --- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -19,10 +19,21 @@ statement ; skippingIndexStatement - : describeSkippingIndexStatement + : createSkippingIndexStatement + | refreshSkippingIndexStatement + | describeSkippingIndexStatement | dropSkippingIndexStatement ; +createSkippingIndexStatement + : CREATE SKIPPING INDEX ON tableName=multipartIdentifier + LEFT_PAREN indexColTypeList RIGHT_PAREN + ; + +refreshSkippingIndexStatement + : REFRESH SKIPPING INDEX ON tableName=multipartIdentifier + ; + describeSkippingIndexStatement : (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier ; @@ -30,3 +41,11 @@ describeSkippingIndexStatement dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName=multipartIdentifier ; + +indexColTypeList + : indexColType (COMMA indexColType)* + ; + +indexColType + : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + ; \ No newline at end of file diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 6836b7cacd..79ae7125bd 100644 --- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -106,20 +106,30 @@ nonReserved // Flint lexical tokens -SKIPPING : 'SKIPPING'; +MIN_MAX: 'MIN_MAX'; +SKIPPING: 'SKIPPING'; +VALUE_SET: 'VALUE_SET'; // Spark lexical tokens SEMICOLON: ';'; +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +COMMA: ','; +DOT: '.'; + +CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; -DOT: '.'; DROP: 'DROP'; INDEX: 'INDEX'; -MINUS: '-'; ON: 'ON'; +PARTITION: 'PARTITION'; +REFRESH: 'REFRESH'; + +MINUS: '-'; IDENTIFIER : (LETTER | DIGIT | '_')+ diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 4560b60b65..4785261c84 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,9 +5,12 @@ package org.opensearch.flint.spark.sql +import java.util.Locale + +import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateSkippingIndexStatementContext, DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext, RefreshSkippingIndexStatementContext} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -19,6 +22,35 @@ import org.apache.spark.sql.types.StringType */ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] { + override def visitCreateSkippingIndexStatement( + ctx: CreateSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + val indexBuilder = flint + .skippingIndex() + .onTable(ctx.tableName.getText) + + ctx.indexColTypeList().indexColType().forEach { colTypeCtx => + val colName = colTypeCtx.identifier().getText + val skipType = colTypeCtx.skipType.getText.toLowerCase(Locale.ROOT) + + skipType match { + case "partition" => indexBuilder.addPartitions(colName) + case "value_set" => indexBuilder.addValueSet(colName) + case "min_max" => indexBuilder.addMinMax(colName) + } + } + indexBuilder.create() + Seq.empty + } + + override def visitRefreshSkippingIndexStatement( + ctx: RefreshSkippingIndexStatementContext): Command = + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(ctx.tableName.getText) + flint.refreshIndex(indexName, RefreshMode.FULL) + Seq.empty + } + override def visitDescribeSkippingIndexStatement( ctx: DescribeSkippingIndexStatementContext): Command = { val outputSchema = Seq( diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala index f45be95325..9a358ef280 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala @@ -9,20 +9,18 @@ import scala.Option.empty import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT} +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite { /** Flint Spark high level API for assertion */ - private lazy val flint: FlintSpark = { - setFlintSparkConf(HOST_ENDPOINT, openSearchHost) - setFlintSparkConf(HOST_PORT, openSearchPort) - new FlintSpark(spark) - } + private lazy val flint: FlintSpark = new FlintSpark(spark) /** Test table and index name */ private val testTable = "flint_sql_test" @@ -30,6 +28,13 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite override def beforeAll(): Unit = { super.beforeAll() + + // Configure for FlintSpark explicit created above and the one behind Flint SQL + setFlintSparkConf(HOST_ENDPOINT, openSearchHost) + setFlintSparkConf(HOST_PORT, openSearchPort) + setFlintSparkConf(REFRESH_POLICY, true) + + // Create test table sql(s""" | CREATE TABLE $testTable | ( @@ -46,6 +51,12 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite | month INT | ) |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 30) + | """.stripMargin) } protected override def beforeEach(): Unit = { @@ -64,6 +75,26 @@ class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite flint.deleteIndex(testIndex) } + test("create skipping index with manual refresh") { + flint.deleteIndex(testIndex) + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | year PARTITION, + | name VALUE_SET, + | age MIN_MAX + | ) + | """.stripMargin) + + val indexData = spark.read.format(FLINT_DATASOURCE).options(openSearchOptions).load(testIndex) + + flint.describeIndex(testIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + indexData.count() shouldBe 1 + } + test("describe skipping index") { val result = sql(s"DESC SKIPPING INDEX ON $testTable")