Skip to content

Commit

Permalink
perf: ensure miner scores updated per func call, not per task (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
jarvis8x7b authored Oct 30, 2024
1 parent 6a46208 commit 5339f09
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,8 @@ async def _ensure_subtensor_ws_connected(
async def update_score_and_send_feedback(self):
while True:
await asyncio.sleep(dojo.VALIDATOR_UPDATE_SCORE)
# for each hotkey, a list of scores from all tasks being scored
hotkey_to_all_scores = defaultdict(list)
try:
validator_hotkeys: List[str] = self._get_validator_hotkeys()

Expand Down Expand Up @@ -967,21 +969,32 @@ async def update_score_and_send_feedback(self):
continue

for task in task_batch:
processed_id = await self._score_task(task)
processed_id, hotkey_to_score = await self._score_task(task)
if processed_id:
processed_request_ids.append(processed_id)
for hotkey, score in hotkey_to_score.items():
hotkey_to_all_scores[hotkey].append(score)

if processed_request_ids:
await ORM.mark_tasks_processed_by_request_ids(processed_request_ids)

logger.success(
f"📝 All tasks processed, total tasks: {len(processed_request_ids)}"
)
gc.collect()

# average scores across all tasks being scored by this trigger to update_scores
# so miners moving average decay is lower and we incentivise quality > quantity
hotkey_to_score = {
hotkey: sum(scores) / len(scores)
for hotkey, scores in hotkey_to_all_scores.items()
}
await self.update_scores(hotkey_to_scores=hotkey_to_score)

except Exception:
traceback.print_exc()
pass
finally:
gc.collect()

async def update_task_completions(
self, validator_hotkeys: List[str], expire_from: datetime, expire_to: datetime
Expand Down Expand Up @@ -1198,7 +1211,7 @@ async def _update_miner_completions_batch(
logger.warning(f"Error during attempt {attempt+1}, retrying: {e}")
await asyncio.sleep(2**attempt)

async def _score_task(self, task: DendriteQueryResponse) -> str:
async def _score_task(self, task: DendriteQueryResponse) -> tuple[str, dict]:
"""Process a task and calculate the scores for the miner responses"""
if not task.miner_responses:
logger.warning("📝 No miner responses, skipping task")
Expand Down Expand Up @@ -1227,7 +1240,6 @@ async def _score_task(self, task: DendriteQueryResponse) -> str:
logger.info("📝 Did not manage to generate a dict of hotkey to score")
return task.request.request_id

await self.update_scores(hotkey_to_scores=hotkey_to_score)
await self.send_scores(
synapse=ScoringResult(
request_id=task.request.request_id,
Expand All @@ -1240,7 +1252,7 @@ async def _score_task(self, task: DendriteQueryResponse) -> str:
self._log_wandb(task, criteria_to_miner_score, hotkey_to_score)
)

return task.request.request_id
return task.request.request_id, hotkey_to_score

async def _log_wandb(
self,
Expand Down

0 comments on commit 5339f09

Please sign in to comment.