Skip to content

Commit

Permalink
Implemented filtering on the ISM eplain API (#998)
Browse files Browse the repository at this point in the history
* Implemented filtering on the ISM eplain API

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Fixed tests for ExplainRequest

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Added filtering on query and metadata map

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Filtered on indexNames in metadata

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Fixed github workflow check errors

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Removed debugging comments

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Updated code styling to make more clear

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

* Refactored code to match suggestions

Signed-off-by: Joshua Au <joshuahyau@gmail.com>

---------

Signed-off-by: Joshua Au <joshuahyau@gmail.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
Joshua152 and bowenlan-amzn authored Dec 14, 2023
1 parent 60a8513 commit 21f5f8d
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_POLICY_ID_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import java.io.IOException

data class ExplainFilter(
val policyID: String? = null,
val state: String? = null,
val actionType: String? = null,
val failed: Boolean? = null
) : Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
policyID = sin.readOptionalString(),
state = sin.readOptionalString(),
actionType = sin.readOptionalString(),
failed = sin.readOptionalBoolean()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(policyID)
out.writeOptionalString(state)
out.writeOptionalString(actionType)
out.writeOptionalBoolean(failed)
}

fun byMetaData(metaData: ManagedIndexMetaData): Boolean {
var isValid = true

val stateMetaData = metaData.stateMetaData
if (state != null && (stateMetaData == null || stateMetaData.name != state)) {
isValid = false
}

val actionMetaData = metaData.actionMetaData
if (actionType != null && (actionMetaData == null || actionMetaData.name != actionType)) {
isValid = false
}

val retryInfoMetaData = metaData.policyRetryInfo
val actionFailedNotValid = actionMetaData == null || actionMetaData.failed != failed
val retryFailedNotValid = retryInfoMetaData == null || retryInfoMetaData.failed != failed
if (failed != null && actionFailedNotValid && retryFailedNotValid) {
isValid = false
}

return isValid
}

companion object {
const val FILTER_FIELD = "filter"
const val POLICY_ID_FIELD = "policy_id"
const val STATE_FIELD = "state"
const val ACTION_FIELD = "action_type"
const val FAILED_FIELD = "failed"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ExplainFilter {
var policyID: String? = null
var state: String? = null
var actionType: String? = null
var failed: Boolean? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
FILTER_FIELD -> {
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val filter = xcp.currentName()
xcp.nextToken()

when (filter) {
POLICY_ID_FIELD -> policyID = xcp.text()
STATE_FIELD -> state = xcp.text()
ACTION_FIELD -> actionType = xcp.text()
FAILED_FIELD -> failed = xcp.booleanValue()
}
}
}
}
}

return ExplainFilter(policyID, state, actionType, failed)
}
}
}

