-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stop to accept more requests when maximum accepted is achieved #962
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import socket | ||
import ssl | ||
import sys | ||
from threading import RLock | ||
import time | ||
|
||
from .. import http | ||
|
@@ -65,13 +66,14 @@ def init(self): | |
|
||
# initialize the parser | ||
self.parser = http.RequestParser(self.cfg, self.sock) | ||
return True | ||
return False | ||
|
||
def set_timeout(self): | ||
# set the timeout | ||
self.timeout = time.time() + self.cfg.keepalive | ||
|
||
def close(self): | ||
util.close(self.sock) | ||
|
||
def __lt__(self, other): | ||
return self.timeout < other.timeout | ||
|
||
|
@@ -83,68 +85,94 @@ class ThreadWorker(base.Worker): | |
def __init__(self, *args, **kwargs): | ||
super(ThreadWorker, self).__init__(*args, **kwargs) | ||
self.worker_connections = self.cfg.worker_connections | ||
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads | ||
|
||
# initialise the pool | ||
self.tpool = None | ||
self.poller = None | ||
self._lock = None | ||
self.futures = deque() | ||
self._keep = deque() | ||
|
||
def init_process(self): | ||
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) | ||
self.poller = selectors.DefaultSelector() | ||
self._lock = RLock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to not set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for it to be allocated before the fork, but it doesn't really |
||
super(ThreadWorker, self).init_process() | ||
|
||
def _wrap_future(self, fs, conn): | ||
fs.conn = conn | ||
self.futures.append(fs) | ||
fs.add_done_callback(self.finish_request) | ||
|
||
def init_process(self): | ||
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) | ||
self.poller = selectors.DefaultSelector() | ||
super(ThreadWorker, self).init_process() | ||
def enqueue_req(self, conn): | ||
conn.init() | ||
# submit the connection to a worker | ||
fs = self.tpool.submit(self.handle, conn) | ||
self._wrap_future(fs, conn) | ||
|
||
def accept(self, listener): | ||
if not self.alive: | ||
return | ||
|
||
try: | ||
client, addr = listener.accept() | ||
# initialize the connection object | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: I'd remove this comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is mostly a mind trick for myself so I can remember the workflow :) |
||
conn = TConn(self.cfg, listener, client, addr) | ||
|
||
# wait for the read event to handle the connection | ||
self.poller.register(client, selectors.EVENT_READ, | ||
partial(self.handle_client, conn)) | ||
|
||
self.nr += 1 | ||
# enqueue the job | ||
self.enqueue_req(conn) | ||
except socket.error as e: | ||
if e.args[0] not in (errno.EAGAIN, | ||
errno.ECONNABORTED, errno.EWOULDBLOCK): | ||
raise | ||
|
||
def handle_client(self, conn, client): | ||
# unregister the client from the poller | ||
self.poller.unregister(client) | ||
def reuse_connection(self, conn, client): | ||
with self._lock: | ||
# unregister the client from the poller | ||
self.poller.unregister(client) | ||
# remove the connection from keepalive | ||
try: | ||
self._keep.remove(conn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could just tag the connection here instead of removing it from the queue and then locking. Then next time the connection is getting out from the que check if it can be used or not. Something like: Thoughts? cc @tilgovi |
||
except ValueError: | ||
# race condition | ||
return | ||
|
||
# submit the connection to a worker | ||
fs = self.tpool.submit(self.handle, conn) | ||
self._wrap_future(fs, conn) | ||
self.enqueue_req(conn) | ||
|
||
def murder_keepalived(self): | ||
now = time.time() | ||
while True: | ||
try: | ||
# remove the connection from the queue | ||
conn = self._keep.popleft() | ||
except IndexError: | ||
break | ||
with self._lock: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for lock. Popping from the deque is atomic. |
||
try: | ||
# remove the connection from the queue | ||
conn = self._keep.popleft() | ||
except IndexError: | ||
break | ||
|
||
delta = conn.timeout - now | ||
if delta > 0: | ||
# add the connection back to the queue | ||
self._keep.appendleft(conn) | ||
with self._lock: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some here, the append is atomic so no need to lock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I think I understand. This lock avoids the possibility that the poller triggers the keepalive request handling while we're murdering it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct. Without it under load you will mutate the queue and the murder loop will crash. |
||
self._keep.appendleft(conn) | ||
break | ||
else: | ||
self.nr -= 1 | ||
# remove the socket from the poller | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you meant to call |
||
self.poller.unregister(conn.sock) | ||
with self._lock: | ||
try: | ||
self.poller.unregister(conn.sock) | ||
except socket.error as e: | ||
if e.args[0] != errno.EBADF: | ||
raise | ||
|
||
# close the socket | ||
util.close(conn.sock) | ||
conn.close() | ||
|
||
def is_parent_alive(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're not calling this anywhere, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in b1bde15 . Thanks! |
||
# If our parent changed then we shut down. | ||
if self.ppid != os.getppid(): | ||
self.log.info("Parent changed, shutting down: %s", self) | ||
return False | ||
return True | ||
|
||
def run(self): | ||
# init listeners, add them to the event loop | ||
|
@@ -155,55 +183,43 @@ def run(self): | |
timeout = self.cfg.timeout or 0.5 | ||
|
||
while self.alive: | ||
# If our parent changed then we shut down. | ||
if self.ppid != os.getppid(): | ||
self.log.info("Parent changed, shutting down: %s", self) | ||
return | ||
|
||
# notify the arbiter we are alive | ||
self.notify() | ||
|
||
events = self.poller.select(0.2) | ||
for key, mask in events: | ||
callback = key.data | ||
callback(key.fileobj) | ||
# can we accept more connections? | ||
if self.nr < self.worker_connections: | ||
# wait for an event | ||
events = self.poller.select(0.02) | ||
for key, mask in events: | ||
callback = key.data | ||
callback(key.fileobj) | ||
|
||
# hanle keepalive timeouts | ||
self.murder_keepalived() | ||
|
||
# if we more connections than the max number of connections | ||
# accepted on a worker, wait until some complete or exit. | ||
if len(self.futures) >= self.worker_connections: | ||
res = futures.wait(self.futures, timeout=timeout) | ||
if not res: | ||
self.alive = False | ||
self.log.info("max requests achieved") | ||
break | ||
# if the number of connections is < to the max we can handle at | ||
# the same time there is no need to wait for one | ||
if len(self.futures) < self.cfg.threads: | ||
continue | ||
|
||
# shutdown the pool | ||
self.poller.close() | ||
self.tpool.shutdown(False) | ||
|
||
# wait for the workers | ||
futures.wait(self.futures, timeout=self.cfg.graceful_timeout) | ||
result = futures.wait(self.futures, timeout=timeout, | ||
return_when=futures.FIRST_COMPLETED) | ||
|
||
# if we have still fures running, try to close them | ||
while True: | ||
try: | ||
fs = self.futures.popleft() | ||
except IndexError: | ||
if not result.done: | ||
break | ||
|
||
sock = fs.conn.sock | ||
else: | ||
[self.futures.remove(f) for f in result.done] | ||
|
||
# the future is not running, cancel it | ||
if not fs.done() and not fs.running(): | ||
fs.cancel() | ||
self.tpool.shutdown(False) | ||
self.poller.close() | ||
|
||
# make sure we close the sockets after the graceful timeout | ||
util.close(sock) | ||
|
||
def finish_request(self, fs): | ||
if fs.cancelled(): | ||
fs.conn.close() | ||
return | ||
|
||
try: | ||
(keepalive, conn) = fs.result() | ||
# if the connection should be kept alived add it | ||
|
@@ -214,32 +230,22 @@ def finish_request(self, fs): | |
|
||
# register the connection | ||
conn.set_timeout() | ||
self._keep.append(conn) | ||
with self._lock: | ||
self._keep.append(conn) | ||
|
||
# add the socket to the event loop | ||
self.poller.register(conn.sock, selectors.EVENT_READ, | ||
partial(self.handle_client, conn)) | ||
# add the socket to the event loop | ||
self.poller.register(conn.sock, selectors.EVENT_READ, | ||
partial(self.reuse_connection, conn)) | ||
else: | ||
util.close(conn.sock) | ||
self.nr -= 1 | ||
conn.close() | ||
except: | ||
# an exception happened, make sure to close the | ||
# socket. | ||
util.close(fs.conn.sock) | ||
finally: | ||
# remove the future from our list | ||
try: | ||
self.futures.remove(fs) | ||
except ValueError: | ||
pass | ||
self.nr -= 1 | ||
fs.conn.close() | ||
|
||
def handle(self, conn): | ||
if not conn.init(): | ||
# connection kept alive | ||
try: | ||
self._keep.remove(conn) | ||
except ValueError: | ||
pass | ||
|
||
keepalive = False | ||
req = None | ||
try: | ||
|
@@ -287,15 +293,15 @@ def handle_request(self, req, conn): | |
conn.listener.getsockname(), self.cfg) | ||
environ["wsgi.multithread"] = True | ||
|
||
self.nr += 1 | ||
|
||
if self.alive and self.nr >= self.max_requests: | ||
self.log.info("Autorestarting worker after current request.") | ||
resp.force_close() | ||
self.alive = False | ||
|
||
if not self.cfg.keepalive: | ||
resp.force_close() | ||
elif len(self._keep) >= self.max_keepalived: | ||
resp.force_close() | ||
|
||
respiter = self.wsgi(environ, resp.start_response) | ||
try: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea to always leave room for new clients? I don't know if it's important. We have a keep-alive timeout that can be used to mitigate this starvation possibility. This might just complicate the code unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I you don't leave room to new clients, in some case you won't be able to accept any more any new connection just discussing with some clients which can at the end create a DDOS. Imo we should do it. Also it appears we already had a way to handle that in the response by setting the must_close flag to true, so the change was actually really simple.
The keepalive timeout is still there so we make sure that we won't keep alive a connection too long.