Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM: add support for rolling over data streams #57295

Merged
merged 10 commits into from
Jun 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -1245,12 +1245,6 @@ public void testIndexPutSettings() throws IOException {
+ "reason=final index setting [index.number_of_shards], not updateable"));
}

@SuppressWarnings("unchecked")
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}

public void testIndexPutSettingNonExistent() throws IOException {

String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,4 @@ public void testSoftDeletesDisabledWarning() throws Exception {
ensureGreen(indexName);
indexDocs(indexName, randomInt(100), randomInt(100));
}

@SuppressWarnings("unchecked")
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,12 @@ protected static Map<String, Object> getIndexSettings(String index) throws IOExc
}
}

@SuppressWarnings("unchecked")
protected Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexSettings(index);
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
}

protected static boolean indexExists(String index) throws IOException {
Response response = client().performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
* A step which will be called periodically, waiting for some condition to become true.
* Called asynchronously, as the condition may take time to check.
*
* <p>
* If checking something based on the current cluster state which does not take time to check, use {@link ClusterStateWaitStep}.
*/
public abstract class AsyncWaitStep extends Step {
Expand All @@ -29,7 +30,7 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout);
public abstract void evaluateCondition(Metadata metadata, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem strange to pass both the Metadata and the IndexMetadata, should we instead pass Metadata and Index so it's easy to look up the index metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair enough, it felt a bit odd to me too (it equally felt wasteful, in terms of CPU cycles, to re-do the lookup every time though, but given ILM is not so much about low latency I agree it makes sense to change it)


public interface Listener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;

Expand All @@ -39,38 +40,46 @@ public boolean isRetryable() {
@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
if (indexingComplete) {
logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);
listener.onResponse(true);
return;
}
IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexMetadata.getIndex().getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor, but can you add an

assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't";

after this line? I don't think it's going to happen, but we should check it regardless. Optionally, we could make it a real error also (throw an IllegalStateException)

final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getDataStream().getName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be super paranoid, I think we should handle the case where getDataStream() returns null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that can be changed to:

rolloverTarget = indexAbstraction.getParentDataStream().getName();

to eliminate the need for another null check.

} else {
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
if (indexingComplete) {
logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);
listener.onResponse(true);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow this indexing complete setting regardless of whether the parent data stream exists or not? (in otherwords, moving it before the if (indexAbstraction.getParentDataStream() != null) { check)


String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
indexMetadata.getIndex().getName())));
return;
}
if (Strings.isNullOrEmpty(rolloverAlias)) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a chance we can improve this error message (if you agree), maybe something like:

setting [index.lifecycle.rollover_alias] for index [foo-1] is not defined, it must be set to the name of the alias pointing to the group of indices being rolled over

I'm not stuck on the wording, maybe you have a better idea?

Copy link
Contributor Author

@andreidan andreidan May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I think the wording sounds good

indexMetadata.getIndex().getName())));
return;
}

if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
indexMetadata.getIndex().getName(), rolloverAlias);
listener.onResponse(true);
return;
}
if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
indexMetadata.getIndex().getName(), rolloverAlias);
listener.onResponse(true);
return;
}

if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
indexMetadata.getIndex().getName())));
return;
}

if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
indexMetadata.getIndex().getName())));
return;
rolloverTarget = rolloverAlias;
}

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -49,7 +50,7 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
public void evaluateCondition(Metadata metadata, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetadata.getIndex().getName()),
ActionListener.wrap(response -> {
IndexSegments idxSegments = response.getIndices().get(indexMetadata.getIndex().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -52,16 +53,11 @@ public ClusterState performAction(Index index, ClusterState currentState) {
// so just use the current time.
newIndexTime = fallbackTimeSupplier.getAsLong();
} else {
// find the newly created index from the rollover and fetch its index.creation_date
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
}
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverAlias);
final String rolloverTarget = getRolloverTarget(index, currentState);
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverTarget);
if (rolloverInfo == null) {
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" +
rolloverAlias + "], the index has not yet rolled over with that alias");
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() +
"] with rollover target [" + rolloverTarget + "], the index has not yet rolled over with that target");
}
newIndexTime = rolloverInfo.getTime();
}
Expand All @@ -76,6 +72,24 @@ public ClusterState performAction(Index index, ClusterState currentState) {
.put(newIndexMetadata)).build();
}

