-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding unfollow action in ism to invoke stop replication for ccr #1198
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
Hi,
Looking forward for the review, Thanks! |
Hi Reviewers, Environment setup details 1. Setup Opensearch
(Above steps can be avoided if you already have opensearch setup)
For ex. bin/opensearch-plugin install file:///home/username/ccr/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip 2. Setup Configs
(This is using the default opensearch.yml file and just adding minimal required configs for CCR)
For steps 3-5, I've added the steps in a shell script for convenience - test-unfollow-script.txt. (Changed the extension to .txt as .sh files cannot be uploaded here) I hope that helps : ) |
Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
it, | ||
) | ||
}*/ | ||
val response = performStopAction(context.client as NodeClient, stopIndexReplicationRequestObj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the CCR doc and see if we stop relication, we won't be able to resume anymore.
In ISM, probably the normal workflow is after the leader index rollover, we can then stop the replication
I'm wondering how do we know that so to prevent early stopping the replication.
not requiring this since probably we can just mention this caveat in the documentation, and give a long waiting time before stop relication in the follower cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, replication cannot be resumed on an index after stopping (or pausing more than 12h).
Hmm, the major use-case we had identified in ISM+CCR case was - say ISM is setup in both leader and follower clusters, for deletion and other housekeeping operations in the respective clusters.
In the follower cluster, even if the user has setup a policy and intends to delete some-pattern* indices, it would not be allowed as they would be still read-only due to ongoing replication, which needs to be stopped first.
So in such cases, users could chain the actions to be preceeded by stop-replication first (before any other write actions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood.
I'm just a little worried about one part: the follower index cannot know whether leader index finishes indexing, so it may stop relication early unexpectedly
but this can be waited for community feedback whether it's needed
@@ -206,6 +209,7 @@ dependencies { | |||
implementation "org.jetbrains:annotations:13.0" | |||
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') | |||
implementation "org.opensearch:common-utils:${common_utils_version}" | |||
// implementation(files("libs/common-utils-3.0.0.0-SNAPSHOT.jar")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
/** | ||
* ISM action to stop replication on indices replicated on a follower cluster. | ||
*/ | ||
class UnfollowAction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to use stop relication, which is the name of the API we have https://opensearch.org/docs/latest/tuning-your-cluster/replication-plugin/api/#stop-replication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, do you mean the name of the action in ism to be renamed? for ex. {"policy":{"actions":[{"stop_replication":{}}], .., "transitions":[]}}
Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?
However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure 👍 With that, all the files could be renamed as StopReplicationAction.kt, AttemptStopReplicationStep.kt, ValidateStopReplication.kt etc. Shall I go ahead with these names?
yes
However, it would be tricky 🤔 to name the classes in ccr now - as the action-name indices:admin/plugins/replication/index/unfollow and TransportUnfollowIndexReplicationAction need to be renamed accordingly, to distinguish them from the existing stop REST API.
may be InternalTransportStopIndexReplicationAction
😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be
InternalTransportStopIndexReplicationAction
😅
Yes, this also aligns with the action name we've decided "indices:internal/plugins/replication/index/stop". Thank you so much 🙂 I'll make these changes in subsequent pushes..
/*val response: AcknowledgedResponse = | ||
ReplicationPluginInterface.suspendUntil { | ||
this.stopReplication( | ||
context.client as NodeClient, | ||
stopIndexReplicationRequestObj, | ||
it, | ||
) | ||
}*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure 👍
I had added this to get some suggestions on writing mock-uts for AttemptUnfollowStep.kt as I have tried with two different approaches using a function performStopAction() and directly invoking ReplicationPluginInterface.suspendUntil, but to no success.
Do you have any suggestions on either partially mocking AttempUnfollowStep or mocking ReplicationPluginInterface.stopReplication in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering did you try mock the client behavior like this, I suppose it would be pretty similar since both return acknowledge response
Lines 172 to 205 in 4d8ef69
private fun getIndicesAdminClient( | |
rolloverResponse: RolloverResponse?, | |
aliasResponse: AcknowledgedResponse?, | |
rolloverException: Exception?, | |
aliasException: Exception?, | |
): IndicesAdminClient { | |
assertTrue( | |
"Must provide one and only one response or exception", | |
(rolloverResponse != null).xor(rolloverException != null), | |
) | |
assertTrue( | |
"Must provide one and only one response or exception", | |
(aliasResponse != null).xor(aliasException != null), | |
) | |
return mock { | |
doAnswer { invocationOnMock -> | |
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1) | |
if (rolloverResponse != null) { | |
listener.onResponse(rolloverResponse) | |
} else { | |
listener.onFailure(rolloverException) | |
} | |
}.whenever(this.mock).rolloverIndex(any(), any()) | |
doAnswer { invocationOnMock -> | |
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1) | |
if (aliasResponse != null) { | |
listener.onResponse(aliasResponse) | |
} else { | |
listener.onFailure(aliasException) | |
} | |
}.whenever(this.mock).aliases(any(), any()) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey,
Yes I did try this. But there is a slight difference in these two classes, which is the trouble.
AttemptRolloverStep executes the rollover like this
val response: RolloverResponse = context.client.admin().indices().suspendUntil { rolloverIndex(request, it) }
In AttemptRolloverStepTests, context.client has been mocked in a way that when the rolloverIndex() is invoked by the test, it would return the value as directed by the mocked function.
But in the AttemptUnfollowStep.kt class, the execute() method invokes it like this
val response: AcknowledgedResponse =
ReplicationPluginInterface.suspendUntil {
this.stopReplication(
context.client as NodeClient,
stopIndexReplicationRequestObj,
it,
)
}
Here, I can mock the context.client and other params, but the test needs to mock ReplicationPluginInterface and ReplicationPluginInterface.suspendUntil to ultimately mock the response of stopReplication() function. If that does not happen, it would run the actual implementation and not the mocked one - which does not work in the UT.
I'm unable to mock ReplicationPluginInterface as it is static. Tried another approach using mock spy (to mock AttemptUnfollowStep.performStopAction() partially), and that hasn't worked either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bowenlan-amzn , Any thoughts here?
@@ -66,6 +66,9 @@ buildscript { | |||
kotlin_version = System.getProperty("kotlin.version", "1.8.21") | |||
|
|||
security_plugin_version = System.getProperty("security.version", opensearch_build) | |||
ccr_version = System.getProperty("ccr.version", opensearch_build) | |||
ccr_build_download = 'http://localhost:8000/opensearch-cross-cluster-replication-3.0.0.0-SNAPSHOT.zip' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we have ccr zip published https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/opensearch-cross-cluster-replication/3.0.0.0-SNAPSHOT/
can you follow this part
Lines 228 to 232 in dbd2bc2
// https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/ | |
opensearchPlugin "org.opensearch.plugin:opensearch-job-scheduler:${job_scheduler_version}@zip" | |
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip" | |
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip" | |
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes thank you, I have followed this and will push this in the next commit.
ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip'
...
dependencies {
opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip"
}
...
def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication")
testClusters.integTest {
plugin(provider(ccrFile))
However, for now, as my changes in ccr repo are still not merged, I cannot test the integration with the zip taken from repo, and am using the local zip itself.
Issue #, if available: #726
This is to add support for unfollow feature in ism.
It depends on two PRs already raised and under-review in common-utils project as well as CCR project. Detailed information about the proposed solution is explained there.
Description of changes:
CheckList:
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.