Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce soft-deletes retention policy based on global checkpoint #30335

Merged
merged 4 commits into from
May 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ public final class IndexSettings {
*/
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope);

/**
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
* If soft-deletes is enabled, an engine by default will retain all operations up to the global checkpoint.
**/
public static final Setting<Long> INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING =
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0, Property.IndexScope, Property.Dynamic);

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -287,6 +295,7 @@ public final class IndexSettings {
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled;
private volatile long softDeleteRetentionOperations;
private volatile boolean warmerEnabled;
private volatile int maxResultWindow;
private volatile int maxInnerResultWindow;
Expand Down Expand Up @@ -398,6 +407,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(Version.V_7_0_0_alpha1) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
Expand Down Expand Up @@ -455,6 +465,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -837,4 +848,15 @@ public boolean isExplicitRefresh() {
public boolean isSoftDeleteEnabled() {
return softDeleteEnabled;
}

private void setSoftDeleteRetentionOperations(long ops) {
this.softDeleteRetentionOperations = ops;
}

/**
* Returns the number of extra operations (i.e. soft-deleted documents) to be kept for recoveries and history purpose.
*/
public long getSoftDeleteRetentionOperations() {
return this.softDeleteRetentionOperations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -34,8 +35,10 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
Expand Down Expand Up @@ -2002,8 +2005,8 @@ private IndexWriterConfig getIndexWriterConfig() {
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
// TODO: soft-delete retention policy
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand All @@ -2016,6 +2019,20 @@ private IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

/**
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
private Query softDeletesRetentionQuery() {
ensureOpen();
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -178,6 +179,7 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -251,8 +253,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException {
}

public void testSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) {
List<Segment> segments = engine.segments(false);
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats(false).getCount(), equalTo(0L));
Expand Down Expand Up @@ -324,6 +327,8 @@ public void testSegments() throws Exception {


engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get()));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.getTranslog().sync();
engine.refresh("test");

segments = engine.segments(false);
Expand Down Expand Up @@ -1279,9 +1284,13 @@ public void testVersioningNewIndex() throws IOException {
assertThat(indexResult.getVersion(), equalTo(1L));
}

public void testForceMerge() throws IOException {
public void testForceMergeWithoutSoftDeletes() throws IOException {
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
try (Store store = createStore();
Engine engine = createEngine(config(defaultSettings, store, createTempDir(),
Engine engine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(),
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -1322,6 +1331,66 @@ public void testForceMerge() throws IOException {
}
}

public void testForceMergeWithSoftDeletesRetention() throws Exception {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final MapperService mapperService = createMapperService("test");
final Set<String> liveDocs = new HashSet<>();
try (Store store = createStore();
Engine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
for (int i = 0; i < numDocs; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question - do we also want to override documents, rather than just deleting them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question - do we also want to override documents, rather than just deleting them?

+1. I added updates.

ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocs.remove(doc.id());
}
if (randomBoolean()) {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
}
long localCheckpoint = engine.getLocalCheckpointTracker().getCheckpoint();
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
engine.getTranslog().sync();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
if (seqno < keptIndex) {
Translog.Operation op = ops.get(seqno);
if (op != null) {
assertThat(op, instanceOf(Translog.Index.class));
assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs));
}
} else {
assertThat(msg, ops.get(seqno), notNullValue());
}
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
globalCheckpoint.set(localCheckpoint);
engine.getTranslog().sync();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
}
}

public void testForceMergeAndClose() throws IOException, InterruptedException {
int numIters = randomIntBetween(2, 10);
for (int j = 0; j < numIters; j++) {
Expand Down Expand Up @@ -2525,14 +2594,16 @@ public void testSkipTranslogReplay() throws IOException {
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
EngineConfig config = engine.config();
assertVisibleCount(engine, numDocs);
engine.close();
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits, equalTo(0L));
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits, equalTo(0L));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,16 @@ public void testDocStats() throws IOException {
deleteDoc(indexShard, "_doc", id);
indexDoc(indexShard, "_doc", id);
}

// Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it.
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
if (indexShard.routingEntry().primary()) {
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
} else {
indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test");
}
indexShard.sync();
}
// flush the buffered deletes
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(false);
Expand Down Expand Up @@ -2910,6 +2919,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {

// Deleting a doc causes its memory to be freed from the breaker
deleteDoc(primary, "_doc", "0");
primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it.
primary.refresh("force refresh");

ss = primary.segmentStats(randomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand All @@ -69,6 +71,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1005,10 +1008,15 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
}

public void testFilterCacheStats() throws Exception {
assertAcked(prepareCreate("index").setSettings(Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build()).get());
indexRandom(true,
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
assertAcked(prepareCreate("index").setSettings(settings).get());
indexRandom(false, true,
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
client().prepareIndex("index", "type", "2").setSource("foo", "baz"));
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
refresh();
ensureGreen();

IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
Expand Down Expand Up @@ -1039,6 +1047,9 @@ public void testFilterCacheStats() throws Exception {

assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult());
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult());
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
refresh();
response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
assertCumulativeQueryCacheStats(response);
Expand Down Expand Up @@ -1172,4 +1183,21 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}


/**
* Persist the global checkpoint on all shards of the given index into disk.
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
*/
private void persistGlobalCheckpoint(String index) throws Exception {
final Set<String> nodes = internalCluster().nodesInclude(index);
for (String node : nodes) {
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
indexShard.sync();
assertThat(indexShard.getLastSyncedGlobalCheckpoint(), equalTo(indexShard.getGlobalCheckpoint()));
}
}
}
}
}
Loading