Skip to content

Commit

Permalink
fix: add batching for sending feedback requests
Browse files Browse the repository at this point in the history
  • Loading branch information
karootplx committed Oct 25, 2024
1 parent 8c569bb commit 88eebb3
Showing 1 changed file with 60 additions and 45 deletions.
105 changes: 60 additions & 45 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,58 +345,73 @@ async def _send_shuffled_requests(
dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest
) -> list[FeedbackRequest]:
"""Based on the initial synapse, send shuffled ordering of responses so that miners cannot guess ordering of ground truth"""
tasks = []
for axon in axons:
# shuffle synapse Responses
shuffled_completions = random.sample(
synapse.completion_responses,
k=len(synapse.completion_responses),
)
all_responses = []
batch_size = 10
batch_timeout = 3

for i in range(0, len(axons), batch_size):
batch_axons = axons[i : i + batch_size]
tasks = []

for axon in batch_axons:
# shuffle synapse Responses
shuffled_completions = random.sample(
synapse.completion_responses,
k=len(synapse.completion_responses),
)

# Apply obfuscation to each completion's files
# TODO re-nable obfuscation
# await Validator._obfuscate_completion_files(shuffled_completions)
# Apply obfuscation to each completion's files
# TODO re-nable obfuscation
# await Validator._obfuscate_completion_files(shuffled_completions)

criteria_types = []
# ensure criteria options same order as completion_responses
for criteria in synapse.criteria_types:
if not isinstance(criteria, MultiScoreCriteria):
logger.trace(f"Skipping non multi score criteria: {criteria}")
continue
options = [completion.model for completion in shuffled_completions]
criteria = MultiScoreCriteria(
options=options,
min=criteria.min,
max=criteria.max,
criteria_types = []
# ensure criteria options same order as completion_responses
for criteria in synapse.criteria_types:
if not isinstance(criteria, MultiScoreCriteria):
logger.trace(f"Skipping non multi score criteria: {criteria}")
continue
options = [completion.model for completion in shuffled_completions]
criteria = MultiScoreCriteria(
options=options,
min=criteria.min,
max=criteria.max,
)
criteria_types.append(criteria)

shuffled_synapse = FeedbackRequest(
epoch_timestamp=synapse.epoch_timestamp,
request_id=synapse.request_id,
prompt=synapse.prompt,
completion_responses=shuffled_completions,
task_type=synapse.task_type,
criteria_types=criteria_types,
expire_at=synapse.expire_at,
)
criteria_types.append(criteria)

shuffled_synapse = FeedbackRequest(
epoch_timestamp=synapse.epoch_timestamp,
request_id=synapse.request_id,
prompt=synapse.prompt,
completion_responses=shuffled_completions,
task_type=synapse.task_type,
criteria_types=criteria_types,
expire_at=synapse.expire_at,
)

tasks.append(
dendrite.forward(
axons=[axon],
synapse=shuffled_synapse,
deserialize=False,
timeout=12,
tasks.append(
dendrite.forward(
axons=[axon],
synapse=shuffled_synapse,
deserialize=False,
timeout=12,
)
)
)

# Gather results and flatten the list
nested_responses = await asyncio.gather(*tasks)
flat_responses = [
response for sublist in nested_responses for response in sublist
]
# Gather results for this batch and flatten the list
batch_responses = await asyncio.gather(*tasks)
flat_batch_responses = [
response for sublist in batch_responses for response in sublist
]
all_responses.extend(flat_batch_responses)

logger.info(
f"Processed batch {i//batch_size + 1} of {(len(axons)-1)//batch_size + 1}"
)
# Add timeout between batches, but not after the last batch
if i + batch_size < len(axons):
await asyncio.sleep(batch_timeout)

return flat_responses
return all_responses

@staticmethod
async def _obfuscate_completion_files(
Expand Down

0 comments on commit 88eebb3

Please sign in to comment.