Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM: add support for rolling over data streams #57295

Merged
merged 10 commits into from
Jun 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -1245,12 +1245,6 @@ public void testIndexPutSettings() throws IOException {
+ "reason=final index setting [index.number_of_shards], not updateable"));
}

@SuppressWarnings("unchecked")
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}

public void testIndexPutSettingNonExistent() throws IOException {

String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,4 @@ public void testSoftDeletesDisabledWarning() throws Exception {
ensureGreen(indexName);
indexDocs(indexName, randomInt(100), randomInt(100));
}

@SuppressWarnings("unchecked")
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,12 @@ protected static Map<String, Object> getIndexSettings(String index) throws IOExc
}
}

@SuppressWarnings("unchecked")
protected Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}

protected static boolean indexExists(String index) throws IOException {
Response response = client().performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;

/**
* A step which will be called periodically, waiting for some condition to become true.
* Called asynchronously, as the condition may take time to check.
*
* <p>
* If checking something based on the current cluster state which does not take time to check, use {@link ClusterStateWaitStep}.
*/
public abstract class AsyncWaitStep extends Step {
Expand All @@ -29,7 +30,7 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout);
public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);

public interface Listener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;

Expand All @@ -39,38 +40,47 @@ public boolean isRetryable() {
@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
String indexName = indexMetadata.getIndex().getName();
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
if (indexingComplete) {
logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);
listener.onResponse(true);
return;
}
IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexName);
assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't";
final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getName();
} else {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());

String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group " +
"of indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexName)));
return;
}

if (Strings.isNullOrEmpty(rolloverAlias)) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
indexMetadata.getIndex().getName())));
return;
}
if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
indexName, rolloverAlias);
listener.onResponse(true);
return;
}

if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
indexMetadata.getIndex().getName(), rolloverAlias);
listener.onResponse(true);
return;
}
if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
indexName)));
return;
}

if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
indexMetadata.getIndex().getName())));
return;
rolloverTarget = rolloverAlias;
}

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -49,16 +50,16 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetadata.getIndex().getName()),
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()),
ActionListener.wrap(response -> {
IndexSegments idxSegments = response.getIndices().get(indexMetadata.getIndex().getName());
IndexSegments idxSegments = response.getIndices().get(index.getName());
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
"there were {} shard failures. " +
"failures: {}",
indexMetadata.getIndex().getName(),
index.getName(),
response.getFailedShards(),
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
.map(Strings::toString)
Expand All @@ -73,7 +74,7 @@ public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, Ti
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
indexMetadata.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
index.getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -52,16 +53,11 @@ public ClusterState performAction(Index index, ClusterState currentState) {
// so just use the current time.
newIndexTime = fallbackTimeSupplier.getAsLong();
} else {
// find the newly created index from the rollover and fetch its index.creation_date
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
}
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverAlias);
final String rolloverTarget = getRolloverTarget(index, currentState);
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverTarget);
if (rolloverInfo == null) {
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" +
rolloverAlias + "], the index has not yet rolled over with that alias");
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() +
"] with rollover target [" + rolloverTarget + "], the index has not yet rolled over with that target");
}
newIndexTime = rolloverInfo.getTime();
}
Expand All @@ -76,6 +72,24 @@ public ClusterState performAction(Index index, ClusterState currentState) {
.put(newIndexMetadata)).build();
}

private static String getRolloverTarget(Index index, ClusterState currentState) {
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(index.getName());
final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getName();
} else {
// find the newly created index from the rollover and fetch its index.creation_date
IndexMetadata indexMetadata = currentState.metadata().index(index);
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
}
rolloverTarget = rolloverAlias;
}
return rolloverTarget;
}

@Override
public int hashCode() {
return super.hashCode();
Expand Down
Loading