Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Sep 9, 2022
1 parent 9eb820c commit eb33e08
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2360,10 +2360,12 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo,
/**
* Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication.
* Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases.
* Depending on how translog durability is configured, this method might/might not return the snapshot. For e.g,
* If the translog has been configured with no-durability, it'll return UnsupportedOperationException.
* This method should only be invoked if Segment Replication or Remote Store is not enabled.
*/
public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException {
if (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) {
throw new AssertionError("unsupported operation for segment replication enabled indices or remote store backed indices");
}
return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
closeShards(replica);
}

public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString())
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
Expand Down

0 comments on commit eb33e08

Please sign in to comment.