Skip to content

Commit

Permalink
Add Flint create and refresh index SQL support (#1800)
Browse files Browse the repository at this point in the history
* Add create and refresh index

Signed-off-by: Chen Dai <daichen@amazon.com>

* Rename skipping kind to match its name in SQL parser

Signed-off-by: Chen Dai <daichen@amazon.com>

* Support auto refresh property in WITH clause

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update doc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add check for manual refresh on incremental refresh index

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add limitation to doc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix IT failure

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Jul 12, 2023
1 parent b9eb0ea commit 22b9c0a
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 123 deletions.
29 changes: 16 additions & 13 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ A Flint index is ...

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

| Skipping Index | Create Index Statement (TBD) | Index Building Logic | Query Rewrite Logic |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic |
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_LIST<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| Min-Max | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100

### Flint Index Specification

Expand Down Expand Up @@ -122,8 +122,11 @@ DDL statement:
```sql
CREATE SKIPPING INDEX
ON <object>
FOR COLUMNS ( column <index_type> [, ...] )
( column <index_type> [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>

Expand All @@ -135,23 +138,20 @@ DROP SKIPPING INDEX ON <object>
Skipping index type:

```sql
<index_type> ::= { <bloom_filter>, <min_max>, <value_set> }

<bloom_filter> ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD
<min_max> ::= MIN_MAX
<value_set> ::= VALUE_SET
<index_type> ::= { PARTITION, VALUE_SET, MIN_MAX }
```

Example:

```sql
CREATE SKIPPING INDEX ON alb_logs
FOR COLUMNS (
client_ip BLOOM_FILTER,
(
elb_status_code VALUE_SET
)
WHERE time > '2023-04-01 00:00:00'

REFRESH SKIPPING INDEX ON alb_logs

DESCRIBE SKIPPING INDEX ON alb_logs

DROP SKIPPING INDEX ON alb_logs
Expand Down Expand Up @@ -258,7 +258,6 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)
Expand Down Expand Up @@ -302,3 +301,7 @@ val df = new SQLContext(sc).read
## Benchmarks

TODO

## Limitations

Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,34 @@ statement
;

skippingIndexStatement
: describeSkippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName=multipartIdentifier
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName=multipartIdentifier
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;
62 changes: 59 additions & 3 deletions flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,31 @@ grammar SparkSqlBase;
}


propertyList
: property (COMMA property)*
;

property
: key=propertyKey (EQ? value=propertyValue)?
;

propertyKey
: identifier (DOT identifier)*
| STRING
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;

booleanValue
: TRUE | FALSE
;


multipartIdentifier
: parts+=identifier (DOT parts+=identifier)*
;
Expand All @@ -106,20 +131,46 @@ nonReserved

// Flint lexical tokens

SKIPPING : 'SKIPPING';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';


// Spark lexical tokens

SEMICOLON: ';';

LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';


CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DOT: '.';
DROP: 'DROP';
FALSE: 'FALSE';
INDEX: 'INDEX';
MINUS: '-';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';


EQ : '=' | '==';
MINUS: '-';


INTEGER_VALUE
: DIGIT+
;

DECIMAL_VALUE
: DECIMAL_DIGITS {isValidDecimal()}?
;

IDENTIFIER
: (LETTER | DIGIT | '_')+
Expand All @@ -129,6 +180,11 @@ BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;

fragment DECIMAL_DIGITS
: DIGIT+ '.' DIGIT*
| '.' DIGIT+
;

fragment DIGIT
: [0-9]
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy
Expand Down Expand Up @@ -101,6 +101,9 @@ class FlintSpark(val spark: SparkSession) {
}

mode match {
case FULL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")
case FULL =>
writeFlintIndex(
spark.read
Expand All @@ -113,6 +116,7 @@ class FlintSpark(val spark: SparkSession) {
val job = spark.readStream
.table(tableName)
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
Expand Down Expand Up @@ -156,6 +160,9 @@ class FlintSpark(val spark: SparkSession) {
}
}

private def isIncrementalRefreshing(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

// TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed
private def getSourceTableName(index: FlintSparkIndex): String = {
val json = parse(index.metadata().getContent)
Expand All @@ -182,11 +189,11 @@ class FlintSpark(val spark: SparkSession) {
val columnType = (colInfo \ "columnType").extract[String]

skippingKind match {
case Partition =>
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case MinMax =>
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet, MinMax = Value
val PARTITION, VALUE_SET, MIN_MAX = Value
}

/** json4s doesn't serialize Enum by default */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.minmax

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
Expand All @@ -17,7 +17,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
override val kind: SkippingKind = MinMax,
override val kind: SkippingKind = MIN_MAX,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.partition

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{PARTITION, SkippingKind}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy for partitioned columns of source table.
*/
case class PartitionSkippingStrategy(
override val kind: SkippingKind = Partition,
override val kind: SkippingKind = PARTITION,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on unique column value set.
*/
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = ValuesSet,
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Loading

0 comments on commit 22b9c0a

Please sign in to comment.