Skip to content

Commit

Permalink
Adds Create Index Building block (#38)
Browse files Browse the repository at this point in the history
* Initial impelmentation of CreateIndex

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Adds CreateIndex building block

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Integrated WorkflowData and made the request async

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Addressed PR comments

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Added unit test and type field for fetching the payload

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Addressed PR Comments

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

---------

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
(cherry picked from commit a97b7d0)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 25, 2023
1 parent b15c866 commit a3b94b9
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 2 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ opensearchplugin {
dependencyLicenses.enabled = false
// This requires an additional Jar not published as part of build-tools
loggerUsageCheck.enabled = false
thirdPartyAudit.enabled = false

// No need to validate pom, as we do not upload to maven/sonatype
validateNebulaPom.enabled = false
Expand Down Expand Up @@ -106,7 +107,7 @@ dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
compileOnly "com.google.guava:guava:32.1.2-jre"
implementation "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

configurations.all {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,47 @@
*/
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.function.Supplier;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin {
// Implement the relevant Plugin Interfaces here

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
CreateIndexStep createIndexStep = new CreateIndexStep(client);
return ImmutableList.of(createIndexStep);

Check warning on line 52 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L50-L52

Added lines #L50 - L52 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIndex;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Step to create an index
*/
public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private Client client;
private final String NAME = "create_index_step";

/**
* Instantiate this class
* @param client Client to create an index
*/
public CreateIndexStep(Client client) {
this.client = client;
}

@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index: {}", createIndexResponse.index());
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index-name", createIndexResponse.index());
}
});
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to create an index", e);
future.completeExceptionally(e);
}
};

String index = null;
String type = null;
Settings settings = null;

for (WorkflowData workflowData : data) {
Map<String, Object> content = workflowData.getContent();
index = (String) content.get("index-name");
type = (String) content.get("type");
if (index != null && type != null && settings != null) {
break;

Check warning on line 79 in src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java#L79

Added line #L79 was not covered by tests
}
}

// TODO:
// 1. Create settings based on the index settings received from content

try {
CreateIndexRequest request = new CreateIndexRequest(index).mapping(
getIndexMappings("mappings/" + type + ".json"),
XContentType.JSON
);
client.admin().indices().create(request, actionListener);
} catch (Exception e) {
logger.error("Failed to find the right mapping for the index", e);

Check warning on line 93 in src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}

return future;
}

@Override
public String getName() {
return NAME;

Check warning on line 101 in src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java#L101

Added line #L101 was not covered by tests
}

/**
* Get index mapping json content.
*
* @param mapping type of the index to fetch the specific mapping file
* @return index mapping
* @throws IOException IOException if mapping file can't be read correctly
*/
private static String getIndexMappings(String mapping) throws IOException {
URL url = CreateIndexStep.class.getClassLoader().getResource(mapping);
return Resources.toString(url, Charsets.UTF_8);
}
}
16 changes: 16 additions & 0 deletions src/main/resources/mappings/knn.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"properties": {
"desc_v": {
"type": "keyword"
},
"name_v": {
"type": "keyword"
},
"description": {
"type": "keyword"
},
"name": {
"type": "keyword"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIndex;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.mockito.ArgumentCaptor;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CreateIndexStepTests extends OpenSearchTestCase {

private WorkflowData inputData = WorkflowData.EMPTY;

private Client client;

private AdminClient adminClient;

private IndicesAdminClient indicesAdminClient;

@Override
public void setUp() throws Exception {
super.setUp();

inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn"));
}

};

client = mock(Client.class);
adminClient = mock(AdminClient.class);
indicesAdminClient = mock(IndicesAdminClient.class);

when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.admin()).thenReturn(adminClient);

}

public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException {

CreateIndexStep createIndexStep = new CreateIndexStep(client);

ArgumentCaptor<ActionListener> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
CompletableFuture<WorkflowData> future = createIndexStep.execute(List.of(inputData));
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo"));

assertTrue(future.isDone() && !future.isCompletedExceptionally());

Map<String, Object> outputData = Map.of("index-name", "demo");
assertEquals(outputData, future.get().getContent());

}

public void testCreateIndexStepFailure() throws ExecutionException, InterruptedException {

CreateIndexStep createIndexStep = new CreateIndexStep(client);

ArgumentCaptor<ActionListener> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
CompletableFuture<WorkflowData> future = createIndexStep.execute(List.of(inputData));
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture());

actionListenerCaptor.getValue().onFailure(new Exception("Failed to create an index"));

assertTrue(future.isCompletedExceptionally());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create an index", ex.getCause().getMessage());
}
}

0 comments on commit a3b94b9

Please sign in to comment.