From edd9c70cc75ac78b93bdab3d48752bafe3962b1f Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 11 Sep 2023 16:14:20 -0700 Subject: [PATCH 1/6] Initial impelmentation of CreateIndex Signed-off-by: Owais Kazi --- .../workflow/CreateIndexStep.java | 45 +++++++++++++++++++ .../resources/mappings/knn-index-mapping.json | 1 + 2 files changed, 46 insertions(+) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java create mode 100644 src/main/resources/mappings/knn-index-mapping.json diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java new file mode 100644 index 000000000..3786313fb --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.client.AdminClient; +import org.opensearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.net.URL; +import java.util.concurrent.CompletableFuture; + +public class CreateIndexStep implements Workflow { + + AdminClient adminClient; + + @Override + public CompletableFuture execute() throws Exception { + + // ActionListener actionListener + CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) + .settings(settings); + adminClient.indices().create(request, actionListener); + + } + + /** + * Get index mapping json content. + * + * @return index mapping + * @throws IOException IOException if mapping file can't be read correctly + */ + public static String getIndexMappings(String mappingFileName) throws IOException { + URL url = CreateIndexStep.class.getClassLoader().getResource(mappingFileName); + return Resources.toString(url, Charsets.UTF_8); + } +} diff --git a/src/main/resources/mappings/knn-index-mapping.json b/src/main/resources/mappings/knn-index-mapping.json new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/src/main/resources/mappings/knn-index-mapping.json @@ -0,0 +1 @@ + From b8869ef10c1de0204ce46935b3b7f945655a6186 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Wed, 13 Sep 2023 15:50:49 -0700 Subject: [PATCH 2/6] Adds CreateIndex building block Signed-off-by: Owais Kazi --- .../flowframework/FlowFrameworkPlugin.java | 38 +++++++++++++++++- .../workflow/CreateIndexStep.java | 40 ++++++++++++++++--- .../resources/mappings/knn-index-mapping.json | 17 +++++++- 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index f810767eb..09cae4940 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -8,11 +8,47 @@ */ package org.opensearch.flowframework; +import com.google.common.collect.ImmutableList; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.function.Supplier; /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. */ public class FlowFrameworkPlugin extends Plugin { - // Implement the relevant Plugin Interfaces here + + private Client client; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.client = client; + CreateIndexStep createIndexStep = new CreateIndexStep(client); + return ImmutableList.of(createIndexStep); + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 3786313fb..9ebd9aade 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -10,26 +10,54 @@ import com.google.common.base.Charsets; import com.google.common.io.Resources; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.client.AdminClient; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; import java.io.IOException; import java.net.URL; import java.util.concurrent.CompletableFuture; -public class CreateIndexStep implements Workflow { +public class CreateIndexStep implements WorkflowStep { - AdminClient adminClient; + private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); + private Client client; + private final String CREATE_INDEX_STEP = "create_index_step"; + + public CreateIndexStep(Client client) { + this.client = client; + } @Override - public CompletableFuture execute() throws Exception { + public CompletableFuture execute(WorkflowData data) { + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + logger.info("created index:{}"); + } - // ActionListener actionListener + @Override + public void onFailure(Exception e) { + logger.error("Index creation failed", e); + } + }; + + // Fetch indexName, fileName and settings from WorkflowData CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) .settings(settings); - adminClient.indices().create(request, actionListener); + client.admin().indices().create(request, actionListener); + return null; + } + @Override + public String getName() { + return CREATE_INDEX_STEP; } /** diff --git a/src/main/resources/mappings/knn-index-mapping.json b/src/main/resources/mappings/knn-index-mapping.json index 8b1378917..c31946e62 100644 --- a/src/main/resources/mappings/knn-index-mapping.json +++ b/src/main/resources/mappings/knn-index-mapping.json @@ -1 +1,16 @@ - +{ + "properties": { + "desc_v": { + "type": "keyword" + }, + "name_v": { + "type": "keyword" + }, + "description": { + "type": "keyword" + }, + "name": { + "type": "keyword" + } + } +} From f759beabe72f5bff34cdadd1b30e180a88b62f30 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Fri, 15 Sep 2023 14:17:15 -0700 Subject: [PATCH 3/6] Integrated WorkflowData and made the request async Signed-off-by: Owais Kazi --- .../flowframework/FlowFrameworkPlugin.java | 2 +- .../{ => CreateIndex}/CreateIndexStep.java | 33 +++++++++++++++---- 2 files changed, 28 insertions(+), 7 deletions(-) rename src/main/java/org/opensearch/flowframework/workflow/{ => CreateIndex}/CreateIndexStep.java (64%) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 09cae4940..2f7a34f3f 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -16,7 +16,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.flowframework.workflow.CreateIndexStep; +import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java similarity index 64% rename from src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java rename to src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 9ebd9aade..242d80bae 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.workflow; +package org.opensearch.flowframework.workflow.CreateIndex; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -17,9 +17,13 @@ import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; import java.net.URL; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class CreateIndexStep implements WorkflowStep { @@ -33,26 +37,43 @@ public CreateIndexStep(Client client) { } @Override - public CompletableFuture execute(WorkflowData data) { - + public CompletableFuture execute(List data) { + CompletableFuture future = new CompletableFuture<>(); ActionListener actionListener = new ActionListener<>() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { logger.info("created index:{}"); + future.complete(new WorkflowData() { + @Override + public Map getContent() { + return Map.of("index", createIndexResponse.index()); + } + }); } @Override public void onFailure(Exception e) { logger.error("Index creation failed", e); + future.completeExceptionally(e); } }; - // Fetch indexName, fileName and settings from WorkflowData - CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) + String index = null; + + for (WorkflowData workflowData : data) { + // Fetch index from content i.e. request body of execute API + Map content = workflowData.getContent(); + index = (String) content.get("index"); + } + + // TODO: + // 1. Map index type -> fileName + // 2. Create settings based on the index settings received from content + CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON) .settings(settings); client.admin().indices().create(request, actionListener); - return null; + return future; } @Override From 4b6e3e323bc0e92ee51aef07bf588b7d167c42fb Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 18 Sep 2023 16:51:50 -0700 Subject: [PATCH 4/6] Addressed PR comments Signed-off-by: Owais Kazi --- build.gradle | 3 +- .../workflow/CreateIndex/CreateIndexStep.java | 45 ++++++++++++++----- .../{knn-index-mapping.json => knn.json} | 0 3 files changed, 35 insertions(+), 13 deletions(-) rename src/main/resources/mappings/{knn-index-mapping.json => knn.json} (100%) diff --git a/build.gradle b/build.gradle index aa20423ee..c3b193b15 100644 --- a/build.gradle +++ b/build.gradle @@ -56,6 +56,7 @@ opensearchplugin { dependencyLicenses.enabled = false // This requires an additional Jar not published as part of build-tools loggerUsageCheck.enabled = false +thirdPartyAudit.enabled = false // No need to validate pom, as we do not upload to maven/sonatype validateNebulaPom.enabled = false @@ -106,7 +107,7 @@ dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' implementation "com.google.code.gson:gson:2.10.1" - compileOnly "com.google.guava:guava:32.1.2-jre" + implementation "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" configurations.all { diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 242d80bae..50b258b9c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -12,11 +12,12 @@ import com.google.common.io.Resources; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; @@ -26,12 +27,19 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +/** + * Step to create an index + */ public class CreateIndexStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); private Client client; - private final String CREATE_INDEX_STEP = "create_index_step"; + private final String NAME = "create_index_step"; + /** + * Instantiate this class + * @param client Client to create an index + */ public CreateIndexStep(Client client) { this.client = client; } @@ -43,7 +51,7 @@ public CompletableFuture execute(List data) { @Override public void onResponse(CreateIndexResponse createIndexResponse) { - logger.info("created index:{}"); + logger.info("created index:{}", createIndexResponse.index()); future.complete(new WorkflowData() { @Override public Map getContent() { @@ -54,41 +62,54 @@ public Map getContent() { @Override public void onFailure(Exception e) { - logger.error("Index creation failed", e); + logger.error("Failed to create an index", e); future.completeExceptionally(e); } }; String index = null; + String type = null; + Settings settings = null; for (WorkflowData workflowData : data) { // Fetch index from content i.e. request body of execute API Map content = workflowData.getContent(); index = (String) content.get("index"); + type = (String) content.get("type"); + settings = (Settings) content.get("settings"); + if (index != null && type != null) { + break; + } } // TODO: - // 1. Map index type -> fileName - // 2. Create settings based on the index settings received from content - CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON) - .settings(settings); - client.admin().indices().create(request, actionListener); + // 1. Create settings based on the index settings received from content + + try { + CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(type), XContentType.JSON) + .settings(settings); + client.admin().indices().create(request, actionListener); + } catch (Exception e) { + logger.error("Failed to find the right mapping for the index", e); + } + return future; } @Override public String getName() { - return CREATE_INDEX_STEP; + return NAME; } /** * Get index mapping json content. * + * @param mapping type of the index to fetch the specific mapping file * @return index mapping * @throws IOException IOException if mapping file can't be read correctly */ - public static String getIndexMappings(String mappingFileName) throws IOException { - URL url = CreateIndexStep.class.getClassLoader().getResource(mappingFileName); + public static String getIndexMappings(String mapping) throws IOException { + URL url = CreateIndexStep.class.getClassLoader().getResource(mapping); return Resources.toString(url, Charsets.UTF_8); } } diff --git a/src/main/resources/mappings/knn-index-mapping.json b/src/main/resources/mappings/knn.json similarity index 100% rename from src/main/resources/mappings/knn-index-mapping.json rename to src/main/resources/mappings/knn.json From e89914b95097d15089b9ddab9e2537c071ed1a48 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 21 Sep 2023 14:43:50 -0700 Subject: [PATCH 5/6] Added unit test and type field for fetching the payload Signed-off-by: Owais Kazi --- .../workflow/CreateIndex/CreateIndexStep.java | 14 +-- .../CreateIndex/CreateIndexStepTests.java | 105 ++++++++++++++++++ src/test/resources/template/datademo.json | 6 +- 3 files changed, 116 insertions(+), 9 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 50b258b9c..3d62e08f7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -55,7 +55,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) { future.complete(new WorkflowData() { @Override public Map getContent() { - return Map.of("index", createIndexResponse.index()); + return Map.of("index-name", createIndexResponse.index()); } }); } @@ -72,12 +72,10 @@ public void onFailure(Exception e) { Settings settings = null; for (WorkflowData workflowData : data) { - // Fetch index from content i.e. request body of execute API Map content = workflowData.getContent(); - index = (String) content.get("index"); + index = (String) content.get("index-name"); type = (String) content.get("type"); - settings = (Settings) content.get("settings"); - if (index != null && type != null) { + if (index != null && type != null && settings != null) { break; } } @@ -86,8 +84,10 @@ public void onFailure(Exception e) { // 1. Create settings based on the index settings received from content try { - CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(type), XContentType.JSON) - .settings(settings); + CreateIndexRequest request = new CreateIndexRequest(index).mapping( + getIndexMappings("mappings/" + type + ".json"), + XContentType.JSON + ); client.admin().indices().create(request, actionListener); } catch (Exception e) { logger.error("Failed to find the right mapping for the index", e); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java new file mode 100644 index 000000000..e316ea7cb --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow.CreateIndex; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CreateIndexStepTests extends OpenSearchTestCase { + + private WorkflowData inputData = WorkflowData.EMPTY; + + private Client client; + + private AdminClient adminClient; + + private IndicesAdminClient indicesAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + inputData = new WorkflowData() { + + @Override + public Map getContent() { + // See CreateIndexRequest ParseFields for source of content keys needed + return Map.of("index-name", "demo", "type", "knn"); + } + + @Override + public Map getParams() { + // See RestCreateIndexAction for source of param keys needed + return Map.of(); + } + + }; + + client = mock(Client.class); + adminClient = mock(AdminClient.class); + indicesAdminClient = mock(IndicesAdminClient.class); + + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.admin()).thenReturn(adminClient); + + } + + public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo")); + + assertTrue(future.isDone()); + + Map outputData = Map.of("index-name", "demo"); + assertEquals(outputData, future.get().getContent()); + + } + + public void testCreateIndexStepFailure() throws ExecutionException, InterruptedException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new Exception()); + + assertTrue(future.isDone()); + assertThrows(Exception.class, () -> future.get().getContent()); + + } +} diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index a1323ed2c..79f79594a 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -3,11 +3,13 @@ "nodes": [ { "id": "create_index", - "index_name": "demo" + "index_name": "demo", + "type": "knn" }, { "id": "create_another_index", - "index_name": "second_demo" + "index_name": "second_demo", + "type": "knn" } ], "edges": [ From 44da4d55c5db10db555e548470d3331ef35726f0 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Fri, 22 Sep 2023 14:07:21 -0700 Subject: [PATCH 6/6] Addressed PR Comments Signed-off-by: Owais Kazi --- .../workflow/CreateIndex/CreateIndexStep.java | 4 ++-- .../CreateIndex/CreateIndexStepTests.java | 20 +++++++------------ src/test/resources/template/datademo.json | 6 ++---- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 3d62e08f7..ebef2cae8 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -51,7 +51,7 @@ public CompletableFuture execute(List data) { @Override public void onResponse(CreateIndexResponse createIndexResponse) { - logger.info("created index:{}", createIndexResponse.index()); + logger.info("created index: {}", createIndexResponse.index()); future.complete(new WorkflowData() { @Override public Map getContent() { @@ -108,7 +108,7 @@ public String getName() { * @return index mapping * @throws IOException IOException if mapping file can't be read correctly */ - public static String getIndexMappings(String mapping) throws IOException { + private static String getIndexMappings(String mapping) throws IOException { URL url = CreateIndexStep.class.getClassLoader().getResource(mapping); return Resources.toString(url, Charsets.UTF_8); } diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java index e316ea7cb..638dea251 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java @@ -49,14 +49,7 @@ public void setUp() throws Exception { @Override public Map getContent() { - // See CreateIndexRequest ParseFields for source of content keys needed - return Map.of("index-name", "demo", "type", "knn"); - } - - @Override - public Map getParams() { - // See RestCreateIndexAction for source of param keys needed - return Map.of(); + return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")); } }; @@ -80,7 +73,7 @@ public void testCreateIndexStep() throws ExecutionException, InterruptedExceptio verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo")); - assertTrue(future.isDone()); + assertTrue(future.isDone() && !future.isCompletedExceptionally()); Map outputData = Map.of("index-name", "demo"); assertEquals(outputData, future.get().getContent()); @@ -96,10 +89,11 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE assertFalse(future.isDone()); verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); - actionListenerCaptor.getValue().onFailure(new Exception()); - - assertTrue(future.isDone()); - assertThrows(Exception.class, () -> future.get().getContent()); + actionListenerCaptor.getValue().onFailure(new Exception("Failed to create an index")); + assertTrue(future.isCompletedExceptionally()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to create an index", ex.getCause().getMessage()); } } diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index 79f79594a..a1323ed2c 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -3,13 +3,11 @@ "nodes": [ { "id": "create_index", - "index_name": "demo", - "type": "knn" + "index_name": "demo" }, { "id": "create_another_index", - "index_name": "second_demo", - "type": "knn" + "index_name": "second_demo" } ], "edges": [