From ae7b5291785266a1fc599e4e5af31911e49daad9 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Tue, 19 Mar 2024 21:44:49 -0700 Subject: [PATCH] Handle EMR Exceptions in FlintCancelJob Operation Signed-off-by: Vamsi Manohar --- docs/user/admin/settings.rst | 12 +- .../sql/datasource/DataSourceAPIsIT.java | 18 +- .../sql/spark/client/EMRServerlessClient.java | 3 +- .../spark/client/EmrServerlessClientImpl.java | 17 +- .../spark/dispatcher/BatchQueryHandler.java | 2 +- .../execution/session/InteractiveSession.java | 3 +- .../spark/flint/operation/FlintIndexOp.java | 17 +- .../AsyncQueryExecutorServiceSpec.java | 3 +- .../asyncquery/IndexQuerySpecAlterTest.java | 1073 +++++++++++++++++ .../spark/asyncquery/IndexQuerySpecTest.java | 894 +------------- .../asyncquery/IndexQuerySpecVacuumTest.java | 8 +- .../asyncquery/model/MockFlintIndex.java | 3 +- .../client/EmrServerlessClientImplTest.java | 28 +- .../dispatcher/SparkQueryDispatcherTest.java | 4 +- .../session/InteractiveSessionTest.java | 3 +- 15 files changed, 1179 insertions(+), 909 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 165ab97c09..6531e84aa1 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -420,7 +420,7 @@ SQL query:: plugins.query.executionengine.spark.session_inactivity_timeout_millis -=============================== +===================================================================== Description ----------- @@ -456,7 +456,7 @@ SQL query:: plugins.query.executionengine.spark.auto_index_management.enabled -=============================== +================================================================= Description ----------- @@ -492,7 +492,7 @@ SQL query:: plugins.query.executionengine.spark.session.index.ttl -=============================== +===================================================== Description ----------- @@ -529,7 +529,7 @@ SQL query:: plugins.query.executionengine.spark.result.index.ttl -=============================== +==================================================== Description ----------- @@ -565,7 +565,7 @@ SQL query:: } plugins.query.executionengine.async_query.enabled -=============================== +================================================= Description ----------- @@ -596,7 +596,7 @@ Request:: } plugins.query.executionengine.spark.streamingjobs.housekeeper.interval -=============================== +====================================================================== Description ----------- diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index 6756baa61b..70bece480a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -11,7 +11,10 @@ import static org.opensearch.sql.datasources.utils.XContentParserUtils.DESCRIPTION_FIELD; import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD; import static org.opensearch.sql.datasources.utils.XContentParserUtils.STATUS_FIELD; +import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient; import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.legacy.TestUtils.loadDataByRestClient; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; @@ -37,11 +40,6 @@ public class DataSourceAPIsIT extends PPLIntegTestCase { - @Override - protected void init() throws Exception { - loadIndex(Index.DATASOURCES); - } - @After public void cleanUp() throws IOException { wipeAllClusterSettings(); @@ -397,6 +395,16 @@ public void patchDataSourceAPITest() { @SneakyThrows @Test public void testOldDataSourceModelLoadingThroughGetDataSourcesAPI() { + Index index = Index.DATASOURCES; + String indexName = index.getName(); + String mapping = index.getMapping(); + String dataSet = index.getDataSet(); + if (!isIndexExist(client(), indexName)) { + createIndexByRestClient(client(), indexName, mapping); + } + loadDataByRestClient(client(), indexName, dataSet); + // waiting for loaded indices. + Thread.sleep(1000); // get datasource to validate the creation. Request getRequest = getFetchDataSourceRequest(null); Response getResponse = client().performRequest(getRequest); diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java index 7e64b632ea..98c115fde9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java @@ -41,5 +41,6 @@ public interface EMRServerlessClient { * @param jobId jobId. * @return {@link CancelJobRunResult} */ - CancelJobRunResult cancelJobRun(String applicationId, String jobId); + CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 3a47eb21a7..c452e15ebc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient { private static final int MAX_JOB_NAME_LENGTH = 255; - private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; + public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { this.emrServerless = emrServerless; @@ -98,7 +98,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { } @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId); CancelJobRunResult cancelJobRunResult = @@ -108,10 +109,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { try { return emrServerless.cancelJobRun(cancelJobRunRequest); } catch (Throwable t) { - logger.error("Error while making cancel job request to emr:", t); - MetricUtils.incrementNumericalMetric( - MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); - throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); + if (allowExceptionPropagation) { + throw t; + } else { + logger.error("Error while making cancel job request to emr:", t); + MetricUtils.incrementNumericalMetric( + MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); + } } }); logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 0153291eb8..e9356e5bed 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -58,7 +58,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob @Override public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { emrServerlessClient.cancelJobRun( - asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); + asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false); return asyncQueryJobMetadata.getQueryId().getId(); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 254c5a34b4..2363615a7d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -80,7 +80,8 @@ public void close() { if (model.isEmpty()) { throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId()); } else { - serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId()); + serverlessClient.cancelJobRun( + sessionModel.getApplicationId(), sessionModel.getJobId(), false); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index 0e99c18eef..8d5e301631 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -5,10 +5,12 @@ package org.opensearch.sql.spark.flint.operation; +import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE; import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; +import com.amazonaws.services.emrserverless.model.ValidationException; import java.util.Locale; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -145,11 +147,18 @@ public void cancelStreamingJob( String jobId = flintIndexStateModel.getJobId(); try { emrServerlessClient.cancelJobRun( - flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId()); - } catch (IllegalArgumentException e) { - // handle job does not exist case. + flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true); + } catch (ValidationException e) { + // Exception when the job is not in cancellable state and already in terminal state. + if (e.getMessage().contains("Job run is not in a cancellable state")) { + LOG.error(e); + return; + } else { + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); + } + } catch (Exception e) { LOG.error(e); - return; + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } // pull job state until timeout or cancelled. diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c064067e26..d1ca50343f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -243,7 +243,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { } @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { cancelJobRunCalled++; return new CancelJobRunResult().withJobRunId(jobId); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java new file mode 100644 index 0000000000..ddefebcf77 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java @@ -0,0 +1,1073 @@ +package org.opensearch.sql.spark.asyncquery; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.ValidationException; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.client.StartJobRequest; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class IndexQuerySpecAlterTest extends AsyncQueryExecutorServiceSpec { + + @Test + public void testAlterIndexQueryConvertingToManualRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryConvertingToManualRefreshWithNoIncrementalRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithRedundantOperation() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public String startJobRun(StartJobRequest startJobRequest) { + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); + throw new ValidationException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessCientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessCientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + emrsClient.getJobRunResultCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryConvertingToAutoRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=true," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=true," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=true," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient localEMRSClient = new LocalEMRSClient(); + EMRServerlessClientFactory clientFactory = () -> localEMRSClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(clientFactory); + + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "RUNNING", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + localEMRSClient.startJobRunCalled(1); + localEMRSClient.getJobRunResultCalled(1); + localEMRSClient.cancelJobRunCalled(0); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithOutAnyAutoRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (" + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (" + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (" + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient localEMRSClient = new LocalEMRSClient(); + EMRServerlessClientFactory clientFactory = () -> localEMRSClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(clientFactory); + + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "RUNNING", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + localEMRSClient.startJobRunCalled(1); + localEMRSClient.getJobRunResultCalled(1); + localEMRSClient.cancelJobRunCalled(0); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfFullRefreshWithInvalidOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\") "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Altering to full refresh only allows: [auto_refresh, incremental_refresh]" + + " options", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInvalidOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Altering to incremental refresh only allows: [auto_refresh, incremental_refresh," + + " watermark_delay, checkpoint_location] options", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true)"); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptionsForMV() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location, watermark_delay.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithEmptyExistingOptionsForMV() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + existingOptions.put("watermark_delay", ""); + existingOptions.put("checkpoint_location", ""); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location, watermark_delay.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefresh() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + existingOptions.put("watermark_delay", "watermark_delay"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.refreshing(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.getJobRunResultCalled(1); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + Assertions.assertEquals("true", options.get("incremental_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithIncrementalRefreshAlreadyExisting() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "true"); + existingOptions.put("watermark_delay", "watermark_delay"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.refreshing(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.getJobRunResultCalled(1); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + Assertions.assertEquals("true", options.get("incremental_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithInvalidInitialState() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.updating(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Transaction failed as flint index is not in a valid state.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.UPDATING); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithValidationExceptionWithSuccess() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); + throw new ValidationException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + emrsClient.getJobRunResultCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithResourceNotFoundExceptionWithSuccess() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); + throw new ValidationException("Random validation exception"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals("Internal Server Error.", asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + emrsClient.getJobRunResultCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithUnknownError() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); + throw new IllegalArgumentException("Unknown Error"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals("Internal Server Error.", asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + emrsClient.getJobRunResultCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index ff262c24c0..864a87586f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -8,9 +8,8 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.ValidationException; import com.google.common.collect.ImmutableList; -import java.util.HashMap; -import java.util.Map; import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -18,7 +17,6 @@ import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; @@ -171,8 +169,9 @@ public void legacyDropIndexNoJobRunning() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - throw new IllegalArgumentException("Job run is not in a cancellable state"); + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + throw new ValidationException("Job run is not in a cancellable state"); } }; EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; @@ -246,8 +245,9 @@ public void legacyDropIndexSpecialCharacter() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - throw new IllegalArgumentException("Job run is not in a cancellable state"); + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + throw new ValidationException("Job run is not in a cancellable state"); } }; EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; @@ -339,8 +339,9 @@ public void dropIndexNoJobRunning() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - throw new IllegalArgumentException("Job run is not in a cancellable state"); + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { + throw new ValidationException("Job run is not in a cancellable state"); } }; EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; @@ -613,7 +614,8 @@ public void dropIndexWithIndexInDeletedState() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { Assert.fail("should not call cancelJobRun"); return null; } @@ -664,7 +666,8 @@ public void dropIndexSpecialCharacter() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { throw new IllegalArgumentException("Job run is not in a cancellable state"); } }; @@ -687,10 +690,10 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { // 2.fetch result. AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryResults.getStatus()); - assertNull(asyncQueryResults.getError()); + assertEquals("FAILED", asyncQueryResults.getStatus()); + assertEquals("Internal Server Error.", asyncQueryResults.getError()); - flintIndexJob.assertState(FlintIndexState.DELETED); + flintIndexJob.assertState(FlintIndexState.REFRESHING); } /** @@ -706,7 +709,8 @@ public void edgeCaseNoIndexStateDoc() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { Assert.fail("should not call cancelJobRun"); return null; } @@ -846,7 +850,8 @@ public void cancelAutoRefreshCreateFlintIndexShouldThrowException() { LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { Assert.fail("should not call cancelJobRun"); return null; } @@ -1003,861 +1008,4 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { IllegalStateException.class, () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); } - - @Test - public void testAlterIndexQueryConvertingToManualRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryConvertingToManualRefreshWithNoIncrementalRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false)"); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithRedundantOperation() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public String startJobRun(StartJobRequest startJobRequest) { - return "jobId"; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - super.cancelJobRun(applicationId, jobId); - throw new IllegalArgumentException("JobId doesn't exist"); - } - }; - EMRServerlessClientFactory emrServerlessCientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessCientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - emrsClient.getJobRunResultCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryConvertingToAutoRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=true," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=true," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=true," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient localEMRSClient = new LocalEMRSClient(); - EMRServerlessClientFactory clientFactory = () -> localEMRSClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(clientFactory); - - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "RUNNING", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.ACTIVE); - localEMRSClient.startJobRunCalled(1); - localEMRSClient.getJobRunResultCalled(1); - localEMRSClient.cancelJobRunCalled(0); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithOutAnyAutoRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (" - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (" - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (" + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient localEMRSClient = new LocalEMRSClient(); - EMRServerlessClientFactory clientFactory = () -> localEMRSClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(clientFactory); - - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "RUNNING", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.ACTIVE); - localEMRSClient.startJobRunCalled(1); - localEMRSClient.getJobRunResultCalled(1); - localEMRSClient.cancelJobRunCalled(0); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfFullRefreshWithInvalidOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\") "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Altering to full refresh only allows: [auto_refresh, incremental_refresh]" - + " options", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInvalidOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Altering to incremental refresh only allows: [auto_refresh, incremental_refresh," - + " watermark_delay, checkpoint_location] options", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true)"); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptionsForMV() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location, watermark_delay.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithEmptyExistingOptionsForMV() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - existingOptions.put("watermark_delay", ""); - existingOptions.put("checkpoint_location", ""); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location, watermark_delay.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefresh() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - existingOptions.put("watermark_delay", "watermark_delay"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.refreshing(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.getJobRunResultCalled(1); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - Assertions.assertEquals("true", options.get("incremental_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithIncrementalRefreshAlreadyExisting() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "true"); - existingOptions.put("watermark_delay", "watermark_delay"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.refreshing(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.getJobRunResultCalled(1); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - Assertions.assertEquals("true", options.get("incremental_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithInvalidInitialState() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - ImmutableList.of(ALTER_SKIPPING) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.updating(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Transaction failed as flint index is not in a valid state.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.UPDATING); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index 8cee412f02..ee201b5151 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -19,6 +19,7 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.ValidationException; import com.google.common.collect.Lists; import java.util.Base64; import java.util.List; @@ -75,7 +76,7 @@ public void shouldVacuumIndexInRefreshingState() { // Cancel EMR-S job, but not job running Pair.of( () -> { - throw new IllegalArgumentException("Job run is not in a cancellable state"); + throw new ValidationException("Job run is not in a cancellable state"); }, DEFAULT_OP))); @@ -177,9 +178,10 @@ private AsyncQueryExecutionResponse runVacuumTest( LocalEMRSClient emrsClient = new LocalEMRSClient() { @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { if (cancelJobRun == DEFAULT_OP) { - return super.cancelJobRun(applicationId, jobId); + return super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); } return cancelJobRun.call(); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java index 554de586b4..e25250fd09 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.asyncquery.model; -import java.util.HashMap; import java.util.Map; import lombok.Getter; import lombok.SneakyThrows; @@ -55,7 +54,7 @@ public Map getIndexMappings() { .getSourceAsMap(); } - public void updateIndexOptions(HashMap newOptions, Boolean replaceCompletely) { + public void updateIndexOptions(Map newOptions, Boolean replaceCompletely) { GetMappingsResponse mappingsResponse = client.admin().indices().prepareGetMappings().setIndices(indexName).get(); Map flintMetadataMap = diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index a5123e0174..4c2a850bb2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -160,7 +160,7 @@ void testCancelJobRun() { .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); CancelJobRunResult cancelJobRunResult = - emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false); Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); } @@ -169,7 +169,8 @@ void testCancelJobRunWithErrorMetric() { doThrow(new RuntimeException()).when(emrServerless).cancelJobRun(any()); EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); Assertions.assertThrows( - RuntimeException.class, () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, "123")); + RuntimeException.class, + () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, "123", false)); } @Test @@ -179,10 +180,31 @@ void testCancelJobRunWithValidationException() { RuntimeException runtimeException = Assertions.assertThrows( RuntimeException.class, - () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)); + () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false)); Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage()); } + @Test + void testCancelJobRunWithNativeEMRExceptionWithValidationException() { + doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + ValidationException validationException = + Assertions.assertThrows( + ValidationException.class, + () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, true)); + Assertions.assertTrue(validationException.getMessage().contains("Error")); + } + + @Test + void testCancelJobRunWithNativeEMRException() { + when(emrServerless.cancelJobRun(any())) + .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + CancelJobRunResult cancelJobRunResult = + emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, true); + Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); + } + @Test void testStartJobRunWithLongJobName() { StartJobRunResult response = new StartJobRunResult(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 9f58f7708d..429bd93872 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -742,7 +742,7 @@ void testDispatchWithUnSupportedDataSourceType() { @Test void testCancelJob() { - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false)) .thenReturn( new CancelJobRunResult() .withJobRunId(EMR_JOB_ID) @@ -802,7 +802,7 @@ void testCancelQueryWithInvalidStatementId() { @Test void testCancelQueryWithNoSessionId() { - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false)) .thenReturn( new CancelJobRunResult() .withJobRunId(EMR_JOB_ID) diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 6112261336..8fca190cd6 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -227,7 +227,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { } @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + public CancelJobRunResult cancelJobRun( + String applicationId, String jobId, boolean allowExceptionPropagation) { cancelJobRunCalled++; return null; }