Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn committed Oct 3, 2023
1 parent 8f7e744 commit 9f00758
Show file tree
Hide file tree
Showing 33 changed files with 195 additions and 1,449 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object SMRunner :
)

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
log.debug("Snapshot management running job: $job")
log.debug("Snapshot management running job: {}", job)

if (job !is SMPolicy) {
throw IllegalArgumentException("Received invalid job type [${job.javaClass.simpleName}] with id [${context.jobId}].")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SMStateMachine(
val indicesManager: IndexManagementIndices,
) {

val log: Logger = LogManager.getLogger("$javaClass [${job.policyName}]")
val log: Logger = LogManager.getLogger(javaClass)

lateinit var currentState: SMState
fun currentState(currentState: SMState): SMStateMachine {
Expand All @@ -62,7 +62,7 @@ class SMStateMachine(
val prevState = currentState
for (nextState in nextStates) {
currentState = nextState
log.debug("Start executing $currentState.")
log.debug("Start executing {}.", currentState)
log.debug(
"User and roles string from thread context: ${threadPool.threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
Expand Down Expand Up @@ -99,7 +99,7 @@ class SMStateMachine(
break
}
is SMResult.Stay -> {
log.debug("State [$currentState] has not finished.")
log.debug("State [{}] has not finished.", currentState)
updateMetadata(
result.metadataToSave
.setCurrentState(prevState)
Expand Down Expand Up @@ -200,7 +200,7 @@ class SMStateMachine(
suspend fun updateMetadata(md: SMMetadata) {
indicesManager.checkAndUpdateIMConfigIndex(log)
try {
log.debug("Update metadata: $md")
log.debug("Update metadata: {}", md)
if (md == metadata) {
log.debug("Metadata not change, so don't need to update")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.MediaType
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import java.io.IOException
import java.nio.file.Files
import java.util.Date
import java.time.Duration
import java.time.Instant
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
Expand Down Expand Up @@ -65,6 +70,24 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

@Before
fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity(
"""
{
"transient": {
"logger.org.opensearch.indexmanagement":"DEBUG",
"logger.org.opensearch.jobscheduler":"DEBUG"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
}

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()

Expand Down Expand Up @@ -172,6 +195,41 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis)
val waitForActiveShards = if (isMultiNode) "all" else "1"
// TODO flaky: Add this log to confirm this update is missed by job scheduler
// This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node
// However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node
// During this period, this update got missed
// Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this.
logger.info("Update rollup start time to $startTimeMillis")
val response = client().makeRequest(
"POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true",
StringEntity(
"{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
ContentType.APPLICATION_JSON
)
)

assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
)
}

private fun assertIndexRolledUp(indexName: String, policyId: String, rollup: ISMRollup) {
val rollupId = rollup.toRollup(indexName).id
private fun assertIndexRolledUp(indexName: String, policyId: String, ismRollup: ISMRollup) {
val rollup = ismRollup.toRollup(indexName)
val rollupId = rollup.id
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so that the policy will be initialized.
Expand All @@ -290,21 +291,20 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
)
}

Thread.sleep(60000)
updateRollupStartTime(rollup)
waitFor(timeout = Instant.ofEpochSecond(60)) {
val rollupJob = getRollup(rollupId = rollupId)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

// Change the start time so that the rollup action will be attempted.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals(
WaitForRollupCompletionStep.getJobCompletionMessage(rollupId, indexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
val rollupJob = getRollup(rollupId = rollupId)
waitFor {
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {
client: RestClient
) = super.createRollup(rollup, rollupId, refresh, client)

fun updateRollupStartTimeExt(update: Rollup, desiredStartTimeMillis: Long? = null) =
super.updateRollupStartTime(update, desiredStartTimeMillis)

fun getRollupMetadataExt(
metadataId: String,
refresh: Boolean = true,
Expand Down Expand Up @@ -165,10 +162,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {
return executeRequest(request, expectedStatus, client)
}

protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) {
RollupRestTestCaseSecurityExtension.updateRollupStartTimeExt(update, desiredStartTimeMillis)
}

protected fun getRollupMetadata(
metadataId: String,
refresh: Boolean = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.waitFor
import org.opensearch.search.builder.SearchSourceBuilder

class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
Expand Down Expand Up @@ -60,7 +61,9 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
createBasicPolicy()

verifyPolicyExists(LEGACY_POLICY_BASE_URI)
verifyPolicyOnIndex(LEGACY_ISM_BASE_URI)
waitFor {
verifyPolicyOnIndex(LEGACY_ISM_BASE_URI)
}
}
ClusterType.MIXED -> {
assertTrue(pluginNames.contains("opensearch-index-management"))
Expand Down Expand Up @@ -137,7 +140,6 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
val createdVersion = responseBody["_version"] as Int
assertNotEquals("Create policy response is missing id", NO_ID, createdId)
assertTrue("Create policy response has incorrect version", createdVersion > 0)
Thread.sleep(10000)
}

@Throws(Exception::class)
Expand All @@ -164,6 +166,7 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {

assertEquals("Explain Index failed", RestStatus.OK, getResponse.restStatus())
val responseBody = getResponse.asMap()
logger.info("Response body: $responseBody")
assertTrue("Test index does not exist", responseBody.containsKey(INDEX_NAME))
val responsePolicy = responseBody[INDEX_NAME] as Map<String, String>
val responsePolicyId = responsePolicy["policy_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.opensearch.core.rest.RestStatus
abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
@Before
fun prepareForIT() {
setDebugLogLevel()
/* init cluster node ids in integ test */
initNodeIdsInRestIT(client())
}
Expand All @@ -49,22 +48,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
return client().makeRequest("POST", IndexManagementPlugin.LRON_BASE_URI, emptyMap(), lronConfig.toHttpEntity())
}

private fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity(
"""
{
"transient": {
"logger.org.opensearch.indexmanagement.controlcenter.notification":"DEBUG"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
}

protected fun LRONConfig.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,17 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return myIndex["state"] as String
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexNamesOfPattern(pattern: String): Set<String> {
val request = Request("GET", "/_cluster/state")
val response = client().performRequest(request)

val responseMap = response.asMap()
val metadata = responseMap["metadata"] as Map<String, Any>
val indexMetaData = metadata["indices"] as Map<String, Any>
return indexMetaData.filter { it.key.startsWith(pattern) }.keys
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexBlocksWriteSetting(indexName: String): String {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
* We are forcing RollOver to fail in this Integ test.
*/
fun `test failed action`() {
disableValidationService()
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Ingest","states":[
{"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]},
Expand Down Expand Up @@ -96,7 +95,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
}

fun `test exponential backoff`() {
disableValidationService()
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Ingest","states":[
{"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"exponential","delay":"1m"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Create a Policy with one State that only preforms a force_merge Action
val forceMergeActionConfig = ForceMergeAction(maxNumSegments = 1, index = 0)
val states = listOf(State("ForceMergeState", listOf(forceMergeActionConfig), listOf()))

val policy = Policy(
id = policyID,
description = "$testIndexName description",
Expand All @@ -45,25 +44,21 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Will change the startTime each execution so that it triggers in 2 seconds
// First execution: Policy is initialized
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Second execution: Index is set to read-only for force_merge
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)

// verify we set maxNumSegments in action properties when kicking off force merge
waitFor {
assertEquals(
Expand All @@ -75,10 +70,9 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) }
// verify we reset actionproperties at end of forcemerge
waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) }
waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments) }
// index should still be readonly after force merge finishes
waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }
}
Expand Down Expand Up @@ -106,7 +100,6 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

// Set index to read-only
Expand All @@ -117,26 +110,22 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Will change the startTime each execution so that it triggers in 2 seconds
// First execution: Policy is initialized
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Second execution: Index was already read-only and should remain so for force_merge
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(AttemptSetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(AttemptCallForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1))
waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments) }
assertEquals("true", getIndexBlocksWriteSetting(indexName))
}
}
Loading

0 comments on commit 9f00758

Please sign in to comment.