Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CcrRestoreSourceService to track sessions #36578

Merged
merged 18 commits into from
Dec 18, 2018

Conversation

Tim-Brooks
Copy link
Contributor

This commit is related to #36127. It adds a CcrRestoreSourceService to
track Engine.IndexCommitRef need for in-process file restores. When a
follower starts restoring a shard through the CcrRepository it opens a
session with the leader through the PutCcrRestoreSessionAction. The
leader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.

Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionAction action.

@Tim-Brooks Tim-Brooks added >non-issue v7.0.0 :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features v6.6.0 labels Dec 13, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@Tim-Brooks
Copy link
Contributor Author

The CcrRestoreSourceService implements IndexEventListener, however it is not currently registered with the IndicesClusterStateService. It looks like there is not a way to register listeners with that. They are just created:

        this.buildInIndexListener =
                Arrays.asList(
                        peerRecoverySourceService,
                        recoveryTargetService,
                        searchService,
                        syncedFlushService,
                        snapshotShardsService);

You can register listeners in an ad-hoc manner with indexes. But there might be some catches with that (like it looks like it has to be registered when the IndexService is created). So I guess I'm saying, we might need to talk about how to get the CcrRestoreSourceService into the IndicesClusterStateService.

@ywelsch
Copy link
Contributor

ywelsch commented Dec 13, 2018

The CcrRestoreSourceService implements IndexEventListener, however it is not currently registered with the IndicesClusterStateService. It looks like there is not a way to register listeners with that

The Plugin class offers a hook to register IndexEventListeners on index creation, using the following method:

@Override
public void onIndexModule(IndexModule indexModule) {
  indexModule.addIndexEventListener(yourSingletonListenerInstance);
}

The MockIndexEventListener.TestPlugin class should provide a good example.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some initial thoughts.

