Skip to content

Commit

Permalink
Communication mechanism for js (#289)
Browse files Browse the repository at this point in the history
* Job Details from Extension for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Commnunication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Communication Mechanism for JS

Signed-off-by: Varun Jain <varunudr@amazon.com>

Signed-off-by: Varun Jain <varunudr@amazon.com>
  • Loading branch information
vibrantvarun authored Jan 6, 2023
1 parent b67c000 commit 2c9084b
Show file tree
Hide file tree
Showing 15 changed files with 1,709 additions and 8 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ repositories {

dependencies {
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
javaRestTestImplementation project.sourceSets.main.runtimeClasspath
}

Expand Down Expand Up @@ -238,7 +240,8 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty "security", System.getProperty("security")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

systemProperty 'tests.rest.cluster', 'localhost:9200'
systemProperty 'tests.clustername', 'opensearch-job-scheduler-cluster'
if (System.getProperty("tests.rest.cluster") != null) {
filter {
includeTestsMatching "org.opensearch.jobscheduler.*RestIT"
Expand Down
46 changes: 39 additions & 7 deletions src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
*/
package org.opensearch.jobscheduler;

import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;

import org.opensearch.jobscheduler.rest.RestGetJobIndexAction;
import org.opensearch.jobscheduler.rest.RestGetJobTypeAction;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
Expand All @@ -23,35 +32,39 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.function.Supplier;

public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin {
import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";

private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class);

Expand All @@ -61,6 +74,8 @@ public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin {
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;

private JobDetailsService jobDetailsService;

public JobSchedulerPlugin() {
this.indicesToListen = new HashSet<>();
this.indexToJobProviders = new HashMap<>();
Expand All @@ -81,6 +96,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.lockService = new LockService(client, clusterService);
this.jobDetailsService = new JobDetailsService(client, clusterService);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.sweeper = initSweeper(
environment.settings(),
Expand Down Expand Up @@ -185,4 +201,20 @@ private JobSweeper initSweeper(
) {
return new JobSweeper(settings, client, clusterService, threadPool, registry, this.indexToJobProviders, scheduler, lockService);
}

@Override
public List getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
RestGetJobIndexAction restGetJobIndexAction = new RestGetJobIndexAction(jobDetailsService);
RestGetJobTypeAction restGetJobTypeAction = new RestGetJobTypeAction(jobDetailsService);
return ImmutableList.of(restGetJobIndexAction, restGetJobTypeAction);
}

}
185 changes: 185 additions & 0 deletions src/main/java/org/opensearch/jobscheduler/model/JobDetails.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.model;

import org.opensearch.common.Nullable;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This model class stores the job details of the extension.
*/
public class JobDetails implements ToXContentObject {

/**
* jobIndex from the extension.
*/
private String jobIndex;

/**
* jobType from the extension.
*/
private String jobType;

/**
* jobParser action to trigger the response back to the extension.
*/
private String jobParameterAction;

/**
* jobRunner action to trigger the response back to the extension.
*/
private String jobRunnerAction;

public static final String JOB_INDEX = "job_index";
public static final String JOB_TYPE = "job_type";
public static final String JOB_PARAMETER_ACTION = "job_parser_action";
public static final String JOB_RUNNER_ACTION = "job_runner_action";

public JobDetails() {}

public JobDetails(String jobIndex, String jobType, String jobParameterAction, String jobRunnerAction) {
this.jobIndex = jobIndex;
this.jobType = jobType;
this.jobParameterAction = jobParameterAction;
this.jobRunnerAction = jobRunnerAction;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
if (jobIndex != null) {
xContentBuilder.field(JOB_INDEX, jobIndex);
}
if (jobType != null) {
xContentBuilder.field(JOB_TYPE, jobType);
}
if (jobParameterAction != null) {
xContentBuilder.field(JOB_PARAMETER_ACTION, jobParameterAction);
}
if (jobRunnerAction != null) {
xContentBuilder.field(JOB_RUNNER_ACTION, jobRunnerAction);
}
return xContentBuilder.endObject();
}

public static JobDetails parse(XContentParser parser) throws IOException {
String jobIndex = null;
String jobType = null;
String jobParameterAction = null;
String jobRunnerAction = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case JOB_INDEX:
jobIndex = parser.text();
break;
case JOB_TYPE:
jobType = parser.text();
break;
case JOB_PARAMETER_ACTION:
jobParameterAction = parser.text();
break;
case JOB_RUNNER_ACTION:
jobRunnerAction = parser.text();
break;
default:
parser.skipChildren();
break;
}
}

return new JobDetails(jobIndex, jobType, jobParameterAction, jobRunnerAction);
}

public JobDetails(final JobDetails copyJobDetails) {
this(copyJobDetails.jobIndex, copyJobDetails.jobType, copyJobDetails.jobParameterAction, copyJobDetails.jobRunnerAction);
}

@Nullable
public String getJobIndex() {
return jobIndex;
}

public void setJobIndex(String jobIndex) {
this.jobIndex = jobIndex;
}

@Nullable
public String getJobType() {
return jobType;
}

public void setJobType(String jobType) {
this.jobType = jobType;
}

@Nullable
public String getJobParameterAction() {
return jobParameterAction;
}

public void setJobParameterAction(String jobParameterAction) {
this.jobParameterAction = jobParameterAction;
}

@Nullable
public String getJobRunnerAction() {
return jobRunnerAction;
}

public void setJobRunnerAction(String jobRunnerAction) {
this.jobRunnerAction = jobRunnerAction;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobDetails that = (JobDetails) o;
return Objects.equals(jobIndex, that.jobIndex)
&& Objects.equals(jobType, that.jobType)
&& Objects.equals(jobParameterAction, that.jobParameterAction)
&& Objects.equals(jobRunnerAction, that.jobRunnerAction);
}

@Override
public int hashCode() {
return Objects.hash(jobIndex, jobType, jobParameterAction, jobRunnerAction);
}

@Override
public String toString() {
return "JobDetails{"
+ "jobIndex='"
+ jobIndex
+ '\''
+ ", jobType='"
+ jobType
+ '\''
+ ", jobParameterAction='"
+ jobParameterAction
+ '\''
+ ", jobRunnerAction='"
+ jobRunnerAction
+ '\''
+ '}';
}
}
Loading

0 comments on commit 2c9084b

Please sign in to comment.