Skip to content

Commit

Permalink
Ignore waitForActiveShards when syncing leases (#39224)
Browse files Browse the repository at this point in the history
Adjust the retention lease sync actions so that they do not respect the
`index.write.wait_for_active_shards` setting on an index, allowing them to sync
retention leases even if insufficiently many shards are currently active to
accept writes.

Relates #39089
  • Loading branch information
DaveCTurner committed Feb 25, 2019
1 parent 160486e commit fc8c850
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
Expand Down Expand Up @@ -122,6 +123,7 @@ public void backgroundSync(
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws IOException {
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
Expand Down Expand Up @@ -152,6 +154,7 @@ public Request() {
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
waitForActiveShards(ActiveShardCount.NONE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -124,6 +125,7 @@ public void sync(
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws IOException {
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
Expand Down Expand Up @@ -161,6 +163,7 @@ public Request() {
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
waitForActiveShards(ActiveShardCount.NONE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException
private void runUnderBlockTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
Expand All @@ -448,12 +448,10 @@ private void runUnderBlockTest(
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));

final String id = idForInitialRetentionLease;
final long retainingSequenceNumber = initialRetainingSequenceNumber;
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
latch.await();

final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");
Expand All @@ -469,7 +467,7 @@ private void runUnderBlockTest(
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();

indexShard.accept(
primaryConsumer.accept(
primary,
new ActionListener<ReplicationResponse>() {

Expand Down Expand Up @@ -498,4 +496,115 @@ public void onFailure(final Exception e) {
}
}

public void testCanAddRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runWaitForShardsTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> {
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
},
primary -> {});
}

public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
runWaitForShardsTest(
idForInitialRetentionLease,
initialRetainingSequenceNumber,
(primary, listener) -> {
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
listener.onResponse(new ReplicationResponse());
},
primary -> {
try {
/*
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
* implies that the background sync was able to execute despite wait for shards being set on the index.
*/
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
} catch (final Exception e) {
fail(e.toString());
}
});

}

public void testCanRemoveRetentionLeasesWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runWaitForShardsTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
primary -> {});
}

private void runWaitForShardsTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final int numDataNodes = internalCluster().numDataNodes();
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numDataNodes == 1 ? 0 : numDataNodes - 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
assertAcked(prepareCreate("index").setSettings(settings));
ensureYellowAndNoInitializingShards("index");
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForActiveShards(numDataNodes).get().isTimedOut());

final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));

final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
latch.await();

final String waitForActiveValue = randomBoolean() ? "all" : Integer.toString(numDataNodes);

client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().put("index.write.wait_for_active_shards", waitForActiveValue).build())
.get();

final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();

primaryConsumer.accept(
primary,
new ActionListener<ReplicationResponse>() {

@Override
public void onResponse(final ReplicationResponse replicationResponse) {
success.set(true);
actionLatch.countDown();
}

@Override
public void onFailure(final Exception e) {
fail(e.toString());
}

});
actionLatch.await();
assertTrue(success.get());
afterSync.accept(primary);
}

}

0 comments on commit fc8c850

Please sign in to comment.