Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flint create and refresh index SQL support #1800

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ A Flint index is ...

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

| Skipping Index | Create Index Statement (TBD) | Index Building Logic | Query Rewrite Logic |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic |
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_LIST<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| Min-Max | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100

### Flint Index Specification

Expand Down Expand Up @@ -122,8 +122,11 @@ DDL statement:
```sql
CREATE SKIPPING INDEX
ON <object>
FOR COLUMNS ( column <index_type> [, ...] )
( column <index_type> [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>

Expand All @@ -135,23 +138,20 @@ DROP SKIPPING INDEX ON <object>
Skipping index type:

```sql
<index_type> ::= { <bloom_filter>, <min_max>, <value_set> }

<bloom_filter> ::= BLOOM_FILTER( bitCount, numOfHashFunctions ) #TBD
<min_max> ::= MIN_MAX
<value_set> ::= VALUE_SET
<index_type> ::= { PARTITION, VALUE_SET, MIN_MAX }
```

Example:

```sql
CREATE SKIPPING INDEX ON alb_logs
FOR COLUMNS (
client_ip BLOOM_FILTER,
(
elb_status_code VALUE_SET
)
WHERE time > '2023-04-01 00:00:00'

REFRESH SKIPPING INDEX ON alb_logs

DESCRIBE SKIPPING INDEX ON alb_logs

DROP SKIPPING INDEX ON alb_logs
Expand Down Expand Up @@ -257,7 +257,6 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,34 @@ statement
;

skippingIndexStatement
: describeSkippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName=multipartIdentifier
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName=multipartIdentifier
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName=multipartIdentifier
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;
62 changes: 59 additions & 3 deletions flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,31 @@ grammar SparkSqlBase;
}


propertyList
: property (COMMA property)*
;

property
: key=propertyKey (EQ? value=propertyValue)?
;

propertyKey
: identifier (DOT identifier)*
| STRING
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;

booleanValue
: TRUE | FALSE
;


multipartIdentifier
: parts+=identifier (DOT parts+=identifier)*
;
Expand All @@ -106,20 +131,46 @@ nonReserved

// Flint lexical tokens

SKIPPING : 'SKIPPING';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';


// Spark lexical tokens

SEMICOLON: ';';

LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';


CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DOT: '.';
DROP: 'DROP';
FALSE: 'FALSE';
INDEX: 'INDEX';
MINUS: '-';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';


EQ : '=' | '==';
MINUS: '-';


INTEGER_VALUE
: DIGIT+
;

DECIMAL_VALUE
: DECIMAL_DIGITS {isValidDecimal()}?
;

IDENTIFIER
: (LETTER | DIGIT | '_')+
Expand All @@ -129,6 +180,11 @@ BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;

fragment DECIMAL_DIGITS
: DIGIT+ '.' DIGIT*
| '.' DIGIT+
;

fragment DIGIT
: [0-9]
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, Ref
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, Partition, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy
Expand Down Expand Up @@ -89,6 +89,9 @@ class FlintSpark(val spark: SparkSession) {
}

mode match {
case FULL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")
case FULL =>
writeFlintIndex(
spark.read
Expand All @@ -101,6 +104,7 @@ class FlintSpark(val spark: SparkSession) {
val job = spark.readStream
.table(tableName)
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
Expand Down Expand Up @@ -144,6 +148,9 @@ class FlintSpark(val spark: SparkSession) {
}
}

private def isIncrementalRefreshing(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed
private def getSourceTableName(index: FlintSparkIndex): String = {
val json = parse(index.metadata().getContent)
Expand All @@ -170,11 +177,11 @@ class FlintSpark(val spark: SparkSession) {
val columnType = (colInfo \ "columnType").extract[String]

skippingKind match {
case Partition =>
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case ValuesSet =>
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
case MinMax =>
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
throw new IllegalStateException(s"Unknown skipping strategy: $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object FlintSparkSkippingStrategy {
type SkippingKind = Value

// Use Value[s]Set because ValueSet already exists in Enumeration
val Partition, ValuesSet, MinMax = Value
val PARTITION, VALUE_SET, MIN_MAX = Value
}

/** json4s doesn't serialize Enum by default */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.minmax

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MinMax, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, SkippingKind}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
Expand All @@ -17,7 +17,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on min-max boundary of column values.
*/
case class MinMaxSkippingStrategy(
override val kind: SkippingKind = MinMax,
override val kind: SkippingKind = MIN_MAX,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.partition

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{Partition, SkippingKind}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{PARTITION, SkippingKind}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, First}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy for partitioned columns of source table.
*/
case class PartitionSkippingStrategy(
override val kind: SkippingKind = Partition,
override val kind: SkippingKind = PARTITION,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, ValuesSet}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, Predicate}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, CollectSet}
Expand All @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col
* Skipping strategy based on unique column value set.
*/
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = ValuesSet,
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
extends FlintSparkSkippingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.flint.spark.sql

import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{DescribeSkippingIndexStatementContext, DropSkippingIndexStatementContext}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand All @@ -19,6 +22,41 @@ import org.apache.spark.sql.types.StringType
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {

override def visitCreateSkippingIndexStatement(
ctx: CreateSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
// Create skipping index
val indexBuilder = flint
.skippingIndex()
.onTable(ctx.tableName.getText)

ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val indexName = getSkippingIndexName(ctx.tableName.getText)
flint.refreshIndex(indexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}

override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(ctx.tableName.getText)
flint.refreshIndex(indexName, RefreshMode.FULL)
Seq.empty
}

override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
Expand Down Expand Up @@ -46,6 +84,21 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
Seq.empty
}

private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
if (ctx == null) {
false
} else {
ctx
.property()
.forEach(p => {
if (p.key.getText == "auto_refresh") {
return p.value.getText.toBoolean
}
})
false
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}
Loading