Skip to content

Commit

Permalink
Make Persistent Tasks implementations version and feature aware (#31045)
Browse files Browse the repository at this point in the history
With #31020 we introduced the ability for transport clients to indicate what features they support
in order to make sure we don't serialize object to them they don't support. This PR adapts the
serialization logic of persistent tasks to be aware of those features and not serialize tasks that
aren't supported. 

Also, a version check is added for the future where we may add new tasks implementations and
need to be able to indicate they shouldn't be serialized both to nodes and clients.

As the implementation relies on the interface of `PersistentTaskParams`, these are no longer
optional. That's acceptable as all current implementation have them and we plan to make
`PersistentTaskParams` more central in the future.

Relates to #30731
  • Loading branch information
bleskes authored Jun 3, 2018
1 parent 65d3f0e commit a7ceefe
Show file tree
Hide file tree
Showing 38 changed files with 365 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -122,7 +122,7 @@ default Optional<String> getRequiredFeature() {
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
Expand Down Expand Up @@ -748,13 +748,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
13 changes: 3 additions & 10 deletions server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@

package org.elasticsearch.cluster;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;

/**
* Diff that also support NamedWriteable interface
* Diff that also support {@link VersionedNamedWriteable} interface
*/
public interface NamedDiffable<T> extends Diffable<T>, NamedWriteable {
/**
* The minimal version of the recipient this custom object can be sent to
*/
default Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumIndexCompatibilityVersion();
}
public interface NamedDiffable<T> extends Diffable<T>, VersionedNamedWriteable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -382,6 +383,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumCompatibilityVersion();
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Custom.class, TYPE, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumCompatibilityVersion();
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Custom.class, TYPE, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.ParseField;
Expand All @@ -34,8 +35,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -44,7 +43,6 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* A collection of tombstones for explicitly marking indices as deleted in the cluster state.
Expand Down Expand Up @@ -97,6 +95,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumCompatibilityVersion();
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.API_AND_GATEWAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
Expand Down Expand Up @@ -103,6 +104,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumCompatibilityVersion();
}

public RepositoriesMetaData(StreamInput in) throws IOException {
RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()];
for (int i = 0; i < repository.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.io.stream;

import org.elasticsearch.Version;

/**
* A {@link NamedWriteable} that has a minimum version associated with it.
*/
public interface VersionedNamedWriteable extends NamedWriteable {

/**
* Returns the name of the writeable object
*/
String getWriteableName();

/**
* The minimal version of the recipient this object can be sent to
*/
Version getMinimalSupportedVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
Expand Down Expand Up @@ -69,6 +70,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumCompatibilityVersion();
}

public Map<String, PipelineConfiguration> getPipelines() {
return pipelines;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
public <Params extends PersistentTaskParams> void executeTask(Params params,
@Nullable Task.Status status,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package org.elasticsearch.persistent;

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends NamedWriteable, ToXContentObject {
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware {

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
Expand Down Expand Up @@ -65,7 +64,7 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
* @param taskParams the task's parameters
* @param listener the listener that will be called when task is started
*/
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams,
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, Params taskParams,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
Expand Down Expand Up @@ -225,7 +224,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
* @return a new {@link Assignment}
*/
private <Params extends PersistentTaskParams> Assignment createAssignment(final String taskName,
final @Nullable Params taskParams,
final Params taskParams,
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -264,7 +264,6 @@ public static class PersistentTask<P extends PersistentTaskParams> implements Wr
private final String id;
private final long allocationId;
private final String taskName;
@Nullable
private final P params;
@Nullable
private final Status status;
Expand Down Expand Up @@ -314,7 +313,11 @@ public PersistentTask(StreamInput in) throws IOException {
id = in.readString();
allocationId = in.readLong();
taskName = in.readString();
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
params = (P) in.readNamedWriteable(PersistentTaskParams.class);
} else {
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
}
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
Expand All @@ -325,7 +328,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeLong(allocationId);
out.writeString(taskName);
out.writeOptionalNamedWriteable(params);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeNamedWriteable(params);
} else {
out.writeOptionalNamedWriteable(params);
}
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
Expand Down Expand Up @@ -500,7 +507,10 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(lastAllocationId);
out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
Map<String, PersistentTask<?>> filteredTasks = tasks.values().stream()
.filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
}

public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Predicate;
Expand Down Expand Up @@ -118,7 +118,7 @@ protected String getDescription(PersistentTask<Params> taskInProgress) {
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished.
*/
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status);

public String getExecutor() {
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
Expand Down Expand Up @@ -69,7 +68,7 @@ public PersistentTasksService(Settings settings, ClusterService clusterService,
*/
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
final String taskName,
final @Nullable Params taskParams,
final Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked")
final ActionListener<PersistentTask<?>> wrappedListener =
Expand Down
Loading

0 comments on commit a7ceefe

Please sign in to comment.