Skip to content

Commit

Permalink
skipping execution based on cluster service
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jul 31, 2024
1 parent f02bb01 commit 431c4b8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
val skipFlag = SkipExecution(clusterService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner =
RollupRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.core.action.ActionListener
import org.opensearch.Version
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
private val clusterService: ClusterService,
) {
private val logger = LogManager.getLogger(javaClass)

Expand All @@ -32,52 +28,26 @@ class SkipExecution(
private set

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
client.execute(
NodesInfoAction.INSTANCE, request,
object : ActionListener<NodesInfoResponse> {
override fun onResponse(response: NodesInfoResponse) {
val versionSet = mutableSetOf<String>()
val legacyVersionSet = mutableSetOf<String>()

response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos }
.forEach {
it.forEach { nodePlugin ->
if (nodePlugin.name == "opensearch-index-management" ||
nodePlugin.name == "opensearch_index_management"
) {
versionSet.add(nodePlugin.version)
}

if (nodePlugin.name == "opendistro-index-management" ||
nodePlugin.name == "opendistro_index_management"
) {
legacyVersionSet.add(nodePlugin.version)
}
}
}

if ((versionSet.size + legacyVersionSet.size) > 1) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]")
} else {
flag = false
}

if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster")
} else {
hasLegacyPlugin = false
}
}

override fun onFailure(e: Exception) {
logger.error("Failed sweeping nodes for ISM plugin versions: $e")
flag = false
}
},
)
try {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val currentMinVersion = clusterService.state().nodes.minNodeVersion
val currentMaxVersion = clusterService.state().nodes.maxNodeVersion

if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]")
} else {
flag = false
}

if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster")
} else {
hasLegacyPlugin = false
}
} catch (e: Exception) {
logger.error("Unable to fetch node versions from cluster service", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,88 @@

package org.opensearch.indexmanagement.indexstatemanagement.coordinator

import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import org.junit.Before
import org.mockito.Mockito
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.common.transport.TransportAddress
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.test.OpenSearchTestCase

class SkipExecutionTests : OpenSearchAllocationTestCase() {
private lateinit var client: Client
class SkipExecutionTests : OpenSearchTestCase() {
private var clusterService: ClusterService = mock()
private lateinit var clusterState: ClusterState
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
skip = SkipExecution(client)
skip = SkipExecution(clusterService)
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() {
val version = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion()

assertFalse(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() {
val version1 = Version.CURRENT
val version2 = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion()

assertTrue(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)

val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion()

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}
}

0 comments on commit 431c4b8

Please sign in to comment.