private String getRolloverTarget(Index index, ClusterState currentState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a static method?

IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(index.getName());
final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getDataStream().getName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about null check for the getDataStream(), I also wonder if maybe we should make this a nice static helper like IndexAbstraction.streamNameOrNull(indexAbstraction)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying dataStream is non-nullable though. The getName method that Dan recommended emphasis this so replaced it here as well. Happy to discuss if there are any situations where this could be null though (in which case I believe more null checks are needed in IndexAbstraction.DataStream)

} else {
// find the newly created index from the rollover and fetch its index.creation_date
IndexMetadata indexMetadata = currentState.metadata().index(index);
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
}
rolloverTarget = rolloverAlias;
}
return rolloverTarget;
}

@Override
public int hashCode() {
return super.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -64,43 +65,48 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
return new Result(true, new Info(message));
}

String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
}

IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(rolloverAlias);
assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not";

IndexMetadata aliasWriteIndex = indexAbstraction.getWriteIndex();
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
final String rolledIndexName;
final String waitForActiveShardsSettingValue;
if (aliasWriteIndex != null) {
rolledIndexName = aliasWriteIndex.getIndex().getName();
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
if (indexAbstraction.getParentDataStream() != null) {
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
rolledIndexName = DataStream.getBackingIndexName(dataStream.getName(), dataStream.getGeneration());
Copy link
Contributor

@danhermann danhermann May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this correctly identifies the data stream's write index under the current implementation of data streams. The IndexAbstraction::getWriteIndex method will always return the correct write index in the event that we change the logic around backing index names, generations, etc., and would slightly simplify the code above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, Dan. Pushed a fix to change this.

IndexMetadata rolledIndexMeta = clusterState.metadata().index(rolledIndexName);
if (rolledIndexMeta == null) {
return getErrorResultOnNullMetadata(index);
}
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
} else {
List<IndexMetadata> indices = indexAbstraction.getIndices();
int maxIndexCounter = -1;
IndexMetadata rolledIndexMeta = null;
for (IndexMetadata indexMetadata : indices) {
int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName());
if (maxIndexCounter < indexNameCounter) {
maxIndexCounter = indexNameCounter;
rolledIndexMeta = indexMetadata;
}
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
}
if (rolledIndexMeta == null) {
String errorMessage = String.format(Locale.ROOT,
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
getKey().getAction());

// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
IndexAbstraction aliasAbstraction = clusterState.metadata().getIndicesLookup().get(rolloverAlias);
assert aliasAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not";

IndexMetadata aliasWriteIndex = aliasAbstraction.getWriteIndex();
if (aliasWriteIndex != null) {
rolledIndexName = aliasWriteIndex.getIndex().getName();
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor, but can we use IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS instead of hardcoding here?

} else {
List<IndexMetadata> indices = aliasAbstraction.getIndices();
int maxIndexCounter = -1;
IndexMetadata rolledIndexMeta = null;
for (IndexMetadata indexMetadata : indices) {
int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName());
if (maxIndexCounter < indexNameCounter) {
maxIndexCounter = indexNameCounter;
rolledIndexMeta = indexMetadata;
}
}
if (rolledIndexMeta == null) {
return getErrorResultOnNullMetadata(index);
}
rolledIndexName = rolledIndexMeta.getIndex().getName();
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about using IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS

}
rolledIndexName = rolledIndexMeta.getIndex().getName();
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
}

ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
Expand All @@ -114,6 +120,16 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
}

private Result getErrorResultOnNullMetadata(Index originalIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static I think

String errorMessage = String.format(Locale.ROOT,
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", originalIndex.getName(),
getKey().getAction());

// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
}

/**
* Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in &lt; and &gt;)
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -33,7 +34,7 @@ final class WaitForFollowShardTasksStep extends AsyncWaitStep {
}

@Override
public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
public void evaluateCondition(Metadata metadata, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
listener.onResponse(true, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand Down Expand Up @@ -43,7 +44,7 @@ public class WaitForNoFollowersStep extends AsyncWaitStep {
}

@Override
public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
public void evaluateCondition(Metadata metadata, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
String indexName = indexMetadata.getIndex().getName();
Expand Down
Loading