Skip to content

Commit

Permalink
Storing user information as part of the job when security plugin is i…
Browse files Browse the repository at this point in the history
…nstalled (#113)

Signed-off-by: Ravi Thaluru <ravi1092@gmail.com>
  • Loading branch information
thalurur authored Aug 9, 2021
1 parent 1860f38 commit 11da83f
Show file tree
Hide file tree
Showing 24 changed files with 670 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerIMIndex(indexManagementIndices)
.registerHistoryIndex(indexStateManagementHistory)
.registerSkipFlag(skipFlag)
.registerThreadPool(threadPool)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,67 +26,14 @@

package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.opensearch.OpenSearchException
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
import org.opensearch.common.regex.Regex
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import org.opensearch.indexmanagement.util.IndexManagementException

private val log = LogManager.getLogger("ISMTemplateService")

/**
* find the matching policy for the given index
*
* return early if it's hidden index
* filter out templates that were last updated after the index creation time
*
* @return policyID
*/
@Suppress("ReturnCount", "NestedBlockDepth")
fun Map<String, List<ISMTemplate>>.findMatchingPolicy(
indexName: String,
indexCreationDate: Long,
isHiddenIndex: Boolean,
indexAbstraction: IndexAbstraction?
): String? {
val isDataStreamIndex = indexAbstraction?.parentDataStream != null
if (this.isEmpty()) return null
// don't include hidden index
if (!isDataStreamIndex && isHiddenIndex) return null

// If the index belongs to a data stream, then find the matching policy using the data stream name.
val lookupName = when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> indexName
}

// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1

this.forEach { (policyID, templateList) ->
templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate }
.forEach {
if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) {
if (highestPriority < it.priority) {
highestPriority = it.priority
matchedPolicy = policyID
} else if (highestPriority == it.priority) {
log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]")
}
}
}
}

return matchedPolicy
}

