From 431c4b83f5f992ed2d9329a2cc6f711f1730b7b8 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 31 Jul 2024 10:29:48 +0530 Subject: [PATCH] skipping execution based on cluster service Signed-off-by: Sarthak Aggarwal --- .../indexmanagement/IndexManagementPlugin.kt | 2 +- .../indexstatemanagement/SkipExecution.kt | 78 +++++------------ .../coordinator/SkipExecutionTests.kt | 87 ++++++++++++++++--- 3 files changed, 98 insertions(+), 69 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 7660d48e1..4c8f855d6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -411,7 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver) this.indexNameExpressionResolver = indexNameExpressionResolver - val skipFlag = SkipExecution(client) + val skipFlag = SkipExecution(clusterService) RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService) val rollupRunner = RollupRunner diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt index c74f27c89..962696b2a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt @@ -6,19 +6,15 @@ package org.opensearch.indexmanagement.indexstatemanagement import org.apache.logging.log4j.LogManager -import org.opensearch.action.admin.cluster.node.info.NodesInfoAction -import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest -import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse -import org.opensearch.action.admin.cluster.node.info.PluginsAndModules -import org.opensearch.client.Client -import org.opensearch.core.action.ActionListener +import org.opensearch.Version +import org.opensearch.cluster.service.ClusterService import org.opensearch.indexmanagement.util.OpenForTesting // TODO this can be moved to job scheduler, so that all extended plugin // can avoid running jobs in an upgrading cluster @OpenForTesting class SkipExecution( - private val client: Client, + private val clusterService: ClusterService, ) { private val logger = LogManager.getLogger(javaClass) @@ -32,52 +28,26 @@ class SkipExecution( private set fun sweepISMPluginVersion() { - // if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true - val request = NodesInfoRequest().clear().addMetric("plugins") - client.execute( - NodesInfoAction.INSTANCE, request, - object : ActionListener { - override fun onResponse(response: NodesInfoResponse) { - val versionSet = mutableSetOf() - val legacyVersionSet = mutableSetOf() - - response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos } - .forEach { - it.forEach { nodePlugin -> - if (nodePlugin.name == "opensearch-index-management" || - nodePlugin.name == "opensearch_index_management" - ) { - versionSet.add(nodePlugin.version) - } - - if (nodePlugin.name == "opendistro-index-management" || - nodePlugin.name == "opendistro_index_management" - ) { - legacyVersionSet.add(nodePlugin.version) - } - } - } - - if ((versionSet.size + legacyVersionSet.size) > 1) { - flag = true - logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]") - } else { - flag = false - } - - if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) { - hasLegacyPlugin = true - logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster") - } else { - hasLegacyPlugin = false - } - } - - override fun onFailure(e: Exception) { - logger.error("Failed sweeping nodes for ISM plugin versions: $e") - flag = false - } - }, - ) + try { + // if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true + val currentMinVersion = clusterService.state().nodes.minNodeVersion + val currentMaxVersion = clusterService.state().nodes.maxNodeVersion + + if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) { + flag = true + logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]") + } else { + flag = false + } + + if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) { + hasLegacyPlugin = true + logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster") + } else { + hasLegacyPlugin = false + } + } catch (e: Exception) { + logger.error("Unable to fetch node versions from cluster service", e) + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt index 2421ff5f1..ecfeefa80 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt @@ -5,29 +5,88 @@ package org.opensearch.indexmanagement.indexstatemanagement.coordinator +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.junit.Before -import org.mockito.Mockito -import org.opensearch.action.admin.cluster.node.info.NodesInfoAction -import org.opensearch.client.Client -import org.opensearch.cluster.ClusterChangedEvent -import org.opensearch.cluster.OpenSearchAllocationTestCase +import org.opensearch.Version +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.node.DiscoveryNode +import org.opensearch.cluster.node.DiscoveryNodes +import org.opensearch.cluster.service.ClusterService +import org.opensearch.core.common.transport.TransportAddress import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution +import org.opensearch.test.OpenSearchTestCase -class SkipExecutionTests : OpenSearchAllocationTestCase() { - private lateinit var client: Client +class SkipExecutionTests : OpenSearchTestCase() { + private var clusterService: ClusterService = mock() + private lateinit var clusterState: ClusterState private lateinit var skip: SkipExecution @Before - @Throws(Exception::class) fun setup() { - client = Mockito.mock(Client::class.java) - skip = SkipExecution(client) + skip = SkipExecution(clusterService) } - fun `test cluster change event`() { - val event = Mockito.mock(ClusterChangedEvent::class.java) - Mockito.`when`(event.nodesChanged()).thenReturn(true) + fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() { + val version = Version.CURRENT + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion() + + assertFalse(skip.flag) + assertFalse(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() { + val version1 = Version.CURRENT + val version2 = Version.V_2_0_0 + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion() + + assertTrue(skip.flag) + assertFalse(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() { + val minVersion = Version.fromString("7.10.0") + val maxVersion = Version.CURRENT + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + skip.sweepISMPluginVersion() - Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any()) + + assertTrue(skip.flag) + assertTrue(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() { + val minVersion = Version.fromString("7.10.0") + val maxVersion = Version.V_2_0_0 + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion) + + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion() + + assertTrue(skip.flag) + assertTrue(skip.hasLegacyPlugin) } }