From df1380b8d3267d70771a640c7bac5a04fb63a8e1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 21:59:48 -0400 Subject: [PATCH] Remove versionType from translog (#31945) With the introduction of sequence number, we no longer use versionType to resolve out of order collision in replication and recovery requests. This PR removes removes the versionType from translog. We can only remove it in 7.0 because it is still required in a mixed cluster between 6.x and 5.x. --- .../action/bulk/TransportShardBulkAction.java | 5 +- .../org/elasticsearch/index/VersionType.java | 15 ----- .../elasticsearch/index/engine/Engine.java | 2 + .../index/engine/InternalEngine.java | 25 +------- .../elasticsearch/index/shard/IndexShard.java | 23 ++++--- .../index/translog/Translog.java | 61 ++++++++----------- .../index/translog/TranslogWriter.java | 8 ++- .../resync/ResyncReplicationRequestTests.java | 4 +- .../index/engine/InternalEngineTests.java | 30 ++++----- .../RecoveryDuringReplicationTests.java | 1 - .../index/shard/IndexShardTests.java | 27 ++++---- .../index/translog/TranslogTests.java | 25 ++++---- .../indices/recovery/RecoveryTests.java | 10 +-- .../index/engine/EngineTestCase.java | 9 +-- .../index/engine/TranslogHandler.java | 6 +- .../index/shard/IndexShardTestCase.java | 4 +- 16 files changed, 99 insertions(+), 156 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index a78421a2328cb..15a98077eac4a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType()) .routing(indexRequest.routing()); result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), - indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(), - indexRequest.isRetry(), sourceToParse); + indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse); break; case DELETE: DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), - deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery()); + deleteRequest.type(), deleteRequest.id()); break; default: throw new IllegalStateException("Unexpected request operation type on replica: " diff --git a/server/src/main/java/org/elasticsearch/index/VersionType.java b/server/src/main/java/org/elasticsearch/index/VersionType.java index b350252dc9c41..eb05ff809076c 100644 --- a/server/src/main/java/org/elasticsearch/index/VersionType.java +++ b/server/src/main/java/org/elasticsearch/index/VersionType.java @@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) { // not allowing Versions.NOT_FOUND as it is not a valid input value. return version > 0L || version == Versions.MATCH_ANY; } - - @Override - public VersionType versionTypeForReplicationAndRecovery() { - // replicas get the version from the primary after increment. The same version is stored in - // the transaction log. -> the should use the external semantics. - return EXTERNAL; - } }, EXTERNAL((byte) 1) { @Override @@ -333,14 +326,6 @@ public byte getValue() { */ public abstract boolean validateVersionForReads(long version); - /** - * Some version types require different semantics for primary and replicas. This version allows - * the type to override the default behavior. - */ - public VersionType versionTypeForReplicationAndRecovery() { - return this; - } - public static VersionType fromString(String versionType) { if ("internal".equals(versionType)) { return INTERNAL; diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8a560e02fe449..53a7baa60f6ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1168,6 +1168,7 @@ public static class Index extends Operation { public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, long autoGeneratedIdTimestamp, boolean isRetry) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; this.doc = doc; this.isRetry = isRetry; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; @@ -1245,6 +1246,7 @@ public static class Delete extends Operation { public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 92c64d415ad0b..bdcfb2fc7313f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -691,7 +691,7 @@ private boolean canOptimizeAddDocument(Index index) { return true; case PEER_RECOVERY: case REPLICA: - assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL + assert index.version() == 1 && index.versionType() == null : "version: " + index.version() + " type: " + index.versionType(); return true; case LOCAL_TRANSLOG_RECOVERY: @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) { return false; } - private boolean assertVersionType(final Engine.Operation operation) { - if (operation.origin() == Operation.Origin.REPLICA || - operation.origin() == Operation.Origin.PEER_RECOVERY || - operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - // ensure that replica operation has expected version type for replication - // ensure that versionTypeForReplicationAndRecovery is idempotent - assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery() - : "unexpected version type in request from [" + operation.origin().name() + "] " + - "found [" + operation.versionType().name() + "] " + - "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]"; - } - return true; - } - private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (origin == Operation.Origin.PRIMARY) { assert assertOriginPrimarySequenceNumber(seqNo); @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); - assert assertVersionType(index); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); @@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; } versionMap.enforceSafeAccess(); - // drop out of order operations - assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity @@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List docs, public DeleteResult delete(Delete delete) throws IOException { versionMap.enforceSafeAccess(); assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); - assert assertVersionType(delete); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: @@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException { private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); - // drop out of order operations - assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" - + delete.versionType() + "]"; maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5bd8f9abc6e04..b07e22875e81f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -645,22 +645,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) { public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { + assert versionType.validateVersionForWrites(version); return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } - public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType, - long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) + public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, + boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry, + return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } - private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType, + private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); ensureWriteAllowed(origin); Engine.Index operation; try { @@ -736,19 +736,18 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { + assert versionType.validateVersionForWrites(version); return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } - public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id, - VersionType versionType) throws IOException { - return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA); + public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { + return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, - VersionType versionType, Engine.Operation.Origin origin) throws IOException { + @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differenciate foo#1 from bar#1. This is especially an issue @@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), - index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin, + null, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), - delete.versionType().versionTypeForReplicationAndRecovery(), origin); + null, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 63055d933e43e..31404b7874a92 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1011,7 +1011,8 @@ public static class Index implements Operation { public static final int FORMAT_6_0 = 8; // since 6.0.0 public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 - public static final int SERIALIZATION_FORMAT = FORMAT_NO_PARENT; + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String id; private final long autoGeneratedIdTimestamp; @@ -1019,7 +1020,6 @@ public static class Index implements Operation { private final long seqNo; private final long primaryTerm; private final long version; - private final VersionType versionType; private final BytesReference source; private final String routing; @@ -1034,8 +1034,9 @@ private Index(final StreamInput in) throws IOException { in.readOptionalString(); // _parent } this.version = in.readLong(); - this.versionType = VersionType.fromValue(in.readByte()); - assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version; + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // _version_type + } this.autoGeneratedIdTimestamp = in.readLong(); seqNo = in.readLong(); primaryTerm = in.readLong(); @@ -1049,15 +1050,14 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { this.seqNo = indexResult.getSeqNo(); this.primaryTerm = index.primaryTerm(); this.version = indexResult.getVersion(); - this.versionType = index.versionType(); this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp(); } public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) { - this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1); + this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1); } - public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType, + public Index(String type, String id, long seqNo, long primaryTerm, long version, byte[] source, String routing, long autoGeneratedIdTimestamp) { this.type = type; this.id = id; @@ -1065,7 +1065,6 @@ public Index(String type, String id, long seqNo, long primaryTerm, long version, this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; - this.versionType = versionType; this.routing = routing; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; } @@ -1110,24 +1109,22 @@ public long version() { return this.version; } - public VersionType versionType() { - return versionType; - } - @Override public Source getSource() { return new Source(source, routing); } private void write(final StreamOutput out) throws IOException { - out.writeVInt(SERIALIZATION_FORMAT); + final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0; + out.writeVInt(format); out.writeString(id); out.writeString(type); out.writeBytesReference(source); out.writeOptionalString(routing); out.writeLong(version); - - out.writeByte(versionType.getValue()); + if (format < FORMAT_NO_VERSION_TYPE) { + out.writeByte(VersionType.EXTERNAL.getValue()); + } out.writeLong(autoGeneratedIdTimestamp); out.writeLong(seqNo); out.writeLong(primaryTerm); @@ -1149,7 +1146,6 @@ public boolean equals(Object o) { primaryTerm != index.primaryTerm || id.equals(index.id) == false || type.equals(index.type) == false || - versionType != index.versionType || autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp || source.equals(index.source) == false) { return false; @@ -1168,7 +1164,6 @@ public int hashCode() { result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + versionType.hashCode(); result = 31 * result + source.hashCode(); result = 31 * result + (routing != null ? routing.hashCode() : 0); result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp); @@ -1194,14 +1189,15 @@ public long getAutoGeneratedIdTimestamp() { public static class Delete implements Operation { private static final int FORMAT_6_0 = 4; // 6.0 - * - public static final int SERIALIZATION_FORMAT = FORMAT_6_0; + public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String type, id; private final Term uid; private final long seqNo; private final long primaryTerm; private final long version; - private final VersionType versionType; private Delete(final StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT @@ -1210,29 +1206,29 @@ private Delete(final StreamInput in) throws IOException { id = in.readString(); uid = new Term(in.readString(), in.readBytesRef()); this.version = in.readLong(); - this.versionType = VersionType.fromValue(in.readByte()); - assert versionType.validateVersionForWrites(this.version); + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // versionType + } seqNo = in.readLong(); primaryTerm = in.readLong(); } public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { - this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion(), delete.versionType()); + this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion()); } /** utility for testing */ public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) { - this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL); + this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY); } - public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) { + public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version) { this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); this.uid = uid; this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; - this.versionType = versionType; } @Override @@ -1271,23 +1267,22 @@ public long version() { return this.version; } - public VersionType versionType() { - return this.versionType; - } - @Override public Source getSource() { throw new IllegalStateException("trying to read doc source from delete operation"); } private void write(final StreamOutput out) throws IOException { - out.writeVInt(SERIALIZATION_FORMAT); + final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0; + out.writeVInt(format); out.writeString(type); out.writeString(id); out.writeString(uid.field()); out.writeBytesRef(uid.bytes()); out.writeLong(version); - out.writeByte(versionType.getValue()); + if (format < FORMAT_NO_VERSION_TYPE) { + out.writeByte(VersionType.EXTERNAL.getValue()); + } out.writeLong(seqNo); out.writeLong(primaryTerm); } @@ -1306,8 +1301,7 @@ public boolean equals(Object o) { return version == delete.version && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && - uid.equals(delete.uid) && - versionType == delete.versionType; + uid.equals(delete.uid); } @Override @@ -1316,7 +1310,6 @@ public int hashCode() { result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + versionType.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index b89b21c52588a..c135facc67f5f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -202,9 +202,11 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - throw new AssertionError( - "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + - "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + if (newOp.equals(prvOp) == false) { + throw new AssertionError( + "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + } } } else { seenSequenceNumbers.put(seqNo, diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index 914c2b87422db..15b8e1c99d266 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -21,9 +21,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.Index; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; @@ -38,7 +36,7 @@ public class ResyncReplicationRequestTests extends ESTestCase { public void testSerialization() throws IOException { final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), - Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); + randomNonNegativeLong(), bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index}); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2e89a66805ce1..87b63dfdef832 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1183,7 +1183,7 @@ public void testVersioningNewCreate() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1197,7 +1197,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); assertTrue(indexResult.isCreated()); @@ -1216,7 +1216,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(), - update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); updateResult = replicaEngine.index(update); assertThat(updateResult.getVersion(), equalTo(2L)); assertFalse(updateResult.isCreated()); @@ -1269,7 +1269,7 @@ public void testVersioningNewIndex() throws IOException { Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1418,7 +1418,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis(), -1, false ); @@ -1427,7 +1427,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } @@ -3221,7 +3221,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3235,7 +3235,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertEquals(1, topDocs.totalHits); } - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); replicaEngine.refresh("test"); @@ -3255,7 +3255,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() Engine.IndexResult result = engine.index(firstIndexRequest); assertThat(result.getVersion(), equalTo(1L)); - Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica); assertThat(indexReplicaResult.getVersion(), equalTo(1L)); @@ -3269,7 +3269,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertEquals(1, topDocs.totalHits); } - Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(secondIndexRequestReplica); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { @@ -3292,7 +3292,7 @@ public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final l } public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) { - return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null, Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry); } @@ -3694,7 +3694,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio sequenceNumberSupplier.getAsLong(), 1, i, - VersionType.EXTERNAL, + origin == PRIMARY ? VersionType.EXTERNAL : null, origin, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, @@ -3708,7 +3708,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio sequenceNumberSupplier.getAsLong(), 1, i, - VersionType.EXTERNAL, + origin == PRIMARY ? VersionType.EXTERNAL : null, origin, System.nanoTime()); operations.add(delete); @@ -3928,7 +3928,7 @@ public void markSeqNoAsCompleted(long seqNo) { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); final Term uid = newUid(doc); final long time = System.nanoTime(); - actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false)); + actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false)); if (rarely()) { actualEngine.rollTranslogGeneration(); } @@ -4686,7 +4686,7 @@ public void testTrimUnsafeCommits() throws Exception { for (int i = 0; i < seqNos.size(); i++) { ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, - 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + 1, null, REPLICA, System.nanoTime(), -1, false); engine.index(index); if (randomBoolean()) { engine.flush(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ee97ba14fe09e..f01d5e54a2e16 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -186,7 +186,6 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { remainingReplica.applyIndexOperationOnReplica( remainingReplica.getLocalCheckpoint() + 1, 1, - VersionType.EXTERNAL, randomNonNegativeLong(), false, SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ac52378fc6b9d..2e07ec950a572 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -72,7 +72,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -1545,17 +1544,17 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { * - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed. */ final IndexShard shard = newStartedShard(false); - shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id", VersionType.EXTERNAL); + shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id"); shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation - shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id", new BytesArray("{}"), XContentType.JSON)); - shard.applyIndexOperationOnReplica(3, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(3, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-3", new BytesArray("{}"), XContentType.JSON)); // Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery. shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - shard.applyIndexOperationOnReplica(2, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(2, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-2", new BytesArray("{}"), XContentType.JSON)); - shard.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-5", new BytesArray("{}"), XContentType.JSON)); final int translogOps; @@ -1646,8 +1645,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { updateMappings(otherShard, shard.indexSettings().getIndexMetaData()); SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), "_doc", "1", new BytesArray("{}"), XContentType.JSON); - otherShard.applyIndexOperationOnReplica(1, 1, - VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + otherShard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); final ShardRouting primaryShardRouting = shard.routingEntry(); IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, @@ -1763,18 +1761,18 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { final IndexShard shard = newStartedShard(false); final String indexName = shard.shardId().getIndexName(); // Index #0, index #1 - shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-0", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here. - shard.applyIndexOperationOnReplica(1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); // Simulate resync (without rollback): Noop #1, index #2 acquireReplicaOperationPermitBlockingly(shard, shard.primaryTerm + 1); shard.markSeqNoAsNoop(1, "test"); - shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); @@ -2104,11 +2102,11 @@ public void testRecoverFromTranslog() throws IOException { int numCorruptEntries = 0; for (int i = 0; i < numTotalEntries; i++) { if (randomBoolean()) { - operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, + operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1, "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1)); } else { // corrupt entry - operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, + operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1, "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1)); numCorruptEntries++; } @@ -2603,8 +2601,7 @@ private Result indexOnReplicaWithGaps( final String id = Integer.toString(i); SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id, new BytesArray("{}"), XContentType.JSON); - indexShard.applyIndexOperationOnReplica(i, - 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + indexShard.applyIndexOperationOnReplica(i, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); if (!gap && i == localCheckpoint + 1) { localCheckpoint++; } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index dbbb38090bc3b..b255238c8648c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -416,9 +416,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(163L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(162L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(163L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -426,9 +426,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(212L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(210L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(212L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -436,9 +436,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(261L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(258L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(261L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -446,13 +446,13 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(303L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(300L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(303L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } - final long expectedSizeInBytes = 358L; + final long expectedSizeInBytes = 355L; translog.rollGeneration(); { final TranslogStats stats = stats(); @@ -725,14 +725,12 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { assertEquals(expIndexOp.type(), indexOp.type()); assertEquals(expIndexOp.source(), indexOp.source()); assertEquals(expIndexOp.version(), indexOp.version()); - assertEquals(expIndexOp.versionType(), indexOp.versionType()); break; case DELETE: Translog.Delete delOp = (Translog.Delete) op; Translog.Delete expDelOp = (Translog.Delete) expectedOp; assertEquals(expDelOp.uid(), delOp.uid()); assertEquals(expDelOp.version(), delOp.version()); - assertEquals(expDelOp.versionType(), delOp.versionType()); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) op; @@ -1478,7 +1476,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " + + assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3025, " + "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage()); } @@ -1842,8 +1840,7 @@ public void run() { new Term("_uid", threadId + "_" + opCount), seqNoGenerator.getAndIncrement(), primaryTerm.get(), - 1 + randomInt(100000), - randomFrom(VersionType.values())); + 1 + randomInt(100000)); break; case NO_OP: op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 537409f35d175..e7606328c7665 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -122,22 +122,22 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { final String indexName = orgReplica.shardId().getIndexName(); // delete #1 - orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL); + orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id"); getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 - orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)); // index #3 - orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON)); // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); // index #2 - orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON)); orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); // index #5 -> force NoOp #4. - orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON)); final int translogOps; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index a23e29b0bcd6b..f26522245493f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -52,7 +52,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapping; @@ -493,14 +492,12 @@ protected Engine.Index indexForDoc(ParsedDocument doc) { protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { - return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, startTime); + return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, null, Engine.Operation.Origin.REPLICA, startTime); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 53fe89ac17ea5..9999a3b3748f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -124,14 +124,12 @@ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine. source(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())) .routing(index.routing()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, - index.getAutoGeneratedIdTimestamp(), true); + index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true); return engineIndex; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); + delete.primaryTerm(), delete.version(), null, origin, System.nanoTime()); return engineDelete; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5a8e91841c5a7..e4849be20e16e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -573,7 +573,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, shard.getLocalCheckpoint()); } else { result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0, - VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId, "Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate()); @@ -591,7 +591,7 @@ protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id if (shard.routingEntry().primary()) { return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); } else { - return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL); + return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id); } }