Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Segment Replication - Change replicas to poll for new segments. #6334

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
Expand Down Expand Up @@ -700,7 +699,8 @@ public static final IndexShard newIndexShard(
RetentionLeaseSyncer.EMPTY,
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
(s) -> {},
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
Expand All @@ -43,22 +42,13 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

private void createIndex(int replicaCount) {
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.number_of_replicas", replicaCount)
.put("index.refresh_interval", -1)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get();
}

/**
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and documents are verified
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocation() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -135,7 +125,6 @@ public void testPrimaryRelocation() throws Exception {
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the
* replicas.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -220,7 +209,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
* This test verifies primary recovery behavior with continuous ingestion
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -297,7 +285,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
* operations during handoff. The test verifies all docs ingested are searchable on new primary.
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -396,7 +383,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
}, 2, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
Expand All @@ -406,13 +393,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testNewlyAddedReplicaIsUpdated() throws Exception {
final String primary = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
Expand All @@ -430,10 +414,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replica
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);

ClusterHealthResponse clusterHealthResponse = client().admin()
Expand All @@ -454,18 +435,15 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)

).get();

Expand Down Expand Up @@ -505,10 +483,7 @@ public void testAddNewReplicaFailure() throws Exception {
ensureGreen(INDEX_NAME);
// Add Replica shard to the new empty replica node
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica);
waitForRecovery.await();
Expand Down
49 changes: 44 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -160,6 +159,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncFetchSegmentsTask fetchSegmentTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -279,6 +279,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.fetchSegmentTask = new AsyncFetchSegmentsTask(this);
this.translogFactorySupplier = translogFactorySupplier;
updateFsyncTaskIfNecessary();
}
Expand Down Expand Up @@ -385,7 +386,8 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
fsyncTask,
trimTranslogTask,
globalCheckpointTask,
retentionLeaseSyncTask
retentionLeaseSyncTask,
fetchSegmentTask
);
}
}
Expand Down Expand Up @@ -438,7 +440,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final Consumer<IndexShard> onCheckpointUpdate,
final Consumer<IndexShard> segmentSyncer
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -553,8 +556,9 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
this.indexSettings.isSegRepEnabled() ? onCheckpointUpdate : null,
remoteStore,
segmentSyncer
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -979,6 +983,14 @@ private void maybeRefreshEngine(boolean force) {
}
}

private void fetchSegments() {
for (IndexShard shard : this.shards.values()) {
if (shard.routingEntry().primary() == false) {
shard.fetchSegments();
}
}
}

private void maybeTrimTranslog() {
for (IndexShard shard : this.shards.values()) {
switch (shard.state()) {
Expand Down Expand Up @@ -1118,6 +1130,33 @@ public String toString() {
}
}

final class AsyncFetchSegmentsTask extends BaseAsyncTask {

AsyncFetchSegmentsTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings().getRefreshInterval());
}

@Override
protected void runInternal() {
indexService.fetchSegments();
}

@Override
protected boolean mustReschedule() {
return indexService.getIndexSettings().isSegRepEnabled();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.REFRESH;
}

@Override
public String toString() {
return "refresh";
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;
import java.util.function.Consumer;

/**
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
Expand All @@ -26,11 +26,11 @@ public class CheckpointRefreshListener implements ReferenceManager.RefreshListen
protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

private final IndexShard shard;
private final SegmentReplicationCheckpointPublisher publisher;
private final Consumer<IndexShard> checkpointUpdateConsumer;

public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
public CheckpointRefreshListener(IndexShard shard, Consumer<IndexShard> checkpointUpdateConsumer) {
this.shard = shard;
this.publisher = publisher;
this.checkpointUpdateConsumer = checkpointUpdateConsumer;
}

@Override
Expand All @@ -40,8 +40,8 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
if (shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
checkpointUpdateConsumer.accept(shard);
}
}
}
Loading