Skip to content

Commit

Permalink
Fix IndexNotFound Exception after datasource creation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Nov 3, 2023
1 parent 2f2ecd2 commit 2c71216
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,13 @@ public interface DataSourceMetadataStorage {
* @param datasourceName name of the {@link DataSource}.
*/
void deleteDataSourceMetadata(String datasourceName);

/**
* Creates query result index for the given datasource. We have introduced this method to handle
* the issue: https://github.com/opensearch-project/sql/issues/2268 Once a long term solution is
* finalized, we can make a decision to remove or have it.
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void createDataSourceResultIndex(DataSourceMetadata dataSourceMetadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void createDataSource(DataSourceMetadata metadata) {
if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceLoaderCache.getOrLoadDataSource(metadata);
this.dataSourceMetadataStorage.createDataSourceMetadata(metadata);
this.dataSourceMetadataStorage.createDataSourceResultIndex(metadata);
}
}

Expand All @@ -90,6 +91,8 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
if (!dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
this.dataSourceMetadataStorage.updateDataSourceMetadata(dataSourceMetadata);
// If resultIndex is changed in update call
this.dataSourceMetadataStorage.createDataSourceResultIndex(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt
private static final Integer DATASOURCE_QUERY_RESULT_SIZE = 10000;
private static final String DATASOURCE_INDEX_SETTINGS_FILE_NAME =
"datasources-index-settings.yml";
private static final String DATASOURCE_QUERY_RESULT_INDEX_MAPPING_FILE_NAME =
"query_execution_result_mapping.yml";
private static final String DATASOURCE_QUERY_RESULT_INDEX_SETTING_FILE_NAME =
"query_execution_result_settings.yml";

private static final Logger LOG = LogManager.getLogger();
private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -183,6 +188,44 @@ public void deleteDataSourceMetadata(String datasourceName) {
}
}

@Override
public void createDataSourceResultIndex(DataSourceMetadata dataSourceMetadata) {
if (!this.clusterService.state().routingTable().hasIndex(dataSourceMetadata.getResultIndex())) {
try {
InputStream mappingFileStream =
OpenSearchDataSourceMetadataStorage.class
.getClassLoader()
.getResourceAsStream(DATASOURCE_QUERY_RESULT_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpenSearchDataSourceMetadataStorage.class
.getClassLoader()
.getResourceAsStream(DATASOURCE_QUERY_RESULT_INDEX_SETTING_FILE_NAME);
CreateIndexRequest createIndexRequest =
new CreateIndexRequest(dataSourceMetadata.getResultIndex());
createIndexRequest
.mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML)
.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest);
}
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();
if (createIndexResponse.isAcknowledged()) {
LOG.info(
"Index: {} creation Acknowledged", dataSourceMetadata.fromNameToCustomResultIndex());
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
LOG.error("Datasource Query Result Index Creation failed due to :{}", e.getMessage());
// Suppressing all exceptions as this is not an important step in datasource creation
// workflow.
}
}
}

private void createDataSourcesIndex() {
try {
InputStream mappingFileStream =
Expand Down
37 changes: 37 additions & 0 deletions datasources/src/main/resources/query_execution_result_mapping.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
##
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
##

# Schema file for the query_execution_request index
# Also "dynamic" is set to "false" so that other fields can be added.,
dynamic: false
properties:
result:
type: object
enabled: false
schema:
type: object
enabled: false
jobRunId:
type: keyword
applicationId:
type: keyword
dataSourceName:
type: keyword
status:
type: keyword
queryId:
type: keyword
queryText:
type: text
sessionId:
type: keyword
updateTime:
type: date
format: strict_date_time||epoch_millis
error:
type: text
queryRunTime:
type: long
11 changes: 11 additions & 0 deletions datasources/src/main/resources/query_execution_result_settings.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
##
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
##

# Settings file for the query_execution_result index
index:
number_of_shards: "1"
auto_expand_replicas: "0-2"
number_of_replicas: "0"
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,56 @@ public void testDeleteDataSourceMetadataWithUnexpectedResult() {
Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(1)).stashContext();
}

@Test
public void testCreateDataSourceResultIndexWithOutCreatingIndex() {
DataSourceMetadata dataSourceMetadata = getDataSourceMetadata();
Mockito.when(
clusterService.state().routingTable().hasIndex(dataSourceMetadata.getResultIndex()))
.thenReturn(Boolean.TRUE);
this.openSearchDataSourceMetadataStorage.createDataSourceResultIndex(dataSourceMetadata);

Mockito.verify(client.admin().indices(), Mockito.times(0)).create(ArgumentMatchers.any());
Mockito.verify(client, Mockito.times(0)).index(ArgumentMatchers.any());
Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(0)).stashContext();
}

@Test
public void testCreateDataSourceResultIndexWhenIndexIsNotAvailable() {
DataSourceMetadata dataSourceMetadata = getDataSourceMetadata();
Mockito.when(
clusterService.state().routingTable().hasIndex(dataSourceMetadata.getResultIndex()))
.thenReturn(Boolean.FALSE);
Mockito.when(client.admin().indices().create(ArgumentMatchers.any()))
.thenReturn(createIndexResponseActionFuture);
Mockito.when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(true, true, dataSourceMetadata.getResultIndex()));

this.openSearchDataSourceMetadataStorage.createDataSourceResultIndex(dataSourceMetadata);

Mockito.verify(client.admin().indices(), Mockito.times(1)).create(ArgumentMatchers.any());
Mockito.verify(client, Mockito.times(0)).index(ArgumentMatchers.any());
Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(1)).stashContext();
}

@Test
public void testCreateDataSourceResultIndexWithException() {

DataSourceMetadata dataSourceMetadata = getDataSourceMetadata();
Mockito.when(
clusterService.state().routingTable().hasIndex(dataSourceMetadata.getResultIndex()))
.thenReturn(Boolean.FALSE);
Mockito.when(client.admin().indices().create(ArgumentMatchers.any()))
.thenReturn(createIndexResponseActionFuture);
Mockito.when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(false, false, dataSourceMetadata.getResultIndex()));

this.openSearchDataSourceMetadataStorage.createDataSourceResultIndex(dataSourceMetadata);

Mockito.verify(client.admin().indices(), Mockito.times(1)).create(ArgumentMatchers.any());
Mockito.verify(client, Mockito.times(0)).index(ArgumentMatchers.any());
Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(1)).stashContext();
}

private String getBasicDataSourceMetadataString() throws JsonProcessingException {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("testDS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {}

@Override
public void deleteDataSourceMetadata(String datasourceName) {}

@Override
public void createDataSourceResultIndex(DataSourceMetadata dataSourceMetadata) {}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ public void setup() {
null);
dataSourceService.createDataSource(otherDm);
stateStore = new StateStore(client, clusterService);
createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings());
createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings());
}

@After
Expand Down

0 comments on commit 2c71216

Please sign in to comment.