Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] CompoundProcessor limits ingest pipeline length #6338

Open
MaximeWewer opened this issue Feb 16, 2023 · 8 comments
Open

[BUG] CompoundProcessor limits ingest pipeline length #6338

MaximeWewer opened this issue Feb 16, 2023 · 8 comments
Assignees
Labels
bug Something isn't working Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@MaximeWewer
Copy link

MaximeWewer commented Feb 16, 2023

Describe the bug
I have used the ingest pipeline to rename a large number of fields and I found that it is possible to get a stack overflow exception when running an ingest pipeline with many processors.

I've found similar issue on the Elasticsearch github => elastic/elasticsearch#84274

Expected behavior
Can you fix the problem like the PR below ?
elastic/elasticsearch#84250

Host/Environment (please complete the following information):

  • OS: Docker
  • Version : Opensearch 1.2.4
@saratvemulapalli
Copy link
Member

@MaximeWewer thanks for reaching out. We cannot look at code which is not compatible with ALv2.
It would be nice to see the stacktrace of the problem and we can learn about it more.

@MaximeWewer
Copy link
Author

Hi @saratvemulapalli,

Yes, of course, I understand for license compliance.
Here is an example of my stacktrace when the ingest pipeline stops working :

[2023-02-20T10:22:03,191][ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [opensearch.node01] fatal error in thread [opensearch[opensearch.node01][write][T#1]], exiting java.lang.StackOverflowError: null at org.opensearch.ingest.IngestDocument.createTemplateModel(IngestDocument.java:713) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.IngestDocument.renderTemplate(IngestDocument.java:709) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.common.RenameProcessor.execute(RenameProcessor.java:82) ~[?:?] at org.opensearch.ingest.Processor.execute(Processor.java:65) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] fatal error in thread [opensearch[opensearch.node01][write][T#1]], exiting java.lang.StackOverflowError at org.opensearch.ingest.IngestDocument.createTemplateModel(IngestDocument.java:713) at org.opensearch.ingest.IngestDocument.renderTemplate(IngestDocument.java:709) at org.opensearch.ingest.common.RenameProcessor.execute(RenameProcessor.java:82) at org.opensearch.ingest.Processor.execute(Processor.java:65) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.Processor.execute(Processor.java:70) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.Processor.execute(Processor.java:70)

@kartg kartg added Indexing & Search Indexing Indexing, Bulk Indexing and anything related to indexing and removed untriaged labels Feb 24, 2023
@dblock
Copy link
Member

dblock commented Feb 28, 2023

Looks like a bug. Looking for someone to write a unit test that reproduces this problem and a fix, without looking at any non-APLv2 code, please.

@msfroh
Copy link
Collaborator

msfroh commented Mar 27, 2023

From the description in this issue, I was able to reproduce with the following unit test in CompoundProcessorTests:

    public void testManyProcessors() {
        TestProcessor testProcessor = new TestProcessor(i -> {});
        Processor[] processors = new Processor[10_000];
        Arrays.fill(processors, testProcessor);
        CompoundProcessor compoundProcessor = new CompoundProcessor(processors);
        compoundProcessor.execute(ingestDocument, (result, e) -> {});
    }

The problem is that CompoundProcessor implements chaining via callbacks. Each invocation adds a nested call to trigger the next processor.

I think we might be able to reimplement it as a for-loop. I need to wrap my head around it some more, but I was already planning to refactor CompoundProcessor as one of my TODOs on #6587

@msfroh
Copy link
Collaborator

msfroh commented Mar 27, 2023

I poked at the CompoundProcessor code for a few minutes. It's not a simple matter of turning it into a for-loop, since some processors may do things that we want to do asynchronously, like network calls to get some extra data to enrich documents.

I'll give it some more thought.

It would be so much easier if Java had tail-call optimization. :)

@msfroh
Copy link
Collaborator

msfroh commented Jun 6, 2024

We should probably address this by adding a method to Processor (and SearchRequestProcessor and SearchReponseProcessor) like:

default boolean isSynchronous() {
  return true;
}

For search pipelines, we have "handy" async subinterfaces that flip the sync-vs-async abstractness, so we can override isSynchronous there.

For ingest processors, we could similarly add a subinterface that flips things. Right now, we don't have any async ingest processors AFAIK, but e.g. the ML inference ingest processor (and probably the GeoIP ingest processor) should be async, to avoid holding the transport_worker thread.

Then, for both cases, we can process a whole chain of synchronous processors in a while loop, only using callbacks for async processors. Since the async processors should be running on a task executor, the callback should execute on a different thread (with a fresh stack) anyway.

@dhwanilpatel
Copy link
Contributor

For CompoundProcessor we are using the execute method with callback via BiConsumer only. Can we use the same for the all type [sync + async] processor, and implement a mechanism where Compound processor will wait for the response from BiConsumer via Countdown latch and after that execute the next processor. With this way we can move this recursive call to iterative call as well.

Simple snippet to explain idea :

void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
    for(int i = 0 ; i < processors ; i++) {
        CountDownLatch latch = new CountDownLatch(1);
        ...
        processor.execute(ingestDocument, (result, e) -> {
            ...
            latch.countDown();
        });
        latch.wait();
    }
}

@msfroh Thoughts? Let me know if I have missed something over here.

cc: @shwetathareja / @ankitkala

@msfroh
Copy link
Collaborator

msfroh commented Jul 29, 2024

@msfroh Thoughts? Let me know if I have missed something over here.

The difficulty with that approach is that the latch.wait() blocks the current thread, which is what the async processors are trying to avoid.

If you have an async ingest processor that makes a long network call (e.g calling out to a remote ML inference service to compute embeddings), it could exhaust the available indexing threads. Normally the remote call would not hold a thread, but only need to execute the callback on a threadpool once the call completes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
Status: Next (Next Quarter)
Development

No branches or pull requests

8 participants