Skip to content

Commit

Permalink
Enhance ISM template
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn committed Jul 28, 2021
1 parent 7aaf9f8 commit b6fd7ef
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 97 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ File repo = file("$buildDir/testclusters/repo")
def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters.integTest {
plugin(project.tasks.bundlePlugin.archiveFile)
testDistribution = "ARCHIVE"
testDistribution = "INTEG_TEST"
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.opensearch.OpenSearchException
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
import org.opensearch.common.regex.Regex
Expand All @@ -40,26 +39,24 @@ import org.opensearch.indexmanagement.util.IndexManagementException
private val log = LogManager.getLogger("ISMTemplateService")

/**
* find the matching policy based on ISM template field for the given index
* find the matching policy for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
* return early if it's hidden index
* filter out templates that were last updated after the index creation time
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, indexName: String): String? {
if (this.isEmpty()) return null

val indexMetadata = clusterState.metadata.index(indexName)
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
@Suppress("ReturnCount", "NestedBlockDepth")
fun Map<String, List<ISMTemplate>>.findMatchingPolicy(
indexName: String,
indexCreationDate: Long,
isHiddenIndex: Boolean,
indexAbstraction: IndexAbstraction?
): String? {
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

// Don't include hidden index unless it belongs to a data stream.
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (!isDataStreamIndex && isHidden) return null
if (this.isEmpty()) return null
// don't include hidden index
if (!isDataStreamIndex && isHiddenIndex) return null

// If the index belongs to a data stream, then find the matching policy using the data stream name.
val lookupName = when {
Expand All @@ -72,14 +69,19 @@ fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, inde
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}

this.forEach { (policyID, templateList) ->
templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate }
.forEach {
if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) {
if (highestPriority < it.priority) {
highestPriority = it.priority
matchedPolicy = policyID
} else if (highestPriority == it.priority) {
log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]")
}
}
}
}

return matchedPolicy
Expand Down Expand Up @@ -120,30 +122,61 @@ fun validateFormat(indexPatterns: List<String>): OpenSearchException? {
return null
}

fun List<ISMTemplate>.findSelfConflictingTemplates(): Pair<List<String>, List<String>>? {
val priorityToTemplates = mutableMapOf<Int, List<ISMTemplate>>()
this.forEach {
val templateList = priorityToTemplates[it.priority]
if (templateList != null) {
priorityToTemplates[it.priority] = templateList.plus(it)
} else {
priorityToTemplates[it.priority] = mutableListOf(it)
}
}
priorityToTemplates.forEach { (_, templateList) ->
// same priority
val indexPatternsList = templateList.map { it.indexPatterns }
if (indexPatternsList.size > 1) {
indexPatternsList.forEachIndexed { ind, indexPatterns ->
val comparePatterns = indexPatternsList.subList(ind + 1, indexPatternsList.size).flatten()
if (overlapping(indexPatterns, comparePatterns)) {
return indexPatterns to comparePatterns
}
}
}
}

return null
}

@Suppress("SpreadOperator")
fun overlapping(p1: List<String>, p2: List<String>): Boolean {
if (p1.isEmpty() || p2.isEmpty()) return false
val a1 = Regex.simpleMatchToAutomaton(*p1.toTypedArray())
val a2 = Regex.simpleMatchToAutomaton(*p2.toTypedArray())
return !Operations.isEmpty(Operations.intersection(a1, a2))
}

/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun Map<String, ISMTemplate>.findConflictingPolicyTemplates(
fun Map<String, List<ISMTemplate>>.findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

// focus on template with same priority
this.filter { it.value.priority == priority }
.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("Existing ism_template for $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
this.forEach { (policyID, templateList) ->
templateList.filter { it.priority == priority }
.map { it.indexPatterns }
.forEach {
if (overlapping(indexPatterns, it)) {
overlappingTemplates[policyID] = it
}
}
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.get.MultiGetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchPhaseExecutionException
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand All @@ -49,6 +50,7 @@ import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
Expand All @@ -67,6 +69,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.Swe
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.AUTO_MANAGE
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
Expand Down Expand Up @@ -292,46 +295,51 @@ class ManagedIndexCoordinator(
/**
* build requests to create jobs for indices matching ISM templates
*/
@Suppress("NestedBlockDepth")
suspend fun getMatchingIndicesUpdateReq(
clusterState: ClusterState,
indexNames: List<String>
): List<DocWriteRequest<*>> {
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<*>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<IndexRequest>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs.toList()

val indexMetadatas = clusterState.metadata.indices
val templates = getISMTemplates()

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to templates.findMatchingPolicy(clusterState, indexName)
}.toMap()

indexToMatchedPolicy.filterNotNullValues()
.forEach { (index, policyID) ->
val indexUuid = indexMetadatas[index].indexUUID
val ismTemplate = templates[policyID]
if (indexUuid != null && ismTemplate != null) {
logger.info("Index [$index] will be managed by policy [$policyID]")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval)
)
} else {
logger.warn(
"Index [$index] has index uuid [$indexUuid] and/or " +
"a matching template [$ismTemplate] that is null."
)
// Iterate over each unmanaged hot/warm index and if it matches an ISM template add a managed index config index request
indexNames.forEach { indexName ->
if (clusterState.metadata.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true)
if (autoManage) {
val indexUuid = indexMetadata.indexUUID
val isHiddenIndex =
IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".")
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
templates.findMatchingPolicy(indexName, indexMetadata.creationDate, isHiddenIndex, indexAbstraction)
?.let { policyID ->
logger.info("Index [$indexName] matched ISM policy template and will be managed by $policyID")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(
indexName,
indexUuid,
policyID,
jobInterval
)
)
}
}
}
}

return updateManagedIndexReqs
return updateManagedIndexReqs.toList()
}

suspend fun getISMTemplates(): Map<String, ISMTemplate> {
suspend fun getISMTemplates(): Map<String, List<ISMTemplate>> {
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder().query(
QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD)
)
).size(MAX_HITS)
)
.indices(INDEX_MANAGEMENT_INDEX)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data class Policy(
val errorNotification: ErrorNotification?,
val defaultState: String,
val states: List<State>,
val ismTemplate: ISMTemplate? = null
val ismTemplate: List<ISMTemplate>? = null
) : ToXContentObject, Writeable {

init {
Expand Down Expand Up @@ -101,7 +101,9 @@ data class Policy(
errorNotification = sin.readOptionalWriteable(::ErrorNotification),
defaultState = sin.readString(),
states = sin.readList(::State),
ismTemplate = sin.readOptionalWriteable(::ISMTemplate)
ismTemplate = if (sin.readBoolean()) {
sin.readList(::ISMTemplate)
} else null
)

@Throws(IOException::class)
Expand All @@ -115,7 +117,12 @@ data class Policy(
out.writeOptionalWriteable(errorNotification)
out.writeString(defaultState)
out.writeList(states)
out.writeOptionalWriteable(ismTemplate)
if (ismTemplate != null) {
out.writeBoolean(true)
out.writeList(ismTemplate)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -130,7 +137,7 @@ data class Policy(
const val STATES_FIELD = "states"
const val ISM_TEMPLATE = "ism_template"

@Suppress("ComplexMethod")
@Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth")
@JvmStatic
@JvmOverloads
@Throws(IOException::class)
Expand All @@ -146,7 +153,7 @@ data class Policy(
var lastUpdatedTime: Instant? = null
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
val states: MutableList<State> = mutableListOf()
var ismTemplate: ISMTemplate? = null
var ismTemplates: List<ISMTemplate>? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -166,7 +173,22 @@ data class Policy(
states.add(State.parse(xcp))
}
}
ISM_TEMPLATE -> ismTemplate = if (xcp.currentToken() == Token.VALUE_NULL) null else ISMTemplate.parse(xcp)
ISM_TEMPLATE -> {
if (xcp.currentToken() != Token.VALUE_NULL) {
ismTemplates = mutableListOf()
when (xcp.currentToken()) {
Token.START_ARRAY -> {
while (xcp.nextToken() != Token.END_ARRAY) {
ismTemplates.add(ISMTemplate.parse(xcp))
}
}
Token.START_OBJECT -> {
ismTemplates.add(ISMTemplate.parse(xcp))
}
else -> ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
}
}
}
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Policy.")
}
}
Expand All @@ -181,7 +203,7 @@ data class Policy(
errorNotification = errorNotification,
defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" },
states = states.toList(),
ismTemplate = ismTemplate
ismTemplate = ismTemplates
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList<String> {
*/
@Throws(Exception::class)
fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY):
Map<String, ISMTemplate?> {
Map<String, List<ISMTemplate>?> {
return response.hits.hits.map {
val id = it.id
val seqNo = it.seqNo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val AUTO_MANAGE: Setting<Boolean> = Setting.boolSetting(
"index.plugins.index_state_management.auto_manage",
true,
Setting.Property.IndexScope,
Setting.Property.Dynamic
)

val JOB_INTERVAL: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.job_interval",
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
Expand Down
Loading

0 comments on commit b6fd7ef

Please sign in to comment.