diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/CustomElandModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/CustomElandModel.java index b710b24cbda31..b76de5eeedbfc 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/CustomElandModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/CustomElandModel.java @@ -7,14 +7,9 @@ package org.elasticsearch.xpack.inference.services.elasticsearch; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.inference.ChunkingSettings; -import org.elasticsearch.inference.Model; import org.elasticsearch.inference.TaskSettings; import org.elasticsearch.inference.TaskType; -import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAssignmentAction; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; public class CustomElandModel extends ElasticsearchInternalModel { @@ -39,31 +34,10 @@ public CustomElandModel( } @Override - public ActionListener getCreateTrainedModelAssignmentActionListener( - Model model, - ActionListener listener - ) { - - return new ActionListener<>() { - @Override - public void onResponse(CreateTrainedModelAssignmentAction.Response response) { - listener.onResponse(Boolean.TRUE); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { - listener.onFailure( - new ResourceNotFoundException( - "Could not start the inference as the custom eland model [{0}] for this platform cannot be found." - + " Custom models need to be loaded into the cluster with eland before they can be started.", - internalServiceSettings.modelId() - ) - ); - return; - } - listener.onFailure(e); - } - }; + protected String modelNotFoundErrorMessage(String modelId) { + return "Could not deploy model [" + + modelId + + "] as the model cannot be found." + + " Custom models need to be loaded into the cluster with Eland before they can be started."; } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticDeployedModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticDeployedModel.java index 724c7a8f0a166..ce6c6258d0393 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticDeployedModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticDeployedModel.java @@ -36,6 +36,11 @@ public StartTrainedModelDeploymentAction.Request getStartTrainedModelDeploymentA throw new IllegalStateException("cannot start model that uses an existing deployment"); } + @Override + protected String modelNotFoundErrorMessage(String modelId) { + throw new IllegalStateException("cannot start model [" + modelId + "] that uses an existing deployment"); + } + @Override public ActionListener getCreateTrainedModelAssignmentActionListener( Model model, diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticsearchInternalModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticsearchInternalModel.java index 2405243f302bc..aa12bf0c645c3 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticsearchInternalModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElasticsearchInternalModel.java @@ -7,6 +7,9 @@ package org.elasticsearch.xpack.inference.services.elasticsearch; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; @@ -15,8 +18,10 @@ import org.elasticsearch.inference.ModelConfigurations; import org.elasticsearch.inference.TaskSettings; import org.elasticsearch.inference.TaskType; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAssignmentAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import static org.elasticsearch.xpack.core.ml.inference.assignment.AllocationStatus.State.STARTED; @@ -79,10 +84,38 @@ public StartTrainedModelDeploymentAction.Request getStartTrainedModelDeploymentA return startRequest; } - public abstract ActionListener getCreateTrainedModelAssignmentActionListener( + public ActionListener getCreateTrainedModelAssignmentActionListener( Model model, ActionListener listener - ); + ) { + return new ActionListener<>() { + @Override + public void onResponse(CreateTrainedModelAssignmentAction.Response response) { + listener.onResponse(Boolean.TRUE); + } + + @Override + public void onFailure(Exception e) { + var cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceNotFoundException) { + listener.onFailure(new ResourceNotFoundException(modelNotFoundErrorMessage(internalServiceSettings.modelId()))); + return; + } else if (cause instanceof ElasticsearchStatusException statusException) { + if (statusException.status() == RestStatus.CONFLICT + && statusException.getRootCause() instanceof ResourceAlreadyExistsException) { + // Deployment is already started + listener.onResponse(Boolean.TRUE); + } + return; + } + listener.onFailure(e); + } + }; + } + + protected String modelNotFoundErrorMessage(String modelId) { + return "Could not deploy model [" + modelId + "] as the model cannot be found."; + } public boolean usesExistingDeployment() { return internalServiceSettings.getDeploymentId() != null; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElserInternalModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElserInternalModel.java index 8d2f59171a601..2594f18db3fb5 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElserInternalModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/ElserInternalModel.java @@ -7,13 +7,8 @@ package org.elasticsearch.xpack.inference.services.elasticsearch; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.inference.ChunkingSettings; -import org.elasticsearch.inference.Model; import org.elasticsearch.inference.TaskType; -import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAssignmentAction; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; public class ElserInternalModel extends ElasticsearchInternalModel { @@ -37,31 +32,4 @@ public ElserInternalServiceSettings getServiceSettings() { public ElserMlNodeTaskSettings getTaskSettings() { return (ElserMlNodeTaskSettings) super.getTaskSettings(); } - - @Override - public ActionListener getCreateTrainedModelAssignmentActionListener( - Model model, - ActionListener listener - ) { - return new ActionListener<>() { - @Override - public void onResponse(CreateTrainedModelAssignmentAction.Response response) { - listener.onResponse(Boolean.TRUE); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { - listener.onFailure( - new ResourceNotFoundException( - "Could not start the ELSER service as the ELSER model for this platform cannot be found." - + " ELSER needs to be downloaded before it can be started." - ) - ); - return; - } - listener.onFailure(e); - } - }; - } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/MultilingualE5SmallModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/MultilingualE5SmallModel.java index fee00d04d940b..2dcf91140c995 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/MultilingualE5SmallModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/MultilingualE5SmallModel.java @@ -7,13 +7,8 @@ package org.elasticsearch.xpack.inference.services.elasticsearch; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.inference.ChunkingSettings; -import org.elasticsearch.inference.Model; import org.elasticsearch.inference.TaskType; -import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAssignmentAction; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; public class MultilingualE5SmallModel extends ElasticsearchInternalModel { @@ -31,34 +26,4 @@ public MultilingualE5SmallModel( public MultilingualE5SmallInternalServiceSettings getServiceSettings() { return (MultilingualE5SmallInternalServiceSettings) super.getServiceSettings(); } - - @Override - public ActionListener getCreateTrainedModelAssignmentActionListener( - Model model, - ActionListener listener - ) { - - return new ActionListener<>() { - @Override - public void onResponse(CreateTrainedModelAssignmentAction.Response response) { - listener.onResponse(Boolean.TRUE); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { - listener.onFailure( - new ResourceNotFoundException( - "Could not start the TextEmbeddingService service as the " - + "Multilingual-E5-Small model for this platform cannot be found." - + " Multilingual-E5-Small needs to be downloaded before it can be started" - ) - ); - return; - } - listener.onFailure(e); - } - }; - } - } diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java index b286ca95c74c0..b22379ceb3445 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java @@ -58,7 +58,7 @@ public class TransportLoadTrainedModelPackage extends TransportMasterNodeAction< private final Client client; private final CircuitBreakerService circuitBreakerService; - final Map> downloadTrackersByModelId; + final Map> taskRemovedListenersByModelId; @Inject public TransportLoadTrainedModelPackage( @@ -83,7 +83,7 @@ public TransportLoadTrainedModelPackage( ); this.client = new OriginSettingClient(client, ML_ORIGIN); this.circuitBreakerService = circuitBreakerService; - downloadTrackersByModelId = new HashMap<>(); + taskRemovedListenersByModelId = new HashMap<>(); } @Override @@ -174,7 +174,7 @@ synchronized boolean existingDownloadInProgress( // Otherwise register a task removed listener which is called // once the tasks is complete and unregistered var tracker = new DownloadTaskRemovedListener(inProgress, listener); - downloadTrackersByModelId.computeIfAbsent(modelId, s -> new ArrayList<>()).add(tracker); + taskRemovedListenersByModelId.computeIfAbsent(modelId, s -> new ArrayList<>()).add(tracker); taskManager.registerRemovedTaskListener(tracker); return true; } @@ -191,7 +191,7 @@ synchronized boolean existingDownloadInProgress( synchronized void unregisterTask(ModelDownloadTask task) { taskManager.unregister(task); // unregister will call the on remove function - var trackers = downloadTrackersByModelId.remove(task.getModelId()); + var trackers = taskRemovedListenersByModelId.remove(task.getModelId()); if (trackers != null) { for (var tracker : trackers) { taskManager.unregisterRemovedTaskListener(tracker); diff --git a/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java b/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java index ae7217d34b694..0fbf1afd54e69 100644 --- a/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java +++ b/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java @@ -157,14 +157,14 @@ public void testWaitForExistingDownload() { assertTrue(action.existingDownloadInProgress(modelId, true, ActionListener.noop())); verify(taskManager).registerRemovedTaskListener(any()); - assertThat(action.downloadTrackersByModelId.entrySet(), hasSize(1)); - assertThat(action.downloadTrackersByModelId.get(modelId), hasSize(1)); + assertThat(action.taskRemovedListenersByModelId.entrySet(), hasSize(1)); + assertThat(action.taskRemovedListenersByModelId.get(modelId), hasSize(1)); // With wait for completion == false no new removed listener will be added assertTrue(action.existingDownloadInProgress(modelId, false, ActionListener.noop())); verify(taskManager, times(1)).registerRemovedTaskListener(any()); - assertThat(action.downloadTrackersByModelId.entrySet(), hasSize(1)); - assertThat(action.downloadTrackersByModelId.get(modelId), hasSize(1)); + assertThat(action.taskRemovedListenersByModelId.entrySet(), hasSize(1)); + assertThat(action.taskRemovedListenersByModelId.get(modelId), hasSize(1)); assertFalse(action.existingDownloadInProgress("no-task-for-this-one", randomBoolean(), ActionListener.noop())); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java index 5fd70ce71cd24..f01372ca4f246 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java @@ -190,11 +190,11 @@ protected void masterOperation( () -> "[" + request.getDeploymentId() + "] creating new assignment for model [" + request.getModelId() + "] failed", e ); - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException resourceAlreadyExistsException) { e = new ElasticsearchStatusException( "Cannot start deployment [{}] because it has already been started", RestStatus.CONFLICT, - e, + resourceAlreadyExistsException, request.getDeploymentId() ); }