diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index 958b6ad813430..826cf23edd853 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -1521,12 +1521,6 @@ public void testIndexPutSettings() throws IOException { + "reason=final index setting [index.number_of_shards], not updateable")); } - @SuppressWarnings("unchecked") - private Map getIndexSettingsAsMap(String index) throws IOException { - Map indexSettings = getIndexSettings(index); - return (Map)((Map) indexSettings.get(index)).get("settings"); - } - public void testIndexPutSettingNonExistent() throws IOException { String index = "index"; diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index c2f7b5afc6f88..c5b1fa484be80 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -799,10 +799,4 @@ public void testSoftDeletesDisabledWarning() throws Exception { ensureGreen(indexName); indexDocs(indexName, randomInt(100), randomInt(100)); } - - @SuppressWarnings("unchecked") - private Map getIndexSettingsAsMap(String index) throws IOException { - Map indexSettings = getIndexSettings(index); - return (Map)((Map) indexSettings.get(index)).get("settings"); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index d7aa7029a78cc..3f79d48bb3d59 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1139,6 +1139,12 @@ protected static Map getIndexSettings(String index) throws IOExc } } + @SuppressWarnings("unchecked") + protected Map getIndexSettingsAsMap(String index) throws IOException { + Map indexSettings = getIndexSettings(index); + return (Map)((Map) indexSettings.get(index)).get("settings"); + } + protected static boolean indexExists(String index) throws IOException { Response response = client().performRequest(new Request("HEAD", "/" + index)); return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java index 586c421bf8042..22085a95a2d0e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java @@ -6,14 +6,15 @@ package org.elasticsearch.xpack.core.ilm; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.index.Index; /** * A step which will be called periodically, waiting for some condition to become true. * Called asynchronously, as the condition may take time to check. - * + *

* If checking something based on the current cluster state which does not take time to check, use {@link ClusterStateWaitStep}. */ public abstract class AsyncWaitStep extends Step { @@ -29,7 +30,7 @@ protected Client getClient() { return client; } - public abstract void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout); + public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout); public interface Listener { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java index 6d5ea0e6f308f..2993a7624f06a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; @@ -39,38 +40,47 @@ public boolean isRetryable() { @Override public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState, ClusterStateObserver observer, Listener listener) { + String indexName = indexMetadata.getIndex().getName(); boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings()); if (indexingComplete) { logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME); listener.onResponse(true); return; } + IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexName); + assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't"; + final String rolloverTarget; + if (indexAbstraction.getParentDataStream() != null) { + rolloverTarget = indexAbstraction.getParentDataStream().getName(); + } else { + String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); - String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); + if (Strings.isNullOrEmpty(rolloverAlias)) { + listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, + "setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group " + + "of indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexName))); + return; + } - if (Strings.isNullOrEmpty(rolloverAlias)) { - listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, - "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, - indexMetadata.getIndex().getName()))); - return; - } + if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) { + logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again", + indexName, rolloverAlias); + listener.onResponse(true); + return; + } - if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) { - logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again", - indexMetadata.getIndex().getName(), rolloverAlias); - listener.onResponse(true); - return; - } + if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) { + listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, + "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias, + indexName))); + return; + } - if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) { - listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, - "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias, - indexMetadata.getIndex().getName()))); - return; + rolloverTarget = rolloverAlias; } // Calling rollover with no conditions will always roll over the index - RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null) + RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null) .masterNodeTimeout(getMasterTimeout(currentClusterState)); // We don't wait for active shards when we perform the rollover because the // {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java index 040fa8fcf03e8..9978d4cec21ba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import java.io.IOException; import java.util.Arrays; @@ -49,16 +50,16 @@ public int getMaxNumSegments() { } @Override - public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetadata.getIndex().getName()), + public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) { + getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> { - IndexSegments idxSegments = response.getIndices().get(indexMetadata.getIndex().getName()); + IndexSegments idxSegments = response.getIndices().get(index.getName()); if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) { final DefaultShardOperationFailedException[] failures = response.getShardFailures(); logger.info("[{}] retrieval of segment counts after force merge did not succeed, " + "there were {} shard failures. " + "failures: {}", - indexMetadata.getIndex().getName(), + index.getName(), response.getFailedShards(), failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures) .map(Strings::toString) @@ -73,7 +74,7 @@ public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, Ti Map unmergedShardCounts = unmergedShards.stream() .collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size())); logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}", - indexMetadata.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts); + index.getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts); } // Force merging is best effort, so always return true that the condition has been met. listener.onResponse(true, new Info(unmergedShards.size())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java index 6700e22c04c55..528485402690c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Strings; @@ -52,16 +53,11 @@ public ClusterState performAction(Index index, ClusterState currentState) { // so just use the current time. newIndexTime = fallbackTimeSupplier.getAsLong(); } else { - // find the newly created index from the rollover and fetch its index.creation_date - String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); - if (Strings.isNullOrEmpty(rolloverAlias)) { - throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS - + "] is not set on index [" + indexMetadata.getIndex().getName() + "]"); - } - RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverAlias); + final String rolloverTarget = getRolloverTarget(index, currentState); + RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverTarget); if (rolloverInfo == null) { - throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" + - rolloverAlias + "], the index has not yet rolled over with that alias"); + throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + + "] with rollover target [" + rolloverTarget + "], the index has not yet rolled over with that target"); } newIndexTime = rolloverInfo.getTime(); } @@ -76,6 +72,24 @@ public ClusterState performAction(Index index, ClusterState currentState) { .put(newIndexMetadata)).build(); } + private static String getRolloverTarget(Index index, ClusterState currentState) { + IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(index.getName()); + final String rolloverTarget; + if (indexAbstraction.getParentDataStream() != null) { + rolloverTarget = indexAbstraction.getParentDataStream().getName(); + } else { + // find the newly created index from the rollover and fetch its index.creation_date + IndexMetadata indexMetadata = currentState.metadata().index(index); + String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); + if (Strings.isNullOrEmpty(rolloverAlias)) { + throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS + + "] is not set on index [" + indexMetadata.getIndex().getName() + "]"); + } + rolloverTarget = rolloverAlias; + } + return rolloverTarget; + } + @Override public int hashCode() { return super.hashCode(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java index 53444e4849ae0..8c695a48ffee1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java @@ -10,8 +10,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.common.ParseField; @@ -46,7 +48,8 @@ public boolean isRetryable() { @Override public Result isConditionMet(Index index, ClusterState clusterState) { - IndexMetadata originalIndexMeta = clusterState.metadata().index(index); + Metadata metadata = clusterState.metadata(); + IndexMetadata originalIndexMeta = metadata.index(index); if (originalIndexMeta == null) { String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists", @@ -64,43 +67,50 @@ public Result isConditionMet(Index index, ClusterState clusterState) { return new Result(true, new Info(message)); } - String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings()); - if (Strings.isNullOrEmpty(rolloverAlias)) { - throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS - + "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]"); - } - - IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(rolloverAlias); - assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not"; - - IndexMetadata aliasWriteIndex = indexAbstraction.getWriteIndex(); + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index.getName()); final String rolledIndexName; final String waitForActiveShardsSettingValue; - if (aliasWriteIndex != null) { - rolledIndexName = aliasWriteIndex.getIndex().getName(); - waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards"); + if (indexAbstraction.getParentDataStream() != null) { + DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream(); + IndexAbstraction dataStreamAbstraction = metadata.getIndicesLookup().get(dataStream.getName()); + assert dataStreamAbstraction != null : dataStream.getName() + " datastream is not present in the metadata indices lookup"; + IndexMetadata rolledIndexMeta = dataStreamAbstraction.getWriteIndex(); + if (rolledIndexMeta == null) { + return getErrorResultOnNullMetadata(getKey(), index); + } + rolledIndexName = rolledIndexMeta.getIndex().getName(); + waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()); } else { - List indices = indexAbstraction.getIndices(); - int maxIndexCounter = -1; - IndexMetadata rolledIndexMeta = null; - for (IndexMetadata indexMetadata : indices) { - int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName()); - if (maxIndexCounter < indexNameCounter) { - maxIndexCounter = indexNameCounter; - rolledIndexMeta = indexMetadata; - } + String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings()); + if (Strings.isNullOrEmpty(rolloverAlias)) { + throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS + + "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]"); } - if (rolledIndexMeta == null) { - String errorMessage = String.format(Locale.ROOT, - "unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(), - getKey().getAction()); - // Index must have been since deleted - logger.debug(errorMessage); - return new Result(false, new Info(errorMessage)); + IndexAbstraction aliasAbstraction = metadata.getIndicesLookup().get(rolloverAlias); + assert aliasAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not"; + + IndexMetadata aliasWriteIndex = aliasAbstraction.getWriteIndex(); + if (aliasWriteIndex != null) { + rolledIndexName = aliasWriteIndex.getIndex().getName(); + waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()); + } else { + List indices = aliasAbstraction.getIndices(); + int maxIndexCounter = -1; + IndexMetadata rolledIndexMeta = null; + for (IndexMetadata indexMetadata : indices) { + int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName()); + if (maxIndexCounter < indexNameCounter) { + maxIndexCounter = indexNameCounter; + rolledIndexMeta = indexMetadata; + } + } + if (rolledIndexMeta == null) { + return getErrorResultOnNullMetadata(getKey(), index); + } + rolledIndexName = rolledIndexMeta.getIndex().getName(); + waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards"); } - rolledIndexName = rolledIndexMeta.getIndex().getName(); - waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards"); } ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue); @@ -114,6 +124,16 @@ public Result isConditionMet(Index index, ClusterState clusterState) { return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive)); } + private static Result getErrorResultOnNullMetadata(StepKey key, Index originalIndex) { + String errorMessage = String.format(Locale.ROOT, + "unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", originalIndex.getName(), + key.getAction()); + + // Index must have been since deleted + logger.debug(errorMessage); + return new Result(false, new Info(errorMessage)); + } + /** * Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in < and >) *

diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java index 5cd1302de193c..aef26c2a9e109 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java @@ -8,11 +8,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -33,7 +35,8 @@ final class WaitForFollowShardTasksStep extends AsyncWaitStep { } @Override - public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { + public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) { + IndexMetadata indexMetadata = metadata.index(index); Map customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY); if (customIndexMetadata == null) { listener.onResponse(true, null); @@ -41,7 +44,7 @@ public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, Ti } FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest(); - request.setIndices(new String[]{indexMetadata.getIndex().getName()}); + request.setIndices(new String[]{index.getName()}); getClient().execute(FollowStatsAction.INSTANCE, request, ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure)); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java index f6e84d9fe7d77..5fe05f3c6c98e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java @@ -13,11 +13,12 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import java.io.IOException; import java.util.Arrays; @@ -43,17 +44,16 @@ public class WaitForNoFollowersStep extends AsyncWaitStep { } @Override - public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { + public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) { IndicesStatsRequest request = new IndicesStatsRequest(); request.clear(); - String indexName = indexMetadata.getIndex().getName(); + String indexName = index.getName(); request.indices(indexName); getClient().admin().indices().stats(request, ActionListener.wrap((response) -> { IndexStats indexStats = response.getIndex(indexName); if (indexStats == null) { // Index was probably deleted - logger.debug("got null shard stats for index {}, proceeding on the assumption it has been deleted", - indexMetadata.getIndex()); + logger.debug("got null shard stats for index {}, proceeding on the assumption it has been deleted", indexName); listener.onResponse(true, null); return; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java index 1aa3f4cce3549..1f27d66fc7d4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java @@ -11,12 +11,15 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import java.io.IOException; import java.util.Locale; @@ -48,73 +51,83 @@ public boolean isRetryable() { } @Override - public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); - - if (Strings.isNullOrEmpty(rolloverAlias)) { - listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, - "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, - indexMetadata.getIndex().getName()))); - return; - } + public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found"; + final String rolloverTarget; + if (indexAbstraction.getParentDataStream() != null) { + rolloverTarget = indexAbstraction.getParentDataStream().getName(); + } else { + IndexMetadata indexMetadata = metadata.index(index); + String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings()); + + if (Strings.isNullOrEmpty(rolloverAlias)) { + listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, + "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, + index.getName()))); + return; + } - if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) { - logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again", - indexMetadata.getIndex().getName(), rolloverAlias); - listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo()); - return; - } + if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) { + logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again", + index.getName(), rolloverAlias); + listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo()); + return; + } - // The order of the following checks is important in ways which may not be obvious. + // The order of the following checks is important in ways which may not be obvious. - // First, figure out if 1) The configured alias points to this index, and if so, - // whether this index is the write alias for this index - boolean aliasPointsToThisIndex = indexMetadata.getAliases().containsKey(rolloverAlias); + // First, figure out if 1) The configured alias points to this index, and if so, + // whether this index is the write alias for this index + boolean aliasPointsToThisIndex = indexMetadata.getAliases().containsKey(rolloverAlias); - Boolean isWriteIndex = null; - if (aliasPointsToThisIndex) { - // The writeIndex() call returns a tri-state boolean: - // true -> this index is the write index for this alias - // false -> this index is not the write index for this alias - // null -> this alias is a "classic-style" alias and does not have a write index configured, but only points to one index - // and is thus the write index by default - isWriteIndex = indexMetadata.getAliases().get(rolloverAlias).writeIndex(); - } + Boolean isWriteIndex = null; + if (aliasPointsToThisIndex) { + // The writeIndex() call returns a tri-state boolean: + // true -> this index is the write index for this alias + // false -> this index is not the write index for this alias + // null -> this alias is a "classic-style" alias and does not have a write index configured, but only points to one index + // and is thus the write index by default + isWriteIndex = indexMetadata.getAliases().get(rolloverAlias).writeIndex(); + } - boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings()); - if (indexingComplete) { - logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + WaitForRolloverReadyStep.NAME); - // If this index is still the write index for this alias, skipping rollover and continuing with the policy almost certainly - // isn't what we want, as something likely still expects to be writing to this index. - // If the alias doesn't point to this index, that's okay as that will be the result if this index is using a - // "classic-style" alias and has already rolled over, and we want to continue with the policy. - if (aliasPointsToThisIndex && Boolean.TRUE.equals(isWriteIndex)) { - listener.onFailure(new IllegalStateException(String.format(Locale.ROOT, - "index [%s] has [%s] set to [true], but is still the write index for alias [%s]", - indexMetadata.getIndex().getName(), LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, rolloverAlias))); + boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings()); + if (indexingComplete) { + logger.trace(index + " has lifecycle complete set, skipping " + WaitForRolloverReadyStep.NAME); + // If this index is still the write index for this alias, skipping rollover and continuing with the policy almost certainly + // isn't what we want, as something likely still expects to be writing to this index. + // If the alias doesn't point to this index, that's okay as that will be the result if this index is using a + // "classic-style" alias and has already rolled over, and we want to continue with the policy. + if (aliasPointsToThisIndex && Boolean.TRUE.equals(isWriteIndex)) { + listener.onFailure(new IllegalStateException(String.format(Locale.ROOT, + "index [%s] has [%s] set to [true], but is still the write index for alias [%s]", + index.getName(), LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, rolloverAlias))); + return; + } + + listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo()); return; } - listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo()); - return; - } + // If indexing_complete is *not* set, and the alias does not point to this index, we can't roll over this index, so error out. + if (aliasPointsToThisIndex == false) { + listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, + "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias, + index.getName()))); + return; + } - // If indexing_complete is *not* set, and the alias does not point to this index, we can't roll over this index, so error out. - if (aliasPointsToThisIndex == false) { - listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, - "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias, - indexMetadata.getIndex().getName()))); - return; - } + // Similarly, if isWriteIndex is false (see note above on false vs. null), we can't roll over this index, so error out. + if (Boolean.FALSE.equals(isWriteIndex)) { + listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, + "index [%s] is not the write index for alias [%s]", index.getName(), rolloverAlias))); + return; + } - // Similarly, if isWriteIndex is false (see note above on false vs. null), we can't roll over this index, so error out. - if (Boolean.FALSE.equals(isWriteIndex)) { - listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT, - "index [%s] is not the write index for alias [%s]", indexMetadata.getIndex().getName(), rolloverAlias))); - return; + rolloverTarget = rolloverAlias; } - RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null).masterNodeTimeout(masterTimeout); + RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null).masterNodeTimeout(masterTimeout); rolloverRequest.dryRun(true); if (maxAge != null) { rolloverRequest.addMaxIndexAgeCondition(maxAge); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java index 3ac019c7a65cb..6c279df4ba4d5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java @@ -42,17 +42,19 @@ public void shutdownThreadPool() { } public void testMasterTimeout() { + IndexMetadata indexMetadata = getIndexMetadata(); checkMasterTimeout(TimeValue.timeValueSeconds(30), - ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().build()).build()); + ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, true).build()).build(), indexMetadata); checkMasterTimeout(TimeValue.timeValueSeconds(10), ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder() .persistentSettings(Settings.builder().put(LIFECYCLE_STEP_MASTER_TIMEOUT, "10s").build()) + .put(indexMetadata, true) .build()) - .build()); + .build(), indexMetadata); } - private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState) { + private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState, IndexMetadata indexMetadata) { AtomicBoolean timeoutChecked = new AtomicBoolean(); client = new NoOpClient(pool) { @Override @@ -65,7 +67,7 @@ protected void } } }; - createRandomInstance().performAction(getIndexMetadata(), currentClusterState, null, new AsyncActionStep.Listener() { + createRandomInstance().performAction(indexMetadata, currentClusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java index 1652ef41b3fe9..8e4922dc639ce 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java @@ -12,13 +12,18 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.mockito.Mockito; import java.util.Collections; +import java.util.List; import java.util.Locale; import static org.hamcrest.Matchers.equalTo; @@ -71,11 +76,11 @@ protected IndexMetadata getIndexMetadata() { return getIndexMetadata(randomAlphaOfLength(5)); } - private static void assertRolloverIndexRequest(RolloverRequest request, String alias) { + private static void assertRolloverIndexRequest(RolloverRequest request, String rolloverTarget) { assertNotNull(request); assertEquals(1, request.indices().length); - assertEquals(alias, request.indices()[0]); - assertEquals(alias, request.getRolloverTarget()); + assertEquals(rolloverTarget, request.indices()[0]); + assertEquals(rolloverTarget, request.getRolloverTarget()); assertFalse(request.isDryRun()); assertEquals(0, request.getConditions().size()); } @@ -86,17 +91,16 @@ public void testPerformAction() { RolloverStep step = createRandomInstance(); - Mockito.doAnswer(invocation -> { - RolloverRequest request = (RolloverRequest) invocation.getArguments()[0]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[1]; - assertRolloverIndexRequest(request, alias); - listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true)); - return null; - }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); + mockClientRolloverCall(alias); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -116,6 +120,55 @@ public void onFailure(Exception e) { Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any()); } + public void testPerformActionOnDataStream() { + String dataStreamName = "test-datastream"; + IndexMetadata indexMetadata = IndexMetadata.builder(dataStreamName + "-000001") + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + RolloverStep step = createRandomInstance(); + + mockClientRolloverCall(dataStreamName); + + SetOnce actionCompleted = new SetOnce<>(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L)) + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + actionCompleted.set(complete); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(true, actionCompleted.get()); + + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any()); + } + + private void mockClientRolloverCall(String rolloverTarget) { + Mockito.doAnswer(invocation -> { + RolloverRequest request = (RolloverRequest) invocation.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + assertRolloverIndexRequest(request, rolloverTarget); + listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true)); + return null; + }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); + } + public void testPerformActionWithIndexingComplete() { String alias = randomAlphaOfLength(5); IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10)) @@ -128,7 +181,13 @@ public void testPerformActionWithIndexingComplete() { RolloverStep step = createRandomInstance(); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -156,8 +215,13 @@ public void testPerformActionSkipsRolloverForAlreadyRolledIndex() { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); RolloverStep step = createRandomInstance(); - - step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -189,7 +253,13 @@ public void testPerformActionFailure() { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -218,7 +288,13 @@ public void testPerformActionInvalidNullOrEmptyAlias() { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); @@ -231,8 +307,8 @@ public void onFailure(Exception e) { }); assertThat(exceptionThrown.get().getClass(), equalTo(IllegalArgumentException.class)); assertThat(exceptionThrown.get().getMessage(), equalTo(String.format(Locale.ROOT, - "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, - indexMetadata.getIndex().getName()))); + "setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group of " + + "indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexMetadata.getIndex().getName()))); } public void testPerformActionAliasDoesNotPointToIndex() { @@ -243,7 +319,13 @@ public void testPerformActionAliasDoesNotPointToIndex() { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, true) + ) + .build(); + step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java index 5da2797c1c0ef..25b570adbcb0c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; @@ -113,7 +114,8 @@ public void testIsConditionMet() { SetOnce conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { + IndexMetadata indexMetadata = makeMeta(index); + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -165,7 +167,8 @@ public void testIsConditionIsTrueEvenWhenMoreSegments() { SetOnce conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { + IndexMetadata indexMetadata = makeMeta(index); + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -220,7 +223,8 @@ public void testFailedToRetrieveSomeSegments() { SetOnce conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { + IndexMetadata indexMetadata = makeMeta(index); + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -256,7 +260,8 @@ public void testThrowsException() { SetOnce exceptionThrown = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { + IndexMetadata indexMetadata = makeMeta(index); + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { throw new AssertionError("unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java index a2971d83529c0..d8db33a4dfaba 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java @@ -11,11 +11,13 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.util.Collections; +import java.util.List; import java.util.function.LongSupplier; import static org.hamcrest.Matchers.equalTo; @@ -78,6 +80,35 @@ public void testPerformAction() { assertThat(actualRolloverTime, equalTo(rolloverTime)); } + public void testPerformActionOnDataStream() { + long creationDate = randomLongBetween(0, 1000000); + long rolloverTime = randomValueOtherThan(creationDate, () -> randomNonNegativeLong()); + String dataStreamName = "test-datastream"; + IndexMetadata originalIndexMeta = IndexMetadata.builder(dataStreamName + "-000001") + .putRolloverInfo(new RolloverInfo(dataStreamName, Collections.emptyList(), rolloverTime)) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + IndexMetadata rolledIndexMeta= IndexMetadata.builder(dataStreamName + "-000002") + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L)) + .put(originalIndexMeta, true) + .put(rolledIndexMeta, true) + ).build(); + + UpdateRolloverLifecycleDateStep step = createRandomInstance(); + ClusterState newState = step.performAction(originalIndexMeta.getIndex(), clusterState); + long actualRolloverTime = LifecycleExecutionState + .fromIndexMetadata(newState.metadata().index(originalIndexMeta.getIndex())) + .getLifecycleDate(); + assertThat(actualRolloverTime, equalTo(rolloverTime)); + } + public void testPerformActionBeforeRolloverHappened() { String alias = randomAlphaOfLength(3); long creationDate = randomLongBetween(0, 1000000); @@ -92,8 +123,8 @@ public void testPerformActionBeforeRolloverHappened() { IllegalStateException exceptionThrown = expectThrows(IllegalStateException.class, () -> step.performAction(indexMetadata.getIndex(), clusterState)); assertThat(exceptionThrown.getMessage(), - equalTo("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" + alias + "], the index " + - "has not yet rolled over with that alias")); + equalTo("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with rollover target [" + alias + "], the " + + "index has not yet rolled over with that target")); } public void testPerformActionWithNoRolloverAliasSetting() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java index c26a3f52276ec..0b50036e49cd3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.io.IOException; +import java.util.List; import java.util.UUID; import static org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep.parseIndexNameCounter; @@ -147,6 +149,44 @@ public void testResultEvaluatedOnOnlyIndexTheAliasPointsToIfWriteIndexIsNull() { " met", createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true)); } + public void testResultEvaluatedOnDataStream() throws IOException { + String dataStreamName = "test-datastream"; + IndexMetadata originalIndexMeta = IndexMetadata.builder(dataStreamName + "-000001") + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + IndexMetadata rolledIndexMeta= IndexMetadata.builder(dataStreamName + "-000002") + .settings(settings(Version.CURRENT).put("index.write.wait_for_active_shards", "3")) + .numberOfShards(1).numberOfReplicas(3).build(); + + IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndexMeta.getIndex()); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndexMeta.getIndex().getName(), 0, "node", null, true, + ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndexMeta.getIndex().getName(), 0, "node2", null, false, + ShardRoutingState.STARTED)); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L)) + .put(originalIndexMeta, true) + .put(rolledIndexMeta, true) + ) + .routingTable(RoutingTable.builder().add(routingTable.build()).build()) + .build(); + + WaitForActiveShardsStep waitForActiveShardsStep = createRandomInstance(); + + ClusterStateWaitStep.Result result = waitForActiveShardsStep.isConditionMet(originalIndexMeta.getIndex(), clusterState); + assertThat(result.isComplete(), is(false)); + + XContentBuilder expected = new WaitForActiveShardsStep.ActiveShardsInfo(2, "3", false).toXContent(JsonXContent.contentBuilder(), + ToXContent.EMPTY_PARAMS); + String actualResultAsString = Strings.toString(result.getInfomationContext()); + assertThat(actualResultAsString, is(Strings.toString(expected))); + assertThat(actualResultAsString, containsString("waiting for [3] shards to become active, but only [2] are active")); + } + public void testResultReportsMeaningfulMessage() throws IOException { String alias = randomAlphaOfLength(5); IndexMetadata originalIndex = IndexMetadata.builder("index-000000") diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java index 2c8f7d1ffbe93..b14ca530ad19a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -68,18 +69,19 @@ public void testConditionMet() { final boolean[] conditionMetHolder = new boolean[1]; final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; final Exception[] exceptionHolder = new Exception[1]; - createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { - @Override - public void onResponse(boolean conditionMet, ToXContentObject informationContext) { - conditionMetHolder[0] = conditionMet; - informationContextHolder[0] = informationContext; - } - - @Override - public void onFailure(Exception e) { - exceptionHolder[0] = e; - } - }, MASTER_TIMEOUT); + createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), + new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }, MASTER_TIMEOUT); assertThat(conditionMetHolder[0], is(true)); assertThat(informationContextHolder[0], nullValue()); @@ -102,18 +104,19 @@ public void testConditionNotMetShardsNotInSync() { final boolean[] conditionMetHolder = new boolean[1]; final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; final Exception[] exceptionHolder = new Exception[1]; - createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { - @Override - public void onResponse(boolean conditionMet, ToXContentObject informationContext) { - conditionMetHolder[0] = conditionMet; - informationContextHolder[0] = informationContext; - } - - @Override - public void onFailure(Exception e) { - exceptionHolder[0] = e; - } - }, MASTER_TIMEOUT); + createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), + new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }, MASTER_TIMEOUT); assertThat(conditionMetHolder[0], is(false)); assertThat(informationContextHolder[0], notNullValue()); @@ -135,18 +138,19 @@ public void testConditionNotMetNotAFollowerIndex() { final boolean[] conditionMetHolder = new boolean[1]; final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; final Exception[] exceptionHolder = new Exception[1]; - createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { - @Override - public void onResponse(boolean conditionMet, ToXContentObject informationContext) { - conditionMetHolder[0] = conditionMet; - informationContextHolder[0] = informationContext; - } - - @Override - public void onFailure(Exception e) { - exceptionHolder[0] = e; - } - }, MASTER_TIMEOUT); + createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), + new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }, MASTER_TIMEOUT); assertThat(conditionMetHolder[0], is(true)); assertThat(informationContextHolder[0], nullValue()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java index 07599333a6c9f..c691378f5b4df 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.seqno.RetentionLease; @@ -78,7 +79,7 @@ public void testConditionMet() { final SetOnce conditionMetHolder = new SetOnce<>(); final SetOnce stepInfoHolder = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject infomationContext) { conditionMetHolder.set(conditionMet); @@ -111,7 +112,7 @@ public void testConditionNotMet() { final SetOnce conditionMetHolder = new SetOnce<>(); final SetOnce stepInfoHolder = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject infomationContext) { conditionMetHolder.set(conditionMet); @@ -148,7 +149,7 @@ public void testNoShardStats() { final SetOnce conditionMetHolder = new SetOnce<>(); final SetOnce stepInfoHolder = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject infomationContext) { conditionMetHolder.set(conditionMet); @@ -187,7 +188,7 @@ public void testFailure() { }).when(indicesClient).stats(any(), any()); final SetOnce exceptionHolder = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject infomationContext) { fail("onResponse should not be called in this test, called with conditionMet: " + conditionMet diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java index 179e4f8fac6c7..afb7b2a783d8e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java @@ -17,7 +17,9 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -26,6 +28,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -90,11 +93,11 @@ protected WaitForRolloverReadyStep copyInstance(WaitForRolloverReadyStep instanc instance.getMaxSize(), instance.getMaxAge(), instance.getMaxDocs()); } - private static void assertRolloverIndexRequest(RolloverRequest request, String alias, Set> expectedConditions) { + private static void assertRolloverIndexRequest(RolloverRequest request, String rolloverTarget, Set> expectedConditions) { assertNotNull(request); assertEquals(1, request.indices().length); - assertEquals(alias, request.indices()[0]); - assertEquals(alias, request.getRolloverTarget()); + assertEquals(rolloverTarget, request.indices()[0]); + assertEquals(rolloverTarget, request.getRolloverTarget()); assertEquals(expectedConditions.size(), request.getConditions().size()); assertTrue(request.isDryRun()); Set expectedConditionValues = expectedConditions.stream().map(Condition::value).collect(Collectors.toSet()); @@ -103,7 +106,6 @@ private static void assertRolloverIndexRequest(RolloverRequest request, String a assertEquals(expectedConditionValues, actualConditionValues); } - public void testEvaluateCondition() { String alias = randomAlphaOfLength(5); IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10)) @@ -113,29 +115,44 @@ public void testEvaluateCondition() { WaitForRolloverReadyStep step = createRandomInstance(); - Mockito.doAnswer(invocation -> { - RolloverRequest request = (RolloverRequest) invocation.getArguments()[0]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[1]; - Set> expectedConditions = new HashSet<>(); - if (step.getMaxAge() != null) { - expectedConditions.add(new MaxAgeCondition(step.getMaxAge())); - } - if (step.getMaxSize() != null) { - expectedConditions.add(new MaxSizeCondition(step.getMaxSize())); + mockRolloverIndexCall(alias, step); + + SetOnce conditionsMet = new SetOnce<>(); + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { + + @Override + public void onResponse(boolean complete, ToXContentObject infomationContext) { + conditionsMet.set(complete); } - if (step.getMaxDocs() != null) { - expectedConditions.add(new MaxDocsCondition(step.getMaxDocs())); + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); } - assertRolloverIndexRequest(request, alias, expectedConditions); - Map conditionResults = expectedConditions.stream() - .collect(Collectors.toMap(Condition::toString, condition -> true)); - listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false)); - return null; - }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); + }, MASTER_TIMEOUT); + + assertEquals(true, conditionsMet.get()); + + verify(client, Mockito.only()).admin(); + verify(adminClient, Mockito.only()).indices(); + verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any()); + } + + public void testEvaluateConditionOnDataStreamTarget() { + String dataStreamName = "test-datastream"; + IndexMetadata indexMetadata = IndexMetadata.builder(dataStreamName + "-000001") + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + + WaitForRolloverReadyStep step = createRandomInstance(); + + mockRolloverIndexCall(dataStreamName, step); SetOnce conditionsMet = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + Metadata metadata = Metadata.builder().put(indexMetadata, true) + .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L)) + .build(); + step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -155,6 +172,29 @@ public void onFailure(Exception e) { verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any()); } + private void mockRolloverIndexCall(String rolloverTarget, WaitForRolloverReadyStep step) { + Mockito.doAnswer(invocation -> { + RolloverRequest request = (RolloverRequest) invocation.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + Set> expectedConditions = new HashSet<>(); + if (step.getMaxAge() != null) { + expectedConditions.add(new MaxAgeCondition(step.getMaxAge())); + } + if (step.getMaxSize() != null) { + expectedConditions.add(new MaxSizeCondition(step.getMaxSize())); + } + if (step.getMaxDocs() != null) { + expectedConditions.add(new MaxDocsCondition(step.getMaxDocs())); + } + assertRolloverIndexRequest(request, rolloverTarget, expectedConditions); + Map conditionResults = expectedConditions.stream() + .collect(Collectors.toMap(Condition::toString, condition -> true)); + listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false)); + return null; + }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); + } + public void testEvaluateDoesntTriggerRolloverForIndexManuallyRolledOnLifecycleRolloverAlias() { String rolloverAlias = randomAlphaOfLength(5); IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10)) @@ -166,7 +206,7 @@ public void testEvaluateDoesntTriggerRolloverForIndexManuallyRolledOnLifecycleRo WaitForRolloverReadyStep step = createRandomInstance(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject informationContext) { @@ -195,7 +235,7 @@ public void testEvaluateTriggersRolloverForIndexManuallyRolledOnDifferentAlias() WaitForRolloverReadyStep step = createRandomInstance(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject informationContext) { @@ -220,7 +260,7 @@ public void testPerformActionWriteIndexIsFalse() { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); WaitForRolloverReadyStep step = createRandomInstance(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -249,7 +289,7 @@ public void testPerformActionWithIndexingComplete() { WaitForRolloverReadyStep step = createRandomInstance(); SetOnce conditionsMet = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -277,7 +317,7 @@ public void testPerformActionWithIndexingCompleteStillWriteIndex() { WaitForRolloverReadyStep step = createRandomInstance(); SetOnce correctFailureCalled = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -324,7 +364,7 @@ public void testPerformActionNotComplete() { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -373,7 +413,7 @@ public void testPerformActionFailure() { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { @@ -402,7 +442,7 @@ public void testPerformActionInvalidNullOrEmptyAlias() { WaitForRolloverReadyStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { throw new AssertionError("Unexpected method call"); @@ -427,7 +467,7 @@ public void testPerformActionAliasDoesNotPointToIndex() { WaitForRolloverReadyStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean complete, ToXContentObject infomationContext) { throw new AssertionError("Unexpected method call"); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java new file mode 100644 index 0000000000000..96aa70e24a8bd --- /dev/null +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.Step; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Locale; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * This class provides the operational REST functions needed to control an ILM time series lifecycle. + */ +public final class TimeSeriesRestDriver { + + private static final Logger logger = LogManager.getLogger(TimeSeriesRestDriver.class); + + private TimeSeriesRestDriver() { + } + + public static Step.StepKey getStepKeyForIndex(RestClient client, String indexName) throws IOException { + Map indexResponse = explainIndex(client, indexName); + if (indexResponse == null) { + return new Step.StepKey(null, null, null); + } + + return getStepKey(indexResponse); + } + + private static Step.StepKey getStepKey(Map explainIndexResponse) { + String phase = (String) explainIndexResponse.get("phase"); + String action = (String) explainIndexResponse.get("action"); + String step = (String) explainIndexResponse.get("step"); + return new Step.StepKey(phase, action, step); + } + + public static Map explainIndex(RestClient client, String indexName) throws IOException { + return explain(client, indexName, false, false).get(indexName); + } + + public static Map> explain(RestClient client, String indexPattern, boolean onlyErrors, + boolean onlyManaged) throws IOException { + Request explainRequest = new Request("GET", indexPattern + "/_ilm/explain"); + explainRequest.addParameter("only_errors", Boolean.toString(onlyErrors)); + explainRequest.addParameter("only_managed", Boolean.toString(onlyManaged)); + Response response = client.performRequest(explainRequest); + Map responseMap; + try (InputStream is = response.getEntity().getContent()) { + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + + @SuppressWarnings("unchecked") Map> indexResponse = + ((Map>) responseMap.get("indices")); + return indexResponse; + } + + public static void indexDocument(RestClient client, String indexAbstractionName) throws IOException { + indexDocument(client, indexAbstractionName, false); + } + + public static void indexDocument(RestClient client, String indexAbstractionName, boolean refresh) throws IOException { + Request indexRequest = new Request("POST", indexAbstractionName + "/_doc" + (refresh ? "?refresh" : "")); + indexRequest.setEntity(new StringEntity("{\"a\": \"test\"}", ContentType.APPLICATION_JSON)); + Response response = client.performRequest(indexRequest); + logger.info(response.getStatusLine()); + } + + public static void createNewSingletonPolicy(RestClient client, String policyName, String phaseName, LifecycleAction action) + throws IOException { + createNewSingletonPolicy(client, policyName, phaseName, action, TimeValue.ZERO); + } + + public static void createNewSingletonPolicy(RestClient client, String policyName, String phaseName, LifecycleAction action, + TimeValue after) throws IOException { + Phase phase = new Phase(phaseName, after, singletonMap(action.getWriteableName(), action)); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policyName, singletonMap(phase.getName(), phase)); + XContentBuilder builder = jsonBuilder(); + lifecyclePolicy.toXContent(builder, null); + final StringEntity entity = new StringEntity( + "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); + Request request = new Request("PUT", "_ilm/policy/" + policyName); + request.setEntity(entity); + client.performRequest(request); + } + + public static void createComposableTemplate(RestClient client, String templateName, String indexPattern, Template template) + throws IOException { + XContentBuilder builder = jsonBuilder(); + template.toXContent(builder, ToXContent.EMPTY_PARAMS); + StringEntity templateJSON = new StringEntity( + String.format(Locale.ROOT, "{\n" + + " \"index_patterns\": \"%s\",\n" + + " \"data_stream\": { \"timestamp_field\": \"@timestamp\" },\n" + + " \"template\": %s\n" + + "}", indexPattern, Strings.toString(builder)), + ContentType.APPLICATION_JSON); + Request createIndexTemplateRequest = new Request("PUT", "_index_template/" + templateName); + createIndexTemplateRequest.setEntity(templateJSON); + client.performRequest(createIndexTemplateRequest); + } + +} diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java index c735c4e65953c..76443b5257a31 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java @@ -116,18 +116,8 @@ public void testChangePolicyForIndex() throws Exception { assertBusy(() -> assertStep(indexName, PhaseCompleteStep.finalStep("warm").getKey()), 30, TimeUnit.SECONDS); // Check index is allocated on integTest-1 and integTest-2 as per policy_2 - Request getSettingsRequest = new Request("GET", "/" + indexName + "/_settings"); - Response getSettingsResponse = client().performRequest(getSettingsRequest); - assertOK(getSettingsResponse); - Map getSettingsResponseMap = entityAsMap(getSettingsResponse); - @SuppressWarnings("unchecked") - Map indexSettings = (Map) ((Map) getSettingsResponseMap.get(indexName)) - .get("settings"); - @SuppressWarnings("unchecked") - Map routingSettings = (Map) ((Map) indexSettings.get("index")).get("routing"); - @SuppressWarnings("unchecked") - String includesAllocation = (String) ((Map) ((Map) routingSettings.get("allocation")) - .get("include")).get("_name"); + Map indexSettings = getIndexSettingsAsMap(indexName); + String includesAllocation = (String) indexSettings.get("index.routing.allocation.include._name"); assertEquals("integTest-1,integTest-2", includesAllocation); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java new file mode 100644 index 0000000000000..90396241fa129 --- /dev/null +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm; + +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; +import org.elasticsearch.xpack.core.ilm.RolloverAction; + +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; +import static org.hamcrest.Matchers.equalTo; + +public class TimeSeriesDataStreamsIT extends ESRestTestCase { + + public void testRolloverAction() throws Exception { + String policyName = "logs-policy"; + createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L)); + + Settings lifecycleNameSetting = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName).build(); + Template template = new Template(lifecycleNameSetting, null, null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, true); + + assertBusy(() -> assertTrue(indexExists("logs-foo-000002"))); + assertBusy(() -> assertTrue(Boolean.parseBoolean((String) getIndexSettingsAsMap("logs-foo-000002").get("index.hidden")))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), "logs-foo-000001"), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + } + +} diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 57c5b2654f965..9f9647b384ccb 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -67,6 +67,11 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.explain; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -145,7 +150,7 @@ public void testMoveToAllocateStep() throws Exception { // move to a step Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex); - assertBusy(() -> assertTrue(getStepKeyForIndex(originalIndex).equals(new StepKey("new", "complete", "complete")))); + assertBusy(() -> assertTrue(getStepKeyForIndex(client(), originalIndex).equals(new StepKey("new", "complete", "complete")))); moveToStepRequest.setJsonEntity("{\n" + " \"current_step\": {\n" + " \"phase\": \"new\",\n" + @@ -180,7 +185,7 @@ public void testMoveToRolloverStep() throws Exception { Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex); // index document to trigger rollover index(client(), originalIndex, "_id", "foo", "bar"); - logger.info(getStepKeyForIndex(originalIndex)); + logger.info(getStepKeyForIndex(client(), originalIndex)); moveToStepRequest.setJsonEntity("{\n" + " \"current_step\": {\n" + " \"phase\": \"new\",\n" + @@ -210,15 +215,15 @@ public void testMoveToRolloverStep() throws Exception { } public void testRetryFailedDeleteAction() throws Exception { - createNewSingletonPolicy("delete", new DeleteAction()); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction()); createIndexWithSettings(index, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_READ_ONLY, true) .put("index.lifecycle.name", policy)); - assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30, - TimeUnit.SECONDS); + assertBusy(() -> assertThat((Integer) explainIndex(client(), index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), + 30, TimeUnit.SECONDS); assertTrue(indexExists(index)); Request request = new Request("PUT", index + "/_settings"); @@ -229,7 +234,7 @@ public void testRetryFailedDeleteAction() throws Exception { } public void testRetryFreezeDeleteAction() throws Exception { - createNewSingletonPolicy("cold", new FreezeAction()); + createNewSingletonPolicy(client(), policy, "cold", new FreezeAction()); createIndexWithSettings(index, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -237,8 +242,8 @@ public void testRetryFreezeDeleteAction() throws Exception { .put(IndexMetadata.SETTING_READ_ONLY, true) .put("index.lifecycle.name", policy)); - assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30, - TimeUnit.SECONDS); + assertBusy(() -> assertThat((Integer) explainIndex(client(), index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), + 30, TimeUnit.SECONDS); assertFalse(getOnlyIndexSettings(index).containsKey("index.frozen")); Request request = new Request("PUT", index + "/_settings"); @@ -255,7 +260,7 @@ public void testRetryFailedShrinkAction() throws Exception { String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("warm", new ShrinkAction(numShards + randomIntBetween(1, numShards))); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numShards + randomIntBetween(1, numShards))); updatePolicy(index, policy); assertBusy(() -> { String failedStep = getFailedStepForIndex(index); @@ -263,7 +268,7 @@ public void testRetryFailedShrinkAction() throws Exception { }); // update policy to be correct - createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards)); updatePolicy(index, policy); // retry step @@ -273,14 +278,14 @@ public void testRetryFailedShrinkAction() throws Exception { // assert corrected policy is picked up and index is shrunken assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS); assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index))); - assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); assertBusy(() -> { Map settings = getOnlyIndexSettings(shrunkenIndex); assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards))); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue()); }); - expectThrows(ResponseException.class, this::indexDocument); + expectThrows(ResponseException.class, () -> indexDocument(client(), index)); } public void testRolloverAction() throws Exception { @@ -291,7 +296,7 @@ public void testRolloverAction() throws Exception { .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); // create policy - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); // update policy on index updatePolicy(originalIndex, policy); // index document {"foo": "bar"} to trigger rollover @@ -330,12 +335,12 @@ public void testRolloverActionWithIndexingComplete() throws Exception { client().performRequest(updateAliasRequest); // create policy - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); // update policy on index updatePolicy(originalIndex, policy); // index document {"foo": "bar"} to trigger rollover index(client(), originalIndex, "_id", "foo", "bar"); - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); assertBusy(() -> assertTrue(indexExists(originalIndex))); assertBusy(() -> assertFalse(indexExists(secondIndex))); assertBusy(() -> assertEquals("true", getOnlyIndexSettings(originalIndex).get(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE))); @@ -347,10 +352,10 @@ public void testAllocateOnlyAllocation() throws Exception { String allocateNodeName = "integTest-" + randomFrom(0, 1); AllocateAction allocateAction = new AllocateAction(null, null, null, singletonMap("_name", allocateNodeName)); String endPhase = randomFrom("warm", "cold"); - createNewSingletonPolicy(endPhase, allocateAction); + createNewSingletonPolicy(client(), policy, endPhase, allocateAction); updatePolicy(index, policy); assertBusy(() -> { - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey())); }); ensureGreen(index); } @@ -363,11 +368,11 @@ public void testAllocateActionOnlyReplicas() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas)); AllocateAction allocateAction = new AllocateAction(finalNumReplicas, null, null, null); String endPhase = randomFrom("warm", "cold"); - createNewSingletonPolicy(endPhase, allocateAction); + createNewSingletonPolicy(client(), policy, endPhase, allocateAction); updatePolicy(index, policy); assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep(endPhase).getKey())); assertThat(settings.get(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()), equalTo(String.valueOf(finalNumReplicas))); }); } @@ -376,10 +381,10 @@ public void testWaitForSnapshot() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); String slmPolicy = randomAlphaOfLengthBetween(4, 10); - createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); + createNewSingletonPolicy(client(), policy, "delete", new WaitForSnapshotAction(slmPolicy)); updatePolicy(index, policy); assertBusy( () -> { - Map indexILMState = explainIndex(index); + Map indexILMState = explainIndex(client(), index); assertThat(indexILMState.get("action"), is("wait_for_snapshot")); assertThat(indexILMState.get("failed_step"), is("wait-for-snapshot")); }, slmPolicy); @@ -388,7 +393,7 @@ public void testWaitForSnapshot() throws Exception { createSlmPolicy(slmPolicy, repo); assertBusy( () -> { - Map indexILMState = explainIndex(index); + Map indexILMState = explainIndex(client(), index); //wait for step to notice that the slm policy is created and to get out of error assertThat(indexILMState.get("failed_step"), nullValue()); assertThat(indexILMState.get("action"), is("wait_for_snapshot")); @@ -398,14 +403,14 @@ public void testWaitForSnapshot() throws Exception { Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); assertOK(client().performRequest(request)); - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("complete")), slmPolicy); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index).getAction(), equalTo("complete")), slmPolicy); } public void testWaitForSnapshotSlmExecutedBefore() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); String slmPolicy = randomAlphaOfLengthBetween(4, 10); - createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); + createNewSingletonPolicy(client(), policy, "delete", new WaitForSnapshotAction(slmPolicy)); String repo = createSnapshotRepo(); createSlmPolicy(slmPolicy, repo); @@ -425,7 +430,7 @@ public void testWaitForSnapshotSlmExecutedBefore() throws Exception { updatePolicy(index, policy); assertBusy( () -> { - Map indexILMState = explainIndex(index); + Map indexILMState = explainIndex(client(), index); assertThat(indexILMState.get("failed_step"), nullValue()); assertThat(indexILMState.get("action"), is("wait_for_snapshot")); assertThat(indexILMState.get("step"), is("wait-for-snapshot")); @@ -443,13 +448,13 @@ public void testWaitForSnapshotSlmExecutedBefore() throws Exception { } }, slmPolicy); - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("complete")), slmPolicy); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index).getAction(), equalTo("complete")), slmPolicy); } public void testDelete() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("delete", new DeleteAction()); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction()); updatePolicy(index, policy); assertBusy(() -> assertFalse(indexExists(index))); } @@ -457,14 +462,14 @@ public void testDelete() throws Exception { public void testDeleteOnlyShouldNotMakeIndexReadonly() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("delete", new DeleteAction(), TimeValue.timeValueHours(1)); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction(), TimeValue.timeValueHours(1)); updatePolicy(index, policy); assertBusy(() -> { - assertThat(getStepKeyForIndex(index).getAction(), equalTo("complete")); + assertThat(getStepKeyForIndex(client(), index).getAction(), equalTo("complete")); Map settings = getOnlyIndexSettings(index); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), not("true")); }); - indexDocument(); + indexDocument(client(), index); } public void testDeleteDuringSnapshot() throws Exception { @@ -482,12 +487,12 @@ public void testDeleteDuringSnapshot() throws Exception { .endObject())); assertOK(client().performRequest(request)); // create delete policy - createNewSingletonPolicy("delete", new DeleteAction(), TimeValue.timeValueMillis(0)); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction(), TimeValue.timeValueMillis(0)); // create index without policy createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); // index document so snapshot actually does something - indexDocument(); + indexDocument(client(), index); // start snapshot String snapName = "snapshot-" + randomAlphaOfLength(6).toLowerCase(Locale.ROOT); request = new Request("PUT", "/_snapshot/repo/" + snapName); @@ -509,11 +514,11 @@ public void testDeleteDuringSnapshot() throws Exception { public void testReadOnly() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("warm", new ReadOnlyAction()); + createNewSingletonPolicy(client(), policy, "warm", new ReadOnlyAction()); updatePolicy(index, policy); assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); }); } @@ -542,16 +547,16 @@ public void forceMergeActionWithCodec(String codec) throws Exception { } }; assertThat(numSegments.get(), greaterThan(1)); - createNewSingletonPolicy("warm", new ForceMergeAction(1, codec)); + createNewSingletonPolicy(client(), policy, "warm", new ForceMergeAction(1, codec)); updatePolicy(index, policy); assertBusy(() -> { - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); Map settings = getOnlyIndexSettings(index); assertThat(numSegments.get(), equalTo(1)); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); }); - expectThrows(ResponseException.class, this::indexDocument); + expectThrows(ResponseException.class, () -> indexDocument(client(), index)); } public void testForceMergeAction() throws Exception { @@ -565,18 +570,18 @@ public void testShrinkAction() throws Exception { String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards)); updatePolicy(index, policy); assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS); assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index))); - assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); assertBusy(() -> { Map settings = getOnlyIndexSettings(shrunkenIndex); assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards))); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue()); }); - expectThrows(ResponseException.class, this::indexDocument); + expectThrows(ResponseException.class, () -> indexDocument(client(), index)); } public void testShrinkSameShards() throws Exception { @@ -584,14 +589,14 @@ public void testShrinkSameShards() throws Exception { String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("warm", new ShrinkAction(numberOfShards)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numberOfShards)); updatePolicy(index, policy); assertBusy(() -> { assertTrue(indexExists(index)); assertFalse(indexExists(shrunkenIndex)); assertFalse(aliasExists(shrunkenIndex, index)); Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(numberOfShards))); assertNull(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey())); assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue()); @@ -614,7 +619,7 @@ public void testShrinkDuringSnapshot() throws Exception { .endObject())); assertOK(client().performRequest(request)); // create delete policy - createNewSingletonPolicy("warm", new ShrinkAction(1), TimeValue.timeValueMillis(0)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1), TimeValue.timeValueMillis(0)); // create index without policy createIndexWithSettings(index, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) @@ -622,7 +627,7 @@ public void testShrinkDuringSnapshot() throws Exception { // required so the shrink doesn't wait on SetSingleNodeAllocateStep .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name", "integTest-0")); // index document so snapshot actually does something - indexDocument(); + indexDocument(client(), index); // start snapshot request = new Request("PUT", "/_snapshot/repo/snapshot"); request.addParameter("wait_for_completion", "false"); @@ -635,12 +640,12 @@ public void testShrinkDuringSnapshot() throws Exception { assertTrue(indexExists(shrunkenIndex)); assertTrue(aliasExists(shrunkenIndex, index)); Map settings = getOnlyIndexSettings(shrunkenIndex); - assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(1))); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); assertThat(settings.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"), nullValue()); }, 2, TimeUnit.MINUTES); - expectThrows(ResponseException.class, this::indexDocument); + expectThrows(ResponseException.class, () -> indexDocument(client(), index)); // assert that snapshot succeeded assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); @@ -674,12 +679,12 @@ public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception }); // assign the policy that'll attempt to shrink the index - createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards)); updatePolicy(index, policy); assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> { try { - Map explainIndexResponse = explainIndex(index); + Map explainIndexResponse = explainIndex(client(), index); if (explainIndexResponse == null) { return false; } @@ -701,17 +706,17 @@ public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS); assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index))); - assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); } public void testFreezeAction() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - createNewSingletonPolicy("cold", new FreezeAction()); + createNewSingletonPolicy(client(), policy, "cold", new FreezeAction()); updatePolicy(index, policy); assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("cold").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("cold").getKey())); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true")); assertThat(settings.get("index.frozen"), equalTo("true")); @@ -733,12 +738,12 @@ public void testFreezeDuringSnapshot() throws Exception { .endObject())); assertOK(client().performRequest(request)); // create delete policy - createNewSingletonPolicy("cold", new FreezeAction(), TimeValue.timeValueMillis(0)); + createNewSingletonPolicy(client(), policy, "cold", new FreezeAction(), TimeValue.timeValueMillis(0)); // create index without policy createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); // index document so snapshot actually does something - indexDocument(); + indexDocument(client(), index); // start snapshot request = new Request("PUT", "/_snapshot/repo/snapshot"); request.addParameter("wait_for_completion", "false"); @@ -749,7 +754,7 @@ public void testFreezeDuringSnapshot() throws Exception { // assert that the index froze assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("cold").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("cold").getKey())); assertThat(settings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true")); assertThat(settings.get("index.frozen"), equalTo("true")); @@ -766,11 +771,11 @@ public void testSetPriority() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.INDEX_PRIORITY_SETTING.getKey(), 100)); int priority = randomIntBetween(0, 99); - createNewSingletonPolicy("warm", new SetPriorityAction(priority)); + createNewSingletonPolicy(client(), policy, "warm", new SetPriorityAction(priority)); updatePolicy(index, policy); assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); assertThat(settings.get(IndexMetadata.INDEX_PRIORITY_SETTING.getKey()), equalTo(String.valueOf(priority))); }); } @@ -778,11 +783,11 @@ public void testSetPriority() throws Exception { public void testSetNullPriority() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.INDEX_PRIORITY_SETTING.getKey(), 100)); - createNewSingletonPolicy("warm", new SetPriorityAction((Integer) null)); + createNewSingletonPolicy(client(), policy, "warm", new SetPriorityAction((Integer) null)); updatePolicy(index, policy); assertBusy(() -> { Map settings = getOnlyIndexSettings(index); - assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); assertNull(settings.get(IndexMetadata.INDEX_PRIORITY_SETTING.getKey())); }); } @@ -806,7 +811,7 @@ public void testNonexistentPolicy() throws Exception { client().performRequest(templateRequest); policy = randomAlphaOfLengthBetween(5,20); - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); index = indexPrefix + "-000001"; final StringEntity putIndex = new StringEntity("{\n" + @@ -819,7 +824,7 @@ public void testNonexistentPolicy() throws Exception { Request putIndexRequest = new Request("PUT", index); putIndexRequest.setEntity(putIndex); client().performRequest(putIndexRequest); - indexDocument(); + indexDocument(client(), index); assertBusy(() -> { Request explainRequest = new Request("GET", index + "/_ilm/explain"); @@ -844,19 +849,19 @@ public void testInvalidPolicyNames() { ResponseException ex; policy = randomAlphaOfLengthBetween(0,10) + "," + randomAlphaOfLengthBetween(0,10); - ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); + ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy(client(), policy, "delete", new DeleteAction())); assertThat(ex.getMessage(), containsString("invalid policy name")); policy = randomAlphaOfLengthBetween(0,10) + "%20" + randomAlphaOfLengthBetween(0,10); - ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); + ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy(client(), policy, "delete", new DeleteAction())); assertThat(ex.getMessage(), containsString("invalid policy name")); policy = "_" + randomAlphaOfLengthBetween(1, 20); - ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); + ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy(client(), policy, "delete", new DeleteAction())); assertThat(ex.getMessage(), containsString("invalid policy name")); policy = randomAlphaOfLengthBetween(256, 1000); - ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); + ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy(client(), policy, "delete", new DeleteAction())); assertThat(ex.getMessage(), containsString("invalid policy name")); } @@ -866,11 +871,11 @@ public void testDeletePolicyInUse() throws IOException { String unmanagedIndex = randomAlphaOfLength(9).toLowerCase(Locale.ROOT); String managedByOtherPolicyIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createNewSingletonPolicy("delete", new DeleteAction(), TimeValue.timeValueHours(12)); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction(), TimeValue.timeValueHours(12)); String originalPolicy = policy; String otherPolicy = randomValueOtherThan(policy, () -> randomAlphaOfLength(5)); policy = otherPolicy; - createNewSingletonPolicy("delete", new DeleteAction(), TimeValue.timeValueHours(13)); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction(), TimeValue.timeValueHours(13)); createIndexWithSettingsNoAlias(managedIndex1, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1,10)) @@ -899,7 +904,7 @@ public void testRemoveAndReaddPolicy() throws Exception { String originalIndex = index + "-000001"; String secondIndex = index + "-000002"; // Set up a policy with rollover - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); createIndexWithSettings( originalIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -927,22 +932,22 @@ public void testRemoveAndReaddPolicy() throws Exception { " }\n" + "}"); client().performRequest(addPolicyRequest); - assertBusy(() -> assertTrue((boolean) explainIndex(originalIndex).getOrDefault("managed", false))); + assertBusy(() -> assertTrue((boolean) explainIndex(client(), originalIndex).getOrDefault("managed", false))); // Wait for everything to be copacetic - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } public void testMoveToInjectedStep() throws Exception { String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; - createNewSingletonPolicy("warm", new ShrinkAction(1), TimeValue.timeValueHours(12)); + createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1), TimeValue.timeValueHours(12)); createIndexWithSettings(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(LifecycleSettings.LIFECYCLE_NAME, policy) .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)); - assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(new StepKey("new", "complete", "complete")))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(new StepKey("new", "complete", "complete")))); // Move to a step from the injected unfollow action Request moveToStepRequest = new Request("POST", "_ilm/move/" + index); @@ -965,13 +970,14 @@ public void testMoveToInjectedStep() throws Exception { assertBusy(() -> { assertTrue(indexExists(shrunkenIndex)); assertTrue(aliasExists(shrunkenIndex, index)); - assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())); + }); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53612") public void testMoveToStepRereadsPolicy() throws Exception { - createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueHours(1), null), TimeValue.ZERO); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, TimeValue.timeValueHours(1), null), TimeValue.ZERO); createIndexWithSettings("test-1", Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -980,9 +986,10 @@ public void testMoveToStepRereadsPolicy() throws Exception { .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias), true); - assertBusy(() -> assertThat(getStepKeyForIndex("test-1"), equalTo(new StepKey("hot", "rollover", "check-rollover-ready")))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), "test-1"), + equalTo(new StepKey("hot", "rollover", "check-rollover-ready")))); - createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null), TimeValue.ZERO); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null), TimeValue.ZERO); // Move to the same step, which should re-read the policy Request moveToStepRequest = new Request("POST", "_ilm/move/test-1"); @@ -1069,17 +1076,17 @@ public void testExplainFilters() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); assertBusy(() -> { - Map> explainResponse = explain(index + "*", false, false); + Map> explainResponse = explain(client(), index + "*", false, false); assertNotNull(explainResponse); assertThat(explainResponse, allOf(hasKey(goodIndex), hasKey(errorIndex), hasKey(nonexistantPolicyIndex), hasKey(unmanagedIndex))); - Map> onlyManagedResponse = explain(index + "*", false, true); + Map> onlyManagedResponse = explain(client(), index + "*", false, true); assertNotNull(onlyManagedResponse); assertThat(onlyManagedResponse, allOf(hasKey(goodIndex), hasKey(errorIndex), hasKey(nonexistantPolicyIndex))); assertThat(onlyManagedResponse, not(hasKey(unmanagedIndex))); - Map> onlyErrorsResponse = explain(index + "*", true, true); + Map> onlyErrorsResponse = explain(client(), index + "*", true, true); assertNotNull(onlyErrorsResponse); assertThat(onlyErrorsResponse, allOf(hasKey(errorIndex), hasKey(nonexistantPolicyIndex))); assertThat(onlyErrorsResponse, allOf(not(hasKey(goodIndex)), not(hasKey(unmanagedIndex)))); @@ -1096,7 +1103,7 @@ public void testExplainIndexContainsAutomaticRetriesInformation() throws Excepti ); assertBusy(() -> { - Map explainIndex = explainIndex(index); + Map explainIndex = explainIndex(client(), index); assertThat((Integer) explainIndex.get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)); assertThat(explainIndex.get(IS_AUTO_RETRYABLE_ERROR_FIELD), is(true)); }); @@ -1105,7 +1112,7 @@ public void testExplainIndexContainsAutomaticRetriesInformation() throws Excepti public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception { String firstIndex = index + "-000001"; - createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); // create the index as readonly and associate the ILM policy to it createIndexWithSettings( @@ -1119,7 +1126,8 @@ public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception { ); // wait for ILM to start retrying the step - assertBusy(() -> assertThat((Integer) explainIndex(firstIndex).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1))); + assertBusy(() -> assertThat((Integer) explainIndex(client(), firstIndex).get(FAILED_STEP_RETRY_COUNT_FIELD), + greaterThanOrEqualTo(1))); // remove the read only block Request allowWritesOnIndexSettingUpdate = new Request("PUT", firstIndex + "/_settings"); @@ -1131,7 +1139,7 @@ public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception { client().performRequest(allowWritesOnIndexSettingUpdate); // index is not readonly so the ILM should complete successfully - assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), firstIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } public void testILMRolloverOnManuallyRolledIndex() throws Exception { @@ -1140,7 +1148,7 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { String thirdIndex = index + "-000003"; // Set up a policy with rollover - createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 2L)); Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); createIndexTemplate.setJsonEntity("{" + "\"index_patterns\": [\"" + index + "-*\"], \n" + @@ -1181,10 +1189,10 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { client().performRequest(refreshOriginalIndex); // Wait for the rollover policy to execute - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); // ILM should manage the second index after attempting (and skipping) rolling the original index - assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true))); + assertBusy(() -> assertTrue((boolean) explainIndex(client(), secondIndex).getOrDefault("managed", true))); // index some documents to trigger an ILM rollover index(client(), alias, "1", "foo", "bar"); @@ -1194,7 +1202,7 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { client().performRequest(refreshSecondIndex).getStatusLine(); // ILM should rollover the second index even though it skipped the first one - assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), secondIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); assertBusy(() -> assertTrue(indexExists(thirdIndex))); } @@ -1202,7 +1210,7 @@ public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Except String index = this.index + "-000001"; String rolledIndex = this.index + "-000002"; - createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); // create the rolled index so the rollover of the first index fails createIndexWithSettings( @@ -1222,7 +1230,8 @@ public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Except true ); - assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30, + assertBusy(() -> assertThat((Integer) explainIndex(client(), index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), + 30, TimeUnit.SECONDS); Request moveToStepRequest = new Request("POST", "_ilm/move/" + index); @@ -1255,7 +1264,7 @@ public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Except // retried (which means ILM moves back and forth between the `attempt-rollover` step and the `error` step) assertTrue("ILM did not start retrying the attempt-rollover step", waitUntil(() -> { try { - Map explainIndexResponse = explainIndex(index); + Map explainIndexResponse = explainIndex(client(), index); String failedStep = (String) explainIndexResponse.get("failed_step"); Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); return failedStep != null && failedStep.equals("attempt-rollover") && retryCount != null && retryCount >= 1; @@ -1268,13 +1277,13 @@ public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Except // the rollover step should eventually succeed assertBusy(() -> assertThat(indexExists(rolledIndex), is(true))); - assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing() throws Exception { String index = this.index + "-000001"; - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); createIndexWithSettings( index, @@ -1285,7 +1294,7 @@ public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing( true ); - assertBusy(() -> assertThat(getStepKeyForIndex(index).getName(), is(WaitForRolloverReadyStep.NAME))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index).getName(), is(WaitForRolloverReadyStep.NAME))); // moving ILM to the "update-rollover-lifecycle-date" without having gone through the actual rollover step // the "update-rollover-lifecycle-date" step will fail as the index has no rollover information @@ -1306,7 +1315,7 @@ public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing( assertTrue("ILM did not start retrying the update-rollover-lifecycle-date step", waitUntil(() -> { try { - Map explainIndexResponse = explainIndex(index); + Map explainIndexResponse = explainIndex(client(), index); String failedStep = (String) explainIndexResponse.get("failed_step"); Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); return failedStep != null && failedStep.equals(UpdateRolloverLifecycleDateStep.NAME) && retryCount != null @@ -1330,7 +1339,7 @@ public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing( "}" ); client().performRequest(rolloverRequest); - assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } public void testWaitForActiveShardsStep() throws Exception { @@ -1342,7 +1351,7 @@ public void testWaitForActiveShardsStep() throws Exception { true); // create policy - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); // update policy on index updatePolicy(originalIndex, policy); Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); @@ -1360,17 +1369,17 @@ public void testWaitForActiveShardsStep() throws Exception { index(client(), originalIndex, "_id", "foo", "bar"); assertBusy(() -> assertTrue(indexExists(secondIndex))); - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex).getName(), equalTo(WaitForActiveShardsStep.NAME))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex).getName(), equalTo(WaitForActiveShardsStep.NAME))); // reset the number of replicas to 0 so that the second index wait for active shard condition can be met updateIndexSettings(secondIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), originalIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/54093") public void testHistoryIsWrittenWithSuccess() throws Exception { - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); createIndexTemplate.setJsonEntity("{" + "\"index_patterns\": [\""+ index + "-*\"], \n" + @@ -1390,7 +1399,7 @@ public void testHistoryIsWrittenWithSuccess() throws Exception { Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); client().performRequest(refreshIndex); - assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1"), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index + "-1"), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS); assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS); @@ -1410,7 +1419,7 @@ public void testHistoryIsWrittenWithSuccess() throws Exception { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353") public void testHistoryIsWrittenWithFailure() throws Exception { createIndexWithSettings(index + "-1", Settings.builder(), false); - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); updatePolicy(index + "-1", policy); // Index a document @@ -1419,7 +1428,8 @@ public void testHistoryIsWrittenWithFailure() throws Exception { client().performRequest(refreshIndex); // Check that we've had error and auto retried - assertBusy(() -> assertThat((Integer) explainIndex(index + "-1").get("failed_step_retry_count"), greaterThanOrEqualTo(1))); + assertBusy(() -> assertThat((Integer) explainIndex(client(), index + "-1").get("failed_step_retry_count"), + greaterThanOrEqualTo(1))); assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", false, "ERROR"), 30, TimeUnit.SECONDS); } @@ -1428,7 +1438,7 @@ public void testHistoryIsWrittenWithFailure() throws Exception { public void testHistoryIsWrittenWithDeletion() throws Exception { // Index should be created and then deleted by ILM createIndexWithSettings(index, Settings.builder(), false); - createNewSingletonPolicy("delete", new DeleteAction()); + createNewSingletonPolicy(client(), policy, "delete", new DeleteAction()); updatePolicy(index, policy); assertBusy(() -> assertFalse(indexExists(index))); @@ -1444,7 +1454,7 @@ public void testRetryableInitializationStep() throws Exception { Request stopReq = new Request("POST", "/_ilm/stop"); Request startReq = new Request("POST", "/_ilm/start"); - createNewSingletonPolicy("hot", new SetPriorityAction(1)); + createNewSingletonPolicy(client(), policy, "hot", new SetPriorityAction(1)); // Stop ILM so that the initialize step doesn't run assertOK(client().performRequest(stopReq)); @@ -1465,7 +1475,7 @@ public void testRetryableInitializationStep() throws Exception { // Wait until an error has occurred. assertTrue("ILM did not start retrying the init step", waitUntil(() -> { try { - Map explainIndexResponse = explainIndex(index); + Map explainIndexResponse = explainIndex(client(), index); String failedStep = (String) explainIndexResponse.get("failed_step"); Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); return failedStep != null && failedStep.equals(InitializePolicyContextStep.KEY.getAction()) && retryCount != null @@ -1480,7 +1490,7 @@ public void testRetryableInitializationStep() throws Exception { .put(LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE, false)); assertBusy(() -> { - Map explainResp = explainIndex(index); + Map explainResp = explainIndex(client(), index); String phase = (String) explainResp.get("phase"); assertThat(phase, equalTo("hot")); }); @@ -1489,7 +1499,7 @@ public void testRetryableInitializationStep() throws Exception { public void testRefreshablePhaseJson() throws Exception { String index = "refresh-index"; - createNewSingletonPolicy("hot", new RolloverAction(null, null, 100L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 100L)); Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); createIndexTemplate.setJsonEntity("{" + "\"index_patterns\": [\""+ index + "-*\"], \n" + @@ -1511,20 +1521,20 @@ public void testRefreshablePhaseJson() throws Exception { index(client(), index + "-1", "1", "foo", "bar"); // Wait for the index to enter the check-rollover-ready step - assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME))); // Update the policy to allow rollover at 1 document instead of 100 - createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + createNewSingletonPolicy(client(), policy, "hot", new RolloverAction(null, null, 1L)); // Index should now have been able to roll over, creating the new index and proceeding to the "complete" step assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true))); - assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(PhaseCompleteStep.NAME))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index + "-1").getName(), equalTo(PhaseCompleteStep.NAME))); } public void testHaltAtEndOfPhase() throws Exception { String index = "halt-index"; - createNewSingletonPolicy("hot", new SetPriorityAction(100)); + createNewSingletonPolicy(client(), policy, "hot", new SetPriorityAction(100)); createIndexWithSettings(index, Settings.builder() @@ -1534,7 +1544,7 @@ public void testHaltAtEndOfPhase() throws Exception { randomBoolean()); // Wait for the index to finish the "hot" phase - assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); + assertBusy(() -> assertThat(getStepKeyForIndex(client(), index), equalTo(PhaseCompleteStep.finalStep("hot").getKey()))); // Update the policy to add a delete phase { @@ -1560,7 +1570,7 @@ public void testHaltAtEndOfPhase() throws Exception { public void testSearchableSnapshotAction() throws Exception { String snapshotRepo = createSnapshotRepo(); - createNewSingletonPolicy("cold", new SearchableSnapshotAction(snapshotRepo)); + createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo)); createIndexWithSettings(index, Settings.builder() @@ -1578,7 +1588,8 @@ public void testSearchableSnapshotAction() throws Exception { } }, 30, TimeUnit.SECONDS)); - assertBusy(() -> assertThat(explainIndex(restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)), 30, + TimeUnit.SECONDS); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/54433") @@ -1613,10 +1624,10 @@ public void testDeleteActionDeletesSearchableSnapshot() throws Exception { String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + this.index; assertTrue(waitUntil(() -> { try { - Map explainIndex = explainIndex(index); + Map explainIndex = explainIndex(client(), index); if(explainIndex == null) { // in case we missed the original index and it was deleted - explainIndex = explainIndex(restoredIndexName); + explainIndex = explainIndex(client(), restoredIndexName); } snapshotName[0] = (String) explainIndex.get("snapshot_name"); return snapshotName[0] != null; @@ -1669,10 +1680,10 @@ public void testDeleteActionDoesntDeleteSearchableSnapshot() throws Exception { String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + this.index; assertTrue(waitUntil(() -> { try { - Map explainIndex = explainIndex(index); + Map explainIndex = explainIndex(client(), index); if(explainIndex == null) { // in case we missed the original index and it was deleted - explainIndex = explainIndex(restoredIndexName); + explainIndex = explainIndex(client(), restoredIndexName); } snapshotName[0] = (String) explainIndex.get("snapshot_name"); return snapshotName[0] != null; @@ -1796,7 +1807,7 @@ private void assertHistoryIsPresent(String policyName, String indexName, boolean } // Finally, check that the history index is in a good state - Step.StepKey stepKey = getStepKeyForIndex("ilm-history-2-000001"); + Step.StepKey stepKey = getStepKeyForIndex(client(), "ilm-history-2-000001"); assertEquals("hot", stepKey.getPhase()); assertEquals(RolloverAction.NAME, stepKey.getAction()); assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); @@ -1830,22 +1841,6 @@ private void createFullPolicy(TimeValue hotTime) throws IOException { assertOK(client().performRequest(request)); } - private void createNewSingletonPolicy(String phaseName, LifecycleAction action) throws IOException { - createNewSingletonPolicy(phaseName, action, TimeValue.ZERO); - } - - private void createNewSingletonPolicy(String phaseName, LifecycleAction action, TimeValue after) throws IOException { - Phase phase = new Phase(phaseName, after, singletonMap(action.getWriteableName(), action)); - LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, singletonMap(phase.getName(), phase)); - XContentBuilder builder = jsonBuilder(); - lifecyclePolicy.toXContent(builder, null); - final StringEntity entity = new StringEntity( - "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); - Request request = new Request("PUT", "_ilm/policy/" + policy); - request.setEntity(entity); - client().performRequest(request); - } - private void createIndexWithSettingsNoAlias(String index, Settings.Builder settings) throws IOException { Request request = new Request("PUT", "/" + index); request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build()) @@ -1893,26 +1888,9 @@ private Map getOnlyIndexSettings(String index) throws IOExceptio return (Map) response.get("settings"); } - - public static StepKey getStepKeyForIndex(String indexName) throws IOException { - Map indexResponse = explainIndex(indexName); - if (indexResponse == null) { - return new StepKey(null, null, null); - } - - return getStepKey(indexResponse); - } - - private static StepKey getStepKey(Map explainIndexResponse) { - String phase = (String) explainIndexResponse.get("phase"); - String action = (String) explainIndexResponse.get("action"); - String step = (String) explainIndexResponse.get("step"); - return new StepKey(phase, action, step); - } - @Nullable private String getFailedStepForIndex(String indexName) throws IOException { - Map indexResponse = explainIndex(indexName); + Map indexResponse = explainIndex(client(), indexName); if (indexResponse == null) { return null; } @@ -1920,33 +1898,6 @@ private String getFailedStepForIndex(String indexName) throws IOException { return (String) indexResponse.get("failed_step"); } - private static Map explainIndex(String indexName) throws IOException { - return explain(indexName, false, false).get(indexName); - } - - private static Map> explain(String indexPattern, boolean onlyErrors, - boolean onlyManaged) throws IOException { - Request explainRequest = new Request("GET", indexPattern + "/_ilm/explain"); - explainRequest.addParameter("only_errors", Boolean.toString(onlyErrors)); - explainRequest.addParameter("only_managed", Boolean.toString(onlyManaged)); - Response response = client().performRequest(explainRequest); - Map responseMap; - try (InputStream is = response.getEntity().getContent()) { - responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); - } - - @SuppressWarnings("unchecked") Map> indexResponse = - ((Map>) responseMap.get("indices")); - return indexResponse; - } - - private void indexDocument() throws IOException { - Request indexRequest = new Request("POST", index + "/_doc"); - indexRequest.setEntity(new StringEntity("{\"a\": \"test\"}", ContentType.APPLICATION_JSON)); - Response response = client().performRequest(indexRequest); - logger.info(response.getStatusLine()); - } - @SuppressWarnings("unchecked") private String getSnapshotState(String snapshot) throws IOException { Response response = client().performRequest(new Request("GET", "/_snapshot/repo/" + snapshot)); @@ -2006,7 +1957,7 @@ private void assertBusy(CheckedRunnable runnable, String slmPolicy) t } catch (Exception ignored) { slm = new HashMap<>(); } - throw new AssertionError("Index:" + explainIndex(index) + "\nSLM:" + slm, e); + throw new AssertionError("Index:" + explainIndex(client(), index) + "\nSLM:" + slm, e); } }); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java index fe716fdc30c6c..3a07319b2bac9 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java @@ -51,7 +51,7 @@ import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX; -import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -618,7 +618,7 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r } private void assertHistoryIndexWaitingForRollover() throws IOException { - Step.StepKey stepKey = getStepKeyForIndex(SLM_HISTORY_INDEX_PREFIX + "000001"); + Step.StepKey stepKey = getStepKeyForIndex(client(), SLM_HISTORY_INDEX_PREFIX + "000001"); assertEquals("hot", stepKey.getPhase()); assertEquals(RolloverAction.NAME, stepKey.getAction()); assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index b6b42f2ade51d..a3c393e0e9cbe 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; @@ -118,7 +119,7 @@ boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetadata * Run the current step, only if it is an asynchronous wait step. These * wait criteria are checked periodically from the ILM scheduler */ - void runPeriodicStep(String policy, IndexMetadata indexMetadata) { + void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetadata) { String index = indexMetadata.getIndex().getName(); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata); final Step currentStep; @@ -170,7 +171,7 @@ void runPeriodicStep(String policy, IndexMetadata indexMetadata) { } } else if (currentStep instanceof AsyncWaitStep) { logger.debug("[{}] running periodic policy with current-step [{}]", index, currentStep.getKey()); - ((AsyncWaitStep) currentStep).evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + ((AsyncWaitStep) currentStep).evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 270312789d368..9ffa840d1a2d4 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -314,7 +314,7 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) if (fromClusterStateChange) { lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); } else { - lifecycleRunner.runPeriodicStep(policyName, idxMeta); + lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta); } // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop safeToStop = false; @@ -326,7 +326,7 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) if (fromClusterStateChange) { lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); } else { - lifecycleRunner.runPeriodicStep(policyName, idxMeta); + lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta); } } } catch (Exception e) { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index f52bc0200bd00..ca2c2fc175642 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -142,7 +142,7 @@ public void testRunPolicyPhaseCompletePolicyStep() { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); runner.runPolicyAfterStateChange(policyName, indexMetadata); - runner.runPeriodicStep(policyName, indexMetadata); + runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); Mockito.verifyZeroInteractions(clusterService); } @@ -158,7 +158,7 @@ public void testRunPolicyPhaseCompleteWithMoreStepsPolicyStep() { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); runner.runPolicyAfterStateChange(policyName, indexMetadata); - runner.runPeriodicStep(policyName, indexMetadata); + runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); Mockito.verify(clusterService, times(2)).submitStateUpdateTask(any(), any()); @@ -231,7 +231,7 @@ public void testRunPolicyErrorStepOnRetryableFailedStep() { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)) .build(); - runner.runPeriodicStep(policyName, indexMetadata); + runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); Mockito.verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); } @@ -414,7 +414,7 @@ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean if (asyncAction) { runner.maybeRunAsyncAction(before, indexMetadata, policyName, stepKey); } else if (periodicAction) { - runner.runPeriodicStep(policyName, indexMetadata); + runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); } else { runner.runPolicyAfterStateChange(policyName, indexMetadata); } @@ -598,7 +598,7 @@ public void testRunPeriodicStep() throws Exception { ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); step.setLatch(latch); - runner.runPeriodicStep(policyName, indexMetadata); + runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); awaitLatch(latch, 5, TimeUnit.SECONDS); ClusterState after = clusterService.state(); @@ -947,7 +947,7 @@ public void setLatch(CountDownLatch latch) { } @Override - public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { + public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) { executeCount++; if (latch != null) { latch.countDown();