Skip to content

Commit

Permalink
perf: only consume data when there are axons to query
Browse files Browse the repository at this point in the history
  • Loading branch information
jarvis8x7b committed Jul 10, 2024
1 parent 8207daf commit 617740b
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import bittensor as bt
import numpy as np
import torch
import wandb
from fastapi.encoders import jsonable_encoder
from loguru import logger
from torch.nn import functional as F

import template
import wandb
from commons.data_manager import DataManager, ValidatorStateKeys
from commons.dataset.synthetic import SyntheticAPI
from commons.human_feedback.dojo import DojoAPI
Expand All @@ -31,7 +31,6 @@
RankingCriteria,
ScoringMethod,
ScoringResult,
SyntheticQA,
TaskType,
)
from template.utils.config import get_config
Expand Down Expand Up @@ -440,13 +439,29 @@ async def send_heartbeats(self):
async def send_request(
self,
synapse: FeedbackRequest = None,
data: SyntheticQA = None,
):
start = get_epoch_time()
# typically the request may come from an external source however,
# initially will seed it with some data for miners to get started

# ensure we consider only active miners
request_id = get_new_uuid()
async with self._lock:
sel_miner_uids = MinerUidSelector(
nodes=list(self._active_miner_uids),
).get_target_uids(key=request_id, k=get_config().neuron.sample_size)
axons = [
self.metagraph.axons[uid]
for uid in sel_miner_uids
if self.metagraph.axons[uid].hotkey.casefold()
!= self.wallet.hotkey.ss58_address.casefold()
]
if not len(axons):
logger.warning("No axons to query ... skipping")
return

if synapse is None:
data = await SyntheticAPI.get_qa()
if not data:
logger.error("Failed to generate data from synthetic gen API")
return
Expand All @@ -458,6 +473,7 @@ async def send_request(
completion.model = new_uuid

synapse = FeedbackRequest(
request_id=request_id,
task_type=str(TaskType.CODE_GENERATION),
criteria_types=[
MultiScoreCriteria(
Expand All @@ -470,25 +486,10 @@ async def send_request(
responses=data.responses,
)

# ensure we consider only active miners
async with self._lock:
sel_miner_uids = MinerUidSelector(
nodes=list(self._active_miner_uids),
).get_target_uids(key=synapse.request_id, k=get_config().neuron.sample_size)
logger.info(
f"Sending synapse off to miners, request id: {synapse.request_id}, miner uids: {sel_miner_uids}"
)
axons = [
self.metagraph.axons[uid]
for uid in sel_miner_uids
if self.metagraph.axons[uid].hotkey.casefold()
!= self.wallet.hotkey.ss58_address.casefold()
]
if not len(axons):
logger.warning("No axons to query ... skipping")
return

miner_responses: List[FeedbackRequest] = []
miner_responses: List[FeedbackRequest] = await self.dendrite.forward(
axons=axons, synapse=synapse, deserialize=False, timeout=24
)
Expand Down Expand Up @@ -561,8 +562,7 @@ async def run(self):
try:
while True:
try:
synthetic_data = await SyntheticAPI.get_qa()
await self.send_request(data=synthetic_data)
await self.send_request()

# # Check if we should exit.
if self._should_exit:
Expand Down

0 comments on commit 617740b

Please sign in to comment.