forked from opensearch-project/alerting
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
draft refactor of REST handling and Transport, still work in progress
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
- Loading branch information
1 parent
a48d1f4
commit 050f42a
Showing
14 changed files
with
555 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 29 additions & 24 deletions
53
alerting/src/main/kotlin/org/opensearch/alerting/action/GetSuggestionsRequest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,49 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.action | ||
|
||
import org.opensearch.action.ActionRequest | ||
import org.opensearch.action.ActionRequestValidationException | ||
import org.opensearch.alerting.model.Monitor | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInput | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInputFactory | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInputType | ||
import org.opensearch.common.io.stream.StreamInput | ||
import org.opensearch.common.io.stream.StreamOutput | ||
import org.opensearch.common.xcontent.XContentParser | ||
import java.io.IOException | ||
|
||
class GetSuggestionsRequest : ActionRequest { | ||
val monitorId: String? | ||
val monitor: Monitor? | ||
val inputType: SuggestionInputType | ||
val component: String | ||
val input: SuggestionInput<*, Any> // TODO: is * safe here? | ||
|
||
constructor( | ||
monitorId: String?, | ||
monitor: Monitor? | ||
inputType: SuggestionInputType, | ||
component: String, | ||
xcp: XContentParser | ||
) : super() { | ||
this.monitorId = monitorId | ||
this.monitor = monitor | ||
this.inputType = inputType | ||
this.component = component | ||
this.input = SuggestionInputFactory.getInput(this.inputType, xcp) | ||
} | ||
|
||
@Throws(IOException::class) | ||
constructor(sin: StreamInput) : this( | ||
sin.readOptionalString(), // monitorId | ||
if (sin.readBoolean()) { | ||
Monitor.readFrom(sin) // monitor | ||
} else null | ||
) | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null | ||
constructor(sin: StreamInput) : super() { | ||
this.inputType = sin.readEnum(SuggestionInputType::class.java) // inputType | ||
this.component = sin.readString() // component | ||
this.input = SuggestionInputFactory.getInput(this.inputType, sin) | ||
} | ||
|
||
@Throws(IOException::class) | ||
override fun writeTo(out: StreamOutput) { | ||
out.writeOptionalString(monitorId) | ||
if (monitor != null) { | ||
out.writeBoolean(true) | ||
monitor.writeTo(out) | ||
} else { | ||
out.writeBoolean(false) | ||
} | ||
out.writeEnum(inputType) | ||
out.writeString(component) | ||
input.writeTo(out) | ||
} | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetSuggestionsAction.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.resthandler | ||
|
||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.alerting.AlertingPlugin | ||
import org.opensearch.alerting.action.GetSuggestionsAction | ||
import org.opensearch.alerting.action.GetSuggestionsRequest | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInput | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInputType | ||
import org.opensearch.client.node.NodeClient | ||
import org.opensearch.common.xcontent.XContentParser.Token | ||
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken | ||
import org.opensearch.rest.BaseRestHandler | ||
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer | ||
import org.opensearch.rest.RestHandler.Route | ||
import org.opensearch.rest.RestRequest | ||
import org.opensearch.rest.RestRequest.Method.POST | ||
import org.opensearch.rest.action.RestToXContentListener | ||
|
||
private val log = LogManager.getLogger(RestGetSuggestionsAction::class.java) | ||
|
||
class RestGetSuggestionsAction : BaseRestHandler() { | ||
|
||
override fun getName(): String = "get_suggestions_action" | ||
|
||
override fun routes(): List<Route> { | ||
return listOf( | ||
Route(POST, AlertingPlugin.SUGGESTIONS_BASE_URI) // inline object with specific format is required | ||
) | ||
} | ||
|
||
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { | ||
log.debug("${request.method()} ${AlertingPlugin.SUGGESTIONS_BASE_URI}") | ||
|
||
return RestChannelConsumer { channel -> | ||
var inputType: SuggestionInputType? = null | ||
var component: String? = null | ||
var hasInput = false | ||
|
||
val xcp = request.contentParser() | ||
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) | ||
while (xcp.nextToken() != Token.END_OBJECT) { | ||
val fieldName = xcp.currentName() | ||
xcp.nextToken() | ||
|
||
when (fieldName) { | ||
SuggestionInput.INPUT_TYPE_FIELD -> inputType = SuggestionInputType.enumFromStr(xcp.text()) | ||
SuggestionInput.COMPONENT_FIELD -> component = xcp.text() | ||
SuggestionInput.INPUT_FIELD -> { | ||
hasInput = true | ||
break | ||
} | ||
else -> throw IllegalArgumentException("request body must contain only input, inputType, and component fields") | ||
} | ||
} | ||
|
||
if (inputType == null || component == null || !hasInput) { | ||
throw IllegalArgumentException("request body must contain input, inputType, and component fields") | ||
} | ||
|
||
val getSuggestionsRequest = GetSuggestionsRequest(inputType, component, xcp) // xcp already pointing to beginning of input{} object | ||
|
||
client.execute(GetSuggestionsAction.INSTANCE, getSuggestionsRequest, RestToXContentListener(channel)) | ||
} | ||
} | ||
|
||
// private fun validateInputs(request: RestRequest) { | ||
// if (!request.hasParam(SuggestionInput.INPUT_FIELD) || !request.hasParam(SuggestionInput.INPUT_TYPE_FIELD) || !request.hasParam(SuggestionInput.COMPONENT_FIELD)) { | ||
// throw IllegalArgumentException( | ||
// "request body must contain input, inputType, and component fields, ${request.hasParam(SuggestionInput.INPUT_FIELD)}," + | ||
// "${request.hasParam(SuggestionInput.INPUT_TYPE_FIELD)}, ${request.hasParam(SuggestionInput.COMPONENT_FIELD)}" | ||
// ) | ||
// } | ||
// | ||
// val inputType = request.param(SuggestionInput.INPUT_TYPE_FIELD) | ||
// val allowedInputTypes = SuggestionInputType.values().map { it.value } | ||
// if (!allowedInputTypes.contains(inputType)) { | ||
// throw IllegalArgumentException("invalid input type, must be one of $allowedInputTypes") | ||
// } | ||
// } | ||
// | ||
// // prepare by making it point to the start of the "input{}" object rather | ||
// // than the start of the entire request body | ||
// private fun prepareXcp(xcp: XContentParser) { | ||
// ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) | ||
// while (xcp.nextToken() != Token.END_OBJECT) { | ||
// val fieldName = xcp.currentName() | ||
// xcp.nextToken() | ||
// if (fieldName == SuggestionInput.INPUT_FIELD) { | ||
// break | ||
// } else { | ||
// xcp.skipChildren() | ||
// } | ||
// } | ||
// } | ||
} |
31 changes: 31 additions & 0 deletions
31
alerting/src/main/kotlin/org/opensearch/alerting/rules/ExampleRule.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.rules | ||
|
||
import org.opensearch.alerting.core.model.SearchInput | ||
import org.opensearch.alerting.model.Monitor | ||
import org.opensearch.alerting.rules.util.Rule | ||
|
||
object ExampleRule : Rule { | ||
// dummy example rule that checks for wildcard expressions in index declarations | ||
override fun evaluate(monitor: Monitor): Boolean { | ||
val input = monitor.inputs[0] | ||
|
||
if (input is SearchInput) { | ||
for (index in input.indices) { | ||
if (index.contains("*")) { | ||
return false | ||
} | ||
} | ||
} | ||
|
||
return true | ||
} | ||
|
||
override fun suggestion(): String { | ||
return "an index in your inputs uses a wildcard (*), consider explicitly declaring indices instead" | ||
} | ||
} |
111 changes: 111 additions & 0 deletions
111
alerting/src/main/kotlin/org/opensearch/alerting/rules/inputs/MonitorIDInput.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.rules.inputs | ||
|
||
import org.opensearch.OpenSearchStatusException | ||
import org.opensearch.action.ActionListener | ||
import org.opensearch.action.get.GetRequest | ||
import org.opensearch.action.get.GetResponse | ||
import org.opensearch.alerting.core.model.ScheduledJob | ||
import org.opensearch.alerting.model.Monitor | ||
import org.opensearch.alerting.rules.inputs.util.SuggestionInput | ||
import org.opensearch.client.Client | ||
import org.opensearch.common.io.stream.StreamInput | ||
import org.opensearch.common.io.stream.StreamOutput | ||
import org.opensearch.common.xcontent.LoggingDeprecationHandler | ||
import org.opensearch.common.xcontent.NamedXContentRegistry | ||
import org.opensearch.common.xcontent.XContentHelper | ||
import org.opensearch.common.xcontent.XContentParser | ||
import org.opensearch.common.xcontent.XContentParser.Token | ||
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken | ||
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.rest.RestStatus | ||
|
||
class MonitorIDInput() : SuggestionInput<String, Monitor> { | ||
|
||
override var rawInput: String? = null | ||
var obj: Monitor? = null | ||
|
||
constructor(sin: StreamInput) : this() { | ||
rawInput = sin.readOptionalString() // TODO: readString() or readOptionalString()? | ||
} | ||
|
||
/** | ||
* User input requirements that will be checked for: | ||
* input{} must contain exactly one field named "monitorId" | ||
* | ||
* whether or not it stores a valid monitor id is deferred until | ||
* the id is used to query the Scheduled Job Index for the Monitor | ||
* | ||
* Error is thrown if any of the above is violated | ||
*/ | ||
override fun parseInput(xcp: XContentParser) { | ||
|
||
// parse input for monitor id | ||
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) // start of input {} block | ||
ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) // name field, should be "monitorId" | ||
if (xcp.currentName() != "monitorId") { // TODO: put String vals in constants in companion object | ||
throw IllegalArgumentException("input must contain exactly one field named \"monitorId\" that stores a valid monitor id") | ||
} | ||
xcp.nextToken() // the value stored in the monitorId field, the monitor id itself | ||
val monitorId: String = xcp.text() // TODO: setting to Monitor.NO_ID is redundant? consider doing it anyway, ie initialize at top and set later | ||
|
||
this.rawInput = monitorId | ||
} | ||
|
||
override fun getObject(client: Client, xContentRegistry: NamedXContentRegistry): Monitor { | ||
// check to ensure that parseInput was called first and rawInput is not null | ||
if (rawInput == null) { | ||
throw IllegalStateException("input was not parsed to get monitorId, parseInput() must be called first") | ||
} | ||
|
||
// use monitor id to retrieve Monitor object from Scheduled Jobs Index | ||
var monitor: Monitor? = null | ||
|
||
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(this.rawInput) | ||
client.get( | ||
getRequest, | ||
object : ActionListener<GetResponse> { | ||
override fun onResponse(response: GetResponse) { | ||
if (!response.isExists) { | ||
throw OpenSearchStatusException("Monitor with ID $rawInput not found", RestStatus.NOT_FOUND) | ||
} | ||
|
||
if (!response.isSourceEmpty) { | ||
XContentHelper.createParser( | ||
xContentRegistry, LoggingDeprecationHandler.INSTANCE, | ||
response.sourceAsBytesRef, XContentType.JSON | ||
).use { xcp -> | ||
val receivedMonitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor | ||
monitor = receivedMonitor.copy() | ||
} | ||
} | ||
} | ||
|
||
override fun onFailure(e: Exception) { | ||
throw IllegalStateException("onFailure: $e, $rawInput, $monitor") | ||
} | ||
} | ||
) | ||
|
||
if (monitor == null) { | ||
throw IllegalStateException("if monitor not found, should have already failed and never gotten here") | ||
} | ||
|
||
return monitor as Monitor | ||
} | ||
|
||
override fun writeTo(out: StreamOutput) { | ||
out.writeOptionalString(rawInput) | ||
} | ||
|
||
companion object { | ||
@JvmStatic | ||
fun readFrom(sin: StreamInput): MonitorIDInput { | ||
return MonitorIDInput(sin) | ||
} | ||
} | ||
} |
Oops, something went wrong.