Skip to content

Commit

Permalink
Add data tiers (hot, warm, cold, frozen) as custom node roles (#60994)
Browse files Browse the repository at this point in the history
This commit adds the `data_hot`, `data_warm`, `data_cold`, and `data_frozen` node roles to the
x-pack plugin. These roles are intended to be the base for the formalization of data tiers in
Elasticsearch.

These roles all act as data nodes (meaning shards can be allocated to them). Nodes with the existing
`data` role acts as though they have all of the roles configured (it is a hot, warm, cold, and
frozen node).

This also includes a custom `AllocationDecider` that allows the user to configure the following
settings on a cluster level:
- `cluster.routing.allocation.require._tier`
- `cluster.routing.allocation.include._tier`
- `cluster.routing.allocation.exclude._tier`

And in index settings:
- `index.routing.allocation.require._tier`
- `index.routing.allocation.include._tier`
- `index.routing.allocation.exclude._tier`

Relates to #60848
  • Loading branch information
dakrone authored Aug 12, 2020
1 parent 4eb09cb commit 58a928b
Show file tree
Hide file tree
Showing 10 changed files with 893 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -72,8 +73,13 @@ public static boolean isMasterNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE);
}

/**
* Due to the way that plugins may not be available when settings are being initialized,
* not all roles may be available from a static/initializing context such as a {@link Setting}
* default value function. In that case, be warned that this may not include all plugin roles.
*/
public static boolean isDataNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.DATA_ROLE);
return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData);
}

public static boolean isIngestNode(final Settings settings) {
Expand Down Expand Up @@ -328,7 +334,7 @@ public Map<String, String> getAttributes() {
* Should this node hold data (shards) or not.
*/
public boolean isDataNode() {
return roles.contains(DiscoveryNodeRole.DATA_ROLE);
return roles.stream().anyMatch(DiscoveryNodeRole::canContainData);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ private DiscoveryNodeRole(final boolean isKnownRole, final String roleName, fina

public abstract Setting<Boolean> legacySetting();

/**
* Indicates whether a node with the given role can contain data. Defaults to false and can be overridden
*/
public boolean canContainData() {
return false;
}

@Override
public final boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -114,6 +121,10 @@ public Setting<Boolean> legacySetting() {
return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope);
}

@Override
public boolean canContainData() {
return true;
}
};

/**
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -332,8 +333,11 @@ protected Node(final Environment initialEnvironment,
this.environment = new Environment(settings, initialEnvironment.configFile());
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info("node name [{}], node ID [{}], cluster name [{}]",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
DiscoveryNode.getRolesFromSettings(settings).stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new)));
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,11 +724,11 @@ private static String getRoleSuffix(Settings settings) {
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) {
suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) {
if (DiscoveryNode.isDataNode(settings)) {
suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false
&& DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) {
&& DiscoveryNode.isDataNode(settings) == false) {
suffix = suffix + "c";
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.cluster.routing.allocation;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.DataTier;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the
* {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it
* is specific to the {@code _tier} setting for both the cluster and index level.
*/
public class DataTierAllocationDecider extends AllocationDecider {

public static final String NAME = "data_tier";

public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier";
public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier";
public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier";
public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier";
public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier";
public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier";

public static final Setting<String> CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);

private static void validateTierSetting(String setting) {
if (Strings.hasText(setting)) {
Set<String> invalidTiers = Arrays.stream(setting.split(","))
.filter(tier -> DataTier.validTierName(tier) == false)
.collect(Collectors.toSet());
if (invalidTiers.size() > 0) {
throw new IllegalArgumentException("invalid tier names: " + invalidTiers);
}
}
}

private volatile String clusterRequire = null;
private volatile String clusterInclude = null;
private volatile String clusterExclude = null;

public DataTierAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(indexMetadata, node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(indexMd, node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Settings indexSettings = indexMd.getSettings();
String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings);
String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings);
String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings);

if (Strings.hasText(indexRequire)) {
if (allocationAllowed(OpType.AND, indexRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]",
INDEX_ROUTING_REQUIRE, indexRequire);
}
}
if (Strings.hasText(indexInclude)) {
if (allocationAllowed(OpType.OR, indexInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]",
INDEX_ROUTING_INCLUDE, indexInclude);
}
}
if (Strings.hasText(indexExclude)) {
if (allocationAllowed(OpType.OR, indexExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]",
INDEX_ROUTING_EXCLUDE, indexExclude);
}
}
return null;
}

private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
if (Strings.hasText(clusterRequire)) {
if (allocationAllowed(OpType.AND, clusterRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_REQUIRE, clusterRequire);
}
}
if (Strings.hasText(clusterInclude)) {
if (allocationAllowed(OpType.OR, clusterInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_INCLUDE, clusterInclude);
}
}
if (Strings.hasText(clusterExclude)) {
if (allocationAllowed(OpType.OR, clusterExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_EXCLUDE, clusterExclude);
}
}
return null;
}

private enum OpType {
AND,
OR
}

private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) {
String[] values = Strings.tokenizeToStringArray(tierSetting, ",");
for (String value : values) {
// generic "data" roles are considered to have all tiers
if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) ||
node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) {
if (opType == OpType.OR) {
return true;
}
} else {
if (opType == OpType.AND) {
return false;
}
}
}
if (opType == OpType.OR) {
return false;
} else {
return true;
}
}
}
Loading

0 comments on commit 58a928b

Please sign in to comment.