From 2c9084bc5b5c1c72dc28775d53755b4b591f5ce3 Mon Sep 17 00:00:00 2001 From: Varun Jain Date: Fri, 6 Jan 2023 13:00:58 -0800 Subject: [PATCH] Communication mechanism for js (#289) * Job Details from Extension for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication mechanism for JS Signed-off-by: Varun Jain * Communication mechanism for JS Signed-off-by: Varun Jain * Communication mechanism for JS Signed-off-by: Varun Jain * Communication mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Commnunication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain * Communication Mechanism for JS Signed-off-by: Varun Jain Signed-off-by: Varun Jain --- build.gradle | 5 +- .../jobscheduler/JobSchedulerPlugin.java | 46 ++- .../jobscheduler/model/JobDetails.java | 185 +++++++++++ .../rest/RestGetJobIndexAction.java | 142 ++++++++ .../rest/RestGetJobTypeAction.java | 137 ++++++++ .../transport/GetJobIndexRequest.java | 131 ++++++++ .../transport/GetJobTypeRequest.java | 94 ++++++ .../jobscheduler/utils/JobDetailsService.java | 309 ++++++++++++++++++ .../opensearch_plugins_job_details.json | 20 ++ .../jobscheduler/ODFERestTestCase.java | 182 +++++++++++ .../opensearch/jobscheduler/TestHelpers.java | 89 +++++ .../GetJobDetailsMultiNodeRestIT.java | 64 ++++ .../rest/RestGetJobIndexActionTests.java | 87 +++++ .../rest/RestGetJobTypeActionTests.java | 86 +++++ .../utils/JobDetailsServiceIT.java | 140 ++++++++ 15 files changed, 1709 insertions(+), 8 deletions(-) create mode 100644 src/main/java/org/opensearch/jobscheduler/model/JobDetails.java create mode 100644 src/main/java/org/opensearch/jobscheduler/rest/RestGetJobIndexAction.java create mode 100644 src/main/java/org/opensearch/jobscheduler/rest/RestGetJobTypeAction.java create mode 100644 src/main/java/org/opensearch/jobscheduler/transport/GetJobIndexRequest.java create mode 100644 src/main/java/org/opensearch/jobscheduler/transport/GetJobTypeRequest.java create mode 100644 src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java create mode 100644 src/main/resources/mappings/opensearch_plugins_job_details.json create mode 100644 src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java create mode 100644 src/test/java/org/opensearch/jobscheduler/TestHelpers.java create mode 100644 src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java create mode 100644 src/test/java/org/opensearch/jobscheduler/rest/RestGetJobIndexActionTests.java create mode 100644 src/test/java/org/opensearch/jobscheduler/rest/RestGetJobTypeActionTests.java create mode 100644 src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java diff --git a/build.gradle b/build.gradle index aa6c88f1..3bdedaec 100644 --- a/build.gradle +++ b/build.gradle @@ -138,6 +138,8 @@ repositories { dependencies { implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') + implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre' + implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1' javaRestTestImplementation project.sourceSets.main.runtimeClasspath } @@ -238,7 +240,8 @@ task integTestRemote(type: RestIntegTestTask) { systemProperty "security", System.getProperty("security") systemProperty "user", System.getProperty("user") systemProperty "password", System.getProperty("password") - + systemProperty 'tests.rest.cluster', 'localhost:9200' + systemProperty 'tests.clustername', 'opensearch-job-scheduler-cluster' if (System.getProperty("tests.rest.cluster") != null) { filter { includeTestsMatching "org.opensearch.jobscheduler.*RestIT" diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index 126d08c0..822eb0d8 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -8,6 +8,15 @@ */ package org.opensearch.jobscheduler; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.SettingsFilter; + +import org.opensearch.jobscheduler.rest.RestGetJobIndexAction; +import org.opensearch.jobscheduler.rest.RestGetJobTypeAction; import org.opensearch.jobscheduler.scheduler.JobScheduler; import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; @@ -23,35 +32,39 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; import java.util.function.Supplier; -public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin { +import com.google.common.collect.ImmutableList; + +public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin { public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler"; + public static final String JS_BASE_URI = "/_plugins/_job_scheduler"; private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class); @@ -61,6 +74,8 @@ public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin { private Map indexToJobProviders; private Set indicesToListen; + private JobDetailsService jobDetailsService; + public JobSchedulerPlugin() { this.indicesToListen = new HashSet<>(); this.indexToJobProviders = new HashMap<>(); @@ -81,6 +96,7 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { this.lockService = new LockService(client, clusterService); + this.jobDetailsService = new JobDetailsService(client, clusterService); this.scheduler = new JobScheduler(threadPool, this.lockService); this.sweeper = initSweeper( environment.settings(), @@ -185,4 +201,20 @@ private JobSweeper initSweeper( ) { return new JobSweeper(settings, client, clusterService, threadPool, registry, this.indexToJobProviders, scheduler, lockService); } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + RestGetJobIndexAction restGetJobIndexAction = new RestGetJobIndexAction(jobDetailsService); + RestGetJobTypeAction restGetJobTypeAction = new RestGetJobTypeAction(jobDetailsService); + return ImmutableList.of(restGetJobIndexAction, restGetJobTypeAction); + } + } diff --git a/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java b/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java new file mode 100644 index 00000000..aba35bc6 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java @@ -0,0 +1,185 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.model; + +import org.opensearch.common.Nullable; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * This model class stores the job details of the extension. + */ +public class JobDetails implements ToXContentObject { + + /** + * jobIndex from the extension. + */ + private String jobIndex; + + /** + * jobType from the extension. + */ + private String jobType; + + /** + * jobParser action to trigger the response back to the extension. + */ + private String jobParameterAction; + + /** + * jobRunner action to trigger the response back to the extension. + */ + private String jobRunnerAction; + + public static final String JOB_INDEX = "job_index"; + public static final String JOB_TYPE = "job_type"; + public static final String JOB_PARAMETER_ACTION = "job_parser_action"; + public static final String JOB_RUNNER_ACTION = "job_runner_action"; + + public JobDetails() {} + + public JobDetails(String jobIndex, String jobType, String jobParameterAction, String jobRunnerAction) { + this.jobIndex = jobIndex; + this.jobType = jobType; + this.jobParameterAction = jobParameterAction; + this.jobRunnerAction = jobRunnerAction; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + if (jobIndex != null) { + xContentBuilder.field(JOB_INDEX, jobIndex); + } + if (jobType != null) { + xContentBuilder.field(JOB_TYPE, jobType); + } + if (jobParameterAction != null) { + xContentBuilder.field(JOB_PARAMETER_ACTION, jobParameterAction); + } + if (jobRunnerAction != null) { + xContentBuilder.field(JOB_RUNNER_ACTION, jobRunnerAction); + } + return xContentBuilder.endObject(); + } + + public static JobDetails parse(XContentParser parser) throws IOException { + String jobIndex = null; + String jobType = null; + String jobParameterAction = null; + String jobRunnerAction = null; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case JOB_INDEX: + jobIndex = parser.text(); + break; + case JOB_TYPE: + jobType = parser.text(); + break; + case JOB_PARAMETER_ACTION: + jobParameterAction = parser.text(); + break; + case JOB_RUNNER_ACTION: + jobRunnerAction = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + } + + return new JobDetails(jobIndex, jobType, jobParameterAction, jobRunnerAction); + } + + public JobDetails(final JobDetails copyJobDetails) { + this(copyJobDetails.jobIndex, copyJobDetails.jobType, copyJobDetails.jobParameterAction, copyJobDetails.jobRunnerAction); + } + + @Nullable + public String getJobIndex() { + return jobIndex; + } + + public void setJobIndex(String jobIndex) { + this.jobIndex = jobIndex; + } + + @Nullable + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + @Nullable + public String getJobParameterAction() { + return jobParameterAction; + } + + public void setJobParameterAction(String jobParameterAction) { + this.jobParameterAction = jobParameterAction; + } + + @Nullable + public String getJobRunnerAction() { + return jobRunnerAction; + } + + public void setJobRunnerAction(String jobRunnerAction) { + this.jobRunnerAction = jobRunnerAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JobDetails that = (JobDetails) o; + return Objects.equals(jobIndex, that.jobIndex) + && Objects.equals(jobType, that.jobType) + && Objects.equals(jobParameterAction, that.jobParameterAction) + && Objects.equals(jobRunnerAction, that.jobRunnerAction); + } + + @Override + public int hashCode() { + return Objects.hash(jobIndex, jobType, jobParameterAction, jobRunnerAction); + } + + @Override + public String toString() { + return "JobDetails{" + + "jobIndex='" + + jobIndex + + '\'' + + ", jobType='" + + jobType + + '\'' + + ", jobParameterAction='" + + jobParameterAction + + '\'' + + ", jobRunnerAction='" + + jobRunnerAction + + '\'' + + '}'; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobIndexAction.java b/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobIndexAction.java new file mode 100644 index 00000000..25cc778a --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobIndexAction.java @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.model.JobDetails; + +import org.opensearch.jobscheduler.transport.GetJobIndexRequest; + +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.BytesRestResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * This class consists of the REST handler to GET job index from extensions. + */ +public class RestGetJobIndexAction extends BaseRestHandler { + + public static final String GET_JOB_INDEX_ACTION = "get_job_index_action"; + + private final Logger logger = LogManager.getLogger(RestGetJobIndexAction.class); + + public JobDetailsService jobDetailsService; + + public RestGetJobIndexAction(final JobDetailsService jobDetailsService) { + this.jobDetailsService = jobDetailsService; + } + + @Override + public String getName() { + return GET_JOB_INDEX_ACTION; + } + + @Override + public List routes() { + return unmodifiableList( + asList(new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_index"))) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + XContentParser parser = restRequest.contentParser(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + GetJobIndexRequest getJobIndexRequest = GetJobIndexRequest.parse(parser); + + final JobDetails[] jobDetailsResponseHolder = new JobDetails[1]; + + String jobIndex = getJobIndexRequest.getJobIndex(); + String jobParameterAction = getJobIndexRequest.getJobParameterAction(); + String jobRunnerAction = getJobIndexRequest.getJobRunnerAction(); + String extensionId = getJobIndexRequest.getExtensionId(); + + CompletableFuture inProgressFuture = new CompletableFuture<>(); + + jobDetailsService.processJobDetailsForExtensionId( + jobIndex, + null, + jobParameterAction, + jobRunnerAction, + extensionId, + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + new ActionListener<>() { + @Override + public void onResponse(JobDetails jobDetails) { + jobDetailsResponseHolder[0] = jobDetails; + inProgressFuture.complete(jobDetailsResponseHolder); + } + + @Override + public void onFailure(Exception e) { + logger.info("could not process job index", e); + inProgressFuture.completeExceptionally(e); + } + } + ); + + try { + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.info(" Request timed out with an exception ", e); + } else { + throw e; + } + } catch (Exception e) { + logger.info(" Could not process job index due to exception ", e); + } + + return channel -> { + XContentBuilder builder = channel.newBuilder(); + RestStatus restStatus = RestStatus.OK; + String restResponseString = jobDetailsResponseHolder[0] != null ? "success" : "failed"; + BytesRestResponse bytesRestResponse; + try { + builder.startObject(); + builder.field("response", restResponseString); + if (restResponseString.equals("success")) { + builder.field("jobDetails", jobDetailsResponseHolder[0]); + } else { + restStatus = RestStatus.INTERNAL_SERVER_ERROR; + } + builder.endObject(); + bytesRestResponse = new BytesRestResponse(restStatus, builder); + } finally { + builder.close(); + } + + channel.sendResponse(bytesRestResponse); + }; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobTypeAction.java b/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobTypeAction.java new file mode 100644 index 00000000..1e676ef4 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/RestGetJobTypeAction.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.model.JobDetails; +import org.opensearch.jobscheduler.transport.GetJobTypeRequest; + +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.BytesRestResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * This class consists of the REST handler to GET job type from extensions. + */ +public class RestGetJobTypeAction extends BaseRestHandler { + + public static final String GET_JOB_TYPE_ACTION = "get_job_type_action"; + + private final Logger logger = LogManager.getLogger(RestGetJobTypeAction.class); + + public JobDetailsService jobDetailsService; + + @Override + public String getName() { + return GET_JOB_TYPE_ACTION; + } + + public RestGetJobTypeAction(final JobDetailsService jobDetailsService) { + this.jobDetailsService = jobDetailsService; + } + + @Override + public List routes() { + return unmodifiableList( + asList(new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_type"))) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + XContentParser parser = restRequest.contentParser(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + GetJobTypeRequest getJobTypeRequest = GetJobTypeRequest.parse(parser); + + final JobDetails[] jobDetailsResponseHolder = new JobDetails[1]; + + String jobType = getJobTypeRequest.getJobType(); + String extensionId = getJobTypeRequest.getExtensionId(); + + CompletableFuture inProgressFuture = new CompletableFuture<>(); + + jobDetailsService.processJobDetailsForExtensionId( + null, + jobType, + null, + null, + extensionId, + JobDetailsService.JobDetailsRequestType.JOB_TYPE, + new ActionListener<>() { + @Override + public void onResponse(JobDetails jobDetails) { + jobDetailsResponseHolder[0] = jobDetails; + inProgressFuture.complete(jobDetailsResponseHolder); + } + + @Override + public void onFailure(Exception e) { + logger.info("could not process job type", e); + inProgressFuture.complete(jobDetailsResponseHolder); + } + } + ); + + try { + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.info(" Request timed out with an exception ", e); + } else { + throw e; + } + } catch (Exception e) { + logger.info(" Could not process job type due to exception ", e); + } + + return channel -> { + XContentBuilder builder = channel.newBuilder(); + RestStatus restStatus = RestStatus.OK; + String restResponseString = jobDetailsResponseHolder[0] != null ? "success" : "failed"; + BytesRestResponse bytesRestResponse; + try { + builder.startObject(); + builder.field("response", restResponseString); + if (restResponseString.equals("success")) { + builder.field("jobDetails", jobDetailsResponseHolder[0]); + } else { + restStatus = RestStatus.INTERNAL_SERVER_ERROR; + } + builder.endObject(); + bytesRestResponse = new BytesRestResponse(restStatus, builder); + } finally { + builder.close(); + } + + channel.sendResponse(bytesRestResponse); + }; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/GetJobIndexRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/GetJobIndexRequest.java new file mode 100644 index 00000000..a7d0ce54 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/GetJobIndexRequest.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport; + +import java.util.Objects; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentParserUtils; + +import java.io.IOException; + +/** + * Get Job Index Request Model class + */ +public class GetJobIndexRequest extends ActionRequest { + + private static String jobIndex; + + private static String jobParameterAction; + + private static String jobRunnerAction; + + private static String extensionId; + + public static final String JOB_INDEX = "job_index"; + + public static final String EXTENSION_ID = "extension_id"; + private static final String JOB_PARAMETER_ACTION = "job_parameter_action"; + public static final String JOB_RUNNER_ACTION = "job_runner_action"; + + public GetJobIndexRequest(StreamInput in) throws IOException { + super(in); + jobIndex = in.readString(); + jobParameterAction = in.readString(); + jobRunnerAction = in.readString(); + extensionId = in.readString(); + + } + + public GetJobIndexRequest(String jobIndex, String jobParameterAction, String jobRunnerAction, String extensionId) { + super(); + this.jobIndex = Objects.requireNonNull(jobIndex); + this.jobParameterAction = Objects.requireNonNull(jobParameterAction); + this.jobRunnerAction = Objects.requireNonNull(jobRunnerAction); + this.extensionId = Objects.requireNonNull(extensionId); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobIndex); + out.writeString(jobParameterAction); + out.writeString(jobRunnerAction); + out.writeString(extensionId); + } + + public String getJobIndex() { + return jobIndex; + } + + public void setJobIndex(String jobIndex) { + this.jobIndex = jobIndex; + } + + public static String getJobParameterAction() { + return jobParameterAction; + } + + public static void setJobParameterAction(String jobParameterAction) { + GetJobIndexRequest.jobParameterAction = jobParameterAction; + } + + public String getJobRunnerAction() { + return jobRunnerAction; + } + + public void setJobRunnerAction(String jobRunnerAction) { + this.jobRunnerAction = jobRunnerAction; + } + + public String getExtensionId() { + return extensionId; + } + + public void setExtensionId(String extensionId) { + this.extensionId = extensionId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public static GetJobIndexRequest parse(XContentParser parser) throws IOException { + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case JOB_INDEX: + jobIndex = parser.text(); + break; + case JOB_PARAMETER_ACTION: + jobParameterAction = parser.text(); + break; + case JOB_RUNNER_ACTION: + jobRunnerAction = parser.text(); + break; + case EXTENSION_ID: + extensionId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + + } + return new GetJobIndexRequest(jobIndex, jobParameterAction, jobRunnerAction, extensionId); + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/GetJobTypeRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/GetJobTypeRequest.java new file mode 100644 index 00000000..01a52298 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/GetJobTypeRequest.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport; + +import java.util.Objects; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; + +/** + * Get Job Type Request Model class + */ +public class GetJobTypeRequest extends ActionRequest { + + private static String jobType; + + private static String extensionId; + + public static final String JOB_TYPE = "job_type"; + + public static final String EXTENSION_ID = "extension_id"; + + public GetJobTypeRequest(String jobType, String extensionId) { + super(); + this.jobType = Objects.requireNonNull(jobType); + this.extensionId = Objects.requireNonNull(extensionId); + } + + public GetJobTypeRequest(StreamInput in) throws IOException { + super(in); + jobType = in.readString(); + extensionId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobType); + out.writeString(extensionId); + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getExtensionId() { + return extensionId; + } + + public void setExtensionId(String extensionId) { + this.extensionId = extensionId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public static GetJobTypeRequest parse(XContentParser parser) throws IOException { + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case JOB_TYPE: + jobType = parser.text(); + break; + case EXTENSION_ID: + extensionId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + + } + return new GetJobTypeRequest(jobType, extensionId); + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java new file mode 100644 index 00000000..29dc41b4 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java @@ -0,0 +1,309 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.jobscheduler.model.JobDetails; + +import java.nio.charset.StandardCharsets; + +public class JobDetailsService { + + private static final Logger logger = LogManager.getLogger(JobDetailsService.class); + private static final String JOB_DETAILS_INDEX_NAME = ".opensearch-plugins-job-details"; + private static final String PLUGINS_JOB_DETAILS_MAPPING_FILE = "/mappings/opensearch_plugins_job_details.json"; + + public static Long TIME_OUT_FOR_REQUEST = 10L; + private final Client client; + private final ClusterService clusterService; + + public JobDetailsService(final Client client, final ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + public boolean jobDetailsIndexExist() { + return clusterService.state().routingTable().hasIndex(JOB_DETAILS_INDEX_NAME); + } + + /** + * + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details index if it was created + * or else null. + */ + @VisibleForTesting + void createJobDetailsIndex(ActionListener listener) { + if (jobDetailsIndexExist()) { + listener.onResponse(true); + } else { + CreateIndexRequest request = new CreateIndexRequest(JOB_DETAILS_INDEX_NAME).mapping(jobDetailsMapping()); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } + + /** + * Attempts to process job details with a specific extension Id. If the job details does not exist it attempts to create the job details document. + * If the job details document exists, it will try to update the job details. + * + * @param jobIndexName a non-null job index name. + * @param jobTypeName a non-null job type name. + * @param jobParameterActionName a non-null job parameter action name. + * @param jobRunnerActionName a non-null job runner action name. + * @param extensionId the unique Id for the job details. + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was processed + * or else null. + */ + public void processJobDetailsForExtensionId( + final String jobIndexName, + final String jobTypeName, + final String jobParameterActionName, + final String jobRunnerActionName, + final String extensionId, + final JobDetailsRequestType requestType, + ActionListener listener + ) { + boolean isJobIndexRequest; + if (requestType.JOB_INDEX == requestType) { + isJobIndexRequest = true; + if (jobIndexName == null + || jobIndexName.isEmpty() + || jobParameterActionName == null + || jobParameterActionName.isEmpty() + || jobRunnerActionName == null + || jobRunnerActionName.isEmpty()) { + listener.onFailure( + new IllegalArgumentException("JobIndexName, JobParameterActionName, JobRunnerActionName must not be null or empty") + ); + } + } else { + isJobIndexRequest = false; + if (jobTypeName == null || jobTypeName.isEmpty()) { + listener.onFailure(new IllegalArgumentException("Job Type Name must not be null or empty")); + } + } + if (extensionId == null || extensionId.isEmpty()) { + listener.onFailure(new IllegalArgumentException("Extension Id must not be null or empty")); + } else { + createJobDetailsIndex(ActionListener.wrap(created -> { + if (created) { + try { + findJobDetailsForExtensionId(extensionId, ActionListener.wrap(existingJobDetails -> { + if (existingJobDetails != null) { + logger.debug("Updating job details for extension id: " + extensionId + existingJobDetails); + JobDetails updateJobDetails = new JobDetails(existingJobDetails); + if (isJobIndexRequest) { + updateJobDetails.setJobIndex(jobIndexName); + updateJobDetails.setJobParameterAction(jobParameterActionName); + updateJobDetails.setJobRunnerAction(jobRunnerActionName); + } else { + updateJobDetails.setJobType(jobTypeName); + } + updateJobDetailsForExtensionId(updateJobDetails, extensionId, listener); + + } else { + JobDetails tempJobDetails = new JobDetails(); + if (isJobIndexRequest) { + tempJobDetails.setJobIndex(jobIndexName); + tempJobDetails.setJobParameterAction(jobParameterActionName); + tempJobDetails.setJobRunnerAction(jobRunnerActionName); + } else { + tempJobDetails.setJobType(jobTypeName); + } + logger.debug( + "Job Details for extension Id " + + extensionId + + " does not exist. Creating new Job Details" + + tempJobDetails + ); + createJobDetailsForExtensionId(tempJobDetails, extensionId, listener); + } + }, listener::onFailure)); + } catch (VersionConflictEngineException e) { + logger.debug("could not process job index for extensionId " + extensionId, e.getMessage()); + listener.onResponse(null); + } + } else { + listener.onResponse(null); + } + }, listener::onFailure)); + } + } + + /** + * Create Job details entry for extension id + * @param tempJobDetails new job details object that need to be inserted as document in the index + * @param extensionId unique id to create the entry for job details + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was created + * or else null. + */ + private void createJobDetailsForExtensionId(final JobDetails tempJobDetails, String extensionId, ActionListener listener) { + try { + final IndexRequest request = new IndexRequest(JOB_DETAILS_INDEX_NAME).id(extensionId) + .source(tempJobDetails.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .create(true); + client.index(request, ActionListener.wrap(response -> listener.onResponse(new JobDetails(tempJobDetails)), exception -> { + if (exception instanceof VersionConflictEngineException) { + logger.debug("Job Details for extension id " + extensionId + " is already created. {}", exception.getMessage()); + } + if (exception instanceof IOException) { + logger.error("IOException occurred creating job details", exception); + } + listener.onResponse(null); + })); + } catch (IOException e) { + logger.error("IOException occurred creating job details for extension id " + extensionId, e); + listener.onResponse(null); + } + } + + /** + * Find extension corresponding to an extension id + * @param extensionId unique id to find the job details document in the index + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was found + * or else null. + */ + private void findJobDetailsForExtensionId(final String extensionId, ActionListener listener) { + GetRequest getRequest = new GetRequest(JOB_DETAILS_INDEX_NAME).id(extensionId); + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { + listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(JobDetails.parse(parser)); + } catch (IOException e) { + logger.error("IOException occurred finding JobDetails for extension id " + extensionId, e); + listener.onResponse(null); + } + } + }, exception -> { + logger.error("Exception occurred finding job details for extension id " + extensionId, exception); + listener.onFailure(exception); + })); + } + + /** + * Delete job details to a corresponding extension id + * @param extensionId unique id to find and delete the job details document in the index + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was deleted + * or else null. + */ + public void deleteJobDetailsForExtension(final String extensionId, ActionListener listener) { + DeleteRequest deleteRequest = new DeleteRequest(JOB_DETAILS_INDEX_NAME).id(extensionId); + client.delete(deleteRequest, ActionListener.wrap(response -> { + listener.onResponse( + response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + }, exception -> { + if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete job details for extension id. {} " + extensionId, exception.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + + /** + * Update Job details to a corresponding extension Id + * @param updateJobDetails update job details object entry + * @param extensionId unique id to find and update the corresponding document mapped to it + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was updated + * or else null. + */ + private void updateJobDetailsForExtensionId( + final JobDetails updateJobDetails, + final String extensionId, + ActionListener listener + ) { + try { + UpdateRequest updateRequest = new UpdateRequest().index(JOB_DETAILS_INDEX_NAME) + .id(extensionId) + .doc(updateJobDetails.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true); + + client.update( + updateRequest, + ActionListener.wrap(response -> listener.onResponse(new JobDetails(updateJobDetails)), exception -> { + if (exception instanceof VersionConflictEngineException) { + logger.debug("could not update job details for extensionId " + extensionId, exception.getMessage()); + } + if (exception instanceof DocumentMissingException) { + logger.debug("Document is deleted. This happens if the job details is already removed {}", exception.getMessage()); + } + if (exception instanceof IOException) { + logger.error("IOException occurred in updating job details.", exception); + } + listener.onResponse(null); + }) + ); + } catch (IOException e) { + logger.error("IOException occurred updating job details for extension id " + extensionId, e); + listener.onResponse(null); + } + } + + private String jobDetailsMapping() { + try { + InputStream in = JobDetailsService.class.getResourceAsStream(PLUGINS_JOB_DETAILS_MAPPING_FILE); + StringBuilder stringBuilder = new StringBuilder(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + for (String line; (line = bufferedReader.readLine()) != null;) { + stringBuilder.append(line); + } + return stringBuilder.toString(); + } catch (IOException e) { + throw new IllegalArgumentException("JobDetails Mapping cannot be read correctly."); + } + } + + public enum JobDetailsRequestType { + JOB_INDEX, + JOB_TYPE + } +} diff --git a/src/main/resources/mappings/opensearch_plugins_job_details.json b/src/main/resources/mappings/opensearch_plugins_job_details.json new file mode 100644 index 00000000..e979b29d --- /dev/null +++ b/src/main/resources/mappings/opensearch_plugins_job_details.json @@ -0,0 +1,20 @@ +{ + "dynamic": "false", + "properties": { + "job_index_name": { + "type": "keyword" + }, + "job_type_name": { + "type": "keyword" + }, + "job_parser_action": { + "type": "keyword" + }, + "job_runner_action": { + "type": "keyword" + }, + "extension_id": { + "type": "keyword" + } + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java b/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java new file mode 100644 index 00000000..304cc28b --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java @@ -0,0 +1,182 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.net.ssl.SSLEngine; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.function.Factory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.util.Timeout; +import org.junit.After; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.rest.OpenSearchRestTestCase; + +public abstract class ODFERestTestCase extends OpenSearchRestTestCase { + + private static String localhostName = "localhost"; + private static int port = 9200; + + protected boolean isHttps() { + boolean isHttps = Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false); + if (isHttps) { + // currently only external cluster is supported for security enabled testing + if (!Optional.ofNullable(System.getProperty("tests.rest.cluster")).isPresent()) { + throw new RuntimeException("cluster url should be provided for security enabled testing"); + } + } + + return isHttps; + } + + @Override + protected String getProtocol() { + return isHttps() ? "https" : "http"; + } + + @Override + protected Settings restAdminSettings() { + return Settings.builder().put("strictDeprecationMode", false).put("http.port", 9200).build(); + } + + @Override + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + boolean strictDeprecationMode = settings.getAsBoolean("strictDeprecationMode", true); + RestClientBuilder builder = RestClient.builder(hosts); + if (isHttps()) { + configureHttpsClient(builder, settings); + builder.setStrictDeprecationMode(strictDeprecationMode); + return builder.build(); + } else { + configureClient(builder, settings); + builder.setStrictDeprecationMode(strictDeprecationMode); + return builder.build(); + } + + } + + @SuppressWarnings("unchecked") + @After + protected void wipeAllODFEIndices() throws IOException { + Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType()); + try ( + XContentParser parser = xContentType.xContent() + .createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent() + ) + ) { + XContentParser.Token token = parser.nextToken(); + List> parserList = null; + if (token == XContentParser.Token.START_ARRAY) { + parserList = parser.listOrderedMap().stream().map(obj -> (Map) obj).collect(Collectors.toList()); + } else { + parserList = Collections.singletonList(parser.mapOrdered()); + } + + for (Map index : parserList) { + String indexName = (String) index.get("index"); + if (indexName != null && !".opendistro_security".equals(indexName)) { + adminClient().performRequest(new Request("DELETE", "/" + indexName)); + } + } + } + } + + protected static void configureHttpsClient(RestClientBuilder builder, Settings settings) throws IOException { + Map headers = ThreadContext.buildDefaultHeaders(settings); + Header[] defaultHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry entry : headers.entrySet()) { + defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); + } + builder.setDefaultHeaders(defaultHeaders); + builder.setHttpClientConfigCallback(httpClientBuilder -> { + String userName = Optional.ofNullable(System.getProperty("user")) + .orElseThrow(() -> new RuntimeException("user name is missing")); + String password = Optional.ofNullable(System.getProperty("password")) + .orElseThrow(() -> new RuntimeException("password is missing")); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(new HttpHost(localhostName, port)), + new UsernamePasswordCredentials(userName, password.toCharArray()) + ); + try { + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build()) + // disable the certificate since our testing cluster just uses the default security configuration + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + // See please https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(tlsStrategy) + .build(); + + return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); + final TimeValue socketTimeout = TimeValue.parseTimeValue( + socketTimeoutString == null ? "60s" : socketTimeoutString, + CLIENT_SOCKET_TIMEOUT + ); + builder.setRequestConfigCallback( + conf -> conf.setResponseTimeout(Timeout.ofMilliseconds(Math.toIntExact(socketTimeout.getMillis()))) + ); + if (settings.hasValue(CLIENT_PATH_PREFIX)) { + builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); + } + } + + /** + * wipeAllIndices won't work since it cannot delete security index. Use wipeAllODFEIndices instead. + */ + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/TestHelpers.java b/src/test/java/org/opensearch/jobscheduler/TestHelpers.java new file mode 100644 index 00000000..217d7285 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/TestHelpers.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.WarningsHandler; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; + +public class TestHelpers { + + public static final String GET_JOB_INDEX_BASE_DETECTORS_URI = "/_plugins/_job_scheduler/_get/_job_index"; + public static final String GET_JOB_TYPE_BASE_DETECTORS_URI = "/_plugins/_job_scheduler/_get/_job_type"; + + public static String xContentBuilderToString(XContentBuilder builder) { + return BytesReference.bytes(builder).utf8ToString(); + } + + public static String toJsonString(ToXContentObject object) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + + public static HttpEntity toHttpEntity(ToXContentObject object) throws IOException { + return new StringEntity(toJsonString(object), ContentType.APPLICATION_JSON); + } + + public static Response makeRequest( + RestClient client, + String method, + String endpoint, + Map params, + HttpEntity entity, + List
headers + ) throws IOException { + return makeRequest(client, method, endpoint, params, entity, headers, false); + } + + public static HttpEntity toHttpEntity(String jsonString) throws IOException { + return new StringEntity(jsonString, ContentType.APPLICATION_JSON); + } + + public static Response makeRequest( + RestClient client, + String method, + String endpoint, + Map params, + HttpEntity entity, + List
headers, + boolean strictDeprecationMode + ) throws IOException { + Request request = new Request(method, endpoint); + + RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); + if (headers != null) { + headers.forEach(header -> options.addHeader(header.getName(), header.getValue())); + } + options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); + request.setOptions(options.build()); + + if (params != null) { + params.entrySet().forEach(it -> request.addParameter(it.getKey(), it.getValue())); + } + if (entity != null) { + request.setEntity(entity); + } + return client.performRequest(request); + } + +} diff --git a/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java b/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java new file mode 100644 index 00000000..66619ba1 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.multinode; + +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.opensearch.client.Response; +import org.opensearch.jobscheduler.ODFERestTestCase; +import org.opensearch.jobscheduler.TestHelpers; +import org.opensearch.test.OpenSearchIntegTestCase; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class GetJobDetailsMultiNodeRestIT extends ODFERestTestCase { + + /** + * The below test performs a get index api on a multinode cluster. Internally, the cluster redirects the request to either of the node. + * After getting successful response, the get job type api is triggered for 100 times. From the response of get job type, job index is retrieved and is being compared with get index api response. + * Both response should be equal. + * @throws Exception + */ + public void testGetJobDetailsRestAPI() throws Exception { + + Response response = TestHelpers.makeRequest( + client(), + "PUT", + TestHelpers.GET_JOB_INDEX_BASE_DETECTORS_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity( + "{\"job_index\":\"demo_job_index\",\"job_parameter_action\":\"demo_parameter\",\"job_runner_action\":\"demo_runner\",\"extension_id\":\"sample_extension\"}" + ), + null + ); + + String expectedJobIndex = validateResponseAndGetJobIndex(entityAsMap(response)); + + for (int i = 0; i < 100; i++) { + Response response1 = TestHelpers.makeRequest( + client(), + "PUT", + TestHelpers.GET_JOB_TYPE_BASE_DETECTORS_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity("{\"job_type\":\"demo_job_type\",\"extension_id\":\"sample_extension\"}"), + null + ); + + String jobIndex = validateResponseAndGetJobIndex(entityAsMap(response1)); + assertEquals(expectedJobIndex, jobIndex); + } + } + + private String validateResponseAndGetJobIndex(Map responseMap) { + assertEquals("success", responseMap.get("response")); + HashMap jobDetails = (HashMap) responseMap.get("jobDetails"); + return jobDetails.get("job_index"); + } + +} diff --git a/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobIndexActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobIndexActionTests.java new file mode 100644 index 00000000..4be6e71f --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobIndexActionTests.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class RestGetJobIndexActionTests extends OpenSearchTestCase { + + private RestGetJobIndexAction action; + + private JobDetailsService jobDetailsService; + + private String getJobIndexPath; + + @Before + public void setUp() throws Exception { + super.setUp(); + jobDetailsService = Mockito.mock(JobDetailsService.class); + action = new RestGetJobIndexAction(jobDetailsService); + getJobIndexPath = String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_index"); + } + + public void testGetNames() { + String name = action.getName(); + assertEquals(action.GET_JOB_INDEX_ACTION, name); + } + + public void testGetRoutes() { + List routes = action.routes(); + + assertEquals(getJobIndexPath, routes.get(0).getPath()); + } + + public void testPrepareRequest() throws IOException { + String content = + "{\"job_index\":\"sample-index-name\",\"job_runner_action\":\"sample-job-runner-action\",\"job_parameter_action\":\"sample-job-parameter-action\",\"extension_id\":\"sample-extension\"}"; + Map params = new HashMap<>(); + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(getJobIndexPath) + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + Mockito.doNothing() + .when(jobDetailsService) + .processJobDetailsForExtensionId( + "sample-index-name", + null, + "sample-job-parameter-action", + "sample-runner-name", + "sample-extension", + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + ActionListener.wrap(response -> {}, exception -> {}) + ); + + action.prepareRequest(request, Mockito.mock(NodeClient.class)); + + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobTypeActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobTypeActionTests.java new file mode 100644 index 00000000..d6bc6974 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/rest/RestGetJobTypeActionTests.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class RestGetJobTypeActionTests extends OpenSearchTestCase { + + private RestGetJobTypeAction action; + + private String getJobTypePath; + + private JobDetailsService jobDetailsService; + + @Before + public void setUp() throws Exception { + super.setUp(); + jobDetailsService = Mockito.mock(JobDetailsService.class); + action = new RestGetJobTypeAction(jobDetailsService); + getJobTypePath = String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_get/_job_type"); + } + + public void testGetNames() { + String name = action.getName(); + assertEquals(action.GET_JOB_TYPE_ACTION, name); + } + + public void testGetRoutes() { + List routes = action.routes(); + + assertEquals(getJobTypePath, routes.get(0).getPath()); + } + + public void testPrepareRequest() throws IOException { + + String content = "{\"job_type\":\"sample-job-type\",\"extension_id\":\"sample-extension\"}"; + Map params = new HashMap<>(); + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(getJobTypePath) + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + Mockito.doNothing() + .when(jobDetailsService) + .processJobDetailsForExtensionId( + null, + "sample-job-type", + null, + null, + "sample-extension", + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + ActionListener.wrap(response -> {}, exception -> {}) + ); + action.prepareRequest(request, Mockito.mock(NodeClient.class)); + + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java new file mode 100644 index 00000000..0c110f27 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.test.OpenSearchIntegTestCase; + +public class JobDetailsServiceIT extends OpenSearchIntegTestCase { + + private ClusterService clusterService; + + @Before + public void setup() { + this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(this.clusterService.state().routingTable().hasIndex("opensearch-plugins-job-details")) + .thenReturn(false) + .thenReturn(true); + } + + public void testSanity() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture inProgressFuture = new CompletableFuture<>(); + JobDetailsService jobDetailsService = new JobDetailsService(client(), this.clusterService); + + jobDetailsService.processJobDetailsForExtensionId( + "sample-job-index", + null, + "sample-job-parameter", + "sample-job-runner", + "sample-extension", + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + ActionListener.wrap(jobDetails -> { + assertNotNull("Expected to successfully get job details.", jobDetails); + assertEquals("sample-job-index", jobDetails.getJobIndex()); + assertEquals("sample-job-parameter", jobDetails.getJobParameterAction()); + assertEquals("sample-job-runner", jobDetails.getJobRunnerAction()); + assertNull(jobDetails.getJobType()); + jobDetailsService.createJobDetailsIndex(ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + + jobDetailsService.deleteJobDetailsForExtension("sample-extension", ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + }, exception -> { fail(exception.getMessage()); }) + ); + inProgressFuture.get(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } + + public void testSecondProcessofJobIndexPass() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture inProgressFuture = new CompletableFuture<>(); + JobDetailsService jobDetailsService = new JobDetailsService(client(), this.clusterService); + + jobDetailsService.processJobDetailsForExtensionId( + "sample-job-index", + null, + "sample-job-parameter", + "sample-job-runner", + "sample-extension", + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + ActionListener.wrap(jobDetails -> { + jobDetailsService.processJobDetailsForExtensionId( + "sample-job-index1", + null, + "sample-job-parameter", + "sample-job-runner", + "sample-extension", + JobDetailsService.JobDetailsRequestType.JOB_INDEX, + ActionListener.wrap(jobDetails1 -> { + assertNotNull("Expected to failed to get get job details for 2nd request.", jobDetails1); + assertNotNull("Expected to successfully get job details.", jobDetails); + assertEquals("sample-job-index", jobDetails.getJobIndex()); + assertEquals("sample-job-parameter", jobDetails.getJobParameterAction()); + assertEquals("sample-job-runner", jobDetails.getJobRunnerAction()); + assertEquals("sample-job-index1", jobDetails1.getJobIndex()); + assertNull(jobDetails.getJobType()); + jobDetailsService.createJobDetailsIndex(ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + + jobDetailsService.deleteJobDetailsForExtension("sample-extension", ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + }, exception -> { fail(exception.getMessage()); }) + ); + }, + exception -> { fail(exception.getMessage()); } + + ) + ); + + inProgressFuture.get(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } + + public void testDeleteJobDetailsWithOutExtensionIdCreation() throws ExecutionException, InterruptedException, TimeoutException { + JobDetailsService jobDetailsService = new JobDetailsService(client(), this.clusterService); + jobDetailsService.deleteJobDetailsForExtension( + "demo-extension", + ActionListener.wrap( + deleted -> { assertTrue("Failed to delete JobDetails.", deleted); }, + exception -> { fail(exception.getMessage()); } + ) + ); + } + + public void testDeleteNonExistingJobDetails() throws ExecutionException, InterruptedException, TimeoutException { + JobDetailsService jobDetailsService = new JobDetailsService(client(), this.clusterService); + jobDetailsService.createJobDetailsIndex(ActionListener.wrap(created -> { + if (created) { + jobDetailsService.deleteJobDetailsForExtension( + "demo-extension", + ActionListener.wrap( + deleted -> { assertTrue("Failed to delete job details for extension.", deleted); }, + exception -> fail(exception.getMessage()) + ) + ); + } else { + fail("Failed to job details for extension"); + } + + }, exception -> fail(exception.getMessage()))); + } + +}