diff --git a/flint/docs/index.md b/flint/docs/index.md
index 0a6453d999..27c68bcc74 100644
--- a/flint/docs/index.md
+++ b/flint/docs/index.md
@@ -17,11 +17,11 @@ A Flint index is ...
Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.
-| Skipping Index | Create Index Statement (TBD) | Index Building Logic | Query Rewrite Logic |
-|----------------|-------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic |
+|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX ON alb_logs FOR COLUMNS ( year PARTITION, month PARTITION, day PARTITION, hour PARTITION ) | INSERT INTO flint_alb_logs_skipping_index SELECT FIRST(year) AS year, FIRST(month) AS month, FIRST(day) AS day, FIRST(hour) AS hour, input_file_name() AS file_path FROM alb_logs GROUP BY input_file_name() | SELECT * FROM alb_logs WHERE year = 2023 AND month = 4 => SELECT * FROM alb_logs (input_files = SELECT file_path FROM flint_alb_logs_skipping_index WHERE year = 2023 AND month = 4 ) WHERE year = 2023 AND month = 4 |
-| ValueSet | CREATE SKIPPING INDEX ON alb_logs FOR COLUMNS ( elb_status_code VALUE_LIST ) | INSERT INTO flint_alb_logs_skipping_index SELECT COLLECT_SET(elb_status_code) AS elb_status_code, input_file_name() AS file_path FROM alb_logs GROUP BY input_file_name() | SELECT * FROM alb_logs WHERE elb_status_code = 404 => SELECT * FROM alb_logs (input_files = SELECT file_path FROM flint_alb_logs_skipping_index WHERE ARRAY_CONTAINS(elb_status_code, 404) ) WHERE elb_status_code = 404 |
-| Min-Max | CREATE SKIPPING INDEX ON alb_logs FOR COLUMNS ( request_processing_time MIN_MAX ) | INSERT INTO flint_alb_logs_skipping_index SELECT MIN(request_processing_time) AS request_processing_time_min, MAX(request_processing_time) AS request_processing_time_max, input_file_name() AS file_path FROM alb_logs GROUP BY input_file_name() | SELECT * FROM alb_logs WHERE request_processing_time = 100 => SELECT * FROM alb_logs (input_files = SELECT file_path FROM flint_alb_logs_skipping_index WHERE request_processing_time_min <= 100 AND 100 <= request_processing_time_max ) WHERE request_processing_time = 100
+| ValueSet | CREATE SKIPPING INDEX ON alb_logs FOR COLUMNS ( elb_status_code VALUE_SET ) | INSERT INTO flint_alb_logs_skipping_index SELECT COLLECT_SET(elb_status_code) AS elb_status_code, input_file_name() AS file_path FROM alb_logs GROUP BY input_file_name() | SELECT * FROM alb_logs WHERE elb_status_code = 404 => SELECT * FROM alb_logs (input_files = SELECT file_path FROM flint_alb_logs_skipping_index WHERE ARRAY_CONTAINS(elb_status_code, 404) ) WHERE elb_status_code = 404 |
+| MinMax | CREATE SKIPPING INDEX ON alb_logs FOR COLUMNS ( request_processing_time MIN_MAX ) | INSERT INTO flint_alb_logs_skipping_index SELECT MIN(request_processing_time) AS request_processing_time_min, MAX(request_processing_time) AS request_processing_time_max, input_file_name() AS file_path FROM alb_logs GROUP BY input_file_name() | SELECT * FROM alb_logs WHERE request_processing_time = 100 => SELECT * FROM alb_logs (input_files = SELECT file_path FROM flint_alb_logs_skipping_index WHERE request_processing_time_min <= 100 AND 100 <= request_processing_time_max ) WHERE request_processing_time = 100
### Flint Index Specification
@@ -122,8 +122,11 @@ DDL statement:
```sql
CREATE SKIPPING INDEX
ON
-FOR COLUMNS ( column [, ...] )
+( column [, ...] )
WHERE
+WITH (auto_refresh = (true|false))
+
+REFRESH SKIPPING INDEX ON
DESCRIBE SKIPPING INDEX ON
@@ -135,23 +138,20 @@ DROP SKIPPING INDEX ON
Skipping index type:
```sql
- ::= { , , }
-
- ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD
- ::= MIN_MAX
- ::= VALUE_SET
+ ::= { PARTITION, VALUE_SET, MIN_MAX }
```
Example:
```sql
CREATE SKIPPING INDEX ON alb_logs
-FOR COLUMNS (
- client_ip BLOOM_FILTER,
+(
elb_status_code VALUE_SET
)
WHERE time > '2023-04-01 00:00:00'
+REFRESH SKIPPING INDEX ON alb_logs
+
DESCRIBE SKIPPING INDEX ON alb_logs
DROP SKIPPING INDEX ON alb_logs
@@ -258,7 +258,6 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
- .addBloomFilter("client_ip")
.create()
flint.refresh("flint_alb_logs_skipping_index", FULL)
@@ -302,3 +301,7 @@ val df = new SQLContext(sc).read
## Benchmarks
TODO
+
+## Limitations
+
+Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check.
diff --git a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
index 75f71f3ead..0ee976cb75 100644
--- a/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
+++ b/flint/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
@@ -19,10 +19,22 @@ statement
;
skippingIndexStatement
- : describeSkippingIndexStatement
+ : createSkippingIndexStatement
+ | refreshSkippingIndexStatement
+ | describeSkippingIndexStatement
| dropSkippingIndexStatement
;
+createSkippingIndexStatement
+ : CREATE SKIPPING INDEX ON tableName=multipartIdentifier
+ LEFT_PAREN indexColTypeList RIGHT_PAREN
+ (WITH LEFT_PAREN propertyList RIGHT_PAREN)?
+ ;
+
+refreshSkippingIndexStatement
+ : REFRESH SKIPPING INDEX ON tableName=multipartIdentifier
+ ;
+
describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier
;
@@ -30,3 +42,11 @@ describeSkippingIndexStatement
dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;
+
+indexColTypeList
+ : indexColType (COMMA indexColType)*
+ ;
+
+indexColType
+ : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
+ ;
\ No newline at end of file
diff --git a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
index 6836b7cacd..a777cc59fe 100644
--- a/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
+++ b/flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
@@ -85,6 +85,31 @@ grammar SparkSqlBase;
}
+propertyList
+ : property (COMMA property)*
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | STRING
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | STRING
+ ;
+
+booleanValue
+ : TRUE | FALSE
+ ;
+
+
multipartIdentifier
: parts+=identifier (DOT parts+=identifier)*
;
@@ -106,20 +131,46 @@ nonReserved
// Flint lexical tokens
-SKIPPING : 'SKIPPING';
+MIN_MAX: 'MIN_MAX';
+SKIPPING: 'SKIPPING';
+VALUE_SET: 'VALUE_SET';
// Spark lexical tokens
SEMICOLON: ';';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+COMMA: ',';
+DOT: '.';
+
+
+CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
-DOT: '.';
DROP: 'DROP';
+FALSE: 'FALSE';
INDEX: 'INDEX';
-MINUS: '-';
ON: 'ON';
+PARTITION: 'PARTITION';
+REFRESH: 'REFRESH';
+STRING: 'STRING';
+TRUE: 'TRUE';
+WITH: 'WITH';
+
+
+EQ : '=' | '==';
+MINUS: '-';
+
+
+INTEGER_VALUE
+ : DIGIT+
+ ;
+
+DECIMAL_VALUE
+ : DECIMAL_DIGITS {isValidDecimal()}?
+ ;
IDENTIFIER
: (LETTER | DIGIT | '_')+
@@ -129,6 +180,11 @@ BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;
+fragment DECIMAL_DIGITS
+ : DIGIT+ '.' DIGIT*
+ | '.' DIGIT+
+ ;
+
fragment DIGIT
: [0-9]
;
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
index 635cbae50a..b9305f9b17 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
@@ -18,7 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy
@@ -101,6 +101,9 @@ class FlintSpark(val spark: SparkSession) {
}
mode match {
+ case FULL if isIncrementalRefreshing(indexName) =>
+ throw new IllegalStateException(
+ s"Index $indexName is incremental refreshing and cannot be manual refreshed")
case FULL =>
writeFlintIndex(
spark.read
@@ -113,6 +116,7 @@ class FlintSpark(val spark: SparkSession) {
val job = spark.readStream
.table(tableName)
.writeStream
+ .queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
@@ -156,6 +160,9 @@ class FlintSpark(val spark: SparkSession) {
}
}
+ private def isIncrementalRefreshing(indexName: String): Boolean =
+ spark.streams.active.exists(_.name == indexName)
+
// TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed
private def getSourceTableName(index: FlintSparkIndex): String = {
val json = parse(index.metadata().getContent)
@@ -182,11 +189,11 @@ class FlintSpark(val spark: SparkSession) {
val columnType = (colInfo \ "columnType").extract[String]
skippingKind match {
- case Partition =>
+ case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
- case ValuesSet =>
+ case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
- case MinMax =>
+ case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
index 678a01bed5..61721481de 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
@@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value
// Use Value[s]Set because ValueSet already exists in Enumeration
- val Partition, ValuesSet, MinMax = Value
+ val PARTITION, VALUE_SET, MIN_MAX = Value
}
/** json4s doesn't serialize Enum by default */
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala
index d141104cbd..779148c205 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/minmax/MinMaxSkippingStrategy.scala
@@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.minmax
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, SkippingKind}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
@@ -17,7 +17,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
- override val kind: SkippingKind = MinMax,
+ override val kind: SkippingKind = MIN_MAX,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala
index c3ed33bb34..8be602b212 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/partition/PartitionSkippingStrategy.scala
@@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.partition
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind}
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{PARTITION, SkippingKind}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First}
@@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy for partitioned columns of source table.
*/
case class PartitionSkippingStrategy(
- override val kind: SkippingKind = Partition,
+ override val kind: SkippingKind = PARTITION,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala
index 94c6c4989d..389ddc7dc0 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala
@@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.valueset
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet}
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet}
@@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on unique column value set.
*/
case class ValueSetSkippingStrategy(
- override val kind: SkippingKind = ValuesSet,
+ override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala
index 4560b60b65..64d57dd5bf 100644
--- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala
+++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala
@@ -5,9 +5,12 @@
package org.opensearch.flint.spark.sql
+import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
-import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext}
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
+import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -19,6 +22,41 @@ import org.apache.spark.sql.types.StringType
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {
+ override def visitCreateSkippingIndexStatement(
+ ctx: CreateSkippingIndexStatementContext): Command =
+ FlintSparkSqlCommand() { flint =>
+ // Create skipping index
+ val indexBuilder = flint
+ .skippingIndex()
+ .onTable(ctx.tableName.getText)
+
+ ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
+ val colName = colTypeCtx.identifier().getText
+ val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
+ skipType match {
+ case PARTITION => indexBuilder.addPartitions(colName)
+ case VALUE_SET => indexBuilder.addValueSet(colName)
+ case MIN_MAX => indexBuilder.addMinMax(colName)
+ }
+ }
+ indexBuilder.create()
+
+ // Trigger auto refresh if enabled
+ if (isAutoRefreshEnabled(ctx.propertyList())) {
+ val indexName = getSkippingIndexName(ctx.tableName.getText)
+ flint.refreshIndex(indexName, RefreshMode.INCREMENTAL)
+ }
+ Seq.empty
+ }
+
+ override def visitRefreshSkippingIndexStatement(
+ ctx: RefreshSkippingIndexStatementContext): Command =
+ FlintSparkSqlCommand() { flint =>
+ val indexName = getSkippingIndexName(ctx.tableName.getText)
+ flint.refreshIndex(indexName, RefreshMode.FULL)
+ Seq.empty
+ }
+
override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
@@ -46,6 +84,21 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
Seq.empty
}
+ private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
+ if (ctx == null) {
+ false
+ } else {
+ ctx
+ .property()
+ .forEach(p => {
+ if (p.key.getText == "auto_refresh") {
+ return p.value.getText.toBoolean
+ }
+ })
+ false
+ }
+ }
+
override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}
diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
index 068100e814..2da0f0d4de 100644
--- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
+++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
@@ -108,22 +108,22 @@ class FlintSparkSkippingIndexITSuite
| "kind": "skipping",
| "indexedColumns": [
| {
- | "kind": "Partition",
+ | "kind": "PARTITION",
| "columnName": "year",
| "columnType": "int"
| },
| {
- | "kind": "Partition",
+ | "kind": "PARTITION",
| "columnName": "month",
| "columnType": "int"
| },
| {
- | "kind": "ValuesSet",
+ | "kind": "VALUE_SET",
| "columnName": "address",
| "columnType": "string"
| },
| {
- | "kind": "MinMax",
+ | "kind": "MIN_MAX",
| "columnName": "age",
| "columnType": "int"
| }],
@@ -201,6 +201,24 @@ class FlintSparkSkippingIndexITSuite
indexData should have size 2
}
+ test("should fail to manual refresh an incremental refreshing index") {
+ flint
+ .skippingIndex()
+ .onTable(testTable)
+ .addPartitions("year", "month")
+ .create()
+
+ val jobId = flint.refreshIndex(testIndex, INCREMENTAL)
+ val job = spark.streams.get(jobId.get)
+ failAfter(streamingTimeout) {
+ job.processAllAvailable()
+ }
+
+ assertThrows[IllegalStateException] {
+ flint.refreshIndex(testIndex, FULL)
+ }
+ }
+
test("can have only 1 skipping index on a table") {
flint
.skippingIndex()
diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala
new file mode 100644
index 0000000000..4f5ca98fa8
--- /dev/null
+++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlITSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.flint.spark
+
+import scala.Option.empty
+
+import org.opensearch.flint.OpenSearchSuite
+import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
+import org.scalatest.matchers.must.Matchers.defined
+import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
+
+import org.apache.spark.FlintSuite
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
+import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
+import org.apache.spark.sql.streaming.StreamTest
+
+class FlintSparkSqlITSuite
+ extends QueryTest
+ with FlintSuite
+ with OpenSearchSuite
+ with StreamTest {
+
+ /** Flint Spark high level API for assertion */
+ private lazy val flint: FlintSpark = new FlintSpark(spark)
+
+ /** Test table and index name */
+ private val testTable = "flint_sql_test"
+ private val testIndex = getSkippingIndexName(testTable)
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ // Configure for FlintSpark explicit created above and the one behind Flint SQL
+ setFlintSparkConf(HOST_ENDPOINT, openSearchHost)
+ setFlintSparkConf(HOST_PORT, openSearchPort)
+ setFlintSparkConf(REFRESH_POLICY, true)
+
+ // Create test table
+ sql(s"""
+ | CREATE TABLE $testTable
+ | (
+ | name STRING,
+ | age INT
+ | )
+ | USING CSV
+ | OPTIONS (
+ | header 'false',
+ | delimiter '\t'
+ | )
+ | PARTITIONED BY (
+ | year INT,
+ | month INT
+ | )
+ |""".stripMargin)
+
+ sql(s"""
+ | INSERT INTO $testTable
+ | PARTITION (year=2023, month=4)
+ | VALUES ('Hello', 30)
+ | """.stripMargin)
+ }
+
+ protected override def beforeEach(): Unit = {
+ super.beforeEach()
+ flint
+ .skippingIndex()
+ .onTable(testTable)
+ .addPartitions("year")
+ .addValueSet("name")
+ .addMinMax("age")
+ .create()
+ }
+
+ protected override def afterEach(): Unit = {
+ super.afterEach()
+ flint.deleteIndex(testIndex)
+
+ // Stop all streaming jobs if any
+ spark.streams.active.foreach { job =>
+ job.stop()
+ job.awaitTermination()
+ }
+ }
+
+ test("create skipping index with auto refresh") {
+ flint.deleteIndex(testIndex)
+ sql(s"""
+ | CREATE SKIPPING INDEX ON $testTable
+ | (
+ | year PARTITION,
+ | name VALUE_SET,
+ | age MIN_MAX
+ | )
+ | WITH (auto_refresh = true)
+ | """.stripMargin)
+
+ // Wait for streaming job complete current micro batch
+ val job = spark.streams.active.find(_.name == testIndex)
+ job shouldBe defined
+ failAfter(streamingTimeout) {
+ job.get.processAllAvailable()
+ }
+
+ val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex)
+ flint.describeIndex(testIndex) shouldBe defined
+ indexData.count() shouldBe 1
+ }
+
+ test("create skipping index with manual refresh") {
+ flint.deleteIndex(testIndex)
+ sql(s"""
+ | CREATE SKIPPING INDEX ON $testTable
+ | (
+ | year PARTITION,
+ | name VALUE_SET,
+ | age MIN_MAX
+ | )
+ | """.stripMargin)
+
+ val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex)
+
+ flint.describeIndex(testIndex) shouldBe defined
+ indexData.count() shouldBe 0
+
+ sql(s"REFRESH SKIPPING INDEX ON $testTable")
+ indexData.count() shouldBe 1
+ }
+
+ test("describe skipping index") {
+ val result = sql(s"DESC SKIPPING INDEX ON $testTable")
+
+ checkAnswer(
+ result,
+ Seq(
+ Row("year", "int", "PARTITION"),
+ Row("name", "string", "VALUE_SET"),
+ Row("age", "int", "MIN_MAX")))
+ }
+
+ test("should return empty if no skipping index to describe") {
+ flint.deleteIndex(testIndex)
+
+ val result = sql(s"DESC SKIPPING INDEX ON $testTable")
+ checkAnswer(result, Seq.empty)
+ }
+
+ test("drop skipping index") {
+ sql(s"DROP SKIPPING INDEX ON $testTable")
+
+ flint.describeIndex(testIndex) shouldBe empty
+ }
+}
diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala
deleted file mode 100644
index f45be95325..0000000000
--- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSqlSuite.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright OpenSearch Contributors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package org.opensearch.flint.spark
-
-import scala.Option.empty
-
-import org.opensearch.flint.OpenSearchSuite
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
-import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
-
-import org.apache.spark.FlintSuite
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT}
-
-class FlintSparkSqlSuite extends QueryTest with FlintSuite with OpenSearchSuite {
-
- /** Flint Spark high level API for assertion */
- private lazy val flint: FlintSpark = {
- setFlintSparkConf(HOST_ENDPOINT, openSearchHost)
- setFlintSparkConf(HOST_PORT, openSearchPort)
- new FlintSpark(spark)
- }
-
- /** Test table and index name */
- private val testTable = "flint_sql_test"
- private val testIndex = getSkippingIndexName(testTable)
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- sql(s"""
- | CREATE TABLE $testTable
- | (
- | name STRING,
- | age INT
- | )
- | USING CSV
- | OPTIONS (
- | header 'false',
- | delimiter '\t'
- | )
- | PARTITIONED BY (
- | year INT,
- | month INT
- | )
- |""".stripMargin)
- }
-
- protected override def beforeEach(): Unit = {
- super.beforeEach()
- flint
- .skippingIndex()
- .onTable(testTable)
- .addPartitions("year")
- .addValueSet("name")
- .addMinMax("age")
- .create()
- }
-
- protected override def afterEach(): Unit = {
- super.afterEach()
- flint.deleteIndex(testIndex)
- }
-
- test("describe skipping index") {
- val result = sql(s"DESC SKIPPING INDEX ON $testTable")
-
- checkAnswer(
- result,
- Seq(
- Row("year", "int", "Partition"),
- Row("name", "string", "ValuesSet"),
- Row("age", "int", "MinMax")))
- }
-
- test("should return empty if no skipping index to describe") {
- flint.deleteIndex(testIndex)
-
- val result = sql(s"DESC SKIPPING INDEX ON $testTable")
- checkAnswer(result, Seq.empty)
- }
-
- test("drop skipping index") {
- sql(s"DROP SKIPPING INDEX ON $testTable")
-
- flint.describeIndex(testIndex) shouldBe empty
- }
-}