Skip to content

Commit

Permalink
Adding CheckpointRefreshListener to trigger when Segment replication …
Browse files Browse the repository at this point in the history
…is turned on and Primary shard refreshes (#3108)

* Intial PR adding classes and tests related to checkpoint publishing

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Putting a Draft PR with all changes in classes. Testing is still not included in this commit.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Wiring up index shard to new engine, spotless apply and removing unnecessary tests and logs

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding Unit test for checkpointRefreshListener

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fixing import statements *

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* removing unused constructor in index shard

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Addressing comments from last commit

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding package-info.java files for two new packages

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding test for null checkpoint publisher and addreesing PR comments

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add docs for indexshardtests and remove shard.refresh

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 authored May 24, 2022
1 parent eb847ae commit fd5a38d
Show file tree
Hide file tree
Showing 19 changed files with 814 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
Expand Down Expand Up @@ -673,7 +674,8 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
);
}

Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -428,7 +429,8 @@ private long getAvgShardSizeInBytes() throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -530,7 +532,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;

/**
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
* This class is only used with Segment Replication enabled.
*
* @opensearch.internal
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {

protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

private final IndexShard shard;
private final SegmentReplicationCheckpointPublisher publisher;

public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
this.shard = shard;
this.publisher = publisher;
}

@Override
public void beforeRefresh() throws IOException {
// Do nothing
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
publisher.publish(shard);
}
}
}
36 changes: 34 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -299,6 +302,7 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -320,7 +324,8 @@ public IndexShard(
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -403,6 +408,11 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (checkpointPublisher != null) {
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
} else {
this.checkpointRefreshListener = null;
}
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -1363,6 +1373,21 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

/**
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
Expand Down Expand Up @@ -3106,6 +3131,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (this.checkpointRefreshListener != null) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}

return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3122,7 +3154,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.mapper.BinaryFieldMapper;
import org.opensearch.index.mapper.BooleanFieldMapper;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.plugins.MapperPlugin;
Expand Down Expand Up @@ -278,6 +280,9 @@ protected void configure() {
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -839,6 +840,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
Expand All @@ -853,7 +855,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -138,6 +139,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

@Inject
public IndicesClusterStateService(
final Settings settings,
Expand All @@ -153,13 +156,15 @@ public IndicesClusterStateService(
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) {
this(
settings,
indicesService,
clusterService,
threadPool,
checkpointPublisher,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -179,6 +184,7 @@ public IndicesClusterStateService(
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -191,6 +197,7 @@ public IndicesClusterStateService(
final RetentionLeaseSyncer retentionLeaseSyncer
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
Expand Down Expand Up @@ -624,6 +631,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
indicesService.createShard(
shardRouting,
checkpointPublisher,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm, this),
repositoriesService,
Expand Down Expand Up @@ -981,6 +989,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
*/
T createShard(
ShardRouting shardRouting,
SegmentReplicationCheckpointPublisher checkpointPublisher,
PeerRecoveryTargetService recoveryTargetService,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Expand Down
Loading

0 comments on commit fd5a38d

Please sign in to comment.