Skip to content

Commit

Permalink
Try to finish remote sink once (elastic#117592)
Browse files Browse the repository at this point in the history
Currently, we have three clients fetching pages by default, each with 
its own lifecycle. This can result in scenarios where more than one
request is sent to complete the remote sink. While this does not cause
correctness issues, it is inefficient, especially for cross-cluster
requests. This change tracks the status of the remote sink and tries to
send only one finish request per remote sink.
  • Loading branch information
dnhatn authored Nov 27, 2024
1 parent e33e1a0 commit c2e4afc
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -292,6 +293,7 @@ static final class TransportRemoteSink implements RemoteSink {
final Executor responseExecutor;

final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
final AtomicBoolean finished = new AtomicBoolean(false);

TransportRemoteSink(
TransportService transportService,
Expand All @@ -311,6 +313,32 @@ static final class TransportRemoteSink implements RemoteSink {

@Override
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
if (allSourcesFinished) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener);
} else {
// already finished or promised
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
}
} else {
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
}
listener.onResponse(r);
}, e -> {
finished.set(true);
listener.onFailure(e);
}));
}
}

private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
if (reservedBytes > 0) {
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,15 @@ public void testConcurrentWithTransportActions() {
ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
exchange1.registerTransportHandler(node1);
AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
Set<String> finishingRequests = ConcurrentCollections.newConcurrentSet();
node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
final ExchangeRequest exchangeRequest = (ExchangeRequest) request;
if (exchangeRequest.sourcesFinished()) {
String exchangeId = exchangeRequest.exchangeId();
assertTrue("tried to finish [" + exchangeId + "] twice", finishingRequests.add(exchangeId));
}
handler.messageReceived(request, channel, task);
});

try (exchange0; exchange1; node0; node1) {
String exchangeId = "exchange";
Expand Down

0 comments on commit c2e4afc

Please sign in to comment.