-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
InternalEngineTests.testConcurrentOutOfOrderDocsOnReplica should use two documents #30121
Changes from all commits
4e54353
5d9b55a
89592f7
15d61e3
66cf68d
3747006
0300ac7
0e3421d
03f75f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -133,6 +133,7 @@ | |
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -1385,18 +1386,13 @@ public void testVersioningCreateExistsException() throws IOException { | |
} | ||
|
||
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType, | ||
boolean partialOldPrimary, long primaryTerm, | ||
int minOpCount, int maxOpCount) { | ||
long primaryTerm, | ||
int minOpCount, int maxOpCount, String docId) { | ||
final int numOfOps = randomIntBetween(minOpCount, maxOpCount); | ||
final List<Engine.Operation> ops = new ArrayList<>(); | ||
final Term id = newUid("1"); | ||
final int startWithSeqNo; | ||
if (partialOldPrimary) { | ||
startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); | ||
} else { | ||
startWithSeqNo = 0; | ||
} | ||
final String valuePrefix = forReplica ? "r_" : "p_"; | ||
final Term id = newUid(docId); | ||
final int startWithSeqNo = 0; | ||
final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_"; | ||
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); | ||
for (int i = 0; i < numOfOps; i++) { | ||
final Engine.Operation op; | ||
|
@@ -1418,7 +1414,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve | |
throw new UnsupportedOperationException("unknown version type: " + versionType); | ||
} | ||
if (randomBoolean()) { | ||
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), | ||
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), | ||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, | ||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, | ||
version, | ||
|
@@ -1427,7 +1423,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve | |
System.currentTimeMillis(), -1, false | ||
); | ||
} else { | ||
op = new Engine.Delete("test", "1", id, | ||
op = new Engine.Delete("test", docId, id, | ||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, | ||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, | ||
version, | ||
|
@@ -1442,7 +1438,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve | |
|
||
public void testOutOfOrderDocsOnReplica() throws IOException { | ||
final List<Engine.Operation> ops = generateSingleDocHistory(true, | ||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); | ||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), 2, 2, 20, "1"); | ||
assertOpsOnReplica(ops, replicaEngine, true); | ||
} | ||
|
||
|
@@ -1511,28 +1507,83 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli | |
} | ||
} | ||
|
||
public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { | ||
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); | ||
final Engine.Operation lastOp = ops.get(ops.size() - 1); | ||
final String lastFieldValue; | ||
if (lastOp instanceof Engine.Index) { | ||
Engine.Index index = (Engine.Index) lastOp; | ||
lastFieldValue = index.docs().get(0).get("value"); | ||
public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, InterruptedException { | ||
final List<Engine.Operation> opsDoc1 = | ||
generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "1"); | ||
final Engine.Operation lastOpDoc1 = opsDoc1.get(opsDoc1.size() - 1); | ||
final String lastFieldValueDoc1; | ||
if (lastOpDoc1 instanceof Engine.Index) { | ||
Engine.Index index = (Engine.Index) lastOpDoc1; | ||
lastFieldValueDoc1 = index.docs().get(0).get("value"); | ||
} else { | ||
// delete | ||
lastFieldValue = null; | ||
lastFieldValueDoc1 = null; | ||
} | ||
final List<Engine.Operation> opsDoc2 = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This duplication isn't to my taste - I think I'd try and pull the notion of "doc" out into a class of its own and have this kind of thing be methods there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what kind of duplication do you mean? the extraction of the last value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean that there's a block of lines that does something to doc1 followed by an essentially identical block of lines that does the same thing to doc2 - both here and below in the blocks containing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the sentiment. I feel a class will be an overkill for just one test. It's now 21 lines of code and it's all in one place. I prefer to keep as is and refactor if we need it more often. |
||
generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 100, 300, "2"); | ||
final Engine.Operation lastOpDoc2 = opsDoc2.get(opsDoc2.size() - 1); | ||
final String lastFieldValueDoc2; | ||
if (lastOpDoc2 instanceof Engine.Index) { | ||
Engine.Index index = (Engine.Index) lastOpDoc2; | ||
lastFieldValueDoc2 = index.docs().get(0).get("value"); | ||
} else { | ||
// delete | ||
lastFieldValueDoc2 = null; | ||
} | ||
shuffle(ops, random()); | ||
concurrentlyApplyOps(ops, engine); | ||
// randomly interleave | ||
final AtomicLong seqNoGenerator = new AtomicLong(); | ||
Function<Engine.Operation, Engine.Operation> seqNoUpdater = operation -> { | ||
final long newSeqNo = seqNoGenerator.getAndIncrement(); | ||
if (operation instanceof Engine.Index) { | ||
Engine.Index index = (Engine.Index) operation; | ||
return new Engine.Index(index.uid(), index.parsedDoc(), newSeqNo, index.primaryTerm(), index.version(), | ||
index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()); | ||
} else { | ||
Engine.Delete delete = (Engine.Delete) operation; | ||
return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(), | ||
delete.version(), delete.versionType(), delete.origin(), delete.startTime()); | ||
} | ||
}; | ||
final List<Engine.Operation> allOps = new ArrayList<>(); | ||
Iterator<Engine.Operation> iter1 = opsDoc1.iterator(); | ||
Iterator<Engine.Operation> iter2 = opsDoc2.iterator(); | ||
while (iter1.hasNext() && iter2.hasNext()) { | ||
final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next(); | ||
allOps.add(seqNoUpdater.apply(next)); | ||
} | ||
iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); | ||
iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o))); | ||
// insert some duplicates | ||
allOps.addAll(randomSubsetOf(allOps)); | ||
|
||
assertVisibleCount(engine, lastFieldValue == null ? 0 : 1); | ||
if (lastFieldValue != null) { | ||
shuffle(allOps, random()); | ||
concurrentlyApplyOps(allOps, engine); | ||
|
||
engine.refresh("test"); | ||
|
||
if (lastFieldValueDoc1 != null) { | ||
try (Searcher searcher = engine.acquireSearcher("test")) { | ||
final TotalHitCountCollector collector = new TotalHitCountCollector(); | ||
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); | ||
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc1)), collector); | ||
assertThat(collector.getTotalHits(), equalTo(1)); | ||
} | ||
} | ||
if (lastFieldValueDoc2 != null) { | ||
try (Searcher searcher = engine.acquireSearcher("test")) { | ||
final TotalHitCountCollector collector = new TotalHitCountCollector(); | ||
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValueDoc2)), collector); | ||
assertThat(collector.getTotalHits(), equalTo(1)); | ||
} | ||
} | ||
|
||
int totalExpectedOps = 0; | ||
if (lastFieldValueDoc1 != null) { | ||
totalExpectedOps++; | ||
} | ||
if (lastFieldValueDoc2 != null) { | ||
totalExpectedOps++; | ||
} | ||
assertVisibleCount(engine, totalExpectedOps); | ||
} | ||
|
||
private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException { | ||
|
@@ -1572,12 +1623,12 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng | |
} | ||
|
||
public void testInternalVersioningOnPrimary() throws IOException { | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); | ||
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); | ||
} | ||
|
||
public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception { | ||
List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 10, 100); | ||
List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 10, 100, "1"); | ||
CountDownLatch latch = new CountDownLatch(1); | ||
AtomicBoolean running = new AtomicBoolean(true); | ||
Thread refreshThread = new Thread(() -> { | ||
|
@@ -1697,7 +1748,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException { | |
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); | ||
nonInternalVersioning.remove(VersionType.INTERNAL); | ||
final VersionType versionType = randomFrom(nonInternalVersioning); | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, 2, 2, 20, "1"); | ||
final Engine.Operation lastOp = ops.get(ops.size() - 1); | ||
final String lastFieldValue; | ||
if (lastOp instanceof Engine.Index) { | ||
|
@@ -1775,8 +1826,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { | |
} | ||
|
||
public void testVersioningPromotedReplica() throws IOException { | ||
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); | ||
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); | ||
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, 1, 2, 20, "1"); | ||
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, 2, 2, 20, "1"); | ||
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); | ||
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; | ||
final long finalReplicaVersion = lastReplicaOp.version(); | ||
|
@@ -1796,7 +1847,7 @@ public void testVersioningPromotedReplica() throws IOException { | |
} | ||
|
||
public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); | ||
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, 2, 100, 300, "1"); | ||
final Engine.Operation lastOp = ops.get(ops.size() - 1); | ||
final String lastFieldValue; | ||
if (lastOp instanceof Engine.Index) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to check, this is only for
master
(i.e. >= 7.0.0), right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan for this to also go to 6.x, but all the callers of this method were using false for partialOldPrimary, so I removed it. I'll make sure to keep it in the backport.