Skip to content

Commit

Permalink
Refactoring of Gateway*** classes (#26706)
Browse files Browse the repository at this point in the history
- Removes mutual dependency between GatewayMetaState and TransportNodesListGatewayMetaState
- Deguices MetaDataIndexUpgradeService
- Deguices GatewayMetaState
- Makes Gateway the master-level component that is only responsible for coordinating the state recovery
  • Loading branch information
ywelsch committed Sep 20, 2017
1 parent 5db7682 commit 30bd026
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
*/
package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -35,7 +33,6 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.Plugin;

import java.util.AbstractMap;
import java.util.Collection;
Expand All @@ -59,7 +56,6 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
private final IndexScopedSettings indexScopedSettings;
private final UnaryOperator<IndexMetaData> upgraders;

@Inject
public MetaDataIndexUpgradeService(Settings settings, NamedXContentRegistry xContentRegistry, MapperRegistry mapperRegistry,
IndexScopedSettings indexScopedSettings,
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders) {
Expand Down
17 changes: 2 additions & 15 deletions core/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -39,27 +37,23 @@
import java.util.Arrays;
import java.util.Map;

public class Gateway extends AbstractComponent implements ClusterStateApplier {
public class Gateway extends AbstractComponent {

private final ClusterService clusterService;

private final GatewayMetaState metaState;

private final TransportNodesListGatewayMetaState listGatewayMetaState;

private final int minimumMasterNodes;
private final IndicesService indicesService;

public Gateway(Settings settings, ClusterService clusterService, GatewayMetaState metaState,
public Gateway(Settings settings, ClusterService clusterService,
TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.metaState = metaState;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
clusterService.addLowPriorityApplier(this);
}

public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
Expand Down Expand Up @@ -174,13 +168,6 @@ private void logInvalidSetting(String settingType, Map.Entry<String, String> e,
ex);
}

@Override
public void applyClusterState(final ClusterChangedEvent event) {
// order is important, first metaState, and then shardsState
// so dangling indices will be recorded
metaState.applyClusterState(event);
}

public interface GatewayStateRecoveredListener {
void onSuccess(ClusterState build);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.IndexFolderUpgrader;
Expand Down Expand Up @@ -69,15 +68,11 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA

private volatile Set<Index> previouslyWrittenIndices = emptySet();

@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
TransportNodesListGatewayMetaState nodesListGatewayMetaState,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
throws Exception {
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException {
super(settings);
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
nodesListGatewayMetaState.init(this);

if (DiscoveryNode.isDataNode(settings)) {
ensureNoPre019ShardState(nodeEnv);
Expand Down Expand Up @@ -210,7 +205,7 @@ protected static boolean isDataOnlyNode(ClusterState state) {
/**
* Throws an IAE if a pre 0.19 state is detected
*/
private void ensureNoPre019State() throws Exception {
private void ensureNoPre019State() throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) {
Expand Down Expand Up @@ -242,7 +237,7 @@ private void ensureNoPre019State() throws Exception {
*/
static MetaData upgradeMetaData(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws Exception {
MetaDataUpgrader metaDataUpgrader) throws IOException {
// upgrade index meta data
boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
Expand Down Expand Up @@ -288,7 +283,7 @@ private static <Data> boolean applyPluginUpgraders(ImmutableOpenMap<String, Data
}

// shard state BWC
private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws Exception {
private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (Files.exists(stateLocation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ protected void configure() {
bind(DanglingIndicesState.class).asEagerSingleton();
bind(GatewayService.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(GatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(LocalAllocateDangledIndices.class).asEagerSingleton();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public GatewayService(Settings settings, AllocationService allocationService, Cl
TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) {
super(settings);
this.gateway = new Gateway(settings, clusterService, metaState, listGatewayMetaState,
this.gateway = new Gateway(settings, clusterService, listGatewayMetaState,
indicesService);
this.allocationService = allocationService;
this.clusterService = clusterService;
Expand All @@ -121,6 +121,8 @@ public GatewayService(Settings settings, AllocationService allocationService, Cl
// TODO: change me once the minimum_master_nodes is changed too
recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
}

clusterService.addLowPriorityApplier(metaState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void writeIndex(String reason, IndexMetaData indexMetaData) throws IOExce
/**
* Writes the global state, *without* the indices states.
*/
void writeGlobalState(String reason, MetaData metaData) throws Exception {
void writeGlobalState(String reason, MetaData metaData) throws IOException {
logger.trace("[_global] writing state, reason [{}]", reason);
try {
MetaData.FORMAT.write(metaData, nodeEnv.nodeDataPaths());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,16 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra

public static final String ACTION_NAME = "internal:gateway/local/meta_state";

private GatewayMetaState metaState;
private final GatewayMetaState metaState;

@Inject
public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
GatewayMetaState metaState) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
}

TransportNodesListGatewayMetaState init(GatewayMetaState metaState) {
this.metaState = metaState;
return this;
}

public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -416,6 +417,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
Expand Down Expand Up @@ -475,9 +480,9 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testUpgradeStartClusterOn_0_20_6() throws Exception {
internalCluster().startNode(nodeSettings);
fail();
} catch (Exception ex) {
assertThat(ex.getMessage(), containsString(" was created before v2.0.0.beta1 and wasn't upgraded"));
assertThat(ex.getCause().getCause().getMessage(), containsString(" was created before v2.0.0.beta1 and wasn't upgraded"));
}
}
}

0 comments on commit 30bd026

Please sign in to comment.