Skip to content

Commit

Permalink
Rename CCR APIs (#34027)
Browse files Browse the repository at this point in the history
* Renamed CCR APIs

Renamed:
* `/{index}/_ccr/create_and_follow` to `/{index}/_ccr/follow`
* `/{index}/_ccr/unfollow` to `/{index}/_ccr/pause_follow`
* `/{index}/_ccr/follow` to `/{index}/_ccr/resume_follow`

Relates to #33931
  • Loading branch information
martijnvg committed Sep 28, 2018
1 parent 11e7a02 commit 49675d9
Show file tree
Hide file tree
Showing 33 changed files with 302 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ protected boolean preserveClusterUponCompletion() {
return true;
}

public void testFollowIndex() {
public void testResumeFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/follow");
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertNonCompliantLicense(request);
}
}

public void testCreateAndFollowIndex() {
public void testFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
final Request request = new Request("PUT", "/follower/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertNonCompliantLicense(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
follow("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(5));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
Expand All @@ -93,9 +93,9 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

followIndex("leader_cluster:" + allowedIndex, allowedIndex);
resumeFollow("leader_cluster:" + allowedIndex, allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(5));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
Expand All @@ -105,15 +105,15 @@ public void testFollowIndex() throws Exception {
});

Exception e = expectThrows(ResponseException.class,
() -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
() -> follow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
containsString("action [indices:admin/xpack/ccr/create_and_follow_index] is unauthorized for user [test_ccr]"));
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
() -> resumeFollow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
Expand Down Expand Up @@ -157,10 +157,10 @@ public void testAutoFollowPatterns() throws Exception {
verifyAutoFollowMonitoring();
});

// Cleanup by deleting auto follow pattern and unfollowing:
// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
unfollowIndex(allowedIndex);
pauseFollow(allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
Expand Down Expand Up @@ -201,14 +201,14 @@ private static void refresh(String index) throws IOException {
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
private static void follow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
Expand Down Expand Up @@ -273,8 +273,8 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
private static void pauseFollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public void testFollowIndex() throws Exception {
} else {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
unfollowIndex(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
pauseFollow(followIndexName);
resumeFollow("leader_cluster:" + leaderIndexName, followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
Expand All @@ -86,11 +86,11 @@ public void testFollowIndex() throws Exception {
public void testFollowNonExistingLeaderIndex() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
ResponseException e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
() -> resumeFollow("leader_cluster:non-existing-index", "non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));

e = expectThrows(ResponseException.class, () -> createAndFollowIndex("leader_cluster:non-existing-index", "non-existing-index"));
e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
}
Expand Down Expand Up @@ -146,20 +146,20 @@ private static void refresh(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
private static void pauseFollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
- is_true: acknowledged

- do:
ccr.create_and_follow_index:
ccr.follow:
index: bar
body:
leader_index: foo
Expand All @@ -25,18 +25,18 @@
- is_true: index_following_started

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged

- do:
ccr.follow_index:
ccr.resume_follow:
index: bar
body:
leader_index: foo
- is_true: acknowledged

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
type: keyword

- do:
ccr.create_and_follow_index:
ccr.follow:
index: bar
body:
leader_index: foo
Expand Down Expand Up @@ -52,7 +52,7 @@
- gte: { bar.0.time_since_last_fetch_millis: -1 }

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged

Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
import org.elasticsearch.xpack.core.XPackClientActionPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -167,9 +167,9 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
new ActionHandler<>(AutoFollowStatsAction.INSTANCE, TransportAutoFollowStatsAction.class),
// follow actions
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, TransportCreateAndFollowIndexAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, TransportFollowIndexAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, TransportUnfollowIndexAction.class),
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
// auto-follow actions
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
Expand All @@ -189,9 +189,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestCcrStatsAction(settings, restController),
new RestAutoFollowStatsAction(settings, restController),
// follow APIs
new RestCreateAndFollowIndexAction(settings, restController),
new RestFollowIndexAction(settings, restController),
new RestUnfollowIndexAction(settings, restController),
new RestPutFollowAction(settings, restController),
new RestResumeFollowAction(settings, restController),
new RestPauseFollowAction(settings, restController),
// auto-follow APIs
new RestDeleteAutoFollowPatternAction(settings, restController),
new RestPutAutoFollowPatternAction(settings, restController),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -184,13 +184,13 @@ void getLeaderClusterState(final Map<String, String> headers,

@Override
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
ResumeFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
followerClient.execute(
CreateAndFollowIndexAction.INSTANCE,
PutFollowAction.INSTANCE,
request,
ActionListener.wrap(r -> successHandler.run(), failureHandler)
);
Expand Down Expand Up @@ -306,7 +306,7 @@ private void followLeaderIndex(String clusterAlias,

String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request request = new FollowIndexAction.Request();
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
Expand Down Expand Up @@ -409,7 +409,7 @@ abstract void getLeaderClusterState(

abstract void createAndFollow(
Map<String, String> headers,
FollowIndexAction.Request followRequest,
ResumeFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;

public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
Expand Down
Loading

0 comments on commit 49675d9

Please sign in to comment.