Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data tiers (hot, warm, cold, frozen) as custom node roles #60994

Merged
merged 4 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -72,8 +73,13 @@ public static boolean isMasterNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE);
}

/**
* Due to the way that plugins may not be available when settings are being initialized,
* not all roles may be available from a static/initializing context such as a {@link Setting}
* default value function. In that case, be warned that this may not include all plugin roles.
*/
public static boolean isDataNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.DATA_ROLE);
return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData);
}

public static boolean isIngestNode(final Settings settings) {
Expand Down Expand Up @@ -328,7 +334,7 @@ public Map<String, String> getAttributes() {
* Should this node hold data (shards) or not.
*/
public boolean isDataNode() {
return roles.contains(DiscoveryNodeRole.DATA_ROLE);
return roles.stream().anyMatch(DiscoveryNodeRole::canContainData);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ private DiscoveryNodeRole(final boolean isKnownRole, final String roleName, fina

public abstract Setting<Boolean> legacySetting();

/**
* Indicates whether a node with the given role can contain data. Defaults to false and can be overridden
*/
public boolean canContainData() {
return false;
}

@Override
public final boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -114,6 +121,10 @@ public Setting<Boolean> legacySetting() {
return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope);
}

@Override
public boolean canContainData() {
return true;
}
};

/**
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -332,8 +333,11 @@ protected Node(final Environment initialEnvironment,
this.environment = new Environment(settings, initialEnvironment.configFile());
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info("node name [{}], node ID [{}], cluster name [{}]",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
DiscoveryNode.getRolesFromSettings(settings).stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new)));
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,11 +720,11 @@ private static String getRoleSuffix(Settings settings) {
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) {
suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) {
if (DiscoveryNode.isDataNode(settings)) {
suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false
&& DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) {
&& DiscoveryNode.isDataNode(settings) == false) {
suffix = suffix + "c";
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.cluster.routing.allocation;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.DataTier;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the
* {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it
* is specific to the {@code _tier} setting for both the cluster and index level.
*/
public class DataTierAllocationDecider extends AllocationDecider {

public static final String NAME = "data_tier";

public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier";
public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier";
public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier";
public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier";
public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier";
public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier";

public static final Setting<String> CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);

private static void validateTierSetting(String setting) {
if (Strings.hasText(setting)) {
Set<String> invalidTiers = Arrays.stream(setting.split(","))
.filter(tier -> DataTier.validTierName(tier) == false)
.collect(Collectors.toSet());
if (invalidTiers.size() > 0) {
throw new IllegalArgumentException("invalid tier names: " + invalidTiers);
}
}
}

private volatile String clusterRequire = null;
private volatile String clusterInclude = null;
private volatile String clusterExclude = null;

public DataTierAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(indexMetadata, node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}

decision = shouldIndexFilter(indexMd, node, allocation);
if (decision != null) {
return decision;
}

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}

private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Settings indexSettings = indexMd.getSettings();
String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings);
String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings);
String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings);

if (Strings.hasText(indexRequire)) {
if (allocationAllowed(OpType.AND, indexRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]",
INDEX_ROUTING_REQUIRE, indexRequire);
}
}
if (Strings.hasText(indexInclude)) {
if (allocationAllowed(OpType.OR, indexInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]",
INDEX_ROUTING_INCLUDE, indexInclude);
}
}
if (Strings.hasText(indexExclude)) {
if (allocationAllowed(OpType.OR, indexExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]",
INDEX_ROUTING_EXCLUDE, indexExclude);
}
}
return null;
}

private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
if (Strings.hasText(clusterRequire)) {
if (allocationAllowed(OpType.AND, clusterRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_REQUIRE, clusterRequire);
}
}
if (Strings.hasText(clusterInclude)) {
if (allocationAllowed(OpType.OR, clusterInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_INCLUDE, clusterInclude);
}
}
if (Strings.hasText(clusterExclude)) {
if (allocationAllowed(OpType.OR, clusterExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_EXCLUDE, clusterExclude);
}
}
return null;
}

private enum OpType {
AND,
OR
}

private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) {
String[] values = Strings.tokenizeToStringArray(tierSetting, ",");
for (String value : values) {
// generic "data" roles are considered to have all tiers
if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) ||
node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) {
if (opType == OpType.OR) {
return true;
}
} else {
if (opType == OpType.AND) {
return false;
}
}
}
if (opType == OpType.OR) {
return false;
} else {
return true;
}
}
}
Loading