Skip to content

Commit

Permalink
Use soft deletes to maintain doc history (#29549)
Browse files Browse the repository at this point in the history
Today we can use the soft-deletes feature from Lucene to maintain a
history of a document. This change simply replaces hard-deletes by
soft-deletes in Engine.

Besides marking a document as deleted, we also index a tombstone
associated with that delete operation. Storing delete tombstones allows
us to have a history of sequence-based operations which can serve in
recovery or rollback.

Relates #29530
  • Loading branch information
dnhatn authored Apr 20, 2018
1 parent 4be1488 commit ac84879
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -80,6 +81,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;
private final TombstoneDocSupplier tombstoneDocSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -126,7 +128,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) {
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -154,6 +157,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
}

/**
Expand Down Expand Up @@ -363,4 +367,17 @@ public CircuitBreakerService getCircuitBreakerService() {
public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
*/
@FunctionalInterface
public interface TombstoneDocSupplier {
ParsedDocument newTombstoneDoc(String type, String id);
}

public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -1220,7 +1221,17 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
if (plan.currentlyDeleted == false) {
// any exception that comes from this is a either an ACE or a fatal exception there
// can't be any document failures coming from this
indexWriter.deleteDocuments(delete.uid());
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
doc.add(softDeleteField);
indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField);
} else {
indexWriter.deleteDocuments(delete.uid());
}
numDocDeletes.inc();
}
versionMap.putUnderLock(delete.uid().bytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
Expand All @@ -39,14 +40,15 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.util.Collections.emptyMap;
import java.util.stream.Stream;

public class DocumentMapper implements ToXContentFragment {

Expand Down Expand Up @@ -122,6 +124,7 @@ public DocumentMapper build(MapperService mapperService) {
private final Map<String, ObjectMapper> objectMappers;

private final boolean hasNestedObjects;
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;

public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
Expand All @@ -130,6 +133,10 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
final Collection<String> tombstoneFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);

// collect all the mappers for this type
List<ObjectMapper> newObjectMappers = new ArrayList<>();
Expand Down Expand Up @@ -241,7 +248,12 @@ public Map<String, ObjectMapper> objectMappers() {
}

public ParsedDocument parse(SourceToParse source) throws MapperParsingException {
return documentParser.parseDocument(source);
return documentParser.parseDocument(source, mapping.metadataMappers);
}

public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class DocumentParser {
this.docMapper = docMapper;
}

ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
ParsedDocument parseDocument(SourceToParse source, MetadataFieldMapper[] metadataFieldsMappers) throws MapperParsingException {
validateType(source);

final Mapping mapping = docMapper.mapping();
Expand All @@ -66,7 +66,7 @@ ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException
LoggingDeprecationHandler.INSTANCE, source.source(), xContentType)) {
context = new ParseContext.InternalParseContext(indexSettings.getSettings(), docMapperParser, docMapper, source, parser);
validateStart(parser);
internalParseDocument(mapping, context, parser);
internalParseDocument(mapping, metadataFieldsMappers, context, parser);
validateEnd(parser);
} catch (Exception e) {
throw wrapInMapperParsingException(source, e);
Expand All @@ -81,10 +81,11 @@ ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException
return parsedDocument(source, context, createDynamicUpdate(mapping, docMapper, context.getDynamicMappers()));
}

private static void internalParseDocument(Mapping mapping, ParseContext.InternalParseContext context, XContentParser parser) throws IOException {
private static void internalParseDocument(Mapping mapping, MetadataFieldMapper[] metadataFieldsMappers,
ParseContext.InternalParseContext context, XContentParser parser) throws IOException {
final boolean emptyDoc = isEmptyDoc(mapping, parser);

for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.preParse(context);
}

Expand All @@ -95,7 +96,7 @@ private static void internalParseDocument(Mapping mapping, ParseContext.Internal
parseObjectOrNested(context, mapping.root);
}

for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.postParse(context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,8 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
this::createTombstoneDoc);
}

/**
Expand Down Expand Up @@ -2586,4 +2587,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
}
}

private ParsedDocument createTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -2584,7 +2583,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get);
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -75,8 +76,8 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
Expand All @@ -86,8 +87,12 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -154,6 +159,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -2339,8 +2345,15 @@ public void testDocStats() throws IOException {
assertTrue(searcher.reader().numDocs() <= docStats.getCount());
}
assertThat(docStats.getCount(), equalTo(numDocs));
// Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete));
// Lucene will delete a segment if all docs are deleted from it;
// this means that we lose the deletes when deleting all docs.
// If soft-delete is enabled, each delete op will add a deletion marker.
final long deleteTombstones = indexShard.indexSettings.isSoftDeleteEnabled() ? numDocsToDelete : 0L;
if (numDocsToDelete == numDocs) {
assertThat(docStats.getDeleted(), equalTo(deleteTombstones));
} else {
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete + deleteTombstones));
}
}

// merge them away
Expand Down Expand Up @@ -2968,6 +2981,7 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {

// Close remaining searchers
IOUtils.close(searchers);
primary.refresh("test");

SegmentsStats ss = primary.segmentStats(randomBoolean());
CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
Expand Down Expand Up @@ -3053,4 +3067,16 @@ public void onShardInactive(IndexShard indexShard) {
closeShards(primary);
}

public void testSupplyTombstoneDoc() throws Exception {
IndexShard shard = newStartedShard();
String id = randomRealisticUnicodeOfLengthBetween(1, 10);
ParsedDocument tombstone = shard.getEngine().config().getTombstoneDocSupplier().newTombstoneDoc("doc", id);
assertThat(tombstone.docs(), hasSize(1));
ParseContext.Document doc = tombstone.docs().get(0);
assertThat(doc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()),
containsInAnyOrder(SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME,
IdFieldMapper.NAME, VersionFieldMapper.NAME));
assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id)));
closeShards(shard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -130,7 +131,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm);
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
EngineTestCase::createTombstoneDoc);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -119,6 +120,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -2026,7 +2028,9 @@ public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedExc
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

// only one shard
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
final Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build();
assertAcked(prepareCreate("test").setSettings(indexSettings));
ensureGreen();
logger.info("--> indexing");

Expand Down Expand Up @@ -2072,7 +2076,13 @@ public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedExc
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0);
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
for (SnapshotIndexShardStatus status : shards) {
assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
// we flush before the snapshot such that we have to process the segments_N files plus the .del file
if (INDEX_SOFT_DELETES_SETTING.get(indexSettings)) {
// soft-delete generates DV files.
assertThat(status.getStats().getProcessedFiles(), greaterThan(2));
} else {
assertThat(status.getStats().getProcessedFiles(), equalTo(2));
}
}
}
}
Expand Down
Loading

0 comments on commit ac84879

Please sign in to comment.