Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unfollow action in ism to invoke stop replication for ccr #1198

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ src/test/resources/job-scheduler/
src/test/resources/bwc/
bin/
spi/bin/
src/test/resources/notifications*
src/test/resources/notifications*
src/test/resources/replication/
25 changes: 25 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ buildscript {
kotlin_version = System.getProperty("kotlin.version", "1.8.21")

security_plugin_version = System.getProperty("security.version", opensearch_build)
ccr_version = System.getProperty("ccr.version", opensearch_build)
ccr_build_download = 'http://localhost:8000/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have ccr zip published https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/opensearch-cross-cluster-replication/3.0.0.0-SNAPSHOT/

can you follow this part

// https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/
opensearchPlugin "org.opensearch.plugin:opensearch-job-scheduler:${job_scheduler_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
to pull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thank you, I have followed this and will push this in the next commit.

ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +  '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip'
...
dependencies {
    opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip" 
}
...
def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication")
testClusters.integTest {
plugin(provider(ccrFile))

However, for now, as my changes in ccr repo are still not merged, I cannot test the integration with the zip taken from repo, and am using the local zip itself.

ccr_resource_folder = "src/test/resources/replication"
}

repositories {
Expand Down Expand Up @@ -206,6 +209,7 @@ dependencies {
implementation "org.jetbrains:annotations:13.0"
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation "org.opensearch:common-utils:${common_utils_version}"
// implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation "commons-codec:commons-codec:${versions.commonscodec}"
implementation "org.apache.httpcomponents:httpclient:${versions.httpclient}"
Expand All @@ -215,6 +219,7 @@ dependencies {
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.mockito:mockito-inline:4.11.0"

add("ktlint", "com.pinterest.ktlint:ktlint-cli:1.1.0") {
attributes {
Expand Down Expand Up @@ -311,6 +316,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler")
def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core")
def notificationsFile = resolvePluginFile("notifications")
def securityPluginFile = resolvePluginFile("opensearch-security")
def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication")

ext.getPluginResource = { download_to_folder, download_from_src ->
def src_split = download_from_src.split("/")
Expand Down Expand Up @@ -391,6 +397,25 @@ testClusters.integTest {
if (securityEnabled) {
plugin(provider(securityPluginFile))
}
plugin(provider(ccrFile))
/*plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$ccr_resource_folder").exists()) {
project.delete(files("$project.rootDir/$ccr_resource_folder"))
}
project.mkdir ccr_resource_folder
ant.get(src: ccr_build_download,
dest: ccr_resource_folder,
httpusecaches: false)
return fileTree(ccr_resource_folder).getSingleFile()
}
}
}
}))*/
setting 'path.repo', repo.absolutePath
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionPa
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.UnfollowActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand Down Expand Up @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() {
ShrinkActionParser(),
SnapshotActionParser(),
TransformActionParser(),
UnfollowActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.indexmanagement.indexstatemanagement.step.unfollow.AttemptUnfollowStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

/**
* ISM action to stop replication on indices replicated on a follower cluster.
*/
class UnfollowAction(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to use stop relication, which is the name of the API we have https://opensearch.org/docs/latest/tuning-your-cluster/replication-plugin/api/#stop-replication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do you mean the name of the action in ism to be renamed? for ex. {"policy":{"actions":[{"stop_replication":{}}], .., "transitions":[]}}

Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?

However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?

yes

However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.

may be InternalTransportStopIndexReplicationAction 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be InternalTransportStopIndexReplicationAction 😅

Yes, this also aligns with the action name we've decided "indices:internal/plugins/replication/index/stop". Thank you so much 🙂 I'll make these changes in subsequent pushes..

index: Int,
) : Action(name, index) {
companion object {
const val name = "unfollow"
}

private val attemptUnfollowStep = AttemptUnfollowStep()

private val steps = listOf(attemptUnfollowStep)

override fun getStepToExecute(context: StepContext): Step {
return attemptUnfollowStep
}

override fun getSteps(): List<Step> = steps
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class UnfollowActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val index = sin.readInt()
return UnfollowAction(index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)

return UnfollowAction(index)
}

override fun getActionType(): String {
return UnfollowAction.name
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.unfollow

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.ReplicationPluginInterface
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
import org.opensearch.transport.RemoteTransportException
class AttemptUnfollowStep : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName)
/*val response: AcknowledgedResponse =
ReplicationPluginInterface.suspendUntil {
this.stopReplication(
context.client as NodeClient,
stopIndexReplicationRequestObj,
it,
)
}*/
Comment on lines +30 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍

I had added this to get some suggestions on writing mock-uts for AttemptUnfollowStep.kt as I have tried with two different approaches using a function performStopAction() and directly invoking ReplicationPluginInterface.suspendUntil, but to no success.

Do you have any suggestions on either partially mocking AttempUnfollowStep or mocking ReplicationPluginInterface.stopReplication in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering did you try mock the client behavior like this, I suppose it would be pretty similar since both return acknowledge response

private fun getIndicesAdminClient(
rolloverResponse: RolloverResponse?,
aliasResponse: AcknowledgedResponse?,
rolloverException: Exception?,
aliasException: Exception?,
): IndicesAdminClient {
assertTrue(
"Must provide one and only one response or exception",
(rolloverResponse != null).xor(rolloverException != null),
)
assertTrue(
"Must provide one and only one response or exception",
(aliasResponse != null).xor(aliasException != null),
)
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (rolloverResponse != null) {
listener.onResponse(rolloverResponse)
} else {
listener.onFailure(rolloverException)
}
}.whenever(this.mock).rolloverIndex(any(), any())
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (aliasResponse != null) {
listener.onResponse(aliasResponse)
} else {
listener.onFailure(aliasException)
}
}.whenever(this.mock).aliases(any(), any())
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,
Yes I did try this. But there is a slight difference in these two classes, which is the trouble.

AttemptRolloverStep executes the rollover like this

val response: RolloverResponse = context.client.admin().indices().suspendUntil { rolloverIndex(request, it) }

In AttemptRolloverStepTests, context.client has been mocked in a way that when the rolloverIndex() is invoked by the test, it would return the value as directed by the mocked function.

But in the AttemptUnfollowStep.kt class, the execute() method invokes it like this

val response: AcknowledgedResponse =
                ReplicationPluginInterface.suspendUntil {
                    this.stopReplication(
                        context.client as NodeClient,
                        stopIndexReplicationRequestObj,
                        it,
                    )
                }

Here, I can mock the context.client and other params, but the test needs to mock ReplicationPluginInterface and ReplicationPluginInterface.suspendUntil to ultimately mock the response of stopReplication() function. If that does not happen, it would run the actual implementation and not the mocked one - which does not work in the UT.
I'm unable to mock ReplicationPluginInterface as it is static. Tried another approach using mock spy (to mock AttemptUnfollowStep.performStopAction() partially), and that hasn't worked either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bowenlan-amzn , Any thoughts here?

val response = performStopAction(context.client as NodeClient, stopIndexReplicationRequestObj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the CCR doc and see if we stop relication, we won't be able to resume anymore.
In ISM, probably the normal workflow is after the leader index rollover, we can then stop the replication

I'm wondering how do we know that so to prevent early stopping the replication.

not requiring this since probably we can just mention this caveat in the documentation, and give a long waiting time before stop relication in the follower cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, replication cannot be resumed on an index after stopping (or pausing more than 12h).

Hmm, the major use-case we had identified in ISM+CCR case was - say ISM is setup in both leader and follower clusters, for deletion and other housekeeping operations in the respective clusters.
In the follower cluster, even if the user has setup a policy and intends to delete some-pattern* indices, it would not be allowed as they would be still read-only due to ongoing replication, which needs to be stopped first.

So in such cases, users could chain the actions to be preceeded by stop-replication first (before any other write actions).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.

I'm just a little worried about one part: the follower index cannot know whether leader index finishes indexing, so it may stop relication early unexpectedly

but this can be waited for community feedback whether it's needed

if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotInProgressException) {
handleSnapshotException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}
return this
}

internal suspend fun performStopAction(client: NodeClient, request: StopIndexReplicationRequest): AcknowledgedResponse {
val response: AcknowledgedResponse =
ReplicationPluginInterface.suspendUntil {
this.stopReplication(
client,
request,
it,
)
}
return response
}
private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent() = true

companion object {
const val name = "attempt_unfollow"

fun getFailedMessage(index: String) = "Failed to unfollow index [index=$index]"

fun getSuccessMessage(index: String) = "Successfully unfollowed index [index=$index]"

fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying unfollowing [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ActionValidation(
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
"unfollow" -> ValidateUnfollow(settings, clusterService, jvmService).execute(indexName)
// No validations for these actions at current stage.
// Reason: https://github.com/opensearch-project/index-management/issues/587
"notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.validation

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.indices.InvalidIndexNameException
import org.opensearch.monitor.jvm.JvmService
@OpenForTesting
class ValidateUnfollow(
settings: Settings,
clusterService: ClusterService,
jvmService: JvmService,
) : Validate(settings, clusterService, jvmService) {
private val logger = LogManager.getLogger(javaClass)

@Suppress("ReturnSuppressCount", "ReturnCount")
override fun execute(indexName: String): Validate {
// if these conditions are false, fail validation and do not execute unfollow action
if (!indexExists(indexName) || !validIndex(indexName)) {
validationStatus = ValidationStatus.FAILED
return this
}
validationMessage = getValidationPassedMessage(indexName)
return this
}

private fun indexExists(indexName: String): Boolean {
val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName)
if (!isIndexExists) {
val message = getNoIndexMessage(indexName)
logger.warn(message)
validationMessage = message
return false
}
return true
}

private fun validIndex(indexName: String): Boolean {
val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) }
// If the index name is invalid for any reason, this will throw an exception giving the reason why in the message.
// That will be displayed to the user as the cause.
try {
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator)
} catch (e: Exception) {
val message = getIndexNotValidMessage(indexName)
logger.warn(message)
validationMessage = message
return false
}
return true
}

@Suppress("TooManyFunctions")
companion object {
const val name = "validate_unfollow"

fun getNoIndexMessage(index: String) = "No such index [index=$index] for unfollow action."

fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort unfollow action on it."

fun getValidationPassedMessage(index: String) = "Unfollow action validation passed for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.InjectSecurity
import org.opensearch.commons.authuser.User
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.replication.ReplicationPluginInterface
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.support.DefaultShardOperationFailedException
import org.opensearch.core.common.bytes.BytesReference
Expand Down Expand Up @@ -259,6 +260,17 @@ suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPl
)
}

suspend fun <T> ReplicationPluginInterface.suspendUntil(block: ReplicationPluginInterface.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(
object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
},
)
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@
}
}
},
"unfollow": {
"type": "object"
},
"delete": {
"type": "object"
},
Expand Down
Loading
Loading