Skip to content

Commit

Permalink
FlintIndexMetadataReader refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Mar 14, 2024
1 parent a84c3ef commit e4f8d61
Show file tree
Hide file tree
Showing 27 changed files with 1,121 additions and 262 deletions.
20 changes: 20 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
;
Expand All @@ -46,6 +47,12 @@ describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

alterSkippingIndexStatement
: ALTER SKIPPING INDEX
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;
Expand All @@ -59,6 +66,7 @@ coveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| alterCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;
Expand All @@ -83,6 +91,12 @@ describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

alterCoveringIndexStatement
: ALTER INDEX indexName
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;
Expand All @@ -96,6 +110,7 @@ materializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| alterMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;
Expand All @@ -118,6 +133,11 @@ describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

alterMaterializedViewStatement
: ALTER MATERIALIZED VIEW mvName=multipartIdentifier
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ DOT: '.';


AS: 'AS';
ALTER: 'ALTER';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ COMMA: ',';
DOT: '.';
LEFT_BRACKET: '[';
RIGHT_BRACKET: ']';
BANG: '!';

// NOTE: If you add a new token in the list below, you should update the list of keywords
// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and
Expand Down Expand Up @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NOT: 'NOT' | '!';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
NUMERIC: 'NUMERIC';
Expand Down Expand Up @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message:
// * Unicode letters rather than a-z and A-Z only
// * URI paths for table references using paths
// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser.
IDENTIFIER
: (LETTER | DIGIT | '_')+
: (UNICODE_LETTER | DIGIT | '_')+
| UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+
;

BACKQUOTED_IDENTIFIER
Expand All @@ -535,6 +541,10 @@ fragment LETTER
: [A-Z]
;

fragment UNICODE_LETTER
: [\p{L}]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| BANG
;

describeColName
Expand Down Expand Up @@ -946,7 +947,7 @@ expressionSeq
;

booleanExpression
: NOT booleanExpression #logicalNot
: (NOT | BANG) booleanExpression #logicalNot
| EXISTS LEFT_PAREN query RIGHT_PAREN #exists
| valueExpression predicate? #predicated
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
Expand Down Expand Up @@ -59,7 +61,17 @@ public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataReader.getFlintIndexMetadata(
new FlintIndexDetailsRequest.Builder()
.indexPattern(indexDetails.openSearchIndexName())
.build());
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(indexDetails.openSearchIndexName());
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String error = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -15,6 +16,7 @@
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
Expand Down Expand Up @@ -42,8 +44,17 @@ public RefreshQueryHandler(
@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataReader.getFlintIndexMetadata(
new FlintIndexDetailsRequest.Builder()
.indexPattern(asyncQueryJobMetadata.getIndexName())
.build());
if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,22 @@

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID";
public static final String FLINT_INDEX_STATE_DOC_ID = "latestId";

private final String jobId;
private final boolean autoRefresh;
private final String appId;
private final String latestId;
private final FlintIndexStateModel indexStateModel;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
String appId = (String) envMap.getOrDefault(APP_ID, null);
String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null);
return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId);
public Optional<FlintIndexStateModel> getIndexState() {
return Optional.ofNullable(indexStateModel);
}

public Optional<String> getLatestId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import java.util.Map;
import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest;

/** Interface for FlintIndexMetadataReader */
public interface FlintIndexMetadataReader {

/**
* Given Index details, get the streaming job Id.
* Retrieves a map of {@link FlintIndexMetadata} instances matching the specified index pattern.
*
* @param indexQueryDetails indexDetails.
* @return FlintIndexMetadata.
* @param flintIndexDetailsRequest {@link FlintIndexDetailsRequest}
* @return A map of {@link FlintIndexMetadata} instances against indexName, each providing
* metadata access for a matched index. Returns an empty list if no indices match the pattern.
*/
FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails);

/**
* Given Index name, get the streaming job Id.
*
* @param indexName indexName.
* @return FlintIndexMetadata.
*/
FlintIndexMetadata getFlintIndexMetadata(String indexName);
Map<String, FlintIndexMetadata> getFlintIndexMetadata(
FlintIndexDetailsRequest flintIndexDetailsRequest);
}
Loading

0 comments on commit e4f8d61

Please sign in to comment.