From 8d80766d576d1c40318b864e5a59eb771ba9cce9 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 28 Jul 2021 10:09:33 -0700 Subject: [PATCH] Enhance ISM template Signed-off-by: bowenlan-amzn --- .../indexmanagement/IndexManagementPlugin.kt | 1 + .../ISMTemplateService.kt | 107 ++++++++++++------ .../ManagedIndexCoordinator.kt | 56 +++++---- .../indexstatemanagement/model/Policy.kt | 36 ++++-- .../opensearchapi/OpenSearchExtensions.kt | 2 +- .../settings/ManagedIndexSettings.kt | 7 ++ .../indexpolicy/TransportIndexPolicyAction.kt | 48 ++++++-- .../TransportRemovePolicyAction.kt | 64 ++++++++++- .../opensearchapi/OpenSearchExtensions.kt | 6 +- .../IndexStateManagementRestTestCase.kt | 8 ++ .../indexstatemanagement/TestHelpers.kt | 2 +- .../action/RolloverActionIT.kt | 4 +- .../action/RollupActionIT.kt | 2 +- .../resthandler/ISMTemplateRestAPIIT.kt | 24 +++- .../resthandler/RestRemovePolicyActionIT.kt | 17 +++ 15 files changed, 288 insertions(+), 96 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index d704f73d4..8a883ef4b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -367,6 +367,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ManagedIndexSettings.ROLLOVER_SKIP, ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, ManagedIndexSettings.METADATA_SERVICE_ENABLED, + ManagedIndexSettings.AUTO_MANAGE, ManagedIndexSettings.JOB_INTERVAL, ManagedIndexSettings.SWEEP_PERIOD, ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt index 8d896ce45..fe81ac6b2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt @@ -29,8 +29,7 @@ package org.opensearch.indexmanagement.indexstatemanagement import org.apache.logging.log4j.LogManager import org.apache.lucene.util.automaton.Operations import org.opensearch.OpenSearchException -import org.opensearch.cluster.ClusterState -import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.common.Strings import org.opensearch.common.ValidationException import org.opensearch.common.regex.Regex @@ -40,26 +39,24 @@ import org.opensearch.indexmanagement.util.IndexManagementException private val log = LogManager.getLogger("ISMTemplateService") /** - * find the matching policy based on ISM template field for the given index + * find the matching policy for the given index * - * filter out hidden index - * filter out older index than template lastUpdateTime + * return early if it's hidden index + * filter out templates that were last updated after the index creation time * - * @param ismTemplates current ISM templates saved in metadata - * @param indexMetadata cluster state index metadata * @return policyID */ -@Suppress("ReturnCount") -fun Map.findMatchingPolicy(clusterState: ClusterState, indexName: String): String? { - if (this.isEmpty()) return null - - val indexMetadata = clusterState.metadata.index(indexName) - val indexAbstraction = clusterState.metadata.indicesLookup[indexName] +@Suppress("ReturnCount", "NestedBlockDepth") +fun Map>.findMatchingPolicy( + indexName: String, + indexCreationDate: Long, + isHiddenIndex: Boolean, + indexAbstraction: IndexAbstraction? +): String? { val isDataStreamIndex = indexAbstraction?.parentDataStream != null - - // Don't include hidden index unless it belongs to a data stream. - val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) - if (!isDataStreamIndex && isHidden) return null + if (this.isEmpty()) return null + // don't include hidden index + if (!isDataStreamIndex && isHiddenIndex) return null // If the index belongs to a data stream, then find the matching policy using the data stream name. val lookupName = when { @@ -72,14 +69,19 @@ fun Map.findMatchingPolicy(clusterState: ClusterState, inde val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) } var matchedPolicy: String? = null var highestPriority: Int = -1 - this.filter { (_, template) -> - template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate - }.forEach { (policyID, template) -> - val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate) - if (matched && highestPriority < template.priority) { - highestPriority = template.priority - matchedPolicy = policyID - } + + this.forEach { (policyID, templateList) -> + templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate } + .forEach { + if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) { + if (highestPriority < it.priority) { + highestPriority = it.priority + matchedPolicy = policyID + } else if (highestPriority == it.priority) { + log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]") + } + } + } } return matchedPolicy @@ -120,30 +122,61 @@ fun validateFormat(indexPatterns: List): OpenSearchException? { return null } +fun List.findSelfConflictingTemplates(): Pair, List>? { + val priorityToTemplates = mutableMapOf>() + this.forEach { + val templateList = priorityToTemplates[it.priority] + if (templateList != null) { + priorityToTemplates[it.priority] = templateList.plus(it) + } else { + priorityToTemplates[it.priority] = mutableListOf(it) + } + } + priorityToTemplates.forEach { (_, templateList) -> + // same priority + val indexPatternsList = templateList.map { it.indexPatterns } + if (indexPatternsList.size > 1) { + indexPatternsList.forEachIndexed { ind, indexPatterns -> + val comparePatterns = indexPatternsList.subList(ind + 1, indexPatternsList.size).flatten() + if (overlapping(indexPatterns, comparePatterns)) { + return indexPatterns to comparePatterns + } + } + } + } + + return null +} + +@Suppress("SpreadOperator") +fun overlapping(p1: List, p2: List): Boolean { + if (p1.isEmpty() || p2.isEmpty()) return false + val a1 = Regex.simpleMatchToAutomaton(*p1.toTypedArray()) + val a2 = Regex.simpleMatchToAutomaton(*p2.toTypedArray()) + return !Operations.isEmpty(Operations.intersection(a1, a2)) +} + /** * find policy templates whose index patterns overlap with given template * * @return map of overlapping template name to its index patterns */ -@Suppress("SpreadOperator") -fun Map.findConflictingPolicyTemplates( +fun Map>.findConflictingPolicyTemplates( candidate: String, indexPatterns: List, priority: Int ): Map> { - val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray()) val overlappingTemplates = mutableMapOf>() - // focus on template with same priority - this.filter { it.value.priority == priority } - .forEach { (policyID, template) -> - val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray()) - if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) { - log.info("Existing ism_template for $policyID overlaps candidate $candidate") - overlappingTemplates[policyID] = template.indexPatterns + this.forEach { (policyID, templateList) -> + templateList.filter { it.priority == priority } + .map { it.indexPatterns } + .forEach { + if (overlapping(indexPatterns, it)) { + overlappingTemplates[policyID] = it + } } - } + } overlappingTemplates.remove(candidate) - return overlappingTemplates } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 4441d475d..31103b010 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -40,6 +40,7 @@ import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.get.MultiGetResponse +import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchPhaseExecutionException import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -49,6 +50,7 @@ import org.opensearch.cluster.ClusterChangedEvent import org.opensearch.cluster.ClusterState import org.opensearch.cluster.ClusterStateListener import org.opensearch.cluster.block.ClusterBlockException +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.component.LifecycleListener import org.opensearch.common.settings.Settings @@ -67,6 +69,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.Swe import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetManagedIndexMetadata +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.AUTO_MANAGE import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED @@ -292,46 +295,51 @@ class ManagedIndexCoordinator( /** * build requests to create jobs for indices matching ISM templates */ + @Suppress("NestedBlockDepth") suspend fun getMatchingIndicesUpdateReq( clusterState: ClusterState, indexNames: List ): List> { - val updateManagedIndexReqs = mutableListOf>() - if (indexNames.isEmpty()) return updateManagedIndexReqs + val updateManagedIndexReqs = mutableListOf>() + if (indexNames.isEmpty()) return updateManagedIndexReqs.toList() - val indexMetadatas = clusterState.metadata.indices val templates = getISMTemplates() - val indexToMatchedPolicy = indexNames.map { indexName -> - indexName to templates.findMatchingPolicy(clusterState, indexName) - }.toMap() - - indexToMatchedPolicy.filterNotNullValues() - .forEach { (index, policyID) -> - val indexUuid = indexMetadatas[index].indexUUID - val ismTemplate = templates[policyID] - if (indexUuid != null && ismTemplate != null) { - logger.info("Index [$index] will be managed by policy [$policyID]") - updateManagedIndexReqs.add( - managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval) - ) - } else { - logger.warn( - "Index [$index] has index uuid [$indexUuid] and/or " + - "a matching template [$ismTemplate] that is null." - ) + // Iterate over each unmanaged hot/warm index and if it matches an ISM template add a managed index config index request + indexNames.forEach { indexName -> + if (clusterState.metadata.hasIndex(indexName)) { + val indexMetadata = clusterState.metadata.index(indexName) + val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true) + if (autoManage) { + val indexUuid = indexMetadata.indexUUID + val isHiddenIndex = + IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".") + val indexAbstraction = clusterState.metadata.indicesLookup[indexName] + templates.findMatchingPolicy(indexName, indexMetadata.creationDate, isHiddenIndex, indexAbstraction) + ?.let { policyID -> + logger.info("Index [$indexName] matched ISM policy template and will be managed by $policyID") + updateManagedIndexReqs.add( + managedIndexConfigIndexRequest( + indexName, + indexUuid, + policyID, + jobInterval + ) + ) + } } } + } - return updateManagedIndexReqs + return updateManagedIndexReqs.toList() } - suspend fun getISMTemplates(): Map { + suspend fun getISMTemplates(): Map> { val searchRequest = SearchRequest() .source( SearchSourceBuilder().query( QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD) - ) + ).size(MAX_HITS) ) .indices(INDEX_MANAGEMENT_INDEX) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt index f60cda4d9..ddca13e6a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -54,7 +54,7 @@ data class Policy( val errorNotification: ErrorNotification?, val defaultState: String, val states: List, - val ismTemplate: ISMTemplate? = null + val ismTemplate: List? = null ) : ToXContentObject, Writeable { init { @@ -101,7 +101,9 @@ data class Policy( errorNotification = sin.readOptionalWriteable(::ErrorNotification), defaultState = sin.readString(), states = sin.readList(::State), - ismTemplate = sin.readOptionalWriteable(::ISMTemplate) + ismTemplate = if (sin.readBoolean()) { + sin.readList(::ISMTemplate) + } else null ) @Throws(IOException::class) @@ -115,7 +117,12 @@ data class Policy( out.writeOptionalWriteable(errorNotification) out.writeString(defaultState) out.writeList(states) - out.writeOptionalWriteable(ismTemplate) + if (ismTemplate != null) { + out.writeBoolean(true) + out.writeList(ismTemplate) + } else { + out.writeBoolean(false) + } } companion object { @@ -130,7 +137,7 @@ data class Policy( const val STATES_FIELD = "states" const val ISM_TEMPLATE = "ism_template" - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth") @JvmStatic @JvmOverloads @Throws(IOException::class) @@ -146,7 +153,7 @@ data class Policy( var lastUpdatedTime: Instant? = null var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION val states: MutableList = mutableListOf() - var ismTemplate: ISMTemplate? = null + var ismTemplates: List? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -166,7 +173,22 @@ data class Policy( states.add(State.parse(xcp)) } } - ISM_TEMPLATE -> ismTemplate = if (xcp.currentToken() == Token.VALUE_NULL) null else ISMTemplate.parse(xcp) + ISM_TEMPLATE -> { + if (xcp.currentToken() != Token.VALUE_NULL) { + ismTemplates = mutableListOf() + when (xcp.currentToken()) { + Token.START_ARRAY -> { + while (xcp.nextToken() != Token.END_ARRAY) { + ismTemplates.add(ISMTemplate.parse(xcp)) + } + } + Token.START_OBJECT -> { + ismTemplates.add(ISMTemplate.parse(xcp)) + } + else -> ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + } + } + } else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Policy.") } } @@ -181,7 +203,7 @@ data class Policy( errorNotification = errorNotification, defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" }, states = states.toList(), - ismTemplate = ismTemplate + ismTemplate = ismTemplates ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 361e0de6d..ae53309e1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -113,7 +113,7 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList { */ @Throws(Exception::class) fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): - Map { + Map?> { return response.hits.hits.map { val id = it.id val seqNo = it.seqNo diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index d83cb5304..c70c24828 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -76,6 +76,13 @@ class ManagedIndexSettings { Setting.Property.Dynamic ) + val AUTO_MANAGE: Setting = Setting.boolSetting( + "index.plugins.index_state_management.auto_manage", + true, + Setting.Property.IndexScope, + Setting.Property.Dynamic + ) + val JOB_INTERVAL: Setting = Setting.intSetting( "plugins.index_state_management.job_interval", LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index d53dcbe3b..d2cd2c1a9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -46,7 +46,10 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates +import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates +import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap import org.opensearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD @@ -96,8 +99,9 @@ class TransportIndexPolicyAction @Inject constructor( // if there is template field, we will check val reqTemplate = request.policy.ismTemplate - if (reqTemplate != null) { - checkTemplate(reqTemplate.indexPatterns, reqTemplate.priority) + val reqTemplates = request.policy.ismTemplate + if (reqTemplates != null) { + validateISMTemplates(reqTemplates) } else putPolicy() } else { log.error("Unable to create or update ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mapping.") @@ -111,18 +115,28 @@ class TransportIndexPolicyAction @Inject constructor( } } - private fun checkTemplate(indexPatterns: List, priority: Int) { - val possibleEx = validateFormat(indexPatterns) + private fun validateISMTemplates(ismTemplateList: List) { + val possibleEx = validateFormat(ismTemplateList.map { it.indexPatterns }.flatten()) if (possibleEx != null) { actionListener.onFailure(possibleEx) return } + // check self overlapping + val selfOverlap = ismTemplateList.findSelfConflictingTemplates() + if (selfOverlap != null) { + val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " + + "matching this policy's other ISM templates with index patterns ${selfOverlap.second}," + + " please use different priority" + actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage))) + return + } + val searchRequest = SearchRequest() .source( SearchSourceBuilder().query( QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD) - ) + ).size(MAX_HITS) ) .indices(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) @@ -131,13 +145,23 @@ class TransportIndexPolicyAction @Inject constructor( object : ActionListener { override fun onResponse(response: SearchResponse) { val policyToTemplateMap = getPolicyToTemplateMap(response, xContentRegistry).filterNotNullValues() - val conflictingPolicyTemplates = policyToTemplateMap.findConflictingPolicyTemplates(request.policyID, indexPatterns, priority) - if (conflictingPolicyTemplates.isNotEmpty()) { - val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern $indexPatterns " + - "matching existing policy templates," + - " please use a different priority than $priority" - actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage))) - return + ismTemplateList.forEach { + val conflictingPolicyTemplates = policyToTemplateMap + .findConflictingPolicyTemplates(request.policyID, it.indexPatterns, it.priority) + if (conflictingPolicyTemplates.isNotEmpty()) { + val errorMessage = + "New policy ${request.policyID} has an ISM template with index pattern ${it.indexPatterns} " + + "matching existing policy templates," + + " please use a different priority than ${it.priority}" + actionListener.onFailure( + IndexManagementException.wrap( + IllegalArgumentException( + errorMessage + ) + ) + ) + return + } } putPolicy() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt index f570188ff..9329894af 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt @@ -31,6 +31,7 @@ import org.opensearch.ExceptionsHelper import org.opensearch.action.ActionListener import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.MultiGetRequest @@ -38,14 +39,17 @@ import org.opensearch.action.get.MultiGetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.cluster.ClusterState import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.index.Index import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getUuidsForClosedIndices +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest @@ -128,7 +132,13 @@ class TransportRemovePolicyAction @Inject constructor( val f = response.responses.first() if (f.isFailed && f.failure.failure is IndexNotFoundException) { indicesToRemove.forEach { (uuid, name) -> - failedIndices.add(FailedIndex(name, uuid, "This index does not have a policy to remove")) + failedIndices.add( + FailedIndex( + name, + uuid, + "This index does not have a policy to remove" + ) + ) } actionListener.onResponse(ISMStatusResponse(0, failedIndices)) return @@ -147,7 +157,7 @@ class TransportRemovePolicyAction @Inject constructor( } } - removeManagedIndices() + updateSettings(indicesToRemove) } override fun onFailure(t: Exception) { @@ -157,6 +167,48 @@ class TransportRemovePolicyAction @Inject constructor( ) } + /** + * try to update auto_manage setting to false before delete managed-index + * so that index will not be picked up by Coordinator background sweep process + * this wont happen for cold indices + * if update setting failed, remove managed-index and metadata will not happen + */ + @Suppress("SpreadOperator") + fun updateSettings(indices: Map) { + val request = UpdateSettingsRequest() + .indices(*indices.map { it.value }.toTypedArray()) + .settings(Settings.builder().put(ManagedIndexSettings.AUTO_MANAGE.key, false)) + client.admin().indices().updateSettings( + request, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + if (response.isAcknowledged) { + removeManagedIndices() + } else { + indices.forEach { + failedIndices.add( + FailedIndex( + it.value, it.key, + "Update auto_manage setting to false is not acknowledged, remove policy failed." + ) + ) + } + actionListener.onResponse(ISMStatusResponse(0, failedIndices)) + } + } + + override fun onFailure(t: Exception) { + val ex = ExceptionsHelper.unwrapCause(t) as Exception + actionListener.onFailure( + IndexManagementException.wrap( + Exception("Failed to update auto_manage setting to false.", ex) + ) + ) + } + } + ) + } + @Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator. fun removeManagedIndices() { if (indicesToRemove.isNotEmpty()) { @@ -169,7 +221,13 @@ class TransportRemovePolicyAction @Inject constructor( response.forEach { val docId = it.id // docId is indexUuid of the managed index if (it.isFailed) { - failedIndices.add(FailedIndex(indicesToRemove[docId] as String, docId, "Failed to remove policy")) + failedIndices.add( + FailedIndex( + indicesToRemove[docId] as String, + docId, + "Failed to remove policy" + ) + ) indicesToRemove.remove(docId) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 1a8859862..1fb697e5a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -92,11 +92,11 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) } -fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplate: ISMTemplate?): XContentBuilder { - if (ismTemplate == null) { +fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplates: List?): XContentBuilder { + if (ismTemplates == null) { return nullField(name) } - return this.field(Policy.ISM_TEMPLATE, ismTemplate) + return this.field(Policy.ISM_TEMPLATE, ismTemplates.toTypedArray()) } /** diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 07487e85b..e219cce1c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -507,6 +507,14 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() return (indexSettings[indexName]!!["settings"]!!["index.priority"] as String).toInt() } + @Suppress("UNCHECKED_CAST") + protected fun getIndexAutoManageSetting(indexName: String): Boolean? { + val indexSettings = getIndexSettings(indexName) as Map>> + val autoManageSetting = indexSettings[indexName]!!["settings"]!!["index.plugins.index_state_management.auto_manage"] + if (autoManageSetting != null) return (autoManageSetting as String).toBoolean() + return null + } + @Suppress("UNCHECKED_CAST") protected fun getUuid(indexName: String): String { val indexSettings = getIndexSettings(indexName) as Map>> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 3cbef7a25..955d5bcf5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -79,7 +79,7 @@ fun randomPolicy( lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), errorNotification: ErrorNotification? = randomErrorNotification(), states: List = List(OpenSearchRestTestCase.randomIntBetween(1, 10)) { randomState() }, - ismTemplate: ISMTemplate? = null + ismTemplate: List? = null ): Policy { return Policy( id = id, schemaVersion = schemaVersion, lastUpdatedTime = lastUpdatedTime, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index 2fd1e89ed..dcfeab3ee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -364,7 +364,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { errorNotification = randomErrorNotification(), defaultState = states[0].name, states = states, - ismTemplate = ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS)) + ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))) ) createPolicy(policy, policyID) @@ -420,7 +420,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { errorNotification = randomErrorNotification(), defaultState = states[0].name, states = states, - ismTemplate = ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS)) + ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))) ) createPolicy(policy, policyID) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt index 7a899dfcc..559554f26 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt @@ -126,7 +126,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { errorNotification = randomErrorNotification(), defaultState = states[0].name, states = states, - ismTemplate = ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS)) + ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))) ) createPolicy(policy, policyID) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt index 4adbe2a6e..4466c1965 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt @@ -55,7 +55,7 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { fun `test add template with invalid index pattern`() { try { val ismTemp = ISMTemplate(listOf(" "), 100, randomInstant()) - createPolicy(randomPolicy(ismTemplate = ismTemp), policyID1) + createPolicy(randomPolicy(ismTemplate = listOf(ismTemp)), policyID1) fail("Expect a failure") } catch (e: ResponseException) { assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) @@ -65,14 +65,28 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { } } + fun `test add template with self-overlapping index pattern`() { + try { + val ismTemp = ISMTemplate(listOf("ab*"), 100, randomInstant()) + val ismTemp2 = ISMTemplate(listOf("abc*"), 100, randomInstant()) + createPolicy(randomPolicy(ismTemplate = listOf(ismTemp, ismTemp2)), policyID1) + fail("Expect a failure") + } catch (e: ResponseException) { + assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) + val actualMessage = e.response.asMap()["error"] as Map + val expectedReason = "New policy $policyID1 has an ISM template with index pattern [ab*] matching this policy's other ISM templates with index patterns [abc*], please use different priority" + assertEquals(expectedReason, actualMessage["reason"]) + } + } + fun `test add template with overlapping index pattern`() { try { val ismTemp = ISMTemplate(listOf("log*"), 100, randomInstant()) val ismTemp2 = ISMTemplate(listOf("abc*"), 100, randomInstant()) val ismTemp3 = ISMTemplate(listOf("*"), 100, randomInstant()) - createPolicy(randomPolicy(ismTemplate = ismTemp), policyID1) - createPolicy(randomPolicy(ismTemplate = ismTemp2), policyID2) - createPolicy(randomPolicy(ismTemplate = ismTemp3), policyID3) + createPolicy(randomPolicy(ismTemplate = listOf(ismTemp)), policyID1) + createPolicy(randomPolicy(ismTemplate = listOf(ismTemp2)), policyID2) + createPolicy(randomPolicy(ismTemplate = listOf(ismTemp3)), policyID3) fail("Expect a failure") } catch (e: ResponseException) { assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) @@ -105,7 +119,7 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { errorNotification = randomErrorNotification(), defaultState = states[0].name, states = states, - ismTemplate = ismTemp + ismTemplate = listOf(ismTemp) ) createPolicy(policy, policyID) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRemovePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRemovePolicyActionIT.kt index bc556ef39..2ceef8ed2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRemovePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRemovePolicyActionIT.kt @@ -188,4 +188,21 @@ class RestRemovePolicyActionIT : IndexStateManagementRestTestCase() { assertEquals(null, getPolicyIDOfManagedIndex(indexThree)) } } + + fun `test remove policy update auto_manage setting`() { + val index = "movies" + val policy = createRandomPolicy() + createIndex(index, policy.id) + assertEquals("auto manage setting not null at index creation time", null, getIndexAutoManageSetting(index)) + + val response = client().makeRequest( + POST.toString(), + "${RestRemovePolicyAction.REMOVE_POLICY_BASE_URI}/$index" + ) + assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) + + waitFor { + assertEquals("auto manage setting not false after removing policy", false, getIndexAutoManageSetting(index)) + } + } }