Skip to content

Commit

Permalink
fix: bulk ingester might skip listener requests (#867)
Browse files Browse the repository at this point in the history
* fix: bulk ingester might skip lister requests

* minor: fix style

* always waiting for listener to be done before closing

---------

Co-authored-by: Laura Trotta <laura.trotta@elastic.co>
Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 12, 2024
1 parent 67b90b7 commit 27f87e6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class BulkIngester<Context> implements AutoCloseable {
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
private AtomicInteger listenerInProgressCount = new AtomicInteger();

private static class RequestExecution<Context> {
public final long id;
Expand Down Expand Up @@ -235,7 +236,7 @@ private boolean canAddOperation() {
}

private boolean closedAndFlushed() {
return isClosed && operations.isEmpty() && requestsInFlightCount == 0;
return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
}

//----- Ingester logic
Expand Down Expand Up @@ -311,25 +312,42 @@ public void flush() {
if (exec != null) {
// A request was actually sent
exec.futureResponse.handle((resp, thr) -> {

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});

if (resp != null) {
// Success
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, resp));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
} else {
// Failure
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, thr));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
}

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,44 @@ private void printStats(TestTransport transport) {
@Test
public void basicTestFlush() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101);
multiThreadTest(7, 3, 5, 101, true);
}

@Test
public void basicTestFlushWithInternalScheduler() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101, false);
}

@Test
public void basicTestNoFlush() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100);
multiThreadTest(10, 3, 5, 100, true);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
@Test
public void basicTestNoFlushWithInternalScheduler() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100, false);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations,
boolean externalScheduler) throws Exception {

CountingListener listener = new CountingListener();
TestTransport transport = new TestTransport();
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
ScheduledExecutorService scheduler;
if (externalScheduler) {
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("my-bulk-ingester-executor#" );
t.setName("my-bulk-ingester-executor#");
t.setDaemon(true);
return t;
});
});
} else {
scheduler = null;
}

BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
Expand Down Expand Up @@ -139,7 +157,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,

ingester.close();
transport.close();
scheduler.shutdownNow();
if (scheduler != null) scheduler.shutdownNow();

printStats(ingester);
printStats(listener);
Expand Down

0 comments on commit 27f87e6

Please sign in to comment.