Skip to content

Commit

Permalink
Add index for storing template models (opensearch-project#34)
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Mazanec <jmazane1@nd.edu>
  • Loading branch information
jmazanec15 committed Oct 22, 2021
1 parent 104a6ac commit 26e90eb
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public class KNNConstants {
public static final String COMPOUND_EXTENSION = "c";

public static final String JNI_LIBRARY_NAME = "OpensearchKNN";

public static final String MODEL_BLOB_PARAMETER = "model_blob";

public static final String MODEL_INDEX_MAPPING_PATH = "mappings/model-index.json";
public static final String MODEL_INDEX_NAME = ".opensearch-knn-models";
}
21 changes: 20 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class KNNSettings {
public static final String KNN_PLUGIN_ENABLED = "knn.plugin.enabled";
public static final String KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE = "knn.circuit_breaker.unset.percentage";
public static final String KNN_INDEX = "index.knn";
public static final String MODEL_INDEX_NUMBER_OF_SHARDS = "knn.model_index_number_of_shards";
public static final String MODEL_INDEX_NUMBER_OF_REPLICAS = "knn.model_index_number_of_replicas";

/**
* Default setting values
Expand Down Expand Up @@ -142,6 +144,21 @@ public class KNNSettings {
IndexScope,
Setting.Property.Deprecated);

public static final Setting<Integer> MODEL_INDEX_NUMBER_OF_SHARDS_SETTING = Setting.intSetting(
MODEL_INDEX_NUMBER_OF_SHARDS,
1,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Integer> MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING = Setting.intSetting(
MODEL_INDEX_NUMBER_OF_REPLICAS,
1,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic);


/**
* This setting identifies KNN index.
*/
Expand Down Expand Up @@ -291,7 +308,9 @@ public List<Setting<?>> getSettings() {
KNN_ALGO_PARAM_INDEX_THREAD_QTY_SETTING,
KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING,
KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING,
IS_KNN_INDEX_SETTING);
IS_KNN_INDEX_SETTING,
MODEL_INDEX_NUMBER_OF_SHARDS_SETTING,
MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING);
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream())
.collect(Collectors.toList());
}
Expand Down
235 changes: 235 additions & 0 deletions src/main/java/org/opensearch/knn/indices/ModelIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.indices;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteAction;
import org.opensearch.action.delete.DeleteRequestBuilder;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetAction;
import org.opensearch.action.get.GetRequestBuilder;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.util.KNNEngine;

import java.io.IOException;
import java.net.URL;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_MAPPING_PATH;
import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME;
import static org.opensearch.knn.index.KNNSettings.MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.knn.index.KNNSettings.MODEL_INDEX_NUMBER_OF_SHARDS_SETTING;