fun BoolQueryBuilder.filterByPolicyID(explainFilter: ExplainFilter?): BoolQueryBuilder {
if (explainFilter?.policyID != null) {
this.filter(QueryBuilders.termsQuery(MANAGED_INDEX_POLICY_ID_FIELD, explainFilter.policyID))
}

return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.client.node.NodeClient
import org.opensearch.core.common.Strings
import org.opensearch.common.logging.DeprecationLogger
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI
import org.opensearch.indexmanagement.indexstatemanagement.model.ExplainFilter
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_EXPLAIN_VALIDATE_ACTION
Expand All @@ -28,6 +31,7 @@ import org.opensearch.rest.RestHandler.ReplacedRoute
import org.opensearch.rest.RestHandler.Route
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestRequest.Method.GET
import org.opensearch.rest.RestRequest.Method.POST
import org.opensearch.rest.action.RestToXContentListener

private val log = LogManager.getLogger(RestExplainAction::class.java)
Expand All @@ -52,6 +56,14 @@ class RestExplainAction : BaseRestHandler() {
ReplacedRoute(
GET, "$EXPLAIN_BASE_URI/{index}",
GET, "$LEGACY_EXPLAIN_BASE_URI/{index}"
),
ReplacedRoute(
POST, EXPLAIN_BASE_URI,
POST, LEGACY_EXPLAIN_BASE_URI
),
ReplacedRoute(
POST, "$EXPLAIN_BASE_URI/{index}",
POST, "$LEGACY_EXPLAIN_BASE_URI/{index}"
)
)
}
Expand All @@ -69,6 +81,14 @@ class RestExplainAction : BaseRestHandler() {

val indexType = request.param(TYPE_PARAM_KEY, DEFAULT_INDEX_TYPE)

val explainFilter = if (request.method() == RestRequest.Method.POST) {
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
ExplainFilter.parse(xcp)
} else {
null
}

val clusterManagerTimeout = parseClusterManagerTimeout(
request, DeprecationLogger.getLogger(RestExplainAction::class.java), name
)
Expand All @@ -78,6 +98,7 @@ class RestExplainAction : BaseRestHandler() {
request.paramAsBoolean("local", false),
clusterManagerTimeout,
searchParams,
explainFilter,
request.paramAsBoolean(SHOW_POLICY_QUERY_PARAM, DEFAULT_EXPLAIN_SHOW_POLICY),
request.paramAsBoolean(SHOW_VALIDATE_ACTION, DEFAULT_EXPLAIN_VALIDATE_ACTION),
indexType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.common.model.rest.SearchParams
import org.opensearch.indexmanagement.indexstatemanagement.model.ExplainFilter
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import java.io.IOException

Expand All @@ -21,6 +22,7 @@ class ExplainRequest : ActionRequest {
val local: Boolean
val clusterManagerTimeout: TimeValue
val searchParams: SearchParams
val explainFilter: ExplainFilter?
val showPolicy: Boolean
val validateAction: Boolean
val indexType: String
Expand All @@ -31,6 +33,7 @@ class ExplainRequest : ActionRequest {
local: Boolean,
clusterManagerTimeout: TimeValue,
searchParams: SearchParams,
explainFilter: ExplainFilter?,
showPolicy: Boolean,
validateAction: Boolean,
indexType: String
Expand All @@ -39,6 +42,7 @@ class ExplainRequest : ActionRequest {
this.local = local
this.clusterManagerTimeout = clusterManagerTimeout
this.searchParams = searchParams
this.explainFilter = explainFilter
this.showPolicy = showPolicy
this.validateAction = validateAction
this.indexType = indexType
Expand All @@ -50,6 +54,7 @@ class ExplainRequest : ActionRequest {
local = sin.readBoolean(),
clusterManagerTimeout = sin.readTimeValue(),
searchParams = SearchParams(sin),
explainFilter = sin.readOptionalWriteable(::ExplainFilter),
showPolicy = sin.readBoolean(),
validateAction = sin.readBoolean(),
indexType = sin.readString()
Expand All @@ -72,6 +77,7 @@ class ExplainRequest : ActionRequest {
out.writeBoolean(local)
out.writeTimeValue(clusterManagerTimeout)
searchParams.writeTo(out)
out.writeOptionalWriteable(explainFilter)
out.writeBoolean(showPolicy)
out.writeBoolean(validateAction)
out.writeString(indexType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.common.model.rest.SearchParams
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner.actionValidation
import org.opensearch.indexmanagement.indexstatemanagement.model.filterByPolicyID
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
Expand Down Expand Up @@ -159,9 +160,10 @@ class TransportExplainAction @Inject constructor(
.must(
QueryBuilders
.queryStringQuery(params.queryString)
.defaultField(MANAGED_INDEX_NAME_KEYWORD_FIELD)
.field(MANAGED_INDEX_NAME_KEYWORD_FIELD)
.defaultOperator(Operator.AND)
).filter(QueryBuilders.termsQuery(MANAGED_INDEX_INDEX_UUID_FIELD, indexUUIDs))
.filterByPolicyID(request.explainFilter)

val searchSourceBuilder = SearchSourceBuilder()
.from(params.from)
Expand Down Expand Up @@ -291,8 +293,32 @@ class TransportExplainAction @Inject constructor(
mgetMetadataReq,
object : ActionListener<MultiGetResponse> {
override fun onResponse(response: MultiGetResponse) {
val metadataMap: Map<ManagedIndexMetadataDocUUID, ManagedIndexMetadataMap?> =
var metadataMap: Map<ManagedIndexMetadataDocUUID, ManagedIndexMetadataMap?> =
response.responses.associate { it.id to getMetadata(it.response)?.toMap() }

if (request.explainFilter != null) {
metadataMap = metadataMap.filter { (_, value) ->
var isValid = true

if (value != null) {
val metaData = ManagedIndexMetaData.fromMap(value)

if (!request.explainFilter.byMetaData(metaData)) {
indexNames.remove(metaData.index)
indexNamesToUUIDs.remove(metaData.index)

if (managedIndices.contains(metaData.index)) {
totalManagedIndices--
}

isValid = false
}
}

isValid
}
}

buildResponse(indexNamesToUUIDs, metadataMap, clusterStateIndexMetadatas, threadContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const val MANAGED_INDEX_FIELD = "managed_index"
const val MANAGED_INDEX_NAME_KEYWORD_FIELD = "$MANAGED_INDEX_FIELD.name.keyword"
const val MANAGED_INDEX_INDEX_FIELD = "$MANAGED_INDEX_FIELD.index"
const val MANAGED_INDEX_INDEX_UUID_FIELD = "$MANAGED_INDEX_FIELD.index_uuid"
const val MANAGED_INDEX_POLICY_ID_FIELD = "$MANAGED_INDEX_FIELD.policy_id"

const val DEFAULT_JOB_SORT_FIELD = MANAGED_INDEX_INDEX_FIELD
const val DEFAULT_POLICY_SORT_FIELD = "policy.policy_id.keyword"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ class ExplainRequestTests : OpenSearchTestCase() {
val local = true
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val params = SearchParams(0, 20, "sort-field", "asc", "*")
val filter = null
val showPolicy = false
val showValidationResult = false
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, showPolicy, showValidationResult, DEFAULT_INDEX_TYPE)
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, filter, showPolicy, showValidationResult, DEFAULT_INDEX_TYPE)

val out = BytesStreamOutput()
req.writeTo(out)
Expand All @@ -36,9 +37,10 @@ class ExplainRequestTests : OpenSearchTestCase() {
val local = true
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val params = SearchParams(0, 20, "sort-field", "asc", "*")
val filter = null
val showPolicy = false
val showValidationResult = false
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, showPolicy, showValidationResult, "non-existent-index-type")
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, filter, showPolicy, showValidationResult, "non-existent-index-type")

val actualException: String? = req.validate()?.validationErrors()?.firstOrNull()
val expectedException: String = ExplainRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR
Expand Down

0 comments on commit 21f5f8d

Please sign in to comment.