Skip to content

Commit

Permalink
Check for closed index and changes related to rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Jun 30, 2022
1 parent d5ac191 commit e5809d0
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, Remot
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
"restore_source[remote_store]"
);
assert indexMetadata.getIndex().equals(index);
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,6 @@ public ShardFieldData fieldData() {
return this.shardFieldData;
}

public RemoteStoreRefreshListener getRemoteStoreRefreshListener() {
return this.remoteStoreRefreshListener;
}

public boolean isSystem() {
return indexSettings.getIndexMetadata().isSystem();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDire
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
}

public Directory getRemoteDirectory() {
return this.remoteDirectory;
}

@Override
public void beforeRefresh() throws IOException {
// Do Nothing
Expand Down
23 changes: 13 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,32 +439,33 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
}

private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
indexShard.preRecovery();
indexShard.prepareForIndexRecovery();
if (indexShard.getRemoteStoreRefreshListener() == null) {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
"Remote store is not enabled for this index",
new IllegalArgumentException()
);
}
final Directory remoteDirectory = indexShard.getRemoteStoreRefreshListener().getRemoteDirectory();
indexShard.preRecovery();
indexShard.prepareForIndexRecovery();
final Directory remoteDirectory = remoteStore.directory();
final Store store = indexShard.store();
final Directory storeDirectory = store.directory();
store.incRef();
remoteStore.incRef();
try {
// Cleaning up local directory before copying file from remote directory.
// This is done to make sure we start with clean slate.
// ToDo: Check if we can copy only missing files
for (String file : storeDirectory.listAll()) {
storeDirectory.deleteFile(file);
}
} catch (IOException e) {
throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e);
}
try {
for (String file : remoteDirectory.listAll()) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
}
// This creates empty trans-log for now
// ToDo: Add code to restore remote trans-log
// ToDo: Add code to restore from remote trans-log
bootstrap(indexShard, store);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.recoveryState().getIndex().setFileDetailsComplete();
Expand All @@ -474,8 +475,10 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
indexShard.postRecovery("post recovery from remote_store");
} catch (IOException e) {
throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e);
} finally {
store.decRef();
remoteStore.decRef();
}
store.decRef();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ public ClusterState execute(ClusterState currentState) {
logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index);
continue;
}
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
throw new IllegalStateException(
"cannot restore index ["
+ index
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index"
);
}
if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE, false)) {
IndexId indexId = new IndexId(index, currentIndexMetadata.getIndexUUID());

Expand All @@ -239,7 +247,7 @@ public ClusterState execute(ClusterState currentState) {
indicesToBeRestored.add(index);
totalShards += currentIndexMetadata.getNumberOfShards();
} else {
logger.warn("Remote store is not enable for index: {}", index);
logger.warn("Remote store is not enabled for index: {}", index);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2688,20 +2688,29 @@ public void testRestoreShardFromRemoteStore() throws IOException {
storeDirectory.deleteFile(file);
}

Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.remoteStore().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) remoteDirectory).setCheckIndexOnClose(false);

for (String file : remoteDirectory.listAll()) {
if (file.equals("extra0")) {
remoteDirectory.deleteFile(file);
}
}

target.remoteStore().incRef();
target = reinitShard(target, routing);

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
assertTrue(future.actionGet());
target.remoteStore().decRef();

assertTrue(future.actionGet());
assertDocs(target, "1", "2");

((BaseDirectoryWrapper) target.getRemoteStoreRefreshListener().getRemoteDirectory()).setCheckIndexOnClose(false);
storeDirectory = ((FilterDirectory) ((FilterDirectory) target.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
target.getRemoteStoreRefreshListener().getRemoteDirectory().close();
closeShards(target);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ protected IndexShard newShard(
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer,
RemoteStoreRefreshListener remoteStoreRefreshListener,
Store remoteStore,
IndexingOperationListener... listeners
) throws IOException {
// add node id as name to settings for proper logging
Expand All @@ -410,7 +410,7 @@ protected IndexShard newShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
EMPTY_EVENT_LISTENER,
remoteStoreRefreshListener,
remoteStore,
listeners
);
}
Expand Down Expand Up @@ -438,7 +438,7 @@ protected IndexShard newShard(
Runnable globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
RemoteStoreRefreshListener remoteStoreRefreshListener,
Store remoteStore,
IndexingOperationListener... listeners
) throws IOException {
return newShard(
Expand All @@ -453,7 +453,7 @@ protected IndexShard newShard(
retentionLeaseSyncer,
indexEventListener,
SegmentReplicationCheckpointPublisher.EMPTY,
remoteStoreRefreshListener,
remoteStore,
listeners
);
}
Expand Down Expand Up @@ -482,7 +482,7 @@ protected IndexShard newShard(
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable RemoteStoreRefreshListener remoteStoreRefreshListener,
@Nullable Store remoteStore,
IndexingOperationListener... listeners
) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
Expand Down Expand Up @@ -510,12 +510,12 @@ protected IndexShard newShard(
Collections.emptyList(),
clusterSettings
);
if (remoteStoreRefreshListener == null && indexSettings.isRemoteStoreEnabled()) {
if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) {
ShardId shardId = shardPath.getShardId();
NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId);
Directory remoteDirectory = newFSDirectory(remoteShardPath.resolveIndex());
remoteStoreRefreshListener = new RemoteStoreRefreshListener(store.directory(), remoteDirectory);
storeProvider = is -> createStore(is, remoteShardPath);
remoteStore = storeProvider.apply(indexSettings);
}
indexShard = new IndexShard(
routing,
Expand All @@ -539,7 +539,7 @@ protected IndexShard newShard(
retentionLeaseSyncer,
breakerService,
checkpointPublisher,
remoteStoreRefreshListener
remoteStore
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;
Expand Down Expand Up @@ -581,7 +581,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
current.indexSettings.getIndexMetadata(),
current.engineFactory,
current.engineConfigFactory,
current.getRemoteStoreRefreshListener(),
current.remoteStore(),
listeners
);
}
Expand All @@ -600,7 +600,7 @@ protected IndexShard reinitShard(
IndexMetadata indexMetadata,
EngineFactory engineFactory,
EngineConfigFactory engineConfigFactory,
RemoteStoreRefreshListener remoteStoreRefreshListener,
Store remoteStore,
IndexingOperationListener... listeners
) throws IOException {
closeShards(current);
Expand All @@ -615,7 +615,7 @@ protected IndexShard reinitShard(
current.getGlobalCheckpointSyncer(),
current.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
remoteStoreRefreshListener,
remoteStore,
listeners
);
}
Expand Down

0 comments on commit e5809d0

Please sign in to comment.