Skip to content

Commit

Permalink
Merge branch 'main' into lib-toml
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
  • Loading branch information
dblock authored Oct 28, 2024
2 parents e07253e + 9f7d3b6 commit 51e3d5b
Show file tree
Hide file tree
Showing 28 changed files with 684 additions and 222 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))

### Dependencies
Expand All @@ -40,7 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863))
- Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862))
- Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.2 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945), [#16406](https://github.com/opensearch-project/OpenSearch/pull/16406))
- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.10 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307))
- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.12 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307), [#16503](https://github.com/opensearch-project/OpenSearch/pull/16503))
- Update protobuf from 3.25.4 to 3.25.5 ([#16011](https://github.com/opensearch-project/OpenSearch/pull/16011))
- Bump `org.roaringbitmap:RoaringBitmap` from 1.2.1 to 1.3.0 ([#16040](https://github.com/opensearch-project/OpenSearch/pull/16040))
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.40 to 9.41.1 ([#16038](https://github.com/opensearch-project/OpenSearch/pull/16038))
Expand Down Expand Up @@ -100,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422))
- Fix flaky test in `testApproximateRangeWithSizeOverDefault` by adjusting totalHits assertion logic ([#16434](https://github.com/opensearch-project/OpenSearch/pull/16434#pullrequestreview-2386999409))
- Revert changes to upload remote state manifest using minimum codec version([#16403](https://github.com/opensearch-project/OpenSearch/pull/16403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ setup:
---
Test retrieval of number_routing_shards settings:
- skip:
version: " - 2.99.99"
reason: "introduced in 3.0.0" # TODO: change it to 2.18.0 after backport to 2.x branch
version: " - 2.18.99"
reason: "introduced in 2.19.0"
- do:
indices.get_settings:
flat_settings: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand All @@ -48,29 +49,39 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.index.Index;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexService;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.BlockClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}

public void testDisassociateNodesWhileShardInit() throws InterruptedException {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder()
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
String node2 = internalCluster().startDataOnlyNode(
Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()
);

final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
blockShardStartedResponse(clusterManagerName, clusterService);

final String index = "index";

// create index with 3 primary and 1 replica each
prepareCreate(index).setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
// .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
).get();
ensureGreen(index);

// close to have some unassigned started shards shards..
client().admin().indices().prepareClose(index).get();

// block so that replicas are always in init and not started
blockReplicaStart.set(true);
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
// open index
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();

builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
ClusterState state = allocationService.reroute(updatedState, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {
logger.error(e.getMessage(), e);
}
});

ensureYellow(index);
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3;

}));

logger.info("Initializing shards");
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));

// trigger 2nd reroute after shard in initialized
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return allocationService.reroute(currentState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureYellow(index);
assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3));
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// remove the primary node of replica shard which is in init
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
currentState = builder.build();
logger.info("removed the node {}", primaryShard.currentNodeId());
logger.info("shard {}", next);
ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {}
});
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
logger.info("current state {} ", state);
return clusterService.state().nodes().getSize() == 3;

}));

logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
blockReplicaStart.set(false);

clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();

return allocationService.reroute(updatedState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureGreen(index);
}

AtomicBoolean blockReplicaStart = new AtomicBoolean(false);

private void blockShardStartedResponse(String master, ClusterService service) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {

if (blockReplicaStart.get()) {
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
String stringRep = req.toString();
logger.info("ShardStateAction.StartedShardEntry {}", stringRep);

String incomingRequest = req.toString();
Optional<ShardRouting> matchReplica = service.state()
.routingTable()
.allShardsSatisfyingPredicate(r -> !r.primary())
.getShardRoutings()
.stream()
.filter(r -> r.allocationId() != null)
.filter(r -> incomingRequest.contains(r.allocationId().getId()))
.findAny();

if (matchReplica.isPresent()) {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} else {
handler.messageReceived(request, channel, task);
}
} else {
handler.messageReceived(request, channel, task);
}
});
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.remotemigration.MigrationBaseTestCase;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() {
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoTypeAttributeKey, FsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public Settings.Builder remoteWithRoutingTableNodeSetting() {
// Remote Cluster with Routing table

return Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_NAME,
segmentRepoPath,
false
ReloadableFsRepository.TYPE
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,10 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
sourceRemoteTranslogRepository = in.readOptionalString();
}
// TODO: change to V_2_18_0 once this is backported into that version
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
renameAliasPattern = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
renameAliasReplacement = in.readOptionalString();
}
}
Expand Down Expand Up @@ -200,11 +199,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
}
// TODO: change to V_2_18_0 once this is backported into that version
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(renameAliasPattern);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(renameAliasReplacement);
}
}
Expand Down
Loading

0 comments on commit 51e3d5b

Please sign in to comment.