Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into pits2.x
  • Loading branch information
bharath-techie committed Sep 6, 2022
2 parents f808cb1 + 3d90d35 commit fb73af0
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Update to Netty 4.1.80.Final ([#4359](https://github.com/opensearch-project/OpenSearch/pull/4359))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))

### Deprecated

Expand Down Expand Up @@ -48,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ public Iterator<Setting<?>> settings() {
);

public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled";

public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -320,6 +323,50 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

/**
* Used to specify remote store repository to use for this index.
*/
public static final Setting<String> INDEX_REMOTE_STORE_REPOSITORY_SETTING = Setting.simpleString(
SETTING_REMOTE_STORE_REPOSITORY,
new Setting.Validator<>() {

@Override
public void validate(final String value) {}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(
"Setting " + INDEX_REMOTE_STORE_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID"
);
} else {
validateRemoteStoreSettingEnabled(settings, INDEX_REMOTE_STORE_REPOSITORY_SETTING);
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

private static void validateRemoteStoreSettingEnabled(final Map<Setting<?>, Object> settings, Setting<?> setting) {
final Boolean isRemoteSegmentStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_STORE_ENABLED_SETTING);
if (isRemoteSegmentStoreEnabled == false) {
throw new IllegalArgumentException(
"Settings "
+ setting.getKey()
+ " can ont be set/enabled when "
+ INDEX_REMOTE_STORE_ENABLED_SETTING.getKey()
+ " is set to true"
);
}
}

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -218,11 +219,11 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, Setting> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.REPLICATION_TYPE,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
Collections.singletonList(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING),
FeatureFlags.REMOTE_STORE,
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING
Arrays.asList(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ public SettingsModule(
registerSetting(setting);
}

for (Map.Entry<String, Setting> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
registerSetting(featureFlaggedSetting.getValue());
featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public synchronized IndexShard createShard(
Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
clusterService.state().metadata().clusterUUID(),
this.indexSettings.getRemoteStoreRepository(),
this.indexSettings,
path
);
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ public final class IndexSettings {
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final String remoteStoreRepository;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -719,7 +720,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);

remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -971,6 +972,13 @@ public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

/**
* Returns remote store repository configured for this index.
*/
public String getRemoteStoreRepository() {
return remoteStoreRepository;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
localSegmentChecksumMap = new HashMap<>();
if (indexShard.shardRouting.primary()) {
try {
this.remoteDirectory.init();
} catch (IOException e) {
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class SegmentReplicationTarget extends ReplicationTarget {
private final SegmentReplicationState state;
protected final MultiFileWriter multiFileWriter;

public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

public SegmentReplicationTarget(
ReplicationCheckpoint checkpoint,
IndexShard indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand All @@ -34,7 +35,6 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -54,7 +54,7 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final SegmentReplicationSourceFactory sourceFactory;

private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();
private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();

// Empty Implementation, only required while Segment Replication is under feature flag.
public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() {
Expand Down Expand Up @@ -151,14 +151,23 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
replicaShard.getLatestReplicationCheckpoint()
)
);
return;
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.trace(
"Cancelling ongoing replication from old primary with primary term {}",
ongoingReplicationTarget.getCheckpoint().getPrimaryTerm()
);
onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary");
} else {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
replicaShard.getLatestReplicationCheckpoint()
)
);
return;
}
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* This class holds a collection of all on going replication events on the current node (i.e., the node is the target node
Expand Down Expand Up @@ -236,13 +237,18 @@ public boolean cancelForShard(ShardId shardId, String reason) {
}

/**
* check if a shard is currently replicating
* Get target for shard
*
* @param shardId shardId for which to check if replicating
* @return true if shard is currently replicating
* @param shardId shardId
* @return ReplicationTarget for input shardId
*/
public boolean isShardReplicating(ShardId shardId) {
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId));
public T getOngoingReplicationTarget(ShardId shardId) {
final List<T> replicationTargetList = onGoingTargetEvents.values()
.stream()
.filter(t -> t.indexShard.shardId().equals(shardId))
.collect(Collectors.toList());
assert replicationTargetList.size() <= 1 : "More than one on-going replication targets";
return replicationTargetList.size() > 0 ? replicationTargetList.get(0) : null;
}

/**
Expand Down
71 changes: 68 additions & 3 deletions server/src/test/java/org/opensearch/index/IndexSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -59,7 +58,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.object.HasToString.hasToString;
import static org.opensearch.common.settings.IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS;

public class IndexSettingsTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -784,7 +782,7 @@ public void testRemoteStoreExplicitSetting() {

public void testUpdateRemoteStoreFails() {
Set<Setting<?>> remoteStoreSettingSet = new HashSet<>();
remoteStoreSettingSet.add(FEATURE_FLAGGED_INDEX_SETTINGS.get(FeatureFlags.REMOTE_STORE));
remoteStoreSettingSet.add(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING);
IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet);
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
Expand Down Expand Up @@ -818,4 +816,71 @@ public void testEnablingRemoteStoreFailsWhenReplicationTypeIsDefault() {
);
assertEquals("To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT", iae.getMessage());
}

public void testRemoteRepositoryDefaultSetting() {
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertNull(settings.getRemoteStoreRepository());
}

public void testRemoteRepositoryExplicitSetting() {
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "repo1")
.build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertEquals("repo1", settings.getRemoteStoreRepository());
}

public void testUpdateRemoteRepositoryFails() {
Set<Setting<?>> remoteStoreSettingSet = new HashSet<>();
remoteStoreSettingSet.add(IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING);
IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet);
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> settings.updateSettings(
Settings.builder().put("index.remote_store.repository", randomUnicodeOfLength(10)).build(),
Settings.builder(),
Settings.builder(),
"index"
)
);
assertEquals(error.getMessage(), "final index setting [index.remote_store.repository], not updateable");
}

public void testSetRemoteRepositoryFailsWhenRemoteStoreIsNotEnabled() {
Settings indexSettings = Settings.builder()
.put("index.replication.type", ReplicationType.SEGMENT)
.put("index.remote_store.enabled", false)
.put("index.remote_store.repository", "repo1")
.build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING.get(indexSettings)
);
assertEquals(
"Settings index.remote_store.repository can ont be set/enabled when index.remote_store.enabled is set to true",
iae.getMessage()
);
}

public void testSetRemoteRepositoryFailsWhenEmptyString() {
Settings indexSettings = Settings.builder()
.put("index.replication.type", ReplicationType.SEGMENT)
.put("index.remote_store.enabled", false)
.put("index.remote_store.repository", "")
.build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING.get(indexSettings)
);
assertEquals("Setting index.remote_store.repository should be provided with non-empty repository ID", iae.getMessage());
}
}
Loading

0 comments on commit fb73af0

Please sign in to comment.