/**
* validate the template Name and indexPattern provided in the template
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.Index
Expand All @@ -61,14 +62,13 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.parsePolicies
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.AUTO_MANAGE
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
Expand Down Expand Up @@ -303,38 +303,86 @@ class ManagedIndexCoordinator(
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<IndexRequest>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs.toList()

val templates = getISMTemplates()
val policiesWithTemplates = getPoliciesWithISMTemplates()

// Iterate over each unmanaged hot/warm index and if it matches an ISM template add a managed index config index request
indexNames.forEach { indexName ->
if (clusterState.metadata.hasIndex(indexName)) {
val lookupName = findIndexLookupName(indexName, clusterState)
if (lookupName != null) {
val indexMetadata = clusterState.metadata.index(indexName)
val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true)
if (autoManage) {
val indexUuid = indexMetadata.indexUUID
val isHiddenIndex =
IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".")
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
templates.findMatchingPolicy(indexName, indexMetadata.creationDate, isHiddenIndex, indexAbstraction)
?.let { policyID ->
logger.info("Index [$indexName] matched ISM policy template and will be managed by $policyID")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(
indexName,
indexUuid,
policyID,
jobInterval
)
val creationDate = indexMetadata.creationDate
val indexUuid = indexMetadata.indexUUID
findMatchingPolicy(lookupName, creationDate, policiesWithTemplates)
?.let { policy ->
logger.info("Index [$indexName] matched ISM policy template and will be managed by ${policy.id}")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(
indexName,
indexUuid,
policy.id,
jobInterval,
policy.user
)
}
}
)
}
}
}

return updateManagedIndexReqs.toList()
}

suspend fun getISMTemplates(): Map<String, List<ISMTemplate>> {
private fun findIndexLookupName(indexName: String, clusterState: ClusterState): String? {
if (clusterState.metadata.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true)
if (autoManage) {
val isHiddenIndex =
IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".")
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
val isDataStreamIndex = indexAbstraction?.parentDataStream != null
if (!isDataStreamIndex && isHiddenIndex) {
return null
}

return when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> indexName
}
}
}

return null
}

private fun findMatchingPolicy(indexName: String, creationDate: Long, policies: List<Policy>): Policy? {
// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
var matchedPolicyId: String? = null
var matchedPolicy: Policy? = null
var highestPriority: Int = -1

policies.forEach { policy ->
policy.ismTemplate?.filter { template ->
template.lastUpdatedTime.toEpochMilli() < creationDate
}?.forEach { template ->
if (template.indexPatterns.stream().anyMatch(patternMatchPredicate)) {
if (highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicyId = policy.id
matchedPolicy = policy
} else if (highestPriority == template.priority) {
logger.warn("Warning: index $indexName matches [$matchedPolicyId, ${policy.id}]")
}
}
}
}

return matchedPolicy
}

suspend fun getPoliciesWithISMTemplates(): List<Policy> {
val errorMessage = "Failed to get ISM policies with templates"
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder().query(
Expand All @@ -345,17 +393,18 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
getPolicyToTemplateMap(response).filterNotNullValues()
parsePolicies(response)
} catch (ex: IndexNotFoundException) {
emptyMap()
emptyList()
} catch (ex: ClusterBlockException) {
emptyMap()
logger.error(errorMessage)
emptyList()
} catch (e: SearchPhaseExecutionException) {
logger.error("Failed to get ISM templates: $e")
emptyMap()
logger.error("$errorMessage: $e")
emptyList()
} catch (e: Exception) {
logger.error("Failed to get ISM templates", e)
emptyMap()
logger.error(errorMessage, e)
emptyList()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.health.ClusterHealthStatus
import org.opensearch.cluster.health.ClusterStateHealth
import org.opensearch.cluster.metadata.IndexMetadata
Expand All @@ -59,7 +57,6 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.Index
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
Expand All @@ -81,8 +78,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.getActionToExecute
import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData
Expand All @@ -101,6 +96,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMeta
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.opensearchapi.retry
Expand All @@ -115,6 +111,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.threadpool.ThreadPool
import java.time.Instant
import java.time.temporal.ChronoUnit

Expand All @@ -133,6 +130,7 @@ object ManagedIndexRunner :
private lateinit var imIndices: IndexManagementIndices
private lateinit var ismHistory: IndexStateManagementHistory
private lateinit var skipExecFlag: SkipExecution
private lateinit var threadPool: ThreadPool
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED
@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
Expand Down Expand Up @@ -205,6 +203,11 @@ object ManagedIndexRunner :
return this
}

fun registerThreadPool(threadPool: ThreadPool): ManagedIndexRunner {
this.threadPool = threadPool
return this
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ManagedIndexConfig) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
Expand Down Expand Up @@ -352,13 +355,25 @@ object ManagedIndexRunner :
@Suppress("ComplexCondition")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.preExecute(logger).execute().postExecute(logger)
withContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.user
)
) {
step.preExecute(logger).execute().postExecute(logger)
}
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

if (executedManagedIndexMetaData.isFailed) {
try {
// if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message
publishErrorNotification(policy, executedManagedIndexMetaData)
withContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.user
)
) {
// if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message
publishErrorNotification(policy, executedManagedIndexMetaData)
}
} catch (e: Exception) {
logger.error("Failed to publish error notification", e)
val errorMessage = e.message ?: "Failed to publish error notification"
Expand Down Expand Up @@ -572,29 +587,6 @@ object ManagedIndexRunner :
}
}

// delete metadata in cluster state
private suspend fun deleteManagedIndexMetaData(managedIndexMetaData: ManagedIndexMetaData): Boolean {
var result = false
try {
val request = UpdateManagedIndexMetaDataRequest(
indicesToRemoveManagedIndexMetaDataFrom = listOf(Index(managedIndexMetaData.index, managedIndexMetaData.indexUuid))
)
updateMetaDataRetryPolicy.retry(logger) {
val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) }
if (response.isAcknowledged) {
result = true
} else {
logger.error("Failed to delete ManagedIndexMetaData for [index=${managedIndexMetaData.index}]")
}
}
} catch (e: ClusterBlockException) {
logger.error("There was ClusterBlockException trying to delete the metadata for ${managedIndexMetaData.index}. Message: ${e.message}", e)
} catch (e: Exception) {
logger.error("Failed to delete ManagedIndexMetaData for [index=${managedIndexMetaData.index}]", e)
}
return result
}

/**
* update metadata in config index, and save metadata in history after update
* this can be called 2 times in one job run, so need to save seqNo & primeTerm
Expand Down Expand Up @@ -720,8 +712,8 @@ object ManagedIndexRunner :

if (!updated.metadataSaved || policy == null) return

// this will save the new policy on the job and reset the change policy back to null
savePolicyToManagedIndexConfig(managedIndexConfig, policy)
// Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job
savePolicyToManagedIndexConfig(managedIndexConfig.copy(user = changePolicy.user), policy)
}

@Suppress("TooGenericExceptionCaught")
Expand Down
Loading

0 comments on commit 11da83f

Please sign in to comment.