-
Notifications
You must be signed in to change notification settings - Fork 247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BulkIngester - Running listener code in separate thread pool #830
Conversation
26d5542
to
1ea3164
Compare
java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java
Show resolved
Hide resolved
java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java
Outdated
Show resolved
Hide resolved
java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
@swallez @l-trotta can someone help me to understand the impact of these changes, if listener threads were to do some non-trivial work and take time? would the overall processor slow down because of the tied-up flusher thread? we've written some code which has, to date, scaled with the number of listener threads to do work, which blocks new ingesting work in a predictable way. the current code worked equivalently to the HLRC bulk processor and the transportclient bulk processor before that. I'm concerned that this makes a fundamental shift in the (desired) blocking role these threads have held against new indexing when listeners are executing their methods. |
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
Hello @marcreichman-pfi, sorry for the wait, we were still in the process of fixing bulk ingester related bugs and we wanted the situation to be stable before answering. As explained in the first comment, before these changes the Bulk Ingester used to run Listener tasks directly with the same threads that perform the The new approach has a thread pool execute Listener tasks, and this thread pool can be managed by the user (external) or be a simple one that will be provided by default (internal). Either way, it will be both used to schedule Initially we didn't consider the fact that Listener tasks could take a longer time and not be done in by the time the Bulk Ingester finishes its operations and gets I hope this clarifies the new changes, let me know if there's anything else. |
@l-trotta Thanks for your response. I do understand the rationale for this and the subsequent change. My biggest concern is that, since the days of the original BulkProcessor in the transport client, and then the versions in the HLRC and in this client, the fact that there is blocking on bulk index (and delete) calls when all requests are going out has been helpful to establish a sliding window of sorts. My concern is that with these changes, the indexing calls will all be accepted causing a memory expansion situation. I will have to experiment and see how things go. Which is the first shipping version of this code and is there documentation on the option you mentioned to provide a custom thread pool for the listener execution threads? |
@marcreichman-pfi I think if you have some blocking mechanism that slows down incoming requests according to the execution of Listener tasks it should still work the same. You can test this right now in version 8.15.0, but beware of the fact that when closing the Bulk Ingester some Listener tasks could still be enqueued and could be ignored, the fix for it should be out in the next patch. The explanation for the custom thread pool is in the method signatures of the Bulk Ingester builder, to summarize:
Executors.newScheduledThreadPool(maxRequests + 1, (r) -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId());
t.setDaemon(true);
return t;
}); |
@l-trotta I will take a look and test, thank you for your explanations and responses. My instinct is that in the original model, any bulk request looking to go out would block if all threads were tied up, whether they were making their own bulk requests or waiting for the listener response. In the new model, regardless of which pool is involved, it might be that if listeners are doing non-trivial work, the listening pool (same as the flushing pool?) would grow a large queue of work, and bulk requests may continue to go out. Given that the listening/flushing is the same pool, maybe it'll all work out to the same equivalent pattern, but I'd just be hesitant that was once an outside-observable queue/blocking wait on calls to .index, .delete, etc. would now fly through those methods, and an inside, non-observable thread pool queue would build up inside the bulk ingester code. This would be concerning since it's harder to figure out what is going on. |
The current logic makes the current running thread execute whatever code is in the listener, so if that gets stuck somehow, or slows down, each ingester thread will end up stuck at some point. Fixing the issue by running the listener code in the same thread pool as the flusher thread.