Skip to content

Commit

Permalink
chore: update logs debug
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Oct 4, 2023
1 parent f00afaf commit f0b5820
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 60 deletions.
5 changes: 3 additions & 2 deletions jina/serve/runtimes/worker/batch_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ async def push(self, request: DataRequest) -> asyncio.Queue:
self._flush_task = asyncio.create_task(self._await_then_flush())
if not self._timer_task:
self._start_timer()

self._big_doc.extend(docs)
self._request_idxs.extend([len(self._requests)] * len(docs))
nex_req_idx = len(self._requests)
num_docs = len(docs)
self._request_idxs.extend([nex_req_idx] * num_docs)
self._request_lens.append(len(docs))
self._requests.append(request)
queue = asyncio.Queue()
Expand Down
116 changes: 58 additions & 58 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ class WorkerRequestHandler:
_KEY_RESULT = '__results__'

def __init__(
self,
args: 'argparse.Namespace',
logger: 'JinaLogger',
metrics_registry: Optional['CollectorRegistry'] = None,
tracer_provider: Optional['trace.TracerProvider'] = None,
meter_provider: Optional['metrics.MeterProvider'] = None,
meter=None,
tracer=None,
deployment_name: str = '',
**kwargs,
self,
args: 'argparse.Namespace',
logger: 'JinaLogger',
metrics_registry: Optional['CollectorRegistry'] = None,
tracer_provider: Optional['trace.TracerProvider'] = None,
meter_provider: Optional['metrics.MeterProvider'] = None,
meter=None,
tracer=None,
deployment_name: str = '',
**kwargs,
):
"""Initialize private parameters and execute private loading functions.
Expand All @@ -83,8 +83,8 @@ def __init__(
self._is_closed = False
if self.metrics_registry:
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Counter, Summary

Expand Down Expand Up @@ -227,9 +227,9 @@ async def _hot_reload(self):
watched_files.add(extra_python_file)

with ImportExtensions(
required=True,
logger=self.logger,
help_text='''hot reload requires watchfiles dependency to be installed. You can do `pip install
required=True,
logger=self.logger,
help_text='''hot reload requires watchfiles dependency to be installed. You can do `pip install
watchfiles''',
):
from watchfiles import awatch
Expand Down Expand Up @@ -296,14 +296,14 @@ def _init_batchqueue_dict(self):
}

def _init_monitoring(
self,
metrics_registry: Optional['CollectorRegistry'] = None,
meter: Optional['metrics.Meter'] = None,
self,
metrics_registry: Optional['CollectorRegistry'] = None,
meter: Optional['metrics.Meter'] = None,
):
if metrics_registry:
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Counter, Summary

Expand Down Expand Up @@ -359,10 +359,10 @@ def _init_monitoring(
self._sent_response_size_histogram = None

def _load_executor(
self,
metrics_registry: Optional['CollectorRegistry'] = None,
tracer_provider: Optional['trace.TracerProvider'] = None,
meter_provider: Optional['metrics.MeterProvider'] = None,
self,
metrics_registry: Optional['CollectorRegistry'] = None,
tracer_provider: Optional['trace.TracerProvider'] = None,
meter_provider: Optional['metrics.MeterProvider'] = None,
):
"""
Load the executor to this runtime, specified by ``uses`` CLI argument.
Expand Down Expand Up @@ -576,8 +576,8 @@ def _setup_req_doc_array_cls(self, requests, exec_endpoint, is_response=False):
req.document_array_cls = DocumentArray
else:
if (
not endpoint_info.is_generator
and not endpoint_info.is_singleton_doc
not endpoint_info.is_generator
and not endpoint_info.is_singleton_doc
):
req.document_array_cls = (
endpoint_info.request_schema
Expand All @@ -594,9 +594,9 @@ def _setup_req_doc_array_cls(self, requests, exec_endpoint, is_response=False):
pass

def _setup_requests(
self,
requests: List['DataRequest'],
exec_endpoint: str,
self,
requests: List['DataRequest'],
exec_endpoint: str,
):
"""Execute a request using the executor.
Expand All @@ -612,7 +612,7 @@ def _setup_requests(
return requests, params

async def handle_generator(
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
) -> Generator:
"""Prepares and executes a request for generator endpoints.
Expand Down Expand Up @@ -647,7 +647,7 @@ async def handle_generator(
)

async def handle(
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
) -> DataRequest:
"""Initialize private parameters and execute private loading functions.
Expand Down Expand Up @@ -721,7 +721,7 @@ async def handle(

@staticmethod
def replace_docs(
request: List['DataRequest'], docs: 'DocumentArray', ndarray_type: str = None
request: List['DataRequest'], docs: 'DocumentArray', ndarray_type: str = None
) -> None:
"""Replaces the docs in a message with new Documents.
Expand Down Expand Up @@ -769,7 +769,7 @@ async def close(self):

@staticmethod
def _get_docs_matrix_from_request(
requests: List['DataRequest'],
requests: List['DataRequest'],
) -> Tuple[Optional[List['DocumentArray']], Optional[Dict[str, 'DocumentArray']]]:
"""
Returns a docs matrix from a list of DataRequest objects.
Expand All @@ -793,7 +793,7 @@ def _get_docs_matrix_from_request(

@staticmethod
def get_parameters_dict_from_request(
requests: List['DataRequest'],
requests: List['DataRequest'],
) -> 'Dict':
"""
Returns a parameters dict from a list of DataRequest objects.
Expand All @@ -813,7 +813,7 @@ def get_parameters_dict_from_request(

@staticmethod
def get_docs_from_request(
requests: List['DataRequest'],
requests: List['DataRequest'],
) -> 'DocumentArray':
"""
Gets a field from the message
Expand Down Expand Up @@ -893,7 +893,7 @@ def reduce_requests(requests: List['DataRequest']) -> 'DataRequest':

# serving part
async def process_single_data(
self, request: DataRequest, context, is_generator: bool = False
self, request: DataRequest, context, is_generator: bool = False
) -> DataRequest:
"""
Process the received requests and return the result as a new request
Expand All @@ -907,7 +907,7 @@ async def process_single_data(
return await self.process_data([request], context, is_generator=is_generator)

async def stream_doc(
self, request: SingleDocumentRequest, context: 'grpc.aio.ServicerContext'
self, request: SingleDocumentRequest, context: 'grpc.aio.ServicerContext'
) -> SingleDocumentRequest:
"""
Process the received requests and return the result as a new request, used for streaming behavior, one doc IN, several out
Expand Down Expand Up @@ -1033,7 +1033,7 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto:
return endpoints_proto

def _extract_tracing_context(
self, metadata: 'grpc.aio.Metadata'
self, metadata: 'grpc.aio.Metadata'
) -> Optional['Context']:
if self.tracer:
from opentelemetry.propagate import extract
Expand All @@ -1043,13 +1043,8 @@ def _extract_tracing_context(

return None

def _log_data_request(self, request: DataRequest):
self.logger.debug(
f'recv DataRequest at {request.header.exec_endpoint} with id: {request.header.request_id}'
)

async def process_data(
self, requests: List[DataRequest], context, is_generator: bool = False
self, requests: List[DataRequest], context, is_generator: bool = False
) -> DataRequest:
"""
Process the received requests and return the result as a new request
Expand All @@ -1061,11 +1056,13 @@ async def process_data(
"""
self.logger.debug('recv a process_data request')
with MetricsTimer(
self._summary, self._receiving_request_seconds, self._metric_attributes
self._summary, self._receiving_request_seconds, self._metric_attributes
):
try:
if self.logger.debug_enabled:
self._log_data_request(requests[0])
self.logger.debug(
f'recv DataRequest at {requests[0].header.exec_endpoint} with id: {requests[0].header.request_id}'
)

if context is not None:
tracing_context = self._extract_tracing_context(
Expand All @@ -1089,7 +1086,10 @@ async def process_data(
self._successful_requests_counter.add(
1, attributes=self._metric_attributes
)
self.logger.debug('return request result')
if self.logger.debug_enabled:
self.logger.debug(
f'return DataRequest from {result.header.exec_endpoint} with id: {result.header.request_id}'
)
return result
except (RuntimeError, Exception) as ex:
self.logger.error(
Expand All @@ -1111,8 +1111,8 @@ async def process_data(
)

if (
self.args.exit_on_exceptions
and type(ex).__name__ in self.args.exit_on_exceptions
self.args.exit_on_exceptions
and type(ex).__name__ in self.args.exit_on_exceptions
):
self.logger.info('Exiting because of "--exit-on-exceptions".')
raise RuntimeTerminated
Expand All @@ -1136,7 +1136,7 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto:
return info_proto

async def stream(
self, request_iterator, context=None, *args, **kwargs
self, request_iterator, context=None, *args, **kwargs
) -> AsyncIterator['Request']:
"""
stream requests from client iterator and stream responses back.
Expand All @@ -1154,8 +1154,8 @@ async def stream(
Call = stream

def _create_snapshot_status(
self,
snapshot_directory: str,
self,
snapshot_directory: str,
) -> 'jina_pb2.SnapshotStatusProto':
_id = str(uuid.uuid4())
self.logger.debug(f'Generated snapshot id: {_id}')
Expand All @@ -1168,7 +1168,7 @@ def _create_snapshot_status(
)

def _create_restore_status(
self,
self,
) -> 'jina_pb2.SnapshotStatusProto':
_id = str(uuid.uuid4())
self.logger.debug(f'Generated restore id: {_id}')
Expand All @@ -1187,9 +1187,9 @@ async def snapshot(self, request, context) -> 'jina_pb2.SnapshotStatusProto':
"""
self.logger.debug('Calling snapshot')

Check warning on line 1188 in jina/serve/runtimes/worker/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/request_handling.py#L1188

Added line #L1188 was not covered by tests
if (
self._snapshot
and self._snapshot_thread
and self._snapshot_thread.is_alive()
self._snapshot
and self._snapshot_thread
and self._snapshot_thread.is_alive()
):
raise RuntimeError(
f'A snapshot with id {self._snapshot.id.value} is currently in progress. Cannot start another.'
Expand All @@ -1207,7 +1207,7 @@ async def snapshot(self, request, context) -> 'jina_pb2.SnapshotStatusProto':
return self._snapshot

async def snapshot_status(
self, request: 'jina_pb2.SnapshotId', context
self, request: 'jina_pb2.SnapshotId', context
) -> 'jina_pb2.SnapshotStatusProto':
"""
method to start a snapshot process of the Executor
Expand Down Expand Up @@ -1268,7 +1268,7 @@ async def restore(self, request: 'jina_pb2.RestoreSnapshotCommand', context):
return self._restore

async def restore_status(
self, request, context
self, request, context
) -> 'jina_pb2.RestoreSnapshotStatusProto':
"""
method to start a snapshot process of the Executor
Expand Down

0 comments on commit f0b5820

Please sign in to comment.