Skip to content

Commit

Permalink
docs: fix docstring
Browse files Browse the repository at this point in the history
Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>
  • Loading branch information
JoanFM committed Oct 4, 2023
1 parent 0ae67a1 commit 3bf6e6e
Showing 1 changed file with 93 additions and 44 deletions.
137 changes: 93 additions & 44 deletions jina/serve/runtimes/worker/batch_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class BatchQueue:
"""A batch queue that holds the data request and the callable to batch requests to."""

def __init__(
self,
func: Callable,
request_docarray_cls,
response_docarray_cls,
output_array_type: Optional[str] = None,
params: Optional[Dict] = None,
preferred_batch_size: int = 4,
timeout: int = 10_000,
self,
func: Callable,
request_docarray_cls,
response_docarray_cls,
output_array_type: Optional[str] = None,
params: Optional[Dict] = None,
preferred_batch_size: int = 4,
timeout: int = 10_000,
) -> None:
self._data_lock = asyncio.Lock()
self.func = func
Expand Down Expand Up @@ -63,9 +63,9 @@ def _reset(self) -> None:

def _cancel_timer_if_pending(self):
if (
self._timer_task
and not self._timer_task.done()
and not self._timer_task.cancelled()
self._timer_task
and not self._timer_task.done()
and not self._timer_task.cancelled()
):
self._timer_task.cancel()

Expand Down Expand Up @@ -118,14 +118,20 @@ async def push(self, request: DataRequest) -> asyncio.Queue:
async def _await_then_flush(self) -> None:
"""Process all requests in the queue once flush_trigger event is set."""

def _get_docs_groups_completed_request_indexes(non_assigned_docs,
non_assigned_docs_reqs_idx,
sum_from_previous_mini_batch_in_first_req_idx):
def _get_docs_groups_completed_request_indexes(
non_assigned_docs,
non_assigned_docs_reqs_idx,
sum_from_previous_mini_batch_in_first_req_idx,
):
"""
This method groups all the `non_assigned_docs` into groups of docs according to the `req_idx` they belong to.
They are only distributed when we are sure that the request is full.
The method returns the a list of document groups and a list of request Idx to which each of these groups belong
:param non_assigned_docs: The documents that have already been processed but have not been assigned to a request result
:param non_assigned_docs_reqs_idx: The request IDX that are not yet completed (not all of its docs have been processed)
:param sum_from_previous_mini_batch_in_first_req_idx: The number of docs from previous iteration that belong to the first non_assigned_req_idx. This is useful
to make sure we know when a request is completed.
:return: list of document groups and a list of request Idx to which each of these groups belong
"""
distributed_requests = []
completed_req_idx = []
Expand All @@ -134,37 +140,58 @@ def _get_docs_groups_completed_request_indexes(non_assigned_docs,
min_involved_req_idx = non_assigned_docs_reqs_idx[0]
req_idx = min_involved_req_idx
for req_idx in non_assigned_docs_reqs_idx:
sum_from_previous_mini_batch_in_first_req_idx -= 1 # the previous leftovers are being allocated here
sum_from_previous_mini_batch_in_first_req_idx -= (
1 # the previous leftovers are being allocated here
)
if req_idx > min_involved_req_idx:
request_bucket = non_assigned_docs[num_distributed_docs: num_distributed_docs + num_docs_in_req_idx]
request_bucket = non_assigned_docs[
num_distributed_docs : num_distributed_docs
+ num_docs_in_req_idx
]
num_distributed_docs += num_docs_in_req_idx
completed_req_idx.append(min_involved_req_idx)
min_involved_req_idx = req_idx
num_docs_in_req_idx = 0
distributed_requests.append(request_bucket)
num_docs_in_req_idx += 1

if req_idx not in completed_req_idx and num_docs_in_req_idx + sum_from_previous_mini_batch_in_first_req_idx == \
self._request_lens[req_idx]:
if (
req_idx not in completed_req_idx
and num_docs_in_req_idx + sum_from_previous_mini_batch_in_first_req_idx
== self._request_lens[req_idx]
):
completed_req_idx.append(req_idx)
request_bucket = non_assigned_docs[num_distributed_docs: num_distributed_docs + num_docs_in_req_idx]
request_bucket = non_assigned_docs[
num_distributed_docs : num_distributed_docs + num_docs_in_req_idx
]
distributed_requests.append(request_bucket)

return distributed_requests, completed_req_idx

async def _assign_results(non_assigned_to_response_docs,
non_assigned_to_response_request_idxs,
sum_from_previous_mini_batch_in_first_req_idx):
async def _assign_results(
non_assigned_docs,
non_assigned_docs_reqs_idx,
sum_from_previous_mini_batch_in_first_req_idx,
):
"""
This method aims to assign to the corresponding request objects the resulting documents from the mini batches.
They are assigned when we are sure that the Request is fully processed.
It also communicates to the corresponding queue that the request is full so that it can be returned
It returns the amount of assigned documents so that some documents can come back in the next iteration
:param non_assigned_docs: The documents that have already been processed but have not been assigned to a request result
:param non_assigned_docs_reqs_idx: The request IDX that are not yet completed (not all of its docs have been processed)
:param sum_from_previous_mini_batch_in_first_req_idx: The number of docs from previous iteration that belong to the first non_assigned_req_idx. This is useful
to make sure we know when a request is completed.
:return: amount of assigned documents so that some documents can come back in the next iteration
"""
docs_grouped, completed_req_idxs = _get_docs_groups_completed_request_indexes(non_assigned_to_response_docs,
non_assigned_to_response_request_idxs,
sum_from_previous_mini_batch_in_first_req_idx)
(
docs_grouped,
completed_req_idxs,
) = _get_docs_groups_completed_request_indexes(
non_assigned_docs,
non_assigned_docs_reqs_idx,
sum_from_previous_mini_batch_in_first_req_idx,
)
num_assigned_docs = sum(len(group) for group in docs_grouped)

for docs_group, request_idx in zip(docs_grouped, completed_req_idxs):
Expand All @@ -180,7 +207,9 @@ async def _assign_results(non_assigned_to_response_docs,
def batch(iterable_1, iterable_2, n=1):
items = len(iterable_1)
for ndx in range(0, items, n):
yield iterable_1[ndx: min(ndx + n, items)], iterable_2[ndx: min(ndx + n, items)]
yield iterable_1[ndx : min(ndx + n, items)], iterable_2[
ndx : min(ndx + n, items)
]

await self._flush_trigger.wait()
# writes to shared data between tasks need to be mutually exclusive
Expand All @@ -196,7 +225,9 @@ def batch(iterable_1, iterable_2, n=1):
non_assigned_to_response_docs = self._response_docarray_cls()
non_assigned_to_response_request_idxs = []
sum_from_previous_first_req_idx = 0
for docs_inner_batch, req_idxs in batch(self._big_doc, self._request_idxs, self._preferred_batch_size):
for docs_inner_batch, req_idxs in batch(
self._big_doc, self._request_idxs, self._preferred_batch_size
):
involved_requests_min_indx = req_idxs[0]
involved_requests_max_indx = req_idxs[-1]
input_len_before_call: int = len(docs_inner_batch)
Expand All @@ -210,7 +241,9 @@ def batch(iterable_1, iterable_2, n=1):
)
# Output validation
if (docarray_v2 and isinstance(batch_res_docs, DocList)) or (
not docarray_v2 and isinstance(batch_res_docs, DocumentArray)):
not docarray_v2
and isinstance(batch_res_docs, DocumentArray)
):
if not len(batch_res_docs) == input_len_before_call:
raise ValueError(
f'Dynamic Batching requires input size to equal output size. Expected output size {input_len_before_call}, but got {len(batch_res_docs)}'
Expand All @@ -221,36 +254,52 @@ def batch(iterable_1, iterable_2, n=1):
f'Dynamic Batching requires input size to equal output size. Expected output size {input_len_before_call}, but got {len(docs_inner_batch)}'
)
else:
array_name = 'DocumentArray' if not docarray_v2 else 'DocList'
array_name = (
'DocumentArray' if not docarray_v2 else 'DocList'
)
raise TypeError(
f'The return type must be {array_name} / `None` when using dynamic batching, '
f'but getting {batch_res_docs!r}'
)
except Exception as exc:
# All the requests containing docs in this Exception should be raising it
for request_full in self._requests_completed[
involved_requests_min_indx:involved_requests_max_indx + 1]:
involved_requests_min_indx : involved_requests_max_indx + 1
]:
await request_full.put(exc)
pass

# If there has been an exception, this will be docs_inner_batch
output_executor_docs = batch_res_docs if batch_res_docs is not None else docs_inner_batch
output_executor_docs = (
batch_res_docs
if batch_res_docs is not None
else docs_inner_batch
)

# We need to attribute the docs to their requests
non_assigned_to_response_docs.extend(output_executor_docs)
non_assigned_to_response_request_idxs.extend(req_idxs)
num_assigned_docs = await _assign_results(non_assigned_to_response_docs,
non_assigned_to_response_request_idxs,
sum_from_previous_first_req_idx)

sum_from_previous_first_req_idx = len(non_assigned_to_response_docs) - num_assigned_docs
num_assigned_docs = await _assign_results(
non_assigned_to_response_docs,
non_assigned_to_response_request_idxs,
sum_from_previous_first_req_idx,
)

sum_from_previous_first_req_idx = (
len(non_assigned_to_response_docs) - num_assigned_docs
)
non_assigned_to_response_docs = non_assigned_to_response_docs[
num_assigned_docs:]
non_assigned_to_response_request_idxs = non_assigned_to_response_request_idxs[
num_assigned_docs:]
num_assigned_docs:
]
non_assigned_to_response_request_idxs = (
non_assigned_to_response_request_idxs[num_assigned_docs:]
)
if len(non_assigned_to_response_request_idxs) > 0:
_ = await _assign_results(non_assigned_to_response_docs, non_assigned_to_response_request_idxs,
sum_from_previous_first_req_idx)
_ = await _assign_results(
non_assigned_to_response_docs,
non_assigned_to_response_request_idxs,
sum_from_previous_first_req_idx,
)
finally:
self._reset()

Expand Down

0 comments on commit 3bf6e6e

Please sign in to comment.