-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update index mappings when ccr restore complete #36879
Changes from all commits
2d6a755
9cdf844
ad23578
e865fb2
7f23deb
636b042
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.ccr.action; | ||
|
||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
|
||
public final class CcrRequests { | ||
|
||
private CcrRequests() {} | ||
|
||
public static ClusterStateRequest metaDataRequest(String leaderIndex) { | ||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); | ||
clusterStateRequest.clear(); | ||
clusterStateRequest.metaData(true); | ||
clusterStateRequest.indices(leaderIndex); | ||
return clusterStateRequest; | ||
} | ||
|
||
public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) { | ||
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); | ||
putMappingRequest.type(mappingMetaData.type()); | ||
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); | ||
return putMappingRequest; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,10 +8,13 @@ | |
|
||
import org.apache.lucene.index.IndexCommit; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; | ||
import org.elasticsearch.action.support.PlainActionFuture; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
|
@@ -21,6 +24,7 @@ | |
import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.engine.EngineException; | ||
import org.elasticsearch.index.shard.IndexShard; | ||
import org.elasticsearch.index.shard.IndexShardRecoveryException; | ||
|
@@ -37,6 +41,7 @@ | |
import org.elasticsearch.snapshots.SnapshotState; | ||
import org.elasticsearch.xpack.ccr.Ccr; | ||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; | ||
import org.elasticsearch.xpack.ccr.action.CcrRequests; | ||
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; | ||
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; | ||
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; | ||
|
@@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { | |
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { | ||
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; | ||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
ClusterStateResponse response = remoteClient | ||
.admin() | ||
.cluster() | ||
.prepareState() | ||
.clear() | ||
.setMetaData(true) | ||
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data | ||
.get(); | ||
return response.getState().metaData(); | ||
// We set a single dummy index name to avoid fetching all the index data | ||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); | ||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); | ||
return clusterState.getState().metaData(); | ||
} | ||
|
||
@Override | ||
|
@@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind | |
String leaderIndex = index.getName(); | ||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
|
||
ClusterStateResponse response = remoteClient | ||
.admin() | ||
.cluster() | ||
.prepareState() | ||
.clear() | ||
.setMetaData(true) | ||
.setIndices(leaderIndex) | ||
.get(); | ||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); | ||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); | ||
|
||
// Validates whether the leader cluster has been configured properly: | ||
PlainActionFuture<String[]> future = PlainActionFuture.newFuture(); | ||
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex); | ||
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex); | ||
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); | ||
String[] leaderHistoryUUIDs = future.actionGet(); | ||
|
||
|
@@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v | |
|
||
Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); | ||
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); | ||
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); | ||
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); | ||
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); | ||
|
||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
String sessionUUID = UUIDs.randomBase64UUID(); | ||
|
@@ -261,13 +256,28 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v | |
String nodeId = response.getNodeId(); | ||
// TODO: Implement file restore | ||
closeSession(remoteClient, nodeId, sessionUUID); | ||
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); | ||
} | ||
|
||
@Override | ||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { | ||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); | ||
} | ||
|
||
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @martijnvg would it make sense to share this code with the one in ShardFollowTasksExecutor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Conceptually this code is doing the same as the update mapping code in shard follow task, but the update mapping code is tightly coupled with What I think is possible, is that factory methods for both |
||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); | ||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); | ||
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); | ||
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); | ||
|
||
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { | ||
Index followerIndex = followerIndexSettings.getIndex(); | ||
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); | ||
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); | ||
localClient.admin().indices().putMapping(putMappingRequest).actionGet(); | ||
} | ||
} | ||
|
||
private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { | ||
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, | ||
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