Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tombstone document into Lucene for Noop #30226

Merged
merged 9 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,16 @@ public LongSupplier getPrimaryTermSupplier() {
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
*/
@FunctionalInterface
public interface TombstoneDocSupplier {
ParsedDocument newTombstoneDoc(String type, String id);
/**
* Creates a tombstone document for a delete operation.
*/
ParsedDocument newDeleteTombstoneDoc(String type, String id);

/**
* Creates a tombstone document for a noop operation.
*/
ParsedDocument newNoopTombstoneDoc();
}

public TombstoneDocSupplier getTombstoneDocSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -784,7 +785,9 @@ public IndexResult index(Index index) throws IOException {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
index.startTime(), indexResult.getFailure().getMessage());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
Expand Down Expand Up @@ -1226,11 +1229,13 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null :
"Delete tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeleteField);
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
Expand Down Expand Up @@ -1334,7 +1339,25 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc();
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeleteField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
}
failure = ex;
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(noOp.seqNo(), failure) : new NoOpResult(noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public DocumentMapper build(MapperService mapperService) {
private final Map<String, ObjectMapper> objectMappers;

private final boolean hasNestedObjects;
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;
private final MetadataFieldMapper[] deleteTombstoneMetadataFieldMappers;
private final MetadataFieldMapper[] noopTombstoneMetadataFieldMappers;

public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
Expand All @@ -133,10 +134,6 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
final Collection<String> tombstoneFields =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm why is this no good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I restored it.

Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);

// collect all the mappers for this type
List<ObjectMapper> newObjectMappers = new ArrayList<>();
Expand Down Expand Up @@ -176,6 +173,15 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
}

final Collection<String> deleteTombstoneMetadataFields = Arrays.asList(VersionFieldMapper.NAME, IdFieldMapper.NAME,
TypeFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
this.deleteTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> deleteTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
final Collection<String> noopTombstoneMetadataFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
this.noopTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> noopTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
}

public Mapping mapping() {
Expand Down Expand Up @@ -251,9 +257,15 @@ public ParsedDocument parse(SourceToParse source) throws MapperParsingException
return documentParser.parseDocument(source, mapping.metadataMappers);
}

public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
public ParsedDocument createDeleteTombstoneDoc(String index, String type, String id) throws MapperParsingException {
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, deleteTombstoneMetadataFieldMappers).toTombstone();
}

public ParsedDocument createNoopTombstoneDoc(String index) throws MapperParsingException {
final String id = ""; // _id won't be used.
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
return documentParser.parseDocument(emptySource, noopTombstoneMetadataFieldMappers).toTombstone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public void updateSeqID(long sequenceNumber, long primaryTerm) {
this.seqID.primaryTerm.setLongValue(primaryTerm);
}

/**
* Makes the processing document as a tombstone document rather than a regular document.
* Tombstone documents are stored in Lucene index to represent delete operations or Noops.
*/
ParsedDocument toTombstone() {
assert docs().size() == 1 : "Tombstone should have a single doc [" + docs() + "]";
this.seqID.tombstoneField.setLongValue(1);
rootDoc().add(this.seqID.tombstoneField);
return this;
}

public String routing() {
return this.routing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,29 @@ public static class SequenceIDFields {
public final Field seqNo;
public final Field seqNoDocValue;
public final Field primaryTerm;
public final Field tombstoneField;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this a softDeleteField?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not a softDeletes field. This field is used by a delete-op and an noop only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh. I got confused. Sorry.


public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm, Field tombstoneField) {
Objects.requireNonNull(seqNo, "sequence number field cannot be null");
Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null");
Objects.requireNonNull(primaryTerm, "primary term field cannot be null");
this.seqNo = seqNo;
this.seqNoDocValue = seqNoDocValue;
this.primaryTerm = primaryTerm;
this.tombstoneField = tombstoneField;
}

public static SequenceIDFields emptySeqID() {
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
new NumericDocValuesField(PRIMARY_TERM_NAME, 0), new NumericDocValuesField(TOMBSTONE_NAME, 0));
}
}

public static final String NAME = "_seq_no";
public static final String CONTENT_TYPE = "_seq_no";
public static final String PRIMARY_TERM_NAME = "_primary_term";
public static final String TOMBSTONE_NAME = "_tombstone";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use this as the ground truth in the engine as well? we have a constant in Lucene.java too no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have the same constant in Lucene or Engine. I am not sure about your suggestion here. Can you elaborate it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind I was confused


public static class SeqNoDefaults {
public static final String NAME = SeqNoFieldMapper.NAME;
Expand Down
20 changes: 16 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -2162,8 +2164,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
this::createTombstoneDoc);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down Expand Up @@ -2592,7 +2593,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

private ParsedDocument createTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService);
return new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createDeleteTombstoneDoc(shardId.getIndexName(), type, id);
}
@Override
public ParsedDocument newNoopTombstoneDoc() {
return noopDocumentMapper.createNoopTombstoneDoc(shardId.getIndexName());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
}
}, 30, TimeUnit.SECONDS);
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -173,6 +174,7 @@
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -2603,7 +2605,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");
Expand Down Expand Up @@ -3046,7 +3048,8 @@ public void testDoubleDeliveryReplica() throws IOException {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
assertThat(getOperationSeqNoInLucene(engine), contains(20L));
List<Translog.Operation> ops = readAllOperationsInLucene(engine, createMapperService("test"));
assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L));
}

public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
Expand Down Expand Up @@ -3606,7 +3609,9 @@ public void testNoOps() throws IOException {
maxSeqNo,
localCheckpoint);
trimUnsafeCommits(engine.config());
noOpEngine = new InternalEngine(engine.config(), supplier) {
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD,
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
@Override
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -3636,6 +3641,13 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2)));
assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
assertThat(noOp.reason(), equalTo(reason));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService);
assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop.
for (int i = 0; i < operationsFromLucene.size(); i++) {
assertThat(operationsFromLucene.get(i), equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), "")));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService);
} finally {
IOUtils.close(noOpEngine);
}
Expand Down Expand Up @@ -4603,7 +4615,10 @@ private void assertOperationHistoryInLucene(List<Engine.Operation> operations) t
engine.forceMerge(true);
}
}
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine, mapperService);
assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ public void testDocumentFailureReplication() throws Exception {
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return throwingDocumentFailureEngineFactory;
if (routing.primary()){
return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary.
}else {
return InternalEngine::new;
}
}}) {

// test only primary
Expand Down
Loading