public static class TransportDeleteCcrRestoreSessionAction
extends TransportSingleShardAction<DeleteCcrRestoreSessionRequest, DeleteCcrRestoreSessionResponse> {

private final IndicesService indicesService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps it's nicer to have CcrRestoreSourceService have a reference to IndicesService instead of having it here in the TransportAction class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to do this? CcrRestoreSourceService is created in createComponents. And we do not have IndicesService there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two other options to the current one:

  1. Pass IndicesService to createComponents.
  2. Create CcrRestoreSourceService using Guice, by overriding Collection<Module> createGuiceModules().

Neither sounds really great so let's keep the current model for now.

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have timeouts on these calls (similar as we do for peer recovery within a cluster)? Perhaps something to mark as a follow-up item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a todo. I will also add timeout tasks to the meta issue.

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can derive the correct remote shard id by using indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

Engine.IndexCommitRef commit;
if (onGoingRestores.containsKey(sessionUUID)) {
logger.debug("session [{}] already exists", sessionUUID);
commit = onGoingRestores.get(sessionUUID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be so lenient here, or rather reject opening a session which is already supposed to exist? It depends on how we want to handle failures / retries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would kind of prefer the put be idempotent. This is also why I have the session uuid is generated on the follower node. I'll explain more about my design in a top-level comment. Maybe we should add validation to prevent (unlikely) uuid conflicts (ensure that the put session request comes from the same follower node)?

logger.debug("session [{}] already exists", sessionUUID);
commit = onGoingRestores.get(sessionUUID);
} else {
commit = indexShard.acquireSafeIndexCommit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if anything goes wrong in this method later, should we release the index commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some changes to release. However, I think local timeouts for the index commit being held should also be a future meta task to also help here.

@Tim-Brooks
Copy link
Contributor Author

Thanks @ywelsch I've made changes. Here are my high-level design thoughts:

  1. A put session request opens a session using a uuid generated on the follower node. It includes the follower's store metadata. This is a transport shard request so it is on the node with the shard.
  2. When it is opened the leader node initiates a local timeout for the index commit ref. (To prevent it from leaking due to disruption of this process).
  3. The leader responds with the identical files (follower to keep) and recovery files (follower to fetch).
  4. The follower starts fetching files using a TransportNodesAction type of request.
  5. If the index commit times out on the leader side or the afterIndexShardClosed is called the leader will respond to the file chunk request with something like SESSION_NOT_FOUND.
  6. If the follower's local timeout (not yet implemented) has not yet expired, it can go back to step 1.
  7. Once the follower has recovered all of the files through file chunk requests it will ClearCcrRestoreSessionAction.

In this model the CcrRestoreSourceService is kind of like a cache for the Engine.IndexCommitRef. Obviously if a new Engine.IndexCommitRef is acquired (because the "cache" timed out on the leader) there might be new files the follower needs to recover or delete. But the leader sends this information in the response to the put session request. New put session requests can be made as long as the follower wants to. The process will continue until the follower locally times out. Or some other type of error is encountered.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left mostly smaller comments. Thanks for the high-level design description? Can you also outline how you want to handle (temporary) network disconnects?

return new ClearCcrRestoreSessionResponse();
}

public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportNodesAction is only truly useful if you intend on sending something to multiple nodes. I think it might be simpler here to directly use HandledTransportAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would like to implement a specific node request for the file chunks and delete session in a follow-up? I added a meta task.


public static class PutCcrRestoreSessionResponse extends ActionResponse {

private String nodeId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made final? I see that you both implemented a constructor with StreamInput and the readFrom method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Unfortunately you must implement this:

        @Override
        protected PutCcrRestoreSessionResponse newResponse() {
            return new PutCcrRestoreSessionResponse();
        }

on TransportSingleShardAction.


Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to get the leader index name from Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We don't need that because the index name provided by the args to restoreShard is correct. It is only the uuid that is not.

ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
if (response.hasFailures()) {
throw response.failures().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by not making this a BaseNodesResponse, we will not need this weird unwrapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would like to implement a specific node request for the file chunks and delete session in a follow-up? I added a meta task.

@Tim-Brooks
Copy link
Contributor Author

@ywelsch - I think I would like to implement the mechanism to direct a request to a specific node on the remote cluster in a follow-up. I added a task for that on the meta issue.

@Tim-Brooks Tim-Brooks requested a review from ywelsch December 18, 2018 01:27
}

private void removeSessionForShard(String sessionUUID, IndexShard indexShard) {
logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexShard does not have a toString implementation AFAICS

} else {
logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard);
if (indexShard.state() == IndexShardState.CLOSED) {
throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferably throw IndexShardClosedException

@Tim-Brooks Tim-Brooks merged commit 1fa1056 into elastic:master Dec 18, 2018
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Dec 18, 2018
* elastic/master: (31 commits)
  enable bwc tests and switch transport serialization version to 6.6.0 for CAS features
  [DOCs] Adds ml-cpp PRs to alpha release notes (elastic#36790)
  Synchronize WriteReplicaResult callbacks (elastic#36770)
  Add CcrRestoreSourceService to track sessions (elastic#36578)
  [Painless] Add tests for boxed return types (elastic#36747)
  Internal: Remove originalSettings from Node (elastic#36569)
  [ILM][DOCS] Update ILM API authorization docs (elastic#36749)
  Core: Deprecate use of scientific notation in epoch time parsing (elastic#36691)
  [ML] Merge the Jindex master feature branch (elastic#36702)
  Tests: Mute SnapshotDisruptionIT.testDisruptionOnSnapshotInitialization
  Update versions in SearchSortValues transport serialization
  Update version in SearchHits transport serialization
  [Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (elastic#36751)
  [Docs] Fix error in Common Grams Token Filter (elastic#36774)
  Fix rollup search statistics (elastic#36674)
  SQL: Fix wrong appliance of StackOverflow limit for IN (elastic#36724)
  [TEST] Added more logging
  Invalidate Token API enhancements - HLRC (elastic#36362)
  Deprecate types in index API (elastic#36575)
  Disable bwc tests until elastic#36555 backport is complete (elastic#36737)
  ...
Tim-Brooks added a commit to Tim-Brooks/elasticsearch that referenced this pull request Dec 20, 2018
This commit is related to elastic#36127. It adds a CcrRestoreSourceService to
track Engine.IndexCommitRef need for in-process file restores. When a
follower starts restoring a shard through the CcrRepository it opens a
session with the leader through the PutCcrRestoreSessionAction. The
leader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.

Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionAction action.
Tim-Brooks added a commit that referenced this pull request Dec 20, 2018
This commit is related to #36127. It adds a CcrRestoreSourceService to
track Engine.IndexCommitRef need for in-process file restores. When a
follower starts restoring a shard through the CcrRepository it opens a
session with the leader through the PutCcrRestoreSessionAction. The
leader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.

Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionAction action.
@Tim-Brooks Tim-Brooks deleted the add_ccr_restore_source_service branch December 18, 2019 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features >non-issue v6.7.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants