Skip to content

Commit

Permalink
Making snapshot name to scripted input in template (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#77)

Signed-off-by: Ravi Thaluru <ravi1092@gmail.com>
  • Loading branch information
thalurur authored Oct 1, 2021
1 parent cc117a0 commit 3958175
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.action.Snapshot
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.script.ScriptService

class SnapshotAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: SnapshotActionConfig
) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) {
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData)
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, scriptService, client, config, managedIndexMetaData)
private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData)

override fun getSteps(): List<Step> = listOf(attemptSnapshotStep, waitForSnapshotStep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ data class SnapshotActionConfig(
client: Client,
settings: Settings,
managedIndexMetaData: ManagedIndexMetaData
): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this)
): Action = SnapshotAction(clusterService, scriptService, client, managedIndexMetaData, this)

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmet
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.ConcurrentSnapshotExecutionException
import org.opensearch.transport.RemoteTransportException
import java.time.LocalDateTime
Expand All @@ -50,6 +55,7 @@ import java.util.Locale

class AttemptSnapshotStep(
val clusterService: ClusterService,
val scriptService: ScriptService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
Expand All @@ -74,15 +80,15 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
return this
}
val snapshotNameSuffix = "-".plus(
LocalDateTime.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)

snapshotName = config
.snapshot
.plus("-")
.plus(
LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)
val snapshotScript = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, config.snapshot, mapOf())
// If user intentionally set the snapshot name empty then we are going to honor it
val defaultSnapshotName = if (config.snapshot.isBlank()) config.snapshot else indexName
snapshotName = compileTemplate(snapshotScript, managedIndexMetaData, defaultSnapshotName).plus(snapshotNameSuffix)

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
Expand Down Expand Up @@ -148,6 +154,16 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
}

private fun compileTemplate(template: Script, managedIndexMetaData: ManagedIndexMetaData, defaultValue: String): String {
val contextMap = managedIndexMetaData.convertToMap().filterKeys { key ->
key in validTopContextFields
}
val compiledValue = scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to contextMap))
.execute()
return if (compiledValue.isBlank()) defaultValue else compiledValue
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
Expand All @@ -159,6 +175,7 @@ class AttemptSnapshotStep(
}

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_snapshot"
fun getBlockedMessage(denyList: List<String>, repoName: String, index: String) =
"Snapshot repository [$repoName] is blocked in $denyList [index=$index]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,13 +697,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
protected fun assertSnapshotExists(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.contains(snapshot) }) { "No snapshot found with id: $snapshot" }
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.startsWith(snapshot) }) { "No snapshot found with id: $snapshot" }

@Suppress("UNCHECKED_CAST")
protected fun assertSnapshotFinishedWithSuccess(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.contains(snapshot) && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." }
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.startsWith(snapshot) && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." }

/**
* Compares responses returned by APIs such as those defined in [RetryFailedManagedIndexAction] and [RestAddPolicyAction]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,82 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
waitFor { assertSnapshotExists(repository, "snapshot") }
waitFor { assertSnapshotFinishedWithSuccess(repository, "snapshot") }
}

fun `test basic with templated snapshot name`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val actionConfig = SnapshotActionConfig(repository, "{{ctx.index}}", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test basic with invalid templated snapshot name default to indexName`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val actionConfig = SnapshotActionConfig(repository, "{{ctx.someField}}", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test successful wait for snapshot step`() {
Expand Down Expand Up @@ -130,6 +204,55 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test successful wait for snapshot step - empty snapshot name`() {
val indexName = "${testIndexName}_index_success"
val policyID = "${testIndexName}_policy_success"
val repository = "repository"
val snapshot = "-"
val actionConfig = SnapshotActionConfig(repository, "", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will initialize the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so attempt snapshot step with execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(AttemptSnapshotStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Change the start time so wait for snapshot step will execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(WaitForSnapshotStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// verify we set snapshotName in action properties
waitFor {
assert(
getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName?.contains(snapshot) == true
)
}

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_failed"
val policyID = "${testIndexName}_policy_failed"
Expand Down Expand Up @@ -188,8 +311,7 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
val indexName = "${testIndexName}_index_blocked"
val policyID = "${testIndexName}_policy_basic"
val repository = "hello-world"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val actionConfig = SnapshotActionConfig(repository, "snapshot", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
Expand All @@ -27,25 +28,30 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.ingest.TestTemplateService.MockTemplateScript
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.ConcurrentSnapshotExecutionException
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException

class AttemptSnapshotStepTests : OpenSearchTestCase() {

private val clusterService: ClusterService = mock()
private val config = SnapshotActionConfig("repo", "snapshot-name", 0)
private val scriptService: ScriptService = mock()
private val config = randomSnapshotActionConfig("repo", "snapshot-name")
private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)

@Before
fun settings() {
whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST)))
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(MockTemplateScript.Factory("snapshot-name"))
}

fun `test snapshot response when block`() {
Expand All @@ -54,23 +60,23 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {

whenever(response.status()).doReturn(RestStatus.ACCEPTED)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}

whenever(response.status()).doReturn(RestStatus.OK)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}

whenever(response.status()).doReturn(RestStatus.INTERNAL_SERVER_ERROR)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -81,7 +87,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -93,7 +99,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = ConcurrentSnapshotExecutionException("repo", "other-snapshot", "concurrent snapshot in progress")
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -105,7 +111,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = RemoteTransportException("rte", ConcurrentSnapshotExecutionException("repo", "other-snapshot", "concurrent snapshot in progress"))
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -117,7 +123,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = RemoteTransportException("rte", IllegalArgumentException("some error"))
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand Down
Loading

0 comments on commit 3958175

Please sign in to comment.