Skip to content

Commit

Permalink
FlintIndexMetadataReader refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Mar 14, 2024
1 parent a84c3ef commit 0af4153
Show file tree
Hide file tree
Showing 41 changed files with 1,586 additions and 474 deletions.
20 changes: 20 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
;
Expand All @@ -46,6 +47,12 @@ describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

alterSkippingIndexStatement
: ALTER SKIPPING INDEX
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;
Expand All @@ -59,6 +66,7 @@ coveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| alterCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;
Expand All @@ -83,6 +91,12 @@ describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

alterCoveringIndexStatement
: ALTER INDEX indexName
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;
Expand All @@ -96,6 +110,7 @@ materializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| alterMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;
Expand All @@ -118,6 +133,11 @@ describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

alterMaterializedViewStatement
: ALTER MATERIALIZED VIEW mvName=multipartIdentifier
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ DOT: '.';


AS: 'AS';
ALTER: 'ALTER';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ COMMA: ',';
DOT: '.';
LEFT_BRACKET: '[';
RIGHT_BRACKET: ']';
BANG: '!';

// NOTE: If you add a new token in the list below, you should update the list of keywords
// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and
Expand Down Expand Up @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NOT: 'NOT' | '!';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
NUMERIC: 'NUMERIC';
Expand Down Expand Up @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message:
// * Unicode letters rather than a-z and A-Z only
// * URI paths for table references using paths
// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser.
IDENTIFIER
: (LETTER | DIGIT | '_')+
: (UNICODE_LETTER | DIGIT | '_')+
| UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+
;

BACKQUOTED_IDENTIFIER
Expand All @@ -535,6 +541,10 @@ fragment LETTER
: [A-Z]
;

fragment UNICODE_LETTER
: [\p{L}]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| BANG
;

describeColName
Expand Down Expand Up @@ -946,7 +947,7 @@ expressionSeq
;

booleanExpression
: NOT booleanExpression #logicalNot
: (NOT | BANG) booleanExpression #logicalNot
| EXISTS LEFT_PAREN query RIGHT_PAREN #exists
| valueExpression predicate? #predicated
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -27,52 +28,62 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
@RequiredArgsConstructor
public class IndexDMLHandler extends AsyncQueryHandler {
private static final Logger LOG = LogManager.getLogger();

public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DMLQUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final FlintIndexMetadataService flintIndexMetadataService;

private final Client client;

private final StateStore stateStore;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId);
return DMLQUERY_JOB_ID.equalsIgnoreCase(jobId);
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(
new FlintIndexDetailsRequest.Builder()
.indexPattern(indexDetails.openSearchIndexName())
.build());
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(indexDetails.openSearchIndexName());
FlintIndexOp flintIndexOp;
switch (indexDetails.getIndexQueryActionType()) {
case DROP:
flintIndexOp = new FlintIndexOpDrop(stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
case ALTER:
flintIndexOp = new
}
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String error = "";
long startTime = 0L;
try {
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
jobCancelOp.apply(indexMetadata);

FlintIndexOp indexDeleteOp =
new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource());
indexDeleteOp.apply(indexMetadata);
flintIndexOp.apply(indexMetadata);
status = JobRunState.SUCCESS.toString();
} catch (Exception e) {
error = e.getMessage();
Expand All @@ -91,7 +102,7 @@ public DispatchQueryResponse submit(
String resultIndex = dataSourceMetadata.getResultIndex();
createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult);

return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null);
return new DispatchQueryResponse(asyncQueryId, DMLQUERY_JOB_ID, resultIndex, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -14,7 +15,8 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
Expand All @@ -23,27 +25,36 @@
/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(
new FlintIndexDetailsRequest.Builder()
.indexPattern(asyncQueryJobMetadata.getIndexName())
.build());
if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
Expand All @@ -48,7 +48,7 @@ public class SparkQueryDispatcher {

private JobExecutionResponseReader jobExecutionResponseReader;

private FlintIndexMetadataReader flintIndexMetadataReader;
private FlintIndexMetadataService flintIndexMetadataService;

private Client client;

Expand Down Expand Up @@ -94,7 +94,7 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
flintIndexMetadataService,
stateStore,
leaseManager);
}
Expand Down Expand Up @@ -128,7 +128,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
flintIndexMetadataService,
stateStore,
leaseManager);
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
Expand All @@ -145,7 +145,7 @@ private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessC
return new IndexDMLHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
flintIndexMetadataService,
client,
stateStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public enum IndexQueryActionType {
REFRESH,
DESCRIBE,
SHOW,
DROP
DROP,
ALTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,22 @@

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID";
public static final String FLINT_INDEX_STATE_DOC_ID = "latestId";

private final String jobId;
private final boolean autoRefresh;
private final String appId;
private final String latestId;
private final FlintIndexStateModel indexStateModel;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
String appId = (String) envMap.getOrDefault(APP_ID, null);
String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null);
return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId);
public Optional<FlintIndexStateModel> getIndexState() {
return Optional.ofNullable(indexStateModel);
}

public Optional<String> getLatestId() {
Expand Down
Loading

0 comments on commit 0af4153

Please sign in to comment.