Skip to content

Commit

Permalink
CPU relax strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Jun 23, 2023
1 parent 99e716e commit a63b75e
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "socketify"
version = "0.0.20"
version = "0.0.21"
authors = [
{ name="Ciro Spaciari", email="ciro.spaciari@gmail.com" },
]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

setuptools.setup(
name="socketify",
version="0.0.20",
version="0.0.21",
platforms=["any"],
author="Ciro Spaciari",
author_email="ciro.spaciari@gmail.com",
Expand Down
16 changes: 14 additions & 2 deletions src/socketify/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
def asgi_on_abort_handler(res, user_data):
ctx = ffi.from_handle(user_data)
ctx.aborted = True
ctx.loop.is_idle = False

if ctx.abort_future is not None:
ctx.abort_future.set_result(True)
ctx.abort_future = None
Expand Down Expand Up @@ -59,6 +61,7 @@ def ws_open(ws, user_data):
)
def ws_upgrade(ssl, response, info, socket_context, user_data):
app = ffi.from_handle(user_data)
app.server.loop.is_idle = False
headers = []
next_header = info.header_list
while next_header != ffi.NULL:
Expand Down Expand Up @@ -117,6 +120,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data):
async def send(options):
if ws.aborted:
return False
ws.loop.is_idle = False
type = options["type"]
if type == "websocket.send":
data = options.get("bytes", None)
Expand Down Expand Up @@ -244,6 +248,7 @@ def unregister():
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def asgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
data_response = ffi.from_handle(user_data)
data_response.loop.is_idle = False
data_response.is_end = bool(is_end)
more_body = not data_response.is_end
result = {
Expand Down Expand Up @@ -438,7 +443,8 @@ def uws_asgi_corked_403_handler(res, user_data):
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data, void*)")
def asgi(ssl, response, info, user_data):
app = ffi.from_handle(user_data)

app.server.loop.is_idle = False

headers = []
next_header = info.header_list
while next_header != ffi.NULL:
Expand Down Expand Up @@ -481,6 +487,8 @@ def asgi(ssl, response, info, user_data):
async def receive():
if ctx.aborted:
return {"type": "http.disconnect"}

ctx.loop.is_idle = False
data_queue = ctx.data_queue
if data_queue:
if data_queue.queue.empty():
Expand All @@ -506,6 +514,8 @@ async def receive():
async def send(options):
if ctx.aborted:
return False

ctx.loop.is_idle = False
type = options["type"]
ssl = ctx.ssl
response = ctx.response
Expand Down Expand Up @@ -684,6 +694,7 @@ def listen(self, port_or_options, handler=None):

async def send(options):
nonlocal asgi_app
asgi_app.server.loop.is_idle = False
type = options["type"]
asgi_app.status_message = options.get("message", "")
if type == "lifespan.startup.complete":
Expand All @@ -701,6 +712,7 @@ async def send(options):

async def receive():
nonlocal asgi_app
asgi_app.server.loop.is_idle = False
while not asgi_app.is_stopped:
if asgi_app.is_starting:
asgi_app.is_starting = False
Expand All @@ -723,7 +735,7 @@ async def task_wrapper(task):
asgi_app.server.listen(port_or_options, handler)
finally:
return None

self.server.loop.is_idle = False
# start lifespan
self.server.loop.ensure_future(task_wrapper(self.app(scope, receive, send)))
self.server.run()
Expand Down
26 changes: 20 additions & 6 deletions src/socketify/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, exception_handler=None, task_factory_max_items=0):

# get the current running loop or create a new one without warnings
self.loop = asyncio._get_running_loop()
self._idle_count = 0
self.is_idle = False
if self.loop is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
Expand Down Expand Up @@ -73,15 +75,27 @@ def set_timeout(self, timeout, callback, user_data):

def create_future(self):
return self.loop.create_future()


def _keep_alive(self):
if self.started:
self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)


relax = False
if not self.is_idle:
self._idle_count = 0
elif self._idle_count < 10000:
self._idle_count += 1
else:
relax = True

self.is_idle = True

if relax:
self.uv_loop.run_nowait()
self.loop.call_later(0.001, self._keep_alive)
else:
self.uv_loop.run_nowait()
# be more agressive when needed
self.loop.call_soon(self._keep_alive)

def create_task(self, *args, **kwargs):
# this is not using optimized create_task yet
return self.loop.create_task(*args, **kwargs)
Expand Down
Loading

0 comments on commit a63b75e

Please sign in to comment.