From 11da83fa47644bc97f25301406ae0ed0acdf7e02 Mon Sep 17 00:00:00 2001 From: Ravi <6005951+thalurur@users.noreply.github.com> Date: Mon, 9 Aug 2021 10:22:15 -0700 Subject: [PATCH] Storing user information as part of the job when security plugin is installed (#113) Signed-off-by: Ravi Thaluru --- .../indexmanagement/IndexManagementPlugin.kt | 1 + .../ISMTemplateService.kt | 53 ----- .../ManagedIndexCoordinator.kt | 111 ++++++++--- .../ManagedIndexRunner.kt | 58 +++--- .../model/ChangePolicy.kt | 25 ++- .../model/ManagedIndexConfig.kt | 24 ++- .../indexstatemanagement/model/Policy.kt | 18 +- .../opensearchapi/OpenSearchExtensions.kt | 11 +- .../action/addpolicy/AddPolicyRequest.kt | 14 +- .../addpolicy/TransportAddPolicyAction.kt | 5 +- .../changepolicy/ChangePolicyRequest.kt | 14 +- .../TransportChangePolicyAction.kt | 3 +- .../action/explain/ExplainAllResponse.kt | 3 +- .../action/explain/ExplainResponse.kt | 3 +- .../action/getpolicy/GetPoliciesResponse.kt | 4 +- .../getpolicy/TransportGetPoliciesAction.kt | 16 +- .../indexpolicy/TransportIndexPolicyAction.kt | 13 +- .../util/ManagedIndexUtils.kt | 10 +- .../util/RestHandlerUtils.kt | 13 +- .../opensearchapi/OpenSearchExtensions.kt | 52 +++++ .../indexmanagement/util/SecurityUtils.kt | 39 ++++ .../mappings/opendistro-ism-config.json | 187 +++++++++++++++++- .../IndexManagementRestTestCase.kt | 2 +- .../cached-opendistro-ism-config.json | 187 +++++++++++++++++- 24 files changed, 670 insertions(+), 196 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 8a883ef4b..7c4387a95 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -339,6 +339,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin .registerIMIndex(indexManagementIndices) .registerHistoryIndex(indexStateManagementHistory) .registerSkipFlag(skipFlag) + .registerThreadPool(threadPool) val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt index fe81ac6b2..fdd71543f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt @@ -26,67 +26,14 @@ package org.opensearch.indexmanagement.indexstatemanagement -import org.apache.logging.log4j.LogManager import org.apache.lucene.util.automaton.Operations import org.opensearch.OpenSearchException -import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.common.Strings import org.opensearch.common.ValidationException import org.opensearch.common.regex.Regex import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.util.IndexManagementException -private val log = LogManager.getLogger("ISMTemplateService") - -/** - * find the matching policy for the given index - * - * return early if it's hidden index - * filter out templates that were last updated after the index creation time - * - * @return policyID - */ -@Suppress("ReturnCount", "NestedBlockDepth") -fun Map>.findMatchingPolicy( - indexName: String, - indexCreationDate: Long, - isHiddenIndex: Boolean, - indexAbstraction: IndexAbstraction? -): String? { - val isDataStreamIndex = indexAbstraction?.parentDataStream != null - if (this.isEmpty()) return null - // don't include hidden index - if (!isDataStreamIndex && isHiddenIndex) return null - - // If the index belongs to a data stream, then find the matching policy using the data stream name. - val lookupName = when { - isDataStreamIndex -> indexAbstraction?.parentDataStream?.name - else -> indexName - } - - // only process indices created after template - // traverse all ism templates for matching ones - val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) } - var matchedPolicy: String? = null - var highestPriority: Int = -1 - - this.forEach { (policyID, templateList) -> - templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate } - .forEach { - if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) { - if (highestPriority < it.priority) { - highestPriority = it.priority - matchedPolicy = policyID - } else if (highestPriority == it.priority) { - log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]") - } - } - } - } - - return matchedPolicy -} - /** * validate the template Name and indexPattern provided in the template * diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 31103b010..0afebaad8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -53,6 +53,7 @@ import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.component.LifecycleListener +import org.opensearch.common.regex.Regex import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.index.Index @@ -61,14 +62,13 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetManagedIndexMetadata +import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.parsePolicies import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.AUTO_MANAGE import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS @@ -303,38 +303,86 @@ class ManagedIndexCoordinator( val updateManagedIndexReqs = mutableListOf>() if (indexNames.isEmpty()) return updateManagedIndexReqs.toList() - val templates = getISMTemplates() + val policiesWithTemplates = getPoliciesWithISMTemplates() // Iterate over each unmanaged hot/warm index and if it matches an ISM template add a managed index config index request indexNames.forEach { indexName -> - if (clusterState.metadata.hasIndex(indexName)) { + val lookupName = findIndexLookupName(indexName, clusterState) + if (lookupName != null) { val indexMetadata = clusterState.metadata.index(indexName) - val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true) - if (autoManage) { - val indexUuid = indexMetadata.indexUUID - val isHiddenIndex = - IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".") - val indexAbstraction = clusterState.metadata.indicesLookup[indexName] - templates.findMatchingPolicy(indexName, indexMetadata.creationDate, isHiddenIndex, indexAbstraction) - ?.let { policyID -> - logger.info("Index [$indexName] matched ISM policy template and will be managed by $policyID") - updateManagedIndexReqs.add( - managedIndexConfigIndexRequest( - indexName, - indexUuid, - policyID, - jobInterval - ) + val creationDate = indexMetadata.creationDate + val indexUuid = indexMetadata.indexUUID + findMatchingPolicy(lookupName, creationDate, policiesWithTemplates) + ?.let { policy -> + logger.info("Index [$indexName] matched ISM policy template and will be managed by ${policy.id}") + updateManagedIndexReqs.add( + managedIndexConfigIndexRequest( + indexName, + indexUuid, + policy.id, + jobInterval, + policy.user ) - } - } + ) + } } } return updateManagedIndexReqs.toList() } - suspend fun getISMTemplates(): Map> { + private fun findIndexLookupName(indexName: String, clusterState: ClusterState): String? { + if (clusterState.metadata.hasIndex(indexName)) { + val indexMetadata = clusterState.metadata.index(indexName) + val autoManage = indexMetadata.settings.getAsBoolean(AUTO_MANAGE.key, true) + if (autoManage) { + val isHiddenIndex = + IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) || indexName.startsWith(".") + val indexAbstraction = clusterState.metadata.indicesLookup[indexName] + val isDataStreamIndex = indexAbstraction?.parentDataStream != null + if (!isDataStreamIndex && isHiddenIndex) { + return null + } + + return when { + isDataStreamIndex -> indexAbstraction?.parentDataStream?.name + else -> indexName + } + } + } + + return null + } + + private fun findMatchingPolicy(indexName: String, creationDate: Long, policies: List): Policy? { + // only process indices created after template + // traverse all ism templates for matching ones + val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) } + var matchedPolicyId: String? = null + var matchedPolicy: Policy? = null + var highestPriority: Int = -1 + + policies.forEach { policy -> + policy.ismTemplate?.filter { template -> + template.lastUpdatedTime.toEpochMilli() < creationDate + }?.forEach { template -> + if (template.indexPatterns.stream().anyMatch(patternMatchPredicate)) { + if (highestPriority < template.priority) { + highestPriority = template.priority + matchedPolicyId = policy.id + matchedPolicy = policy + } else if (highestPriority == template.priority) { + logger.warn("Warning: index $indexName matches [$matchedPolicyId, ${policy.id}]") + } + } + } + } + + return matchedPolicy + } + + suspend fun getPoliciesWithISMTemplates(): List { + val errorMessage = "Failed to get ISM policies with templates" val searchRequest = SearchRequest() .source( SearchSourceBuilder().query( @@ -345,17 +393,18 @@ class ManagedIndexCoordinator( return try { val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } - getPolicyToTemplateMap(response).filterNotNullValues() + parsePolicies(response) } catch (ex: IndexNotFoundException) { - emptyMap() + emptyList() } catch (ex: ClusterBlockException) { - emptyMap() + logger.error(errorMessage) + emptyList() } catch (e: SearchPhaseExecutionException) { - logger.error("Failed to get ISM templates: $e") - emptyMap() + logger.error("$errorMessage: $e") + emptyList() } catch (e: Exception) { - logger.error("Failed to get ISM templates", e) - emptyMap() + logger.error(errorMessage, e) + emptyList() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index bb37e1b14..1bc257c11 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -41,10 +41,8 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.IndicesOptions -import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.action.update.UpdateResponse import org.opensearch.client.Client -import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.cluster.health.ClusterHealthStatus import org.opensearch.cluster.health.ClusterStateHealth import org.opensearch.cluster.metadata.IndexMetadata @@ -59,7 +57,6 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType -import org.opensearch.index.Index import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.IndexManagementIndices @@ -81,8 +78,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL import org.opensearch.indexmanagement.indexstatemanagement.step.Step -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest import org.opensearch.indexmanagement.indexstatemanagement.util.getActionToExecute import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData @@ -101,6 +96,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMeta import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest +import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.opensearchapi.retry @@ -115,6 +111,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript +import org.opensearch.threadpool.ThreadPool import java.time.Instant import java.time.temporal.ChronoUnit @@ -133,6 +130,7 @@ object ManagedIndexRunner : private lateinit var imIndices: IndexManagementIndices private lateinit var ismHistory: IndexStateManagementHistory private lateinit var skipExecFlag: SkipExecution + private lateinit var threadPool: ThreadPool private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) @@ -205,6 +203,11 @@ object ManagedIndexRunner : return this } + fun registerThreadPool(threadPool: ThreadPool): ManagedIndexRunner { + this.threadPool = threadPool + return this + } + override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ManagedIndexConfig) { throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}") @@ -352,13 +355,25 @@ object ManagedIndexRunner : @Suppress("ComplexCondition") if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) { // Step null check is done in getStartingManagedIndexMetaData - step.preExecute(logger).execute().postExecute(logger) + withContext( + IndexManagementSecurityContext( + managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.user + ) + ) { + step.preExecute(logger).execute().postExecute(logger) + } var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) if (executedManagedIndexMetaData.isFailed) { try { - // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message - publishErrorNotification(policy, executedManagedIndexMetaData) + withContext( + IndexManagementSecurityContext( + managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.user + ) + ) { + // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message + publishErrorNotification(policy, executedManagedIndexMetaData) + } } catch (e: Exception) { logger.error("Failed to publish error notification", e) val errorMessage = e.message ?: "Failed to publish error notification" @@ -572,29 +587,6 @@ object ManagedIndexRunner : } } - // delete metadata in cluster state - private suspend fun deleteManagedIndexMetaData(managedIndexMetaData: ManagedIndexMetaData): Boolean { - var result = false - try { - val request = UpdateManagedIndexMetaDataRequest( - indicesToRemoveManagedIndexMetaDataFrom = listOf(Index(managedIndexMetaData.index, managedIndexMetaData.indexUuid)) - ) - updateMetaDataRetryPolicy.retry(logger) { - val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } - if (response.isAcknowledged) { - result = true - } else { - logger.error("Failed to delete ManagedIndexMetaData for [index=${managedIndexMetaData.index}]") - } - } - } catch (e: ClusterBlockException) { - logger.error("There was ClusterBlockException trying to delete the metadata for ${managedIndexMetaData.index}. Message: ${e.message}", e) - } catch (e: Exception) { - logger.error("Failed to delete ManagedIndexMetaData for [index=${managedIndexMetaData.index}]", e) - } - return result - } - /** * update metadata in config index, and save metadata in history after update * this can be called 2 times in one job run, so need to save seqNo & primeTerm @@ -720,8 +712,8 @@ object ManagedIndexRunner : if (!updated.metadataSaved || policy == null) return - // this will save the new policy on the job and reset the change policy back to null - savePolicyToManagedIndexConfig(managedIndexConfig, policy) + // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job + savePolicyToManagedIndexConfig(managedIndexConfig.copy(user = changePolicy.user), policy) } @Suppress("TooGenericExceptionCaught") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt index 33f897341..5175c412b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt @@ -35,7 +35,10 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER +import org.opensearch.indexmanagement.opensearchapi.optionalUserField import java.io.IOException /** @@ -52,7 +55,8 @@ data class ChangePolicy( val policyID: String, val state: String?, val include: List, - val isSafe: Boolean + val isSafe: Boolean, + val user: User? = null ) : Writeable, ToXContentObject { @Throws(IOException::class) @@ -60,7 +64,10 @@ data class ChangePolicy( policyID = sin.readString(), state = sin.readOptionalString(), include = sin.readList(::StateFilter), - isSafe = sin.readBoolean() + isSafe = sin.readBoolean(), + user = if (sin.readBoolean()) { + User(sin) + } else null ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -69,8 +76,8 @@ data class ChangePolicy( .field(ManagedIndexConfig.POLICY_ID_FIELD, policyID) .field(StateMetaData.STATE, state) .field(IS_SAFE_FIELD, isSafe) - .endObject() - return builder + if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(ManagedIndexConfig.USER_FIELD, user) + return builder.endObject() } @Throws(IOException::class) @@ -79,6 +86,8 @@ data class ChangePolicy( out.writeOptionalString(state) out.writeList(include) out.writeBoolean(isSafe) + out.writeBoolean(user != null) + user?.writeTo(out) } companion object { @@ -86,6 +95,7 @@ data class ChangePolicy( const val STATE_FIELD = "state" const val INCLUDE_FIELD = "include" const val IS_SAFE_FIELD = "is_safe" + const val USER_FIELD = "user" @JvmStatic @Throws(IOException::class) @@ -93,6 +103,7 @@ data class ChangePolicy( var policyID: String? = null var state: String? = null var isSafe: Boolean = false + var user: User? = null val include = mutableListOf() ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -110,6 +121,9 @@ data class ChangePolicy( } } IS_SAFE_FIELD -> isSafe = xcp.booleanValue() + USER_FIELD -> { + user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) + } else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ChangePolicy.") } } @@ -118,7 +132,8 @@ data class ChangePolicy( requireNotNull(policyID) { "ChangePolicy policy id is null" }, state, include.toList(), - isSafe + isSafe, + user ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index 0645c0b2f..c4d444bc6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -31,10 +31,13 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User import org.opensearch.index.seqno.SequenceNumbers -import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER import org.opensearch.indexmanagement.opensearchapi.instant import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.opensearchapi.optionalUserField import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.schedule.Schedule import org.opensearch.jobscheduler.spi.schedule.ScheduleParser @@ -56,7 +59,8 @@ data class ManagedIndexConfig( val policySeqNo: Long?, val policyPrimaryTerm: Long?, val policy: Policy?, - val changePolicy: ChangePolicy? + val changePolicy: ChangePolicy?, + val user: User? = null ) : ScheduledJobParameter { init { @@ -93,11 +97,11 @@ data class ManagedIndexConfig( .field(POLICY_ID_FIELD, policyID) .field(POLICY_SEQ_NO_FIELD, policySeqNo) .field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm) - .field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE) + .field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE_AND_USER) .field(CHANGE_POLICY_FIELD, changePolicy) - .endObject() - .endObject() - return builder + if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) + builder.endObject() + return builder.endObject() } companion object { @@ -115,6 +119,7 @@ data class ManagedIndexConfig( const val POLICY_SEQ_NO_FIELD = "policy_seq_no" const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term" const val CHANGE_POLICY_FIELD = "change_policy" + const val USER_FIELD = "user" @Suppress("ComplexMethod", "LongMethod") @JvmStatic @@ -138,6 +143,7 @@ data class ManagedIndexConfig( var enabled = true var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO + var user: User? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -165,6 +171,9 @@ data class ManagedIndexConfig( CHANGE_POLICY_FIELD -> { changePolicy = if (xcp.currentToken() == Token.VALUE_NULL) null else ChangePolicy.parse(xcp) } + USER_FIELD -> { + user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) + } else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.") } } @@ -193,7 +202,8 @@ data class ManagedIndexConfig( seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM ), - changePolicy = changePolicy + changePolicy = changePolicy, + user = user ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt index ddca13e6a..c20135c74 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -35,11 +35,14 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER import org.opensearch.indexmanagement.opensearchapi.instant import org.opensearch.indexmanagement.opensearchapi.optionalISMTemplateField import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.opensearchapi.optionalUserField import org.opensearch.indexmanagement.util.IndexUtils import java.io.IOException import java.time.Instant @@ -54,7 +57,8 @@ data class Policy( val errorNotification: ErrorNotification?, val defaultState: String, val states: List, - val ismTemplate: List? = null + val ismTemplate: List? = null, + val user: User? = null ) : ToXContentObject, Writeable { init { @@ -86,6 +90,7 @@ data class Policy( .field(DEFAULT_STATE_FIELD, defaultState) .field(STATES_FIELD, states.toTypedArray()) .optionalISMTemplateField(ISM_TEMPLATE, ismTemplate) + if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(ManagedIndexConfig.USER_FIELD, user) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() return builder.endObject() } @@ -103,6 +108,9 @@ data class Policy( states = sin.readList(::State), ismTemplate = if (sin.readBoolean()) { sin.readList(::ISMTemplate) + } else null, + user = if (sin.readBoolean()) { + User(sin) } else null ) @@ -123,6 +131,8 @@ data class Policy( } else { out.writeBoolean(false) } + out.writeBoolean(user != null) + user?.writeTo(out) } companion object { @@ -136,6 +146,7 @@ data class Policy( const val DEFAULT_STATE_FIELD = "default_state" const val STATES_FIELD = "states" const val ISM_TEMPLATE = "ism_template" + const val USER_FIELD = "user" @Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth") @JvmStatic @@ -154,6 +165,7 @@ data class Policy( var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION val states: MutableList = mutableListOf() var ismTemplates: List? = null + var user: User? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -189,6 +201,7 @@ data class Policy( } } } + USER_FIELD -> user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Policy.") } } @@ -203,7 +216,8 @@ data class Policy( errorNotification = errorNotification, defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" }, states = states.toList(), - ismTemplate = ismTemplates + ismTemplate = ismTemplates, + user = user ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index ae53309e1..f37c92d3d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -52,7 +52,6 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.index.Index import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings @@ -105,15 +104,13 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList { } /** - * Do a exists search query to retrieve all policy with ism_template field - * parse search response with this function + * Parse policies from SearchResponse * - * @return map of policyID to ISMTemplate in this policy + * @return list of policies * @throws [IllegalArgumentException] */ @Throws(Exception::class) -fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): - Map?> { +fun parsePolicies(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): List { return response.hits.hits.map { val id = it.id val seqNo = it.seqNo @@ -122,7 +119,7 @@ fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXCon .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, it.sourceAsString) xcp.parseWithType(id, seqNo, primaryTerm, Policy.Companion::parse) .copy(id = id, seqNo = seqNo, primaryTerm = primaryTerm) - }.map { it.id to it.ismTemplate }.toMap() + } } @Suppress("UNCHECKED_CAST") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt index 58da2ed21..8d2a493bb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt @@ -33,18 +33,10 @@ import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import java.io.IOException -class AddPolicyRequest : ActionRequest { - - val indices: List +class AddPolicyRequest( + val indices: List, val policyID: String - - constructor( - indices: List, - policyID: String - ) : super() { - this.indices = indices - this.policyID = policyID - } +) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index bf5d898a3..6f64b38a0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -55,6 +55,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -213,7 +214,9 @@ class TransportAddPolicyAction @Inject constructor( val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout)) indicesToAdd.forEach { (uuid, name) -> - bulkReq.add(managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval)) + bulkReq.add( + managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, user = buildUser(client.threadPool().threadContext)) + ) } client.bulk( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt index 2df9a4f8e..28e3851e2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt @@ -34,18 +34,10 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy import java.io.IOException -class ChangePolicyRequest : ActionRequest { - - val indices: List +class ChangePolicyRequest( + val indices: List, val changePolicy: ChangePolicy - - constructor( - indices: List, - changePolicy: ChangePolicy - ) : super() { - this.indices = indices - this.changePolicy = changePolicy - } +) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index 06e98196c..0e26e8d8a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -68,6 +68,7 @@ import org.opensearch.indexmanagement.opensearchapi.contentParser import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.rest.RestStatus import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.tasks.Task @@ -271,7 +272,7 @@ class TransportChangePolicyAction @Inject constructor( // compare the sweptConfig policy to the get policy here and update changePolicy val currentStateName = indexUuidToCurrentState[sweptConfig.uuid] val updatedChangePolicy = changePolicy - .copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true) + .copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true, user = buildUser(client.threadPool().threadContext)) bulkUpdateManagedIndexRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy))) mapOfItemIdToIndex[id] = Index(sweptConfig.index, sweptConfig.uuid) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt index 5f5e21cff..ffd184b1c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt @@ -34,6 +34,7 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_USER import java.io.IOException class ExplainAllResponse : ExplainResponse, ToXContentObject { @@ -74,7 +75,7 @@ class ExplainAllResponse : ExplainResponse, ToXContentObject { builder.startObject(name) builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind]) builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind]) - indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS) + indexMetadatas[ind]?.toXContent(builder, XCONTENT_WITHOUT_USER) builder.field("enabled", enabledState[name]) builder.endObject() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt index fd57065cd..14eb79268 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt @@ -35,6 +35,7 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_USER import java.io.IOException open class ExplainResponse : ActionResponse, ToXContentObject { @@ -74,7 +75,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { builder.startObject(name) builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind]) builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind]) - indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS) + indexMetadatas[ind]?.toXContent(builder, XCONTENT_WITHOUT_USER) builder.endObject() } return builder.endObject() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt index 2852fafde..a82367de0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt @@ -33,7 +33,7 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO @@ -72,7 +72,7 @@ class GetPoliciesResponse : ActionResponse, ToXContentObject { .field(_ID, policy.id) .field(_SEQ_NO, policy.seqNo) .field(_PRIMARY_TERM, policy.primaryTerm) - .field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE) + .field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE_AND_USER) .endObject() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt index ac3eaf3b4..4dbbc2cb6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt @@ -35,16 +35,12 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.client.Client import org.opensearch.common.inject.Inject -import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.common.xcontent.XContentType import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.parsePolicies import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortBuilders import org.opensearch.search.sort.SortOrder @@ -99,15 +95,7 @@ class TransportGetPoliciesAction @Inject constructor( object : ActionListener { override fun onResponse(response: SearchResponse) { val totalPolicies = response.hits.totalHits?.value ?: 0 - val policies = response.hits.hits.map { - val id = it.id - val seqNo = it.seqNo - val primaryTerm = it.primaryTerm - val xcp = XContentFactory.xContent(XContentType.JSON) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, it.sourceAsString) - xcp.parseWithType(id, seqNo, primaryTerm, Policy.Companion::parse) - .copy(id = id, seqNo = seqNo, primaryTerm = primaryTerm) - } + val policies = parsePolicies(response) actionListener.onResponse(GetPoliciesResponse(policies, totalPolicies.toInt())) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index d2cd2c1a9..34f587379 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -51,11 +51,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicy import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.filterNotNullValues -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getPolicyToTemplateMap +import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.parsePolicies import org.opensearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD import org.opensearch.indexmanagement.indexstatemanagement.validateFormat import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task @@ -98,7 +99,6 @@ class TransportIndexPolicyAction @Inject constructor( log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.") // if there is template field, we will check - val reqTemplate = request.policy.ismTemplate val reqTemplates = request.policy.ismTemplate if (reqTemplates != null) { validateISMTemplates(reqTemplates) @@ -144,7 +144,8 @@ class TransportIndexPolicyAction @Inject constructor( searchRequest, object : ActionListener { override fun onResponse(response: SearchResponse) { - val policyToTemplateMap = getPolicyToTemplateMap(response, xContentRegistry).filterNotNullValues() + val policies = parsePolicies(response, xContentRegistry) + val policyToTemplateMap: Map> = policies.map { it.id to it.ismTemplate }.toMap().filterNotNullValues() ismTemplateList.forEach { val conflictingPolicyTemplates = policyToTemplateMap .findConflictingPolicyTemplates(request.policyID, it.indexPatterns, it.priority) @@ -175,11 +176,13 @@ class TransportIndexPolicyAction @Inject constructor( } private fun putPolicy() { - request.policy.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion) + val policy = request.policy.copy( + schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = buildUser(client.threadPool().threadContext) + ) val indexRequest = IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(request.policy.toXContent(XContentFactory.jsonBuilder())) + .source(policy.toXContent(XContentFactory.jsonBuilder())) .id(request.policyID) .timeout(IndexRequest.DEFAULT_TIMEOUT) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 8f8578ee2..0aa8af149 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -45,6 +45,7 @@ import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.authuser.User import org.opensearch.index.Index import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders @@ -76,7 +77,7 @@ import java.net.InetAddress import java.time.Instant import java.time.temporal.ChronoUnit -fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String, jobInterval: Int): IndexRequest { +fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String, jobInterval: Int, user: User? = null): IndexRequest { val managedIndexConfig = ManagedIndexConfig( jobName = index, index = index, @@ -89,7 +90,8 @@ fun managedIndexConfigIndexRequest(index: String, uuid: String, policyID: String policy = null, policySeqNo = null, policyPrimaryTerm = null, - changePolicy = null + changePolicy = null, + user = user ) return IndexRequest(INDEX_MANAGEMENT_INDEX) @@ -160,11 +162,11 @@ fun deleteManagedIndexMetadataRequest(uuid: String): DeleteRequest { return DeleteRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)) } -fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig): UpdateRequest { +fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig, user: User? = null): UpdateRequest { return UpdateRequest(INDEX_MANAGEMENT_INDEX, sweptManagedIndexConfig.uuid) .setIfPrimaryTerm(sweptManagedIndexConfig.primaryTerm) .setIfSeqNo(sweptManagedIndexConfig.seqNo) - .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy)) + .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy, user)) } /** diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index 49581d430..6a818e4b4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -34,13 +34,17 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentFragment import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.authuser.User import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.optionalTimeField import java.time.Instant const val WITH_TYPE = "with_type" +const val WITH_USER = "with_user" val XCONTENT_WITHOUT_TYPE = ToXContent.MapParams(mapOf(WITH_TYPE to "false")) +val XCONTENT_WITHOUT_USER = ToXContent.MapParams(mapOf(WITH_USER to "false")) +val XCONTENT_WITHOUT_TYPE_AND_USER = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false")) const val FAILURES = "failures" const val FAILED_INDICES = "failed_indices" @@ -106,13 +110,14 @@ data class FailedIndex(val name: String, val uuid: String, val reason: String) : * Gets the XContentBuilder for partially updating a [ManagedIndexConfig]'s ChangePolicy */ fun getPartialChangePolicyBuilder( - changePolicy: ChangePolicy? + changePolicy: ChangePolicy?, + user: User? = null ): XContentBuilder { - return XContentFactory.jsonBuilder() + val builder = XContentFactory.jsonBuilder() .startObject() .startObject(ManagedIndexConfig.MANAGED_INDEX_TYPE) .optionalTimeField(ManagedIndexConfig.LAST_UPDATED_TIME_FIELD, Instant.now()) .field(ManagedIndexConfig.CHANGE_POLICY_FIELD, changePolicy) - .endObject() - .endObject() + if (user != null) builder.field(ManagedIndexConfig.USER_FIELD, user) + return builder.endObject().endObject() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 1fb697e5a..9111a4f9b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -28,7 +28,9 @@ package org.opensearch.indexmanagement.opensearchapi +import kotlinx.coroutines.ThreadContextElement import kotlinx.coroutines.delay +import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchException @@ -37,7 +39,9 @@ import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.support.DefaultShardOperationFailedException import org.opensearch.client.OpenSearchClient import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.ToXContent @@ -48,15 +52,20 @@ import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.InjectSecurity +import org.opensearch.commons.authuser.User import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.DEFAULT_INJECT_ROLES +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.INTERNAL_REQUEST import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException import java.io.IOException import java.time.Instant +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @@ -99,6 +108,10 @@ fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplates: List XContentParser.parseWithType( ensureExpectedToken(Token.END_OBJECT, this.nextToken(), this) return parsed } + +class IndexManagementSecurityContext( + private val id: String, + settings: Settings, + private val threadContext: ThreadContext, + private val user: User? +) : ThreadContextElement { + + companion object Key : CoroutineContext.Key + + private val logger: Logger = LogManager.getLogger(javaClass) + override val key: CoroutineContext.Key<*> + get() = Key + private val injector = InjectSecurity(id, settings, threadContext) + + /** + * Before the thread executes the coroutine we want the thread context to contain user roles so they are used when executing the code inside + * the coroutine + */ + override fun updateThreadContext(context: CoroutineContext) { + logger.debug("Setting security context in thread ${Thread.currentThread().name} for job $id") + injector.injectRoles(if (user == null) DEFAULT_INJECT_ROLES else user.roles) + // TODO: implement this in InjectSecurity to be able to set specific transient properties in ThreadContext + if (threadContext.getTransient(INTERNAL_REQUEST) == null) { + threadContext.putTransient(INTERNAL_REQUEST, true) + } else { + // TODO: Should fail? + logger.error("Failed to set the context correctly for job $id") + } + } + + /** + * Clean up the thread context before the coroutine executed by thread is suspended + */ + override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { + logger.debug("Cleaning up security context in thread ${Thread.currentThread().name} for job $id") + injector.close() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt new file mode 100644 index 000000000..19bed21f3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indexmanagement.util + +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.authuser.User + +class SecurityUtils { + companion object { + const val INTERNAL_REQUEST = "index_management_plugin_internal_user" + val DEFAULT_INJECT_ROLES: List = listOf("all_access", "AmazonES_all_access") + /** + * Helper method to build the user object either from the threadContext or from the requested user. + */ + fun buildUser(threadContext: ThreadContext, requestedUser: User? = null): User? { + if (threadContext.getTransient(INTERNAL_REQUEST) != null && threadContext.getTransient(INTERNAL_REQUEST)) { + // received internal request + return requestedUser + } + + val injectedUser: User? = User.parse(threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)) + return if (injectedUser == null) { + null + } else { + User(injectedUser.name, injectedUser.backendRoles, injectedUser.roles, injectedUser.customAttNames) + } + } + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index f3eda3c04..d39380450 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 10 + "schema_version": 11 }, "dynamic": "strict", "properties": { @@ -456,6 +456,43 @@ "format": "strict_date_time||epoch_millis" } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -510,6 +547,43 @@ }, "is_safe": { "type": "boolean" + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -540,6 +614,43 @@ } } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -851,6 +962,43 @@ } } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -1039,6 +1187,43 @@ "data_selection_query": { "type": "object", "enabled": false + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index e25c5c282..d44493b26 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -46,7 +46,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 10 + val configSchemaVersion = 11 val historySchemaVersion = 3 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index f3eda3c04..d39380450 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 10 + "schema_version": 11 }, "dynamic": "strict", "properties": { @@ -456,6 +456,43 @@ "format": "strict_date_time||epoch_millis" } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -510,6 +547,43 @@ }, "is_safe": { "type": "boolean" + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -540,6 +614,43 @@ } } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -851,6 +962,43 @@ } } } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } }, @@ -1039,6 +1187,43 @@ "data_selection_query": { "type": "object", "enabled": false + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } } } },