Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: clean up duplicate follow config parameter code change (#37688) #38443

Merged
merged 1 commit into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,

static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");

private final String remoteCluster;
private final String leaderIndex;
Expand All @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;

public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {

private final String followerIndex;
Expand All @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {

private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
PutFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -82,7 +81,7 @@ protected boolean supportsUnknownFields() {
@Override
protected PutFollowRequest createTestInstance() {
PutFollowRequest putFollowRequest =
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex");
if (randomBoolean()) {
putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
Expand All @@ -30,11 +29,10 @@

public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFollowRequest> {

private static final ConstructingObjectParser<ResumeFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new ResumeFollowRequest((String) args[0]));
private static final ObjectParser<ResumeFollowRequest, Void> PARSER = new ObjectParser<>("test_parser",
true, () -> new ResumeFollowRequest("followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ResumeFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -79,7 +77,7 @@ protected boolean supportsUnknownFields() {

@Override
protected ResumeFollowRequest createTestInstance() {
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4));
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex");
if (randomBoolean()) {
resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -511,23 +510,20 @@ private void followLeaderIndex(String autoFollowPattenName,
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setReadPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setFollowerIndex(followIndexName);
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected Boolean newResponse(final boolean acknowledged) {

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowerIndex();
String followIndex = request.getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
Expand Down Expand Up @@ -112,10 +112,10 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
ClusterState updatedState = builder.build();

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
.addAsNew(updatedState.metaData().index(request.getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");
"follow index [" + request.getFollowerIndex() + "] created");

logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());
Expand All @@ -126,10 +126,13 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
}

private void initiateFollowing(final PutFollowAction.Request request, final ActionListener<PutFollowAction.Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(request.getParameters());
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

Expand Down Expand Up @@ -99,18 +99,17 @@ static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, C
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
FollowParameters followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests());
followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests());
followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount());
followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount());
followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize());
followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize());
followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount());
followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize());
followParameters.setMaxRetryDelay(params.getMaxRetryDelay());
followParameters.setReadPollTimeout(params.getReadPollTimeout());
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
previousPattern, followedIndexUUIDs);
} else {
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
followedIndexUUIDs);
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs);
}

if (filteredHeaders != null) {
Expand All @@ -161,16 +160,16 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getMaxReadRequestOperationCount(),
request.getMaxReadRequestSize(),
request.getMaxConcurrentReadBatches(),
request.getMaxWriteRequestOperationCount(),
request.getMaxWriteRequestSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getReadPollTimeout());
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
request.getParameters().getReadPollTimeout());
patterns.put(request.getName(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())
Expand Down
Loading