Skip to content

Commit

Permalink
Allow retention lease operations under blocks (#39089)
Browse files Browse the repository at this point in the history
This commit allows manipulating retention leases under blocks.
  • Loading branch information
jasontedor committed Feb 19, 2019
1 parent fb0d6cf commit e266930
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
// resync should never be blocked because it's an internal action
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected ClusterBlockLevel globalBlockLevel() {
* Index level block to check before request execution. Returning null means that no blocks need to be checked.
*/
@Nullable
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.WRITE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger());
}

@Override
Expand All @@ -138,7 +139,12 @@ protected WriteReplicaResult<Request> shardOperationOnReplica(
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, logger);
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
}

@Override
public ClusterBlockLevel indexBlockLevel() {
return null;
}

public static final class Request extends ReplicatedWriteRequest<Request> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
}
};
Expand Down Expand Up @@ -305,7 +305,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,31 @@ protected Logger getLogger() {
assertTrue(invoked.get());
}

public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());

assertNull(action.indexBlockLevel());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -266,7 +270,7 @@ public void testBackgroundRetentionLeaseSync() throws Exception {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
createIndex("index", settings);
ensureGreen("index");
Expand Down Expand Up @@ -370,4 +374,124 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
}
}

public void testCanAddRetentionLeaseUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runUnderBlockTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> {
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
},
primary -> {});
}

public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
runUnderBlockTest(
idForInitialRetentionLease,
initialRetainingSequenceNumber,
(primary, listener) -> {
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
listener.onResponse(new ReplicationResponse());
},
primary -> {
try {
/*
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
* implies that the background sync was able to execute under a block.
*/
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
} catch (final Exception e) {
fail(e.toString());
}
});

}

public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runUnderBlockTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
indexShard -> {});
}

private void runUnderBlockTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
assertAcked(prepareCreate("index").setSettings(settings));
ensureGreen("index");

final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));

final String id = idForInitialRetentionLease;
final long retainingSequenceNumber = initialRetainingSequenceNumber;
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
latch.await();

final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");

client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().put("index.blocks." + block, true).build())
.get();

try {
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();

indexShard.accept(
primary,
new ActionListener<ReplicationResponse>() {

@Override
public void onResponse(final ReplicationResponse replicationResponse) {
success.set(true);
actionLatch.countDown();
}

@Override
public void onFailure(final Exception e) {
fail(e.toString());
}

});
actionLatch.await();
assertTrue(success.get());
afterSync.accept(primary);
} finally {
client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().putNull("index.blocks." + block).build())
.get();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,31 @@ protected Logger getLogger() {
assertTrue(invoked.get());
}

public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());

assertNull(action.indexBlockLevel());
}

}

0 comments on commit e266930

Please sign in to comment.