Skip to content

Commit

Permalink
Mute accounting circuit breaker check after test (#42448)
Browse files Browse the repository at this point in the history
If we close an engine while a refresh is happening, then we might leak
refCount of some SegmentReaders. We need to skip the ram accounting
circuit breaker check until we have a new Lucene snapshot which includes
the fix for LUCENE-8809.

This also adds a test to the engine but left it muted so we won't forget
to reenable this check.

Closes #30290
  • Loading branch information
dnhatn committed May 24, 2019
1 parent 48dc0dc commit 02739d0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -153,6 +154,7 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -5635,4 +5637,44 @@ public void testMaxSeqNoInCommitUserData() throws Exception {
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}

@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/LUCENE-8809")
public void testRefreshAndFailEngineConcurrently() throws Exception {
AtomicBoolean stopped = new AtomicBoolean();
Semaphore indexedDocs = new Semaphore(0);
Thread indexer = new Thread(() -> {
while (stopped.get() == false) {
String id = Integer.toString(randomIntBetween(1, 100));
try {
engine.index(indexForDoc(createParsedDoc(id, null)));
indexedDocs.release();
} catch (IOException e) {
throw new AssertionError(e);
} catch (AlreadyClosedException e) {
return;
}
}
});

Thread refresher = new Thread(() -> {
while (stopped.get() == false) {
try {
engine.refresh("test", randomFrom(Engine.SearcherScope.values()), randomBoolean());
} catch (AlreadyClosedException e) {
return;
}
}
});
indexer.start();
refresher.start();
indexedDocs.acquire(randomIntBetween(1, 100));
try {
engine.failEngine("test", new IOException("simulated error"));
} finally {
stopped.set(true);
indexer.join();
refresher.join();
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -269,6 +270,8 @@ public void tearDown() throws Exception {
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2426,15 +2426,18 @@ public void ensureEstimatedStats() {
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.FIELDDATA);
assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L));
try {
assertBusy(() -> {
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat("Accounting breaker not reset to 0 on node: " + name + ", are there still Lucene indices around?",
acctBreaker.getUsed(), equalTo(0L));
});
} catch (Exception e) {
throw new AssertionError("Exception during check for accounting breaker reset to 0", e);
}

// Mute this assertion until we have a new Lucene snapshot with https://issues.apache.org/jira/browse/LUCENE-8809.
// try {
// assertBusy(() -> {
// CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
// assertThat("Accounting breaker not reset to 0 on node: " + name + ", are there still Lucene indices around?",
// acctBreaker.getUsed(), equalTo(0L));
// });
// } catch (Exception e) {
// throw new AssertionError("Exception during check for accounting breaker reset to 0", e);
// }

// Anything that uses transport or HTTP can increase the
// request breaker (because they use bigarrays), because of
// that the breaker can sometimes be incremented from ping
Expand Down

0 comments on commit 02739d0

Please sign in to comment.