Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retention lease operations under blocks #39089

Merged
merged 3 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
// resync should never be blocked because it's an internal action
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected ClusterBlockLevel globalBlockLevel() {
* Index level block to check before request execution. Returning null means that no blocks need to be checked.
*/
@Nullable
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return ClusterBlockLevel.WRITE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger());
}

@Override
Expand All @@ -138,7 +139,12 @@ protected WriteReplicaResult<Request> shardOperationOnReplica(
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, logger);
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
}

@Override
public ClusterBlockLevel indexBlockLevel() {
return null;
}

public static final class Request extends ReplicatedWriteRequest<Request> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
}
};
Expand Down Expand Up @@ -305,7 +305,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ protected ClusterBlockLevel globalBlockLevel() {
}

@Override
protected ClusterBlockLevel indexBlockLevel() {
public ClusterBlockLevel indexBlockLevel() {
return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,31 @@ protected Logger getLogger() {
assertTrue(invoked.get());
}

public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());

assertNull(action.indexBlockLevel());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -266,7 +270,7 @@ public void testBackgroundRetentionLeaseSync() throws Exception {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
createIndex("index", settings);
ensureGreen("index");
Expand Down Expand Up @@ -374,4 +378,124 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
}
}

public void testCanAddRetentionLeaseUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runUnderBlockTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> {
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
},
primary -> {});
}

public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
runUnderBlockTest(
idForInitialRetentionLease,
initialRetainingSequenceNumber,
(primary, listener) -> {
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
listener.onResponse(new ReplicationResponse());
},
primary -> {
try {
/*
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
* implies that the background sync was able to execute under a block.
*/
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
} catch (final Exception e) {
fail(e.toString());
}
});

}

public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runUnderBlockTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
indexShard -> {});
}

private void runUnderBlockTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
assertAcked(prepareCreate("index").setSettings(settings));
ensureGreen("index");

final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));

final String id = idForInitialRetentionLease;
final long retainingSequenceNumber = initialRetainingSequenceNumber;
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
latch.await();

final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");

client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().put("index.blocks." + block, true).build())
.get();

try {
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();

indexShard.accept(
primary,
new ActionListener<ReplicationResponse>() {

@Override
public void onResponse(final ReplicationResponse replicationResponse) {
success.set(true);
actionLatch.countDown();
}

@Override
public void onFailure(final Exception e) {
fail(e.toString());
}

});
actionLatch.await();
assertTrue(success.get());
afterSync.accept(primary);
} finally {
client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().putNull("index.blocks." + block).build())
.get();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,31 @@ protected Logger getLogger() {
assertTrue(invoked.get());
}

public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);

final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());

assertNull(action.indexBlockLevel());
}

}