diff --git a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java index c0f6deff787ac..6f4c8412c8f6a 100644 --- a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java +++ b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.tribe; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.MergableCustomMetaData; import org.elasticsearch.cluster.NamedDiff; @@ -238,6 +239,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static MergableCustomMetaData1 readFrom(StreamInput in) throws IOException { return readFrom(MergableCustomMetaData1::new, in); } @@ -270,6 +276,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static MergableCustomMetaData2 readFrom(StreamInput in) throws IOException { return readFrom(MergableCustomMetaData2::new, in); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 516872e652100..a8a5405334cb8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -22,7 +22,6 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -50,6 +49,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -122,7 +122,7 @@ default Optional getRequiredFeature() { * @param the type of the custom * @return true if the custom should be serialized and false otherwise */ - static boolean shouldSerializeCustom(final StreamOutput out, final T custom) { + static boolean shouldSerialize(final StreamOutput out, final T custom) { if (out.getVersion().before(custom.getMinimalSupportedVersion())) { return false; } @@ -745,13 +745,13 @@ public void writeTo(StreamOutput out) throws IOException { // filter out custom states not supported by the other node int numberOfCustoms = 0; for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java b/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java index b548b49fe1910..729523233d73d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java +++ b/server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java @@ -19,17 +19,10 @@ package org.elasticsearch.cluster; -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; /** - * Diff that also support NamedWriteable interface + * Diff that also support {@link VersionedNamedWriteable} interface */ -public interface NamedDiffable extends Diffable, NamedWriteable { - /** - * The minimal version of the recipient this custom object can be sent to - */ - default Version getMinimalSupportedVersion() { - return Version.CURRENT.minimumIndexCompatibilityVersion(); - } +public interface NamedDiffable extends Diffable, VersionedNamedWriteable { } diff --git a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 5c036f94285e0..138788251c90a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -20,14 +20,15 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; import java.util.ArrayList; @@ -382,6 +383,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { return readDiffFrom(Custom.class, TYPE, in); } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 4325a3c456b54..7308d471afb9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -395,6 +395,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { return readDiffFrom(Custom.class, TYPE, in); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java index 9167b28a67b86..74789aada3a46 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.common.ParseField; @@ -34,8 +35,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import java.io.IOException; import java.util.ArrayList; @@ -44,7 +43,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * A collection of tombstones for explicitly marking indices as deleted in the cluster state. @@ -97,6 +95,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return MetaData.API_AND_GATEWAY; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index bb5e8e6fa48b2..9afbbf95ae14d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -793,13 +793,13 @@ public void writeTo(StreamOutput out) throws IOException { // filter out custom states not supported by the other node int numberOfCustoms = 0; for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); for (final ObjectCursor cursor : customs.values()) { - if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { + if (FeatureAware.shouldSerialize(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java index 6aa2d83fa8d47..f2db414e19332 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData.Custom; @@ -102,6 +103,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public RepositoriesMetaData(StreamInput in) throws IOException { RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()]; for (int i = 0; i < repository.length; i++) { diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java new file mode 100644 index 0000000000000..9eea2c00d56a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; + +/** + * A {@link NamedWriteable} that has a minimum version associated with it. + */ +public interface VersionedNamedWriteable extends NamedWriteable { + + /** + * Returns the name of the writeable object + */ + String getWriteableName(); + + /** + * The minimal version of the recipient this object can be sent to + */ + Version getMinimalSupportedVersion(); +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java index ca8a5df845014..1e262adf8cf8d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; @@ -69,6 +70,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + public Map getPipelines() { return pipelines; } diff --git a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java index efed0aef9b807..bf42733ff54ac 100644 --- a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java @@ -35,7 +35,7 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) { this.threadPool = threadPool; } - public void executeTask(@Nullable Params params, + public void executeTask(Params params, @Nullable Task.Status status, AllocatedPersistentTask task, PersistentTasksExecutor executor) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java index a475a7cde174a..c91727a913f3a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java @@ -19,12 +19,13 @@ package org.elasticsearch.persistent; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.xcontent.ToXContentObject; /** * Parameters used to start persistent task */ -public interface PersistentTaskParams extends NamedWriteable, ToXContentObject { +public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware { } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index cf44556ee5ddc..1464279a814d5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; @@ -65,7 +64,7 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR * @param taskParams the task's parameters * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams, + public void createPersistentTask(String taskId, String taskName, Params taskParams, ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override @@ -225,7 +224,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @return a new {@link Assignment} */ private Assignment createAssignment(final String taskName, - final @Nullable Params taskParams, + final Params taskParams, final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index bdee87cc77c51..4b1ba3e11e372 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -49,8 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -264,7 +264,6 @@ public static class PersistentTask

implements Wr private final String id; private final long allocationId; private final String taskName; - @Nullable private final P params; @Nullable private final Status status; @@ -314,7 +313,11 @@ public PersistentTask(StreamInput in) throws IOException { id = in.readString(); allocationId = in.readLong(); taskName = in.readString(); - params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + params = (P) in.readNamedWriteable(PersistentTaskParams.class); + } else { + params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); + } status = in.readOptionalNamedWriteable(Task.Status.class); assignment = new Assignment(in.readOptionalString(), in.readString()); allocationIdOnLastStatusUpdate = in.readOptionalLong(); @@ -325,7 +328,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeLong(allocationId); out.writeString(taskName); - out.writeOptionalNamedWriteable(params); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeNamedWriteable(params); + } else { + out.writeOptionalNamedWriteable(params); + } out.writeOptionalNamedWriteable(status); out.writeOptionalString(assignment.executorNode); out.writeString(assignment.explanation); @@ -500,7 +507,10 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(lastAllocationId); - out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream)); + Map> filteredTasks = tasks.values().stream() + .filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams())) + .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); + out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream)); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 0a1e2095934ef..de75b1ff54085 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -24,10 +24,10 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.util.Map; import java.util.function.Predicate; @@ -118,7 +118,7 @@ protected String getDescription(PersistentTask taskInProgress) { * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status); + protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status); public String getExecutor() { return executor; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 2b656dd219cf9..9889a47ee2a5f 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -37,10 +37,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -66,7 +66,7 @@ public PersistentTasksService(Settings settings, ClusterService clusterService, * Creates the specified persistent task and attempts to assign it to a node. */ @SuppressWarnings("unchecked") - public void startPersistentTask(String taskId, String taskName, @Nullable Params params, + public void startPersistentTask(String taskId, String taskName, Params params, ActionListener> listener) { StartPersistentTaskAction.Request createPersistentActionRequest = new StartPersistentTaskAction.Request(taskId, taskName, params); diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 3b988939879c5..d2ebf7abbe737 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.persistent; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -36,9 +37,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -73,7 +74,6 @@ public static class Request extends MasterNodeRequest { private String taskId; - @Nullable private String taskName; private PersistentTaskParams params; @@ -93,7 +93,11 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); taskId = in.readString(); taskName = in.readString(); - params = in.readOptionalNamedWriteable(PersistentTaskParams.class); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + params = in.readNamedWriteable(PersistentTaskParams.class); + } else { + params = in.readOptionalNamedWriteable(PersistentTaskParams.class); + } } @Override @@ -101,7 +105,11 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(taskId); out.writeString(taskName); - out.writeOptionalNamedWriteable(params); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeNamedWriteable(params); + } else { + out.writeOptionalNamedWriteable(params); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java b/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java index dca17ce486607..2c3375e2f6479 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptMetaData.java @@ -368,6 +368,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return MetaData.ALL_CONTEXTS; diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index b7ea45dd13a3d..f79fef74e917f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -308,6 +308,11 @@ public String getWriteableName() { return "2"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); @@ -324,6 +329,11 @@ public String getWriteableName() { return "1"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java index 07a974a2ca771..fc917d60deede 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -32,7 +33,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -42,7 +42,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.watcher.ResourceWatcherService; import java.io.IOException; @@ -73,7 +72,8 @@ @ESIntegTestCase.ClusterScope(scope = TEST) public class ClusterStateIT extends ESIntegTestCase { - public abstract static class Custom implements MetaData.Custom { + public abstract static + class Custom implements MetaData.Custom { private static final ParseField VALUE = new ParseField("value"); @@ -131,6 +131,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public Optional getRequiredFeature() { return Optional.of("node"); @@ -155,6 +160,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + /* * This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have * versus not requiring any feature. We use a field to make the random choice exactly once. diff --git a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java index 0f826e65248fe..b25d8ced1806d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java @@ -116,7 +116,7 @@ public void testVersion() { if (custom.getRequiredFeature().isPresent()) { out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); } - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { final BytesStreamOutput out = new BytesStreamOutput(); @@ -126,7 +126,7 @@ public void testVersion() { if (custom.getRequiredFeature().isPresent() && randomBoolean()) { out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); } - assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + assertFalse(FeatureAware.shouldSerialize(out, custom)); } } } @@ -141,7 +141,7 @@ public void testFeature() { out.setVersion(afterVersion); assertTrue(custom.getRequiredFeature().isPresent()); out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { // the feature is present and the client is a transport client @@ -149,7 +149,7 @@ public void testFeature() { out.setVersion(afterVersion); assertTrue(custom.getRequiredFeature().isPresent()); out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE))); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } } @@ -161,14 +161,14 @@ public void testMissingFeature() { // the feature is missing but we should serialize it anyway because the client is not a transport client final BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(afterVersion); - assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + assertTrue(FeatureAware.shouldSerialize(out, custom)); } { // the feature is missing and we should not serialize it because the client is a transport client final BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(afterVersion); out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE)); - assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + assertFalse(FeatureAware.shouldSerialize(out, custom)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java index 8b246ecc2d3de..0ba3de4381891 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/SimpleClusterStateIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -37,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; @@ -304,6 +304,11 @@ public String getWriteableName() { return "test"; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeInt(value); diff --git a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 6a2754a0d846c..52c217332f76f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -42,13 +42,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -131,8 +130,9 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { Diff diffs = clusterState.diff(ClusterState.EMPTY_STATE); - // serialize with current version BytesStreamOutput outStream = new BytesStreamOutput(); + Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); + outStream.setVersion(version); diffs.writeTo(outStream); StreamInput inStream = outStream.bytes().streamInput(); inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); @@ -141,28 +141,6 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue()); assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue()); - // serialize with old version - outStream = new BytesStreamOutput(); - outStream.setVersion(Version.CURRENT.minimumIndexCompatibilityVersion()); - diffs.writeTo(outStream); - inStream = outStream.bytes().streamInput(); - inStream.setVersion(outStream.getVersion()); - inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); - serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode()); - stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE); - assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue()); - assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), nullValue()); - - // remove the custom and try serializing again with old version - clusterState = ClusterState.builder(clusterState).removeCustom(SnapshotDeletionsInProgress.TYPE).incrementVersion().build(); - outStream = new BytesStreamOutput(); - diffs.writeTo(outStream); - inStream = outStream.bytes().streamInput(); - inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); - serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode()); - stateAfterDiffs = serializedDiffs.apply(stateAfterDiffs); - assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue()); - assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), nullValue()); } private ClusterState updateUsingSerialisedDiff(ClusterState original, Diff diff) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java index e7cbd04ce4b50..2cebd41a52c43 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.service; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.io.stream.StreamOutput; @@ -43,6 +44,11 @@ public String getWriteableName() { return null; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index e51177c318ca8..b7177fdf867af 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -239,6 +239,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY, MetaData.XContentContext.SNAPSHOT); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index cef3502a077c5..14f3c212c464c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -492,6 +492,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); @@ -510,6 +515,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 67962b800d2cf..72e74359d3016 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.persistent; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; @@ -26,8 +28,11 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -43,13 +48,24 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.VersionUtils.allReleasedVersions; +import static org.elasticsearch.test.VersionUtils.compatibleFutureVersion; +import static org.elasticsearch.test.VersionUtils.getFirstVersion; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.equalTo; public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase { @@ -228,7 +244,65 @@ public void testBuilder() { assertEquals(changed, builder.isChanged()); persistentTasks = builder.build(); } + } + + public void testMinVersionSerialization() throws IOException { + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + + Version minVersion = allReleasedVersions().stream().filter(Version::isRelease).findFirst().orElseThrow(NoSuchElementException::new); + final Version streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(Version.CURRENT)); + tasks.addTask("test_compatible_version", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, streamVersion), + randomBoolean() ? Optional.empty() : Optional.of("test")), + randomAssignment()); + tasks.addTask("test_incompatible_version", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), compatibleFutureVersion(streamVersion), Version.CURRENT), + randomBoolean() ? Optional.empty() : Optional.of("test")), + randomAssignment()); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(streamVersion); + Set features = new HashSet<>(); + final boolean transportClient = randomBoolean(); + if (transportClient) { + features.add(TransportClient.TRANSPORT_CLIENT_FEATURE); + } + // if a transport client, then it must have the feature otherwise we add the feature randomly + if (transportClient || randomBoolean()) { + features.add("test"); + } + out.setFeatures(features); + tasks.build().writeTo(out); + + final StreamInput input = out.bytes().streamInput(); + input.setVersion(streamVersion); + PersistentTasksCustomMetaData read = + new PersistentTasksCustomMetaData(new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry())); + + assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible_version"))); + } + + public void testFeatureSerialization() throws IOException { + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + + Version minVersion = getFirstVersion(); + tasks.addTask("test_compatible", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), + randomBoolean() ? Optional.empty() : Optional.of("existing")), + randomAssignment()); + tasks.addTask("test_incompatible", TestPersistentTasksExecutor.NAME, + new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), Optional.of("non_existing")), + randomAssignment()); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(Version.CURRENT); + Set features = new HashSet<>(); + features.add("existing"); + features.add(TransportClient.TRANSPORT_CLIENT_FEATURE); + out.setFeatures(features); + tasks.build().writeTo(out); + PersistentTasksCustomMetaData read = new PersistentTasksCustomMetaData( + new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry())); + assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible"))); } private Assignment randomAssignment() { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java index 0dddaaa783906..8e1019fa44a08 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java @@ -20,12 +20,12 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import java.util.ArrayList; import java.util.Collection; @@ -35,8 +35,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1) public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { @@ -65,8 +63,7 @@ public void testFullClusterRestart() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); taskIds[i] = UUIDs.base64UUID(); - service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), - future); + service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { diff --git a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java index 3b0fc2a3d0495..e4c5a26de9c0c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java @@ -22,8 +22,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.persistent.StartPersistentTaskAction.Request; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.test.AbstractStreamableTestCase; import java.util.Collections; @@ -32,17 +32,12 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas @Override protected Request createTestInstance() { - TestParams testParams; + TestParams testParams = new TestParams(); + if (randomBoolean()) { + testParams.setTestParam(randomAlphaOfLengthBetween(1, 20)); + } if (randomBoolean()) { - testParams = new TestParams(); - if (randomBoolean()) { - testParams.setTestParam(randomAlphaOfLengthBetween(1, 20)); - } - if (randomBoolean()) { - testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); - } - } else { - testParams = null; + testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); } return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 556d6d1983e63..9799036e0ea91 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.persistent; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -49,6 +50,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; @@ -57,8 +60,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.ArrayList; @@ -67,6 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -120,6 +122,9 @@ public static class TestParams implements PersistentTaskParams { REQUEST_PARSER.declareString(constructorArg(), new ParseField("param")); } + private final Version minVersion; + private final Optional feature; + private String executorNodeAttr = null; private String responseNode = null; @@ -127,17 +132,25 @@ public static class TestParams implements PersistentTaskParams { private String testParam = null; public TestParams() { - + this((String)null); } public TestParams(String testParam) { + this(testParam, Version.CURRENT, Optional.empty()); + } + + public TestParams(String testParam, Version minVersion, Optional feature) { this.testParam = testParam; + this.minVersion = minVersion; + this.feature = feature; } public TestParams(StreamInput in) throws IOException { executorNodeAttr = in.readOptionalString(); responseNode = in.readOptionalString(); testParam = in.readOptionalString(); + minVersion = Version.readVersion(in); + feature = Optional.ofNullable(in.readOptionalString()); } @Override @@ -166,6 +179,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(executorNodeAttr); out.writeOptionalString(responseNode); out.writeOptionalString(testParam); + Version.writeVersion(minVersion, out); + out.writeOptionalString(feature.orElse(null)); } @Override @@ -194,6 +209,16 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(executorNodeAttr, responseNode, testParam); } + + @Override + public Version getMinimalSupportedVersion() { + return minVersion; + } + + @Override + public Optional getRequiredFeature() { + return feature; + } } public static class Status implements Task.Status { diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index 15d12fb1ce932..9c8f7d0ac392f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -71,7 +71,7 @@ public void testEnableAssignmentAfterRestart() throws Exception { final CountDownLatch latch = new CountDownLatch(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); - service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), + service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), new ActionListener>() { @Override public void onResponse(PersistentTask task) { @@ -163,11 +163,4 @@ private void resetPersistentTasksAssignment() { assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); } - /** Returns a random task parameter **/ - private static PersistentTaskParams randomTaskParams() { - if (randomBoolean()) { - return null; - } - return new TestParams(randomAlphaOfLength(10)); - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 5341b268544e7..4c42fe66bcbd3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -21,6 +21,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -993,6 +994,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshottableMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshottableMetadata::new, in); } @@ -1024,6 +1030,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static NonSnapshottableMetadata readFrom(StreamInput in) throws IOException { return readFrom(NonSnapshottableMetadata::new, in); } @@ -1054,6 +1065,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshottableGatewayMetadata::new, in); } @@ -1084,6 +1100,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static NonSnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException { return readFrom(NonSnapshottableGatewayMetadata::new, in); } @@ -1115,6 +1136,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + public static SnapshotableGatewayNoApiMetadata readFrom(StreamInput in) throws IOException { return readFrom(SnapshotableGatewayNoApiMetadata::new, in); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java index 792f3fba123da..84c480b8d510b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java @@ -19,22 +19,19 @@ package org.elasticsearch.test; +import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.elasticsearch.Version; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; - /** Utilities for selecting versions in tests */ public class VersionUtils { @@ -228,6 +225,13 @@ public static Version incompatibleFutureVersion(Version version) { return opt.get(); } + /** returns the first future compatible version */ + public static Version compatibleFutureVersion(Version version) { + final Optional opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version)).findAny(); + assert opt.isPresent() : "no future compatible version for " + version; + return opt.get(); + } + /** Returns the maximum {@link Version} that is compatible with the given version. */ public static Version maxCompatibleVersion(Version version) { final List compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java index d9f7068b2181e..6d001dea516ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java @@ -108,6 +108,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 0a1bab8135157..2f9ac5ba40a53 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.rest.RestController; @@ -61,7 +62,6 @@ import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import javax.security.auth.DestroyFailedException; - import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -336,4 +336,12 @@ default Optional getRequiredFeature() { } + public interface XPackPersistentTaskParams extends PersistentTaskParams { + + @Override + default Optional getRequiredFeature() { + return XPackClientPlugin.X_PACK_FEATURE; + } + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java index 9527c39a607c2..eb102cdc3a68a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java @@ -22,10 +22,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTaskParams; import java.io.IOException; import java.util.Objects; @@ -131,7 +131,7 @@ public String toString() { } } - public static class JobParams implements PersistentTaskParams { + public static class JobParams implements XPackPlugin.XPackPersistentTaskParams { /** TODO Remove in 7.0.0 */ public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime"); @@ -241,6 +241,11 @@ public boolean equals(Object obj) { public String toString() { return Strings.toString(this); } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } } public static class Response extends AcknowledgedResponse { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index cd37354f42e4d..df23fb00c89f3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; @@ -24,10 +25,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTaskParams; import java.io.IOException; import java.util.Objects; @@ -144,7 +145,7 @@ public boolean equals(Object obj) { } } - public static class DatafeedParams implements PersistentTaskParams { + public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams { public static ObjectParser PARSER = new ObjectParser<>(TASK_NAME, DatafeedParams::new); @@ -237,6 +238,11 @@ public String getWriteableName() { return TASK_NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(datafeedId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java index e71186b60e020..7afcdb71b11cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJob.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.rollup.job; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.ParseField; @@ -13,7 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; import java.util.Collections; @@ -25,7 +26,7 @@ * It holds the config (RollupJobConfig) and a map of authentication headers. Only RollupJobConfig * is ever serialized to the user, so the headers should never leak */ -public class RollupJob extends AbstractDiffable implements PersistentTaskParams { +public class RollupJob extends AbstractDiffable implements XPackPlugin.XPackPersistentTaskParams { public static final String NAME = "xpack/rollup/job"; @@ -110,4 +111,9 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(config, headers); } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_6_3_0; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java index 9f014dee843c5..bddeb5f5e3281 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; @@ -38,6 +39,11 @@ public String getWriteableName() { return TYPE; } + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + @Override public EnumSet context() { return EnumSet.of(MetaData.XContentContext.GATEWAY);