diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java new file mode 100644 index 0000000000000..12432c740a701 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.xcontent.XContentType; + +public final class CcrRequests { + + private CcrRequests() {} + + public static ClusterStateRequest metaDataRequest(String leaderIndex) { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + return clusterStateRequest; + } + + public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) { + PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + return putMappingRequest; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index bd22b85684ca4..0fed083bba9ac 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro Index leaderIndex = params.getLeaderShardId().getIndex(); Index followIndex = params.getFollowShardId().getIndex(); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro indexMetaData.getMappings().size() + "]"; MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData); followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum final Index leaderIndex = params.getLeaderShardId().getIndex(); final Index followIndex = params.getFollowShardId().getIndex(); - final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); CheckedConsumer onResponse = clusterStateResponse -> { final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index aeaa7fc5eaf57..e648264a4ad55 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,10 +8,13 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -21,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; @@ -37,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; @@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data - .get(); - return response.getState().metaData(); + // We set a single dummy index name to avoid fetching all the index data + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + return clusterState.getState().metaData(); } @Override @@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind String leaderIndex = index.getName(); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices(leaderIndex) - .get(); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); // Validates whether the leader cluster has been configured properly: PlainActionFuture future = PlainActionFuture.newFuture(); - IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex); + IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex); ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); String[] leaderHistoryUUIDs = future.actionGet(); @@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); + Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); + ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); @@ -261,6 +256,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String nodeId = response.getNodeId(); // TODO: Implement file restore closeSession(remoteClient, nodeId, sessionUUID); + maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @Override @@ -268,6 +264,20 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); + long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); + + if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { + Index followerIndex = followerIndexSettings.getIndex(); + MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); + localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + } + } + private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index f711dd4303f2a..2d3ca857ff848 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -15,12 +16,14 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; @@ -35,6 +38,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -42,6 +46,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; // TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work // TODO: is completed. @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { assertEquals(0, restoreInfo.failedShards()); } + public void testFollowerMappingIsUpdated() throws IOException { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + // TODO: Eventually when the file recovery work is complete, we should test updated mappings by + // indexing to the leader while the recovery is happening. However, into order to that test mappings + // are updated prior to that work, we index documents in the clear session callback. This will + // ensure a mapping change prior to the final mapping check on the follower side. + for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { + restoreSourceService.addCloseSessionListener(s -> { + final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); + leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); + }); + } + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(followerIndex); + ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex); + assertEquals(2, followerIndexMetadata.getMappingVersion()); + + MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() + .get("index2").get("doc"); + assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() {