From 6826812db5102f82cfd395dd77eb96ff098d112e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 12 Jul 2018 22:14:55 -0400 Subject: [PATCH 1/3] Adjust translog after versionType is removed in 7.0 With the presence of sequence number, we no longer use versionType to resolve out of order collision and remove it in 7.0. This PR adjusts translog to adapt that change. Relates #31945 --- .../index/translog/Translog.java | 12 +++++++-- .../index/translog/TranslogWriter.java | 25 ++++++++++++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 24c6d29e9b631..abb799095119d 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1042,7 +1042,11 @@ private Index(final StreamInput in) throws IOException { in.readLong(); // timestamp in.readLong(); // ttl } - this.versionType = VersionType.fromValue(in.readByte()); + if (format <= FORMAT_SEQ_NO) { + this.versionType = VersionType.fromValue(in.readByte()); + } else { + this.versionType = VersionType.EXTERNAL; // versionType is removed in 7.0.0 + } assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version; if (format >= FORMAT_AUTO_GENERATED_IDS) { this.autoGeneratedIdTimestamp = in.readLong(); @@ -1250,7 +1254,11 @@ private Delete(final StreamInput in) throws IOException { id = uidObject.id(); } this.version = in.readLong(); - this.versionType = VersionType.fromValue(in.readByte()); + if (format <= FORMAT_SEQ_NO) { + this.versionType = VersionType.fromValue(in.readByte()); + } else { + this.versionType = VersionType.EXTERNAL; // versionType is removed in 7.0 + } assert versionType.validateVersionForWrites(this.version); if (format >= FORMAT_SEQ_NO) { seqNo = in.readLong(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index b89b21c52588a..8bafb0bb51c31 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -40,6 +40,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -202,9 +203,27 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - throw new AssertionError( - "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + - "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + // we need to exclude versionType from this check because it's removed in 7.0 + final boolean sameOp; + if (prvOp instanceof Translog.Index && newOp instanceof Translog.Index) { + final Translog.Index o1 = (Translog.Index) prvOp; + final Translog.Index o2 = (Translog.Index) newOp; + sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) + && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) + && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); + } else if (prvOp instanceof Translog.Delete && newOp instanceof Translog.Delete) { + final Translog.Delete o1 = (Translog.Delete) prvOp; + final Translog.Delete o2 = (Translog.Delete) newOp; + sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) + && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); + } else { + sameOp = false; + } + if (sameOp == false) { + throw new AssertionError( + "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + } } } else { seenSequenceNumbers.put(seqNo, From 53d99e441965dc99d8324e8a0ac0c5cd067eeabe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 20:41:09 -0400 Subject: [PATCH 2/3] backport formats from 7.0 --- .../elasticsearch/index/translog/Translog.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index abb799095119d..c240fa0599958 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1017,6 +1017,9 @@ public static class Index implements Operation { public static final int FORMAT_AUTO_GENERATED_IDS = FORMAT_2_X + 1; // since 5.0.0-beta1 public static final int FORMAT_SEQ_NO = FORMAT_AUTO_GENERATED_IDS + 1; // since 6.0.0 public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; + // future formats for forward compatibility + private static final int FORMAT_7_0 = 13; + private static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 2; private final String id; private final long autoGeneratedIdTimestamp; @@ -1042,10 +1045,11 @@ private Index(final StreamInput in) throws IOException { in.readLong(); // timestamp in.readLong(); // ttl } - if (format <= FORMAT_SEQ_NO) { + if (format < FORMAT_NO_VERSION_TYPE) { this.versionType = VersionType.fromValue(in.readByte()); } else { - this.versionType = VersionType.EXTERNAL; // versionType is removed in 7.0.0 + // versionType is removed in 7.0 + this.versionType = VersionType.EXTERNAL; } assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version; if (format >= FORMAT_AUTO_GENERATED_IDS) { @@ -1147,6 +1151,7 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { + assert SERIALIZATION_FORMAT < FORMAT_7_0; out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); @@ -1226,6 +1231,9 @@ public static class Delete implements Operation { private static final int FORMAT_SINGLE_TYPE = FORMAT_5_0 + 1; // 5.5 - 6.0 private static final int FORMAT_SEQ_NO = FORMAT_SINGLE_TYPE + 1; // 6.0 - * public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; + // future formats for forward compatibility + private static final int FORMAT_7_0 = 9; + private static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 1; private final String type, id; private final Term uid; @@ -1254,10 +1262,11 @@ private Delete(final StreamInput in) throws IOException { id = uidObject.id(); } this.version = in.readLong(); - if (format <= FORMAT_SEQ_NO) { + if (format < FORMAT_NO_VERSION_TYPE) { this.versionType = VersionType.fromValue(in.readByte()); } else { - this.versionType = VersionType.EXTERNAL; // versionType is removed in 7.0 + // versionType is removed in 7.0 + this.versionType = VersionType.EXTERNAL; } assert versionType.validateVersionForWrites(this.version); if (format >= FORMAT_SEQ_NO) { @@ -1334,6 +1343,7 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { + assert SERIALIZATION_FORMAT < FORMAT_7_0; out.writeVInt(SERIALIZATION_FORMAT); out.writeString(type); out.writeString(id); From ca897b67ca4bba33dd12a30a538856b88e14b985 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 09:48:28 -0400 Subject: [PATCH 3/3] always read version_type --- .../index/translog/Translog.java | 22 ++----------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index c240fa0599958..24c6d29e9b631 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1017,9 +1017,6 @@ public static class Index implements Operation { public static final int FORMAT_AUTO_GENERATED_IDS = FORMAT_2_X + 1; // since 5.0.0-beta1 public static final int FORMAT_SEQ_NO = FORMAT_AUTO_GENERATED_IDS + 1; // since 6.0.0 public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; - // future formats for forward compatibility - private static final int FORMAT_7_0 = 13; - private static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 2; private final String id; private final long autoGeneratedIdTimestamp; @@ -1045,12 +1042,7 @@ private Index(final StreamInput in) throws IOException { in.readLong(); // timestamp in.readLong(); // ttl } - if (format < FORMAT_NO_VERSION_TYPE) { - this.versionType = VersionType.fromValue(in.readByte()); - } else { - // versionType is removed in 7.0 - this.versionType = VersionType.EXTERNAL; - } + this.versionType = VersionType.fromValue(in.readByte()); assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version; if (format >= FORMAT_AUTO_GENERATED_IDS) { this.autoGeneratedIdTimestamp = in.readLong(); @@ -1151,7 +1143,6 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { - assert SERIALIZATION_FORMAT < FORMAT_7_0; out.writeVInt(SERIALIZATION_FORMAT); out.writeString(id); out.writeString(type); @@ -1231,9 +1222,6 @@ public static class Delete implements Operation { private static final int FORMAT_SINGLE_TYPE = FORMAT_5_0 + 1; // 5.5 - 6.0 private static final int FORMAT_SEQ_NO = FORMAT_SINGLE_TYPE + 1; // 6.0 - * public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; - // future formats for forward compatibility - private static final int FORMAT_7_0 = 9; - private static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 1; private final String type, id; private final Term uid; @@ -1262,12 +1250,7 @@ private Delete(final StreamInput in) throws IOException { id = uidObject.id(); } this.version = in.readLong(); - if (format < FORMAT_NO_VERSION_TYPE) { - this.versionType = VersionType.fromValue(in.readByte()); - } else { - // versionType is removed in 7.0 - this.versionType = VersionType.EXTERNAL; - } + this.versionType = VersionType.fromValue(in.readByte()); assert versionType.validateVersionForWrites(this.version); if (format >= FORMAT_SEQ_NO) { seqNo = in.readLong(); @@ -1343,7 +1326,6 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { - assert SERIALIZATION_FORMAT < FORMAT_7_0; out.writeVInt(SERIALIZATION_FORMAT); out.writeString(type); out.writeString(id);