/**
* ModelIndex is a singleton class that controls operations on the model system index
*/
public final class ModelIndex {
public static Logger logger = LogManager.getLogger(ModelIndex.class);

private int numberOfShards;
private int numberOfReplicas;

private static ModelIndex INSTANCE;
private static Client client;
private static ClusterService clusterService;
private static Settings settings;

/**
* Make sure we just have one instance of model index
*
* @return ModelIndex instance
*/
public static synchronized ModelIndex getInstance() {
if (INSTANCE == null) {
INSTANCE = new ModelIndex();
}
return INSTANCE;
}

public static void initialize(Client client, ClusterService clusterService, Settings settings) {
ModelIndex.client = client;
ModelIndex.clusterService = clusterService;
ModelIndex.settings = settings;
}

private ModelIndex() {
numberOfShards = MODEL_INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
numberOfReplicas = MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(MODEL_INDEX_NUMBER_OF_SHARDS_SETTING,
it -> numberOfShards = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING,
it -> numberOfReplicas = it);
}

/**
* Creates model index. It is possible that the 2 threads call this function simulateously. In this case, one
* thread will throw a ResourceAlreadyExistsException. This should be caught and handled.
*
* @param actionListener CreateIndexResponse listener
* @throws IOException thrown when get mapping fails
*/
public void create(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (isCreated()) {
return;
}

CreateIndexRequest request = new CreateIndexRequest(MODEL_INDEX_NAME)
.mapping("_doc", getMapping(), XContentType.JSON)
.settings(Settings.builder()
.put("index.hidden", true)
.put("index.number_of_shards", this.numberOfShards)
.put("index.number_of_replicas", this.numberOfReplicas)
);
client.admin().indices().create(request, actionListener);
}

/**
* Checks if the model index exists
*
* @return true if the model index exists; false otherwise
*/
public boolean isCreated() {
return clusterService.state().getRoutingTable().hasIndex(MODEL_INDEX_NAME);
}

/**
* Put a model into the system index. Non-blocking
*
* @param modelId Id of model to create
* @param modelBlob byte array of model
* @param listener handles index response
*/
public void put(String modelId, KNNEngine knnEngine, byte[] modelBlob, ActionListener<IndexResponse> listener) {
String base64Model = Base64.getEncoder().encodeToString(modelBlob);

Map<String, Object> parameters = ImmutableMap.of(
KNNConstants.KNN_ENGINE, knnEngine.getName(),
KNNConstants.MODEL_BLOB_PARAMETER, base64Model
);

IndexRequestBuilder indexRequestBuilder = client.prepareIndex(MODEL_INDEX_NAME, "_doc");
indexRequestBuilder.setId(modelId);
indexRequestBuilder.setSource(parameters);

put(indexRequestBuilder, listener);
}

/**
* Put a model into the system index. Non-blocking. When no id is passed in, OpenSearch will generate the id
* automatically. The id can be retrieved in the IndexResponse.
*
* @param modelBlob byte array of model
* @param listener handles index response
*/
public void put(KNNEngine knnEngine, byte[] modelBlob, ActionListener<IndexResponse> listener) {
String base64Model = Base64.getEncoder().encodeToString(modelBlob);

Map<String, Object> parameters = ImmutableMap.of(
KNNConstants.KNN_ENGINE, knnEngine.getName(),
KNNConstants.MODEL_BLOB_PARAMETER, base64Model
);

IndexRequestBuilder indexRequestBuilder = client.prepareIndex(MODEL_INDEX_NAME, "_doc");
indexRequestBuilder.setSource(parameters);

put(indexRequestBuilder, listener);
}

private void put(IndexRequestBuilder indexRequestBuilder, ActionListener<IndexResponse> listener) {
if (!isCreated()) {
throw new IllegalStateException("Cannot put model in index before index has been initialized");
}

// Fail if the id already exists. Models are not updateable
indexRequestBuilder.setOpType(DocWriteRequest.OpType.CREATE);
indexRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequestBuilder.execute(listener);
}

/**
* Get a model from the system index. Call blocks.
*
* @param modelId to retrieve
* @return byte array representing the model
* @throws ExecutionException thrown on search
* @throws InterruptedException thrown on search
*/
public byte[] get(String modelId) throws ExecutionException, InterruptedException {
if (!isCreated()) {
throw new IllegalStateException("Cannot get model \"" + modelId + "\". Model index does not exist.");
}

/*
GET /<model_index>/<modelId>?source_includes=<model_blob>&_local
*/
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE, MODEL_INDEX_NAME)
.setId(modelId)
.setFetchSource(KNNConstants.MODEL_BLOB_PARAMETER, null)
.setPreference("_local");
GetResponse getResponse = getRequestBuilder.execute().get();

Object blob = getResponse.getSourceAsMap().get(KNNConstants.MODEL_BLOB_PARAMETER);

if (blob == null) {
throw new IllegalArgumentException("No model available in \"" + MODEL_INDEX_NAME + "\" index with id \""
+ modelId + "\".");
}

return Base64.getDecoder().decode((String) blob);
}

private String getMapping() throws IOException {
URL url = ModelIndex.class.getClassLoader().getResource(MODEL_INDEX_MAPPING_PATH);
if (url == null) {
throw new IllegalStateException("Unable to retrieve mapping for \"" + MODEL_INDEX_NAME + "\"");
}

return Resources.toString(url, Charsets.UTF_8);
}

/**
* Delete model from index
*
* @param modelId to delete
* @param listener handles delete response
*/
public void delete(String modelId, ActionListener<DeleteResponse> listener) {
if (!isCreated()) {
throw new IllegalStateException("Cannot delete model \"" + modelId + "\". Model index does not exist.");
}

DeleteRequestBuilder deleteRequestBuilder = new DeleteRequestBuilder(client, DeleteAction.INSTANCE,
MODEL_INDEX_NAME);
deleteRequestBuilder.setId(modelId);
deleteRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
deleteRequestBuilder.execute(listener);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.KNNVectorFieldMapper;

import org.opensearch.knn.indices.ModelIndex;
import org.opensearch.knn.plugin.rest.RestKNNStatsHandler;
import org.opensearch.knn.plugin.rest.RestKNNWarmupHandler;
import org.opensearch.knn.plugin.script.KNNScoringScriptEngine;
Expand Down Expand Up @@ -144,6 +145,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
this.clusterService = clusterService;
KNNIndexCache.setResourceWatcherService(resourceWatcherService);
KNNSettings.state().initialize(client, clusterService);
ModelIndex.initialize(client, clusterService, environment.settings());
KNNCircuitBreaker.getInstance().initialize(threadPool, clusterService, client);
knnStats = new KNNStats(KNNStatsConfig.KNN_STATS);
return ImmutableList.of(knnStats);
Expand Down
10 changes: 10 additions & 0 deletions src/main/resources/mappings/model-index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"properties": {
"engine": {
"type": "keyword"
},
"model_blob": {
"type": "binary"
}
}
}
Loading

0 comments on commit 26e90eb

Please sign in to comment.