From a7ceefe93fd36574eb2c661c2e7079a0ceb5e0b0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 3 Jun 2018 21:51:08 +0200 Subject: [PATCH] Make Persistent Tasks implementations version and feature aware (#31045) With #31020 we introduced the ability for transport clients to indicate what features they support in order to make sure we don't serialize object to them they don't support. This PR adapts the serialization logic of persistent tasks to be aware of those features and not serialize tasks that aren't supported. Also, a version check is added for the future where we may add new tasks implementations and need to be able to indicate they shouldn't be serialized both to nodes and clients. As the implementation relies on the interface of `PersistentTaskParams`, these are no longer optional. That's acceptable as all current implementation have them and we plan to make `PersistentTaskParams` more central in the future. Relates to #30731 --- .../elasticsearch/cluster/ClusterState.java | 8 +-- .../elasticsearch/cluster/NamedDiffable.java | 13 +--- .../cluster/RestoreInProgress.java | 8 ++- .../cluster/SnapshotsInProgress.java | 5 ++ .../cluster/metadata/IndexGraveyard.java | 9 ++- .../cluster/metadata/MetaData.java | 4 +- .../metadata/RepositoriesMetaData.java | 6 ++ .../io/stream/VersionedNamedWriteable.java | 38 ++++++++++ .../elasticsearch/ingest/IngestMetadata.java | 6 ++ .../NodePersistentTasksExecutor.java | 2 +- .../persistent/PersistentTaskParams.java | 5 +- .../PersistentTasksClusterService.java | 5 +- .../PersistentTasksCustomMetaData.java | 20 ++++-- .../persistent/PersistentTasksExecutor.java | 6 +- .../persistent/PersistentTasksService.java | 3 +- .../persistent/StartPersistentTaskAction.java | 16 +++-- .../elasticsearch/script/ScriptMetaData.java | 5 ++ .../cluster/ClusterChangedEventTests.java | 10 +++ .../elasticsearch/cluster/ClusterStateIT.java | 14 +++- .../cluster/FeatureAwareTests.java | 12 ++-- .../cluster/SimpleClusterStateIT.java | 7 +- .../ClusterSerializationTests.java | 3 +- .../cluster/service/ClusterSerivceTests.java | 6 ++ .../discovery/zen/ZenDiscoveryIT.java | 5 ++ .../gateway/GatewayMetaStateTests.java | 10 +++ .../PersistentTasksCustomMetaDataTests.java | 70 +++++++++++++++++++ .../PersistentTasksExecutorFullRestartIT.java | 10 ++- .../StartPersistentActionRequestTests.java | 17 ++--- .../persistent/TestPersistentTasksPlugin.java | 31 +++++++- .../decider/EnableAssignmentDeciderIT.java | 9 +-- .../DedicatedClusterSnapshotRestoreIT.java | 27 ++++++- .../org/elasticsearch/test/VersionUtils.java | 18 +++-- .../license/LicensesMetaData.java | 5 ++ .../elasticsearch/xpack/core/XPackPlugin.java | 9 +++ .../xpack/core/ml/action/OpenJobAction.java | 9 ++- .../core/ml/action/StartDatafeedAction.java | 10 ++- .../xpack/core/rollup/job/RollupJob.java | 10 ++- .../xpack/core/watcher/WatcherMetaData.java | 6 ++ 38 files changed, 365 insertions(+), 92 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 6bc555eae0bd9..276e00a2ba3db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -22,7 +22,6 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -50,6 +49,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -122,7 +122,7 @@ default Optional getRequiredFeature() { * @param the type of the custom * @return true if the custom should be serialized and false otherwise */ - static boolean shouldSerializeCustom(final StreamOutput out, final T custom) { + static boolean shouldSerialize(final StreamOutput out, final T custom) { if (out.getVersion().before(custom.getMinimalSupportedVersion())) { return false; } @@ -748,13 +748,13 @@ public void writeTo(StreamOutput out) throws IOException { // filter out custom states not supported by the other node int numberOfCustoms = 0; for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java b/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java index b548b49fe1910..729523233d73d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java +++ b/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java @@ -19,17 +19,10 @@ package org.elasticsearch.cluster; -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; /** - * Diff that also support NamedWriteable interface + * Diff that also support {@link VersionedNamedWriteable} interface */ -public interface NamedDiffable extends Diffable, NamedWriteable { - /** - * The minimal version of the recipient this custom object can be sent to - */ - default Version getMinimalSupportedVersion() { - return Version.CURRENT.minimumIndexCompatibilityVersion(); - } +public interface NamedDiffable extends Diffable, VersionedNamedWriteable { } diff --git a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 5c036f94285e0..138788251c90a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -20,14 +20,15 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; import java.util.ArrayList; @@ -382,6 +383,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { return readDiffFrom(Custom.class, TYPE, in); } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 74b748e19a7ee..87563c968af17 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -395,6 +395,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { return readDiffFrom(Custom.class, TYPE, in); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java index 9167b28a67b86..74789aada3a46 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.common.ParseField; @@ -34,8 +35,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import java.io.IOException; import java.util.ArrayList; @@ -44,7 +43,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * A collection of tombstones for explicitly marking indices as deleted in the cluster state. @@ -97,6 +95,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return MetaData.API_AND_GATEWAY; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 8c73222519502..5657873987e18 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -786,13 +786,13 @@ public void writeTo(StreamOutput out) throws IOException { // filter out custom states not supported by the other node int numberOfCustoms = 0; for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java index c813ba76e82dd..7bb72be0e1e18 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData.Custom; @@ -103,6 +104,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public RepositoriesMetaData(StreamInput in) throws IOException { RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()]; for (int i = 0; i < repository.length; i++) { diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java new file mode 100644 index 0000000000000..9eea2c00d56a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; + +/** + * A {@link NamedWriteable} that has a minimum version associated with it. + */ +public interface VersionedNamedWriteable extends NamedWriteable { + + /** + * Returns the name of the writeable object + */ + String getWriteableName(); + + /** + * The minimal version of the recipient this object can be sent to + */ + Version getMinimalSupportedVersion(); +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java index ca8a5df845014..1e262adf8cf8d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; @@ -69,6 +70,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public Map getPipelines() { return pipelines; } diff --git a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java index efed0aef9b807..bf42733ff54ac 100644 --- a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java @@ -35,7 +35,7 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) { this.threadPool = threadPool; } - public void executeTask(@Nullable Params params, + public void executeTask(Params params, @Nullable Task.Status status, AllocatedPersistentTask task, PersistentTasksExecutor executor) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java index a475a7cde174a..c91727a913f3a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java @@ -19,12 +19,13 @@ package org.elasticsearch.persistent; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.xcontent.ToXContentObject; /** * Parameters used to start persistent task */ -public interface PersistentTaskParams extends NamedWriteable, ToXContentObject { +public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware { } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index cf44556ee5ddc..1464279a814d5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; @@ -65,7 +64,7 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR * @param taskParams the task's parameters * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams, + public void createPersistentTask(String taskId, String taskName, Params taskParams, ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override @@ -225,7 +224,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @return a new {@link Assignment} */ private Assignment createAssignment(final String taskName, - final @Nullable Params taskParams, + final Params taskParams, final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index bdee87cc77c51..4b1ba3e11e372 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -49,8 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -264,7 +264,6 @@ public static class PersistentTask

implements Wr private final String id; private final long allocationId; private final String taskName; - @Nullable private final P params; @Nullable private final Status status; @@ -314,7 +313,11 @@ public PersistentTask(StreamInput in) throws IOException { id = in.readString(); allocationId = in.readLong(); taskName = in.readString(); - params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + params = (P) in.readNamedWriteable(PersistentTaskParams.class); + } else { + params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); + } status = in.readOptionalNamedWriteable(Task.Status.class); assignment = new Assignment(in.readOptionalString(), in.readString()); allocationIdOnLastStatusUpdate = in.readOptionalLong(); @@ -325,7 +328,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeLong(allocationId); out.writeString(taskName); - out.writeOptionalNamedWriteable(params); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeNamedWriteable(params); + } else { + out.writeOptionalNamedWriteable(params); + } out.writeOptionalNamedWriteable(status); out.writeOptionalString(assignment.executorNode); out.writeString(assignment.explanation); @@ -500,7 +507,10 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(lastAllocationId); - out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream)); + Map> filteredTasks = tasks.values().stream() + .filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams())) + .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); + out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream)); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 0a1e2095934ef..de75b1ff54085 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -24,10 +24,10 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.util.Map; import java.util.function.Predicate; @@ -118,7 +118,7 @@ protected String getDescription(PersistentTask taskInProgress) { * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status); + protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status); public String getExecutor() { return executor; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 7a4a956fcfd5b..01c28dd5cd634 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -69,7 +68,7 @@ public PersistentTasksService(Settings settings, ClusterService clusterService, */ public void sendStartRequest(final String taskId, final String taskName, - final @Nullable Params taskParams, + final Params taskParams, final ActionListener> listener) { @SuppressWarnings("unchecked") final ActionListener> wrappedListener = diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 4e6b11205fdfc..2a1a59d9b08ae 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.persistent; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -36,9 +37,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -66,7 +67,6 @@ public static class Request extends MasterNodeRequest { private String taskId; - @Nullable private String taskName; private PersistentTaskParams params; @@ -86,7 +86,11 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); taskId = in.readString(); taskName = in.readString(); - params = in.readOptionalNamedWriteable(PersistentTaskParams.class); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + params = in.readNamedWriteable(PersistentTaskParams.class); + } else { + params = in.readOptionalNamedWriteable(PersistentTaskParams.class); + } } @Override @@ -94,7 +98,11 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(taskId); out.writeString(taskName); - out.writeOptionalNamedWriteable(params); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeNamedWriteable(params); + } else { + out.writeOptionalNamedWriteable(params); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java b/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java index 9505875ae1ebc..59d824eb313e0 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java @@ -383,6 +383,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return MetaData.ALL_CONTEXTS; diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index b7ea45dd13a3d..f79fef74e917f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -308,6 +308,11 @@ public String getWriteableName() { return "2"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); @@ -324,6 +329,11 @@ public String getWriteableName() { return "1"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java index 07a974a2ca771..2bf56e888148d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -73,7 +74,8 @@ @ESIntegTestCase.ClusterScope(scope = TEST) public class ClusterStateIT extends ESIntegTestCase { - public abstract static class Custom implements MetaData.Custom { + public abstract static + class Custom implements MetaData.Custom { private static final ParseField VALUE = new ParseField("value"); @@ -131,6 +133,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public Optional getRequiredFeature() { return Optional.of("node"); @@ -155,6 +162,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + /* * This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have * versus not requiring any feature. We use a field to make the random choice exactly once. diff --git a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java index 0f826e65248fe..b25d8ced1806d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java @@ -116,7 +116,7 @@ public void testVersion() { if (custom.getRequiredFeature().isPresent()) { out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); } - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { final BytesStreamOutput out = new BytesStreamOutput(); @@ -126,7 +126,7 @@ public void testVersion() { if (custom.getRequiredFeature().isPresent() && randomBoolean()) { out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); } - assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + assertFalse(FeatureAware.shouldSerialize(out, custom)); } } } @@ -141,7 +141,7 @@ public void testFeature() { out.setVersion(afterVersion); assertTrue(custom.getRequiredFeature().isPresent()); out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { // the feature is present and the client is a transport client @@ -149,7 +149,7 @@ public void testFeature() { out.setVersion(afterVersion); assertTrue(custom.getRequiredFeature().isPresent()); out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE))); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } } @@ -161,14 +161,14 @@ public void testMissingFeature() { // the feature is missing but we should serialize it anyway because the client is not a transport client final BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(afterVersion); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { // the feature is missing and we should not serialize it because the client is a transport client final BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(afterVersion); out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE)); - assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + assertFalse(FeatureAware.shouldSerialize(out, custom)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java index 8b246ecc2d3de..0ba3de4381891 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -37,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; @@ -304,6 +304,11 @@ public String getWriteableName() { return "test"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeInt(value); diff --git a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index d64b4a66ee78a..b2878008f7055 100644 --- a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.test.VersionUtils; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -133,7 +132,7 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { // serialize with current version BytesStreamOutput outStream = new BytesStreamOutput(); - Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT); + Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); outStream.setVersion(version); diffs.writeTo(outStream); StreamInput inStream = outStream.bytes().streamInput(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java index e7cbd04ce4b50..2cebd41a52c43 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.service; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.io.stream.StreamOutput; @@ -43,6 +44,11 @@ public String getWriteableName() { return null; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index e51177c318ca8..b7177fdf867af 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -239,6 +239,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY, MetaData.XContentContext.SNAPSHOT); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index cef3502a077c5..14f3c212c464c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -492,6 +492,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); @@ -510,6 +515,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 67962b800d2cf..267941d3cf87e 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.persistent; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; @@ -26,8 +28,11 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -43,13 +48,22 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.VersionUtils.compatibleFutureVersion; +import static org.elasticsearch.test.VersionUtils.getFirstVersion; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.equalTo; public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase { @@ -228,7 +242,63 @@ public void testBuilder() { assertEquals(changed, builder.isChanged()); persistentTasks = builder.build(); } + } + + public void testMinVersionSerialization() throws IOException { + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + + Version minVersion = getFirstVersion(); + final Version streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(Version.CURRENT)); + tasks.addTask("test_compatible_version", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, streamVersion), + randomBoolean() ? Optional.empty() : Optional.of("test")), + randomAssignment()); + tasks.addTask("test_incompatible_version", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), compatibleFutureVersion(streamVersion), Version.CURRENT), + randomBoolean() ? Optional.empty() : Optional.of("test")), + randomAssignment()); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(streamVersion); + Set features = new HashSet<>(); + if (randomBoolean()) { + features.add("test"); + } + if (randomBoolean()) { + features.add(TransportClient.TRANSPORT_CLIENT_FEATURE); + } + out.setFeatures(features); + tasks.build().writeTo(out); + + final StreamInput input = out.bytes().streamInput(); + input.setVersion(streamVersion); + PersistentTasksCustomMetaData read = + new PersistentTasksCustomMetaData(new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry())); + + assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible_version"))); + } + + public void testFeatureSerialization() throws IOException { + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + + Version minVersion = getFirstVersion(); + tasks.addTask("test_compatible", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), + randomBoolean() ? Optional.empty() : Optional.of("existing")), + randomAssignment()); + tasks.addTask("test_incompatible", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), Optional.of("non_existing")), + randomAssignment()); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(Version.CURRENT); + Set features = new HashSet<>(); + features.add("existing"); + features.add(TransportClient.TRANSPORT_CLIENT_FEATURE); + out.setFeatures(features); + tasks.build().writeTo(out); + PersistentTasksCustomMetaData read = new PersistentTasksCustomMetaData( + new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry())); + assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible"))); } private Assignment randomAssignment() { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java index b67b7678332b7..0a7168ad9b287 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java @@ -20,12 +20,12 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import java.util.ArrayList; import java.util.Collection; @@ -35,8 +35,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1) public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { @@ -65,7 +63,7 @@ public void testFullClusterRestart() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); taskIds[i] = UUIDs.base64UUID(); - service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), future); + service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { diff --git a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java index 3b0fc2a3d0495..e4c5a26de9c0c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java @@ -22,8 +22,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.persistent.StartPersistentTaskAction.Request; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.test.AbstractStreamableTestCase; import java.util.Collections; @@ -32,17 +32,12 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas @Override protected Request createTestInstance() { - TestParams testParams; + TestParams testParams = new TestParams(); + if (randomBoolean()) { + testParams.setTestParam(randomAlphaOfLengthBetween(1, 20)); + } if (randomBoolean()) { - testParams = new TestParams(); - if (randomBoolean()) { - testParams.setTestParam(randomAlphaOfLengthBetween(1, 20)); - } - if (randomBoolean()) { - testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); - } - } else { - testParams = null; + testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); } return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 63ef871c0e3f4..97b3407938768 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.persistent; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -49,6 +50,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; @@ -57,8 +60,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.ArrayList; @@ -67,6 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -120,6 +122,9 @@ public static class TestParams implements PersistentTaskParams { REQUEST_PARSER.declareString(constructorArg(), new ParseField("param")); } + private final Version minVersion; + private final Optional feature; + private String executorNodeAttr = null; private String responseNode = null; @@ -127,17 +132,25 @@ public static class TestParams implements PersistentTaskParams { private String testParam = null; public TestParams() { - + this((String)null); } public TestParams(String testParam) { + this(testParam, Version.CURRENT, Optional.empty()); + } + + public TestParams(String testParam, Version minVersion, Optional feature) { this.testParam = testParam; + this.minVersion = minVersion; + this.feature = feature; } public TestParams(StreamInput in) throws IOException { executorNodeAttr = in.readOptionalString(); responseNode = in.readOptionalString(); testParam = in.readOptionalString(); + minVersion = Version.readVersion(in); + feature = Optional.ofNullable(in.readOptionalString()); } @Override @@ -166,6 +179,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(executorNodeAttr); out.writeOptionalString(responseNode); out.writeOptionalString(testParam); + Version.writeVersion(minVersion, out); + out.writeOptionalString(feature.orElse(null)); } @Override @@ -194,6 +209,16 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(executorNodeAttr, responseNode, testParam); } + + @Override + public Version getMinimalSupportedVersion() { + return minVersion; + } + + @Override + public Optional getRequiredFeature() { + return feature; + } } public static class Status implements Task.Status { diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index cf1cc89b3a18a..aeb4d9b3a9bfb 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -71,7 +71,7 @@ public void testEnableAssignmentAfterRestart() throws Exception { final CountDownLatch latch = new CountDownLatch(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); - service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), + service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), new ActionListener>() { @Override public void onResponse(PersistentTask task) { @@ -163,11 +163,4 @@ private void resetPersistentTasksAssignment() { assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); } - /** Returns a random task parameter **/ - private static PersistentTaskParams randomTaskParams() { - if (randomBoolean()) { - return null; - } - return new TestParams(randomAlphaOfLength(10)); - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 1dc853db59467..5d2abdd149223 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -21,9 +21,9 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; -import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -1162,6 +1162,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshottableMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshottableMetadata::new, in); } @@ -1193,6 +1198,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static NonSnapshottableMetadata readFrom(StreamInput in) throws IOException { return readFrom(NonSnapshottableMetadata::new, in); } @@ -1223,6 +1233,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshottableGatewayMetadata::new, in); } @@ -1253,6 +1268,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static NonSnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException { return readFrom(NonSnapshottableGatewayMetadata::new, in); } @@ -1284,6 +1304,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshotableGatewayNoApiMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshotableGatewayNoApiMetadata::new, in); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java index 792f3fba123da..84c480b8d510b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java @@ -19,22 +19,19 @@ package org.elasticsearch.test; +import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.elasticsearch.Version; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; - /** Utilities for selecting versions in tests */ public class VersionUtils { @@ -228,6 +225,13 @@ public static Version incompatibleFutureVersion(Version version) { return opt.get(); } + /** returns the first future compatible version */ + public static Version compatibleFutureVersion(Version version) { + final Optional opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version)).findAny(); + assert opt.isPresent() : "no future compatible version for " + version; + return opt.get(); + } + /** Returns the maximum {@link Version} that is compatible with the given version. */ public static Version maxCompatibleVersion(Version version) { final List compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java index d9f7068b2181e..6d001dea516ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java @@ -108,6 +108,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 602f4bdbc079b..db36aabf7ac6a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -40,6 +40,7 @@ import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.rest.RestController; @@ -331,4 +332,12 @@ default Optional getRequiredFeature() { } + public interface XPackPersistentTaskParams extends PersistentTaskParams { + + @Override + default Optional getRequiredFeature() { + return XPackClientPlugin.X_PACK_FEATURE; + } + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java index d6e351f763787..429f36d00c2ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java @@ -23,10 +23,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTaskParams; import java.io.IOException; import java.util.Objects; @@ -127,7 +127,7 @@ public String toString() { } } - public static class JobParams implements PersistentTaskParams { + public static class JobParams implements XPackPlugin.XPackPersistentTaskParams { /** TODO Remove in 7.0.0 */ public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime"); @@ -237,6 +237,11 @@ public boolean equals(Object obj) { public String toString() { return Strings.toString(this); } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } } public static class Response extends AcknowledgedResponse { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index fc8c17f462a8a..1a1d95820979e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; @@ -24,10 +25,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTaskParams; import java.io.IOException; import java.util.Objects; @@ -138,7 +139,7 @@ public boolean equals(Object obj) { } } - public static class DatafeedParams implements PersistentTaskParams { + public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams { public static ObjectParser PARSER = new ObjectParser<>(TASK_NAME, DatafeedParams::new); @@ -231,6 +232,11 @@ public String getWriteableName() { return TASK_NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(datafeedId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java index e71186b60e020..7afcdb71b11cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.rollup.job; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.ParseField; @@ -13,7 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; import java.util.Collections; @@ -25,7 +26,7 @@ * It holds the config (RollupJobConfig) and a map of authentication headers. Only RollupJobConfig * is ever serialized to the user, so the headers should never leak */ -public class RollupJob extends AbstractDiffable implements PersistentTaskParams { +public class RollupJob extends AbstractDiffable implements XPackPlugin.XPackPersistentTaskParams { public static final String NAME = "xpack/rollup/job"; @@ -110,4 +111,9 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(config, headers); } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_6_3_0; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java index 9f014dee843c5..bddeb5f5e3281 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; @@ -38,6 +39,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY);