diff --git a/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index e4f0353d0e20d..93a259a443679 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -41,6 +41,11 @@ public abstract class AcknowledgedRequest customIndexMetaData) { return this; } + public Map removeCustom(String type) { + return this.customMetaData.remove(type); + } + public Set getInSyncAllocationIds(int shardId) { return inSyncAllocationIds.get(shardId); } diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index 9aa072a09bcf8..ab60b2e49482a 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -40,3 +40,13 @@ ccr.pause_follow: index: bar - is_true: acknowledged + + - do: + indices.close: + index: bar + - is_true: acknowledged + + - do: + ccr.unfollow: + index: bar + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index d2d769d4269b8..61db5954496e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -41,9 +41,11 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; @@ -72,6 +74,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Arrays; import java.util.Collection; @@ -164,6 +167,7 @@ public List> getPersistentTasksExecutor(ClusterServic new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class), new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class), new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class), + new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class), // auto-follow actions new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), @@ -186,6 +190,7 @@ public List getRestHandlers(Settings settings, RestController restC new RestPutFollowAction(settings, restController), new RestResumeFollowAction(settings, restController), new RestPauseFollowAction(settings, restController), + new RestUnfollowAction(settings, restController), // auto-follow APIs new RestDeleteAutoFollowPatternAction(settings, restController), new RestPutAutoFollowPatternAction(settings, restController), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java new file mode 100644 index 0000000000000..1ce01f7ab0953 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; + +public class TransportUnfollowAction extends TransportMasterNodeAction { + + @Inject + public TransportUnfollowAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters, + UnfollowAction.Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(UnfollowAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState current) throws Exception { + String followerIndex = request.getFollowerIndex(); + return unfollow(followerIndex, current); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(UnfollowAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + static ClusterState unfollow(String followerIndex, ClusterState current) { + IndexMetaData followerIMD = current.metaData().index(followerIndex); + + PersistentTasksCustomMetaData persistentTasks = current.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasks != null) { + for (PersistentTasksCustomMetaData.PersistentTask persistentTask : persistentTasks.tasks()) { + if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) { + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) { + throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex + + "] to a non-follower, because it has not been paused"); + } + } + } + } + + if (followerIMD.getState() != IndexMetaData.State.CLOSE) { + throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex + + "] to a non-follower, because it has not been closed"); + } + + IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD); + // Remove index.xpack.ccr.following_index setting + Settings.Builder builder = Settings.builder(); + builder.put(followerIMD.getSettings()); + builder.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()); + + newIMD.settings(builder); + // Remove ccr custom metadata + newIMD.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY); + + MetaData newMetaData = MetaData.builder(current.metaData()) + .put(newIMD) + .build(); + return ClusterState.builder(current) + .metaData(newMetaData) + .build(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java new file mode 100644 index 0000000000000..127d06eb751d5 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; + +import java.io.IOException; + +import static org.elasticsearch.xpack.core.ccr.action.UnfollowAction.INSTANCE; + +public class RestUnfollowAction extends BaseRestHandler { + + public RestUnfollowAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this); + } + + @Override + public String getName() { + return "ccr_unfollow_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index")); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 793aca83fee9e..2f383207a5991 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -62,6 +63,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; import java.util.Arrays; @@ -650,6 +652,34 @@ public void testDeleteFollowerIndex() throws Exception { ensureNoCcrTasks(); } + public void testUnfollowIndex() throws Exception { + String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get()); + ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L)); + }); + + // Indexing directly into index2 would fail now, because index2 is a follow index. + // We can't test this here because an assertion trips before an actual error is thrown and then index call hangs. + + // Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index: + unfollowIndex("index2"); + client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet()); + client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + ensureGreen("index2"); + + // Indexing succeeds now, because index2 is no longer a follow index: + client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java new file mode 100644 index 0000000000000..1240b37e31287 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.util.Collections; +import java.util.HashMap; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class TransportUnfollowActionTests extends ESTestCase { + + public void testUnfollow() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .state(IndexMetaData.State.CLOSE) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .build()) + .build(); + ClusterState result = TransportUnfollowAction.unfollow("follow_index", current); + + IndexMetaData resultIMD = result.metaData().index("follow_index"); + assertThat(resultIMD.getSettings().get(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()), nullValue()); + assertThat(resultIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY), nullValue()); + } + + public void testUnfollowIndexOpen() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .build()) + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); + assertThat(e.getMessage(), + equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been closed")); + } + + public void testUnfollowRunningShardFollowTasks() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .state(IndexMetaData.State.CLOSE) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + 1024, + 1, + TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + 1, + 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); + PersistentTasksCustomMetaData.PersistentTask task = + new PersistentTasksCustomMetaData.PersistentTask<>("id", ShardFollowTask.NAME, params, 0, null); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0, Collections.singletonMap("id", task))) + .build()) + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); + assertThat(e.getMessage(), + equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been paused")); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java new file mode 100644 index 0000000000000..cf8c9ec2e6101 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class UnfollowAction extends Action { + + public static final UnfollowAction INSTANCE = new UnfollowAction(); + public static final String NAME = "cluster:admin/xpack/ccr/unfollow"; + + private UnfollowAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest { + + private final String followerIndex; + + public Request(String followerIndex) { + this.followerIndex = followerIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + followerIndex = in.readString(); + } + + public String getFollowerIndex() { + return followerIndex; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException e = null; + if (followerIndex == null) { + e = addValidationError("follower index is missing", e); + } + return e; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followerIndex); + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index dd4af08978bfa..1dab97599dfc2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Objects; @@ -85,6 +86,16 @@ public ActionFuture pauseFollow(final PauseFollowAction.Re return listener; } + public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { + client.execute(UnfollowAction.INSTANCE, request, listener); + } + + public ActionFuture unfollow(final UnfollowAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(UnfollowAction.INSTANCE, request, listener); + return listener; + } + public void putAutoFollowPattern( final PutAutoFollowPatternAction.Request request, final ActionListener listener) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json new file mode 100644 index 0000000000000..41be574421fc6 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json @@ -0,0 +1,17 @@ +{ + "ccr.unfollow": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "POST" ], + "url": { + "path": "/{index}/_ccr/unfollow", + "paths": [ "/{index}/_ccr/unfollow" ], + "parts": { + "index": { + "type": "string", + "required": true, + "description": "The name of the follower index that should be turned into a regular index." + } + } + } + } +}