-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stop to accept more requests when maximum accepted is achieved #962
Conversation
this change makes sure that a worker don't handle more requests than it can achieved. The new workflow is quite more simple: listeners are put in the poller. On read we try to accept on them. When a connection is accepted it is put in the execution queue When a request is done and the socket can be kept alived, we put it in the poller, on read event we will try to handle the new request. If it is not put out of the poller before the keepalive timeout the socket will be closed. if all threads are busy we are waiting until one request complet. If it doesn't complete before the timeout we kill the worker. fix #908
I am still undecided what should be the logic when you have N requests kept alived where N is the max number of worker connections accepted. The patch above right now stop to accept the requests when this happen and the number of requests is no decreased until the request is gone. Maybe we should introduce some logic in the way we are returning the keepalive header? I am thinking to the following: The maximum of workers connections is achieved, on next request we set close the connection header and return the Thoughts? |
This is done by limiting the number of kept alived requests to MAX WORKER CONNECTIONS - N Threads so at any time we can accept N connection.
I just implemented the feature described above. |
util.close(conn.sock) | ||
conn.close() | ||
|
||
def is_parent_alive(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not calling this anywhere, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in b1bde15 . Thanks!
I think this is fine. I want to think more today about whether there is a way to do this without the locking. |
Except my comment about the parent alive check that I think you missed. |
def init_process(self): | ||
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) | ||
self.poller = selectors.DefaultSelector() | ||
self._lock = RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to not set RLock
in __init__
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for it to be allocated before the fork, but it doesn't really
matter either way.
@tilgovi did you found some time to do a final review? Would be cool if we can release a new unicorn before the end of the year. Let me know :) |
out of curiosity, when you guys change something so core like the threaded worker, do you have some mechanism to run simulations and stress testing to make sure there's no race condition introduced? the issue that caused this particular rewrite is due to a particular situation that is fairly easy to reproduce, so perhaps a simulation test suite would be helpful to do regression testing. |
@diwu1989 there is not yet a stress suite but it could be really useful I agree. Right now the way it's tested is by stressing it using different tools to send multiple requests at once and follow some manual scenarios. Also all examples must work. In this particular case it just happen I was too lazy in the usage expectations... If you have any idea to improve that, let me know. Also I would dully support any development around. |
Anything to add/edit on that patch? If no-one disagree I will merge it in the day and prepare the next release. Let me know. |
self.poller.unregister(client) | ||
# remove the connection from keepalive | ||
try: | ||
self._keep.remove(conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could just tag the connection here instead of removing it from the queue and then locking. Then next time the connection is getting out from the que check if it can be used or not.
Something like: conn.removed = true
.
Thoughts? cc @tilgovi
stop to accept more requests when maximum accepted is achieved
O nice, this is finally fixed, can't wait for the next release to show up on pypi |
this change makes sure that a worker don't handle more requests than it can achieved. The new workflow is quite more simple:
fix #908