Skip to content

Commit

Permalink
chore: print lines debug
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Jul 23, 2024
1 parent 93139c3 commit 0970493
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 42 deletions.
22 changes: 18 additions & 4 deletions jina/clients/base/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from jina.types.request import Request
from jina.types.request.data import DataRequest
from jina.types.request.status import StatusMessage

import timeit
if TYPE_CHECKING: # pragma: no cover
from opentelemetry import trace

Expand Down Expand Up @@ -122,8 +122,11 @@ async def __aenter__(self):
:return: start self
"""
return await self.start()

_start = timeit.default_timer()
res = await self.start()
_end = timeit.default_timer()
print(f'ASYNC ENTER {_end - _start}s')
return res
async def start(self):
"""Create ClientSession and enter context
Expand All @@ -139,7 +142,11 @@ async def start(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_type, exc_val, exc_tb)
_start = timeit.default_timer()
res = await self.close(exc_type, exc_val, exc_tb)
_end = timeit.default_timer()
print(f'ASYNC EXIT {_end - _start}s')
return res

async def close(self, *args, **kwargs):
"""Close ClientSession
Expand All @@ -160,25 +167,32 @@ async def send_message(self, request: 'Request'):
:param request: request as dict
:return: send post message
"""
_start = timeit.default_timer()
print(f'{_start} => HTTPClient send message lets start')
req_dict = request.to_dict()
req_dict['exec_endpoint'] = req_dict['header']['exec_endpoint']
if 'target_executor' in req_dict['header']:
req_dict['target_executor'] = req_dict['header']['target_executor']
for attempt in range(1, self.max_attempts + 1):
try:
_start_req = timeit.default_timer()
request_kwargs = {'url': self.url}
if not docarray_v2:
request_kwargs['json'] = req_dict
else:
from docarray.base_doc.io.json import orjson_dumps

request_kwargs['data'] = JinaJsonPayload(value=req_dict)
_end_req = timeit.default_timer()
print(f'{_end_req} => HTTPClient prepare request took {_end_req - _start_req}s')
response = await self.session.post(**request_kwargs).__aenter__()
try:
r_str = await response.json()
except aiohttp.ContentTypeError:
r_str = await response.text()
handle_response_status(response.status, r_str, self.url)
_end = timeit.default_timer()
print(f'{_end} => HTTPClient send_message total took {_end - _start}s')
return response
except (ValueError, ConnectionError, BadClient, aiohttp.ClientError) as err:
await retry.wait_or_raise_err(
Expand Down
95 changes: 64 additions & 31 deletions jina/clients/base/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from jina.serve.stream import RequestStreamer
from jina.types.request import Request
from jina.types.request.data import DataRequest
import timeit

if TYPE_CHECKING: # pragma: no cover
from jina.clients.base import CallbackFnType, InputType
Expand Down Expand Up @@ -96,20 +97,20 @@ async def _is_flow_ready(self, **kwargs) -> bool:
return False

async def _get_results(
self,
inputs: 'InputType',
on_done: 'CallbackFnType',
on_error: Optional['CallbackFnType'] = None,
on_always: Optional['CallbackFnType'] = None,
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
results_in_order: bool = False,
prefetch: Optional[int] = None,
timeout: Optional[int] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
self,
inputs: 'InputType',
on_done: 'CallbackFnType',
on_error: Optional['CallbackFnType'] = None,
on_always: Optional['CallbackFnType'] = None,
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
results_in_order: bool = False,
prefetch: Optional[int] = None,
timeout: Optional[int] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
):
"""
:param inputs: the callable
Expand All @@ -130,17 +131,24 @@ async def _get_results(
with ImportExtensions(required=True):
pass

_start_total = timeit.default_timer()
print(f'## {_start_total} => I AM in _get_results')

self.inputs = inputs
_aada = timeit.default_timer()
print(f'Setting inputs took {_aada - _start_total}s')
request_iterator = self._get_requests(**kwargs)
_aada2 = timeit.default_timer()
print(f'Getting req_it {_aada2 - _aada}s')
on = kwargs.get('on', '/post')
if len(self._endpoints) == 0:
await self._get_endpoints_from_openapi(**kwargs)

async with AsyncExitStack() as stack:
cm1 = ProgressBar(
total_length=self._inputs_length, disable=not self.show_progress
)
p_bar = stack.enter_context(cm1)
# cm1 = ProgressBar(
# total_length=self._inputs_length, disable=not self.show_progress
# )
# p_bar = stack.enter_context(cm1)
proto = 'https' if self.args.tls else 'http'
endpoint = on.strip('/')
has_default_endpoint = 'default' in self._endpoints
Expand All @@ -167,7 +175,7 @@ async def _get_results(
)

def _request_handler(
request: 'Request', **kwargs
request: 'Request', **kwargs
) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]':
"""
For HTTP Client, for each request in the iterator, we `send_message` using
Expand All @@ -176,27 +184,40 @@ def _request_handler(
:param kwargs: kwargs
:return: asyncio Task for sending message
"""
return asyncio.ensure_future(iolet.send_message(request=request)), None
res = asyncio.ensure_future(iolet.send_message(request=request)), None
return res

def _result_handler(result):
return result

streamer_args = vars(self.args)
if prefetch:
streamer_args['prefetch'] = prefetch

_start_streamer = timeit.default_timer()
print(f'## {_start_streamer} => I AM creating streamer')
streamer = RequestStreamer(
request_handler=_request_handler,
result_handler=_result_handler,
logger=self.logger,
**streamer_args,
)

_start = timeit.default_timer()
print(f'## Streamer created in {_start - _start_streamer}s')

async for response in streamer.stream(
request_iterator=request_iterator, results_in_order=results_in_order
request_iterator=request_iterator, results_in_order=results_in_order
):

r_status = response.status

r_str = await response.json()
_end = timeit.default_timer()
print(f'{_end} => GETTING RESPONSE from streamer took {_end - _start}s')
_start = timeit.default_timer()
handle_response_status(r_status, r_str, url)
_st = timeit.default_timer()

da = None
if 'data' in r_str and r_str['data'] is not None:
Expand All @@ -216,10 +237,15 @@ def _result_handler(result):
[return_type(**v) for v in r_str['data']]
)
del r_str['data']

_e = timeit.default_timer()
print(f'Create DocList took {_e - _st}s')
resp = DataRequest(r_str)
_e2 = timeit.default_timer()
print(f'Create DataRequest from r_str took {_e2 - _e}s')
if da is not None:
resp.data.docs = da
_e3 = timeit.default_timer()
print(f'Setting resp.data.docs took {_e3 - _e2}s')

callback_exec(
response=resp,
Expand All @@ -229,18 +255,25 @@ def _result_handler(result):
on_always=on_always,
continue_on_error=self.continue_on_error,
)
if self.show_progress:
p_bar.update()
_e4 = timeit.default_timer()
print(f'Calling callback took {_e4 - _e3}s')
# if self.show_progress:
# p_bar.update()
_end = timeit.default_timer()
print(f'{_end} => YIELD RESPONSE TOOK {_end - _start}s')
yield resp

_end_total = timeit.default_timer()
print(f'## {_end_total} => I AM in _get_results took {_end_total - _start_total}s')

async def _get_streaming_results(
self,
on: str,
inputs: 'Document',
parameters: Optional[Dict] = None,
return_type: Type[Document] = Document,
timeout: Optional[int] = None,
**kwargs,
self,
on: str,
inputs: 'Document',
parameters: Optional[Dict] = None,
return_type: Type[Document] = Document,
timeout: Optional[int] = None,
**kwargs,
):
proto = 'https' if self.args.tls else 'http'
endpoint = on.strip('/')
Expand Down
17 changes: 14 additions & 3 deletions jina/clients/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jina.excepts import InternalNetworkError
from jina.helper import deprecate_by, get_or_reuse_loop, run_async
from jina.importer import ImportExtensions

import timeit
if TYPE_CHECKING: # pragma: no cover
from pydantic import BaseModel
from jina.clients.base import CallbackFnType, InputType
Expand Down Expand Up @@ -387,8 +387,10 @@ def post(
.. warning::
``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``.
"""

print(f'##### I AM POSTING')
_post_start = timeit.default_timer()
c = self.client
_cl = timeit.default_timer()
c.show_progress = show_progress
c.continue_on_error = continue_on_error

Expand All @@ -397,6 +399,9 @@ def post(
return_results = (on_always is None) and (on_done is None)

async def _get_results(*args, **kwargs):
_start = timeit.default_timer()
print(f'{_start} ######## I AM GETTING RESULTS')

is_singleton = False
inferred_return_type = return_type
if docarray_v2:
Expand All @@ -415,13 +420,16 @@ async def _get_results(*args, **kwargs):
result.append(resp)
else:
result.extend(resp.data.docs)

_end = timeit.default_timer()
print(f'######## {_end} => I AM GETTING RESULTS took {_end - _start}s')
if return_results:
if not return_responses and is_singleton and len(result) == 1:
return result[0]
else:
return result

return self._with_retry(
res = self._with_retry(
func=_get_results,
inputs=inputs,
on_done=on_done,
Expand All @@ -441,6 +449,9 @@ async def _get_results(*args, **kwargs):
on=on,
**kwargs,
)
_post_end = timeit.default_timer()
print(f'##### I AM POSTING took {_post_end - _post_start}s')
return res

# ONLY CRUD, for other request please use `.post`
index = partialmethod(post, '/index')
Expand Down
4 changes: 3 additions & 1 deletion jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from rich.console import Console

from jina.constants import __windows__

import timeit
__all__ = [
'batch_iterator',
'parse_arg',
Expand Down Expand Up @@ -1316,6 +1316,8 @@ def run(self):
'something wrong when running the eventloop, result can not be retrieved'
)
else:

print(f'{timeit.default_timer()} ==> HEY HERE ASYNCIO RUN {func.__name__}')
return asyncio.run(func(*args, **kwargs))


Expand Down
19 changes: 18 additions & 1 deletion jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ def add_post_route(

app_kwargs['response_class'] = DocArrayResponse

from timeit import default_timer

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
_start = default_timer()
req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id
Expand All @@ -111,8 +114,19 @@ async def post(body: input_model, response: Response):
req.data.docs = DocList[input_doc_list_model]([data])
if body.header is None:
req.header.request_id = req.docs[0].id
_end = default_timer()

logger.info(
f'Creating Request took {_end - _start}s'
)
_start = default_timer()
resp = await caller(req)
_end = default_timer()

logger.info(
f'Respon to Request took {_end - _start}s'
)
_start = default_timer()
status = resp.header.status

if status.code == jina_pb2.StatusProto.ERROR:
Expand All @@ -124,7 +138,10 @@ async def post(body: input_model, response: Response):
docs_response = resp.docs

ret = output_model(data=docs_response, parameters=resp.parameters)

_end = default_timer()
logger.info(
f'Extra time {_end - _start}s'
)
return ret

def add_streaming_routes(
Expand Down
Loading

0 comments on commit 0970493

Please sign in to comment.