Skip to content

Commit

Permalink
[CCR] Add unfollow API (#34132)
Browse files Browse the repository at this point in the history
The unfollow API changes a follower index into a regular index, so that it will accept write requests from clients.

For the unfollow api to work the index follow needs to be stopped and the index needs to be closed.

Closes #33931
  • Loading branch information
martijnvg authored and kcm committed Oct 30, 2018
1 parent fe10403 commit d2d180d
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
protected AcknowledgedRequest() {
}

protected AcknowledgedRequest(StreamInput in) throws IOException {
super(in);
this.timeout = in.readTimeValue();
}

/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,10 @@ public Builder putCustom(String type, Map<String, String> customIndexMetaData) {
return this;
}

public Map<String, String> removeCustom(String type) {
return this.customMetaData.remove(type);
}

public Set<String> getInSyncAllocationIds(int shardId) {
return inSyncAllocationIds.get(shardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@
ccr.pause_follow:
index: bar
- is_true: acknowledged

- do:
indices.close:
index: bar
- is_true: acknowledged

- do:
ccr.unfollow:
index: bar
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -164,6 +167,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class),
// auto-follow actions
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
Expand All @@ -186,6 +190,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestPutFollowAction(settings, restController),
new RestResumeFollowAction(settings, restController),
new RestPauseFollowAction(settings, restController),
new RestUnfollowAction(settings, restController),
// auto-follow APIs
new RestDeleteAutoFollowPatternAction(settings, restController),
new RestPutAutoFollowPatternAction(settings, restController),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowAction.Request, AcknowledgedResponse> {

@Inject
public TransportUnfollowAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
UnfollowAction.Request::new, indexNameExpressionResolver);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

@Override
protected void masterOperation(UnfollowAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState current) throws Exception {
String followerIndex = request.getFollowerIndex();
return unfollow(followerIndex, current);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

@Override
protected ClusterBlockException checkBlock(UnfollowAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

static ClusterState unfollow(String followerIndex, ClusterState current) {
IndexMetaData followerIMD = current.metaData().index(followerIndex);

PersistentTasksCustomMetaData persistentTasks = current.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasks != null) {
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasks.tasks()) {
if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) {
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
"] to a non-follower, because it has not been paused");
}
}
}
}

if (followerIMD.getState() != IndexMetaData.State.CLOSE) {
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
"] to a non-follower, because it has not been closed");
}

IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD);
// Remove index.xpack.ccr.following_index setting
Settings.Builder builder = Settings.builder();
builder.put(followerIMD.getSettings());
builder.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());

newIMD.settings(builder);
// Remove ccr custom metadata
newIMD.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY);

MetaData newMetaData = MetaData.builder(current.metaData())
.put(newIMD)
.build();
return ClusterState.builder(current)
.metaData(newMetaData)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.rest;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.io.IOException;

import static org.elasticsearch.xpack.core.ccr.action.UnfollowAction.INSTANCE;

public class RestUnfollowAction extends BaseRestHandler {

public RestUnfollowAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this);
}

@Override
public String getName() {
return "ccr_unfollow_action";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index"));
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -650,6 +652,34 @@ public void testDeleteFollowerIndex() throws Exception {
ensureNoCcrTasks();
}

public void testUnfollowIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L));
});

// Indexing directly into index2 would fail now, because index2 is a follow index.
// We can't test this here because an assertion trips before an actual error is thrown and then index call hangs.

// Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index:
unfollowIndex("index2");
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
ensureGreen("index2");

// Indexing succeeds now, because index2 is no longer a follow index:
client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.Collections;
import java.util.HashMap;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class TransportUnfollowActionTests extends ESTestCase {

public void testUnfollow() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.state(IndexMetaData.State.CLOSE)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.build())
.build();
ClusterState result = TransportUnfollowAction.unfollow("follow_index", current);

IndexMetaData resultIMD = result.metaData().index("follow_index");
assertThat(resultIMD.getSettings().get(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()), nullValue());
assertThat(resultIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY), nullValue());
}

public void testUnfollowIndexOpen() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.build())
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
assertThat(e.getMessage(),
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been closed"));
}

public void testUnfollowRunningShardFollowTasks() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.state(IndexMetaData.State.CLOSE)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());


ShardFollowTask params = new ShardFollowTask(
null,
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
1024,
1,
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
1,
10240,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
"uuid",
Collections.emptyMap()
);
PersistentTasksCustomMetaData.PersistentTask<?> task =
new PersistentTasksCustomMetaData.PersistentTask<>("id", ShardFollowTask.NAME, params, 0, null);

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0, Collections.singletonMap("id", task)))
.build())
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
assertThat(e.getMessage(),
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been paused"));
}

}
Loading

0 comments on commit d2d180d

Please sign in to comment.