Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance ISM template #105

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.opensearch.OpenSearchException
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
import org.opensearch.common.regex.Regex
Expand All @@ -40,26 +39,24 @@ import org.opensearch.indexmanagement.util.IndexManagementException
private val log = LogManager.getLogger("ISMTemplateService")

/**
* find the matching policy based on ISM template field for the given index
* find the matching policy for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
* return early if it's hidden index
* filter out templates that were last updated after the index creation time
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, indexName: String): String? {
if (this.isEmpty()) return null

val indexMetadata = clusterState.metadata.index(indexName)
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
@Suppress("ReturnCount", "NestedBlockDepth")
fun Map<String, List<ISMTemplate>>.findMatchingPolicy(
indexName: String,
indexCreationDate: Long,
isHiddenIndex: Boolean,
indexAbstraction: IndexAbstraction?
): String? {
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

// Don't include hidden index unless it belongs to a data stream.
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (!isDataStreamIndex && isHidden) return null
if (this.isEmpty()) return null
// don't include hidden index
if (!isDataStreamIndex && isHiddenIndex) return null

// If the index belongs to a data stream, then find the matching policy using the data stream name.
val lookupName = when {
Expand All @@ -72,14 +69,19 @@ fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, inde
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}

this.forEach { (policyID, templateList) ->
templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate }
.forEach {
if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) {
if (highestPriority < it.priority) {
highestPriority = it.priority
matchedPolicy = policyID
} else if (highestPriority == it.priority) {
log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]")
}
}
}
}

return matchedPolicy
Expand Down Expand Up @@ -120,30 +122,61 @@ fun validateFormat(indexPatterns: List<String>): OpenSearchException? {
return null
}

fun List<ISMTemplate>.findSelfConflictingTemplates(): Pair<List<String>, List<String>>? {
val priorityToTemplates = mutableMapOf<Int, List<ISMTemplate>>()
this.forEach {
val templateList = priorityToTemplates[it.priority]
if (templateList != null) {
priorityToTemplates[it.priority] = templateList.plus(it)
} else {
priorityToTemplates[it.priority] = mutableListOf(it)
}
}
priorityToTemplates.forEach { (_, templateList) ->
// same priority
val indexPatternsList = templateList.map { it.indexPatterns }
if (indexPatternsList.size > 1) {
indexPatternsList.forEachIndexed { ind, indexPatterns ->
val comparePatterns = indexPatternsList.subList(ind + 1, indexPatternsList.size).flatten()
if (overlapping(indexPatterns, comparePatterns)) {
return indexPatterns to comparePatterns
}
}
}
}

return null
}

@Suppress("SpreadOperator")
fun overlapping(p1: List<String>, p2: List<String>): Boolean {
if (p1.isEmpty() || p2.isEmpty()) return false
val a1 = Regex.simpleMatchToAutomaton(*p1.toTypedArray())
val a2 = Regex.simpleMatchToAutomaton(*p2.toTypedArray())
return !Operations.isEmpty(Operations.intersection(a1, a2))
}

/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun Map<String, ISMTemplate>.findConflictingPolicyTemplates(
fun Map<String, List<ISMTemplate>>.findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

// focus on template with same priority
this.filter { it.value.priority == priority }
.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("Existing ism_template for $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
this.forEach { (policyID, templateList) ->
templateList.filter { it.priority == priority }
.map { it.indexPatterns }
.forEach {
if (overlapping(indexPatterns, it)) {
overlappingTemplates[policyID] = it
}
}
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.get.MultiGetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchPhaseExecutionException
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand All @@ -49,6 +50,7 @@ import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
Expand All @@ -67,6 +69,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.Swe
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.AUTO_MANAGE
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
Expand Down Expand Up @@ -292,46 +295,51 @@ class ManagedIndexCoordinator(
/**
* build requests to create jobs for indices matching ISM templates
*/
@Suppress("NestedBlockDepth")
suspend fun getMatchingIndicesUpdateReq(
clusterState: ClusterState,
indexNames: List<String>
): List<DocWriteRequest<*>> {
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<*>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<IndexRequest>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs.toList()

val indexMetadatas = clusterState.metadata.indices
val templates = getISMTemplates()

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to templates.findMatchingPolicy(clusterState, indexName)
}.toMap()

indexToMatchedPolicy.filterNotNullValues()
.forEach { (index, policyID) ->
val indexUuid = indexMetadatas[index].indexUUID
val ismTemplate = templates[policyID]
if (indexUuid != null && ismTemplate != null) {
logger.info("Index [$index] will be managed by policy [$policyID]")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval)
)
} else {
logger.warn(
"Index [$index] has index uuid [$indexUuid] and/or " +
"a matching template [$ismTemplate] that is null."
)
// Iterate over each unmanaged hot/warm index and if it matches an ISM template add a managed index config index request
indexNames.forEach { indexName ->
if (clusterState.metadata.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true)
if (autoManage) {
val indexUuid = indexMetadata.indexUUID
val isHiddenIndex =
IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".")
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
templates.findMatchingPolicy(indexName, indexMetadata.creationDate, isHiddenIndex, indexAbstraction)
?.let { policyID ->
logger.info("Index [$indexName] matched ISM policy template and will be managed by $policyID")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(
indexName,
indexUuid,
policyID,
jobInterval
)
)
}
}
}
}

return updateManagedIndexReqs
return updateManagedIndexReqs.toList()
}

suspend fun getISMTemplates(): Map<String, ISMTemplate> {
suspend fun getISMTemplates(): Map<String, List<ISMTemplate>> {
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder().query(
QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD)
)
).size(MAX_HITS)
)
.indices(INDEX_MANAGEMENT_INDEX)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data class Policy(
val errorNotification: ErrorNotification?,
val defaultState: String,
val states: List<State>,
val ismTemplate: ISMTemplate? = null
val ismTemplate: List<ISMTemplate>? = null
) : ToXContentObject, Writeable {

init {
Expand Down Expand Up @@ -101,7 +101,9 @@ data class Policy(
errorNotification = sin.readOptionalWriteable(::ErrorNotification),
defaultState = sin.readString(),
states = sin.readList(::State),
ismTemplate = sin.readOptionalWriteable(::ISMTemplate)
ismTemplate = if (sin.readBoolean()) {
sin.readList(::ISMTemplate)
} else null
)

@Throws(IOException::class)
Expand All @@ -115,7 +117,12 @@ data class Policy(
out.writeOptionalWriteable(errorNotification)
out.writeString(defaultState)
out.writeList(states)
out.writeOptionalWriteable(ismTemplate)
if (ismTemplate != null) {
out.writeBoolean(true)
out.writeList(ismTemplate)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -130,7 +137,7 @@ data class Policy(
const val STATES_FIELD = "states"
const val ISM_TEMPLATE = "ism_template"

@Suppress("ComplexMethod")
@Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth")
@JvmStatic
@JvmOverloads
@Throws(IOException::class)
Expand All @@ -146,7 +153,7 @@ data class Policy(
var lastUpdatedTime: Instant? = null
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
val states: MutableList<State> = mutableListOf()
var ismTemplate: ISMTemplate? = null
var ismTemplates: List<ISMTemplate>? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -166,7 +173,22 @@ data class Policy(
states.add(State.parse(xcp))
}
}
ISM_TEMPLATE -> ismTemplate = if (xcp.currentToken() == Token.VALUE_NULL) null else ISMTemplate.parse(xcp)
ISM_TEMPLATE -> {
if (xcp.currentToken() != Token.VALUE_NULL) {
ismTemplates = mutableListOf()
when (xcp.currentToken()) {
Token.START_ARRAY -> {
while (xcp.nextToken() != Token.END_ARRAY) {
ismTemplates.add(ISMTemplate.parse(xcp))
}
}
Token.START_OBJECT -> {
ismTemplates.add(ISMTemplate.parse(xcp))
}
else -> ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
}
}
}
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Policy.")
}
}
Expand All @@ -181,7 +203,7 @@ data class Policy(
errorNotification = errorNotification,
defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" },
states = states.toList(),
ismTemplate = ismTemplate
ismTemplate = ismTemplates
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList<String> {
*/
@Throws(Exception::class)
fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY):
Map<String, ISMTemplate?> {
Map<String, List<ISMTemplate>?> {
return response.hits.hits.map {
val id = it.id
val seqNo = it.seqNo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val AUTO_MANAGE: Setting<Boolean> = Setting.boolSetting(
"index.plugins.index_state_management.auto_manage",
true,
Setting.Property.IndexScope,
Setting.Property.Dynamic
)

val JOB_INTERVAL: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.job_interval",
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
Expand Down
Loading