Skip to content

Commit

Permalink
fix: minersim task scoring, validatorsim subtensor retry mechanism an…
Browse files Browse the repository at this point in the history
…d dendrite forward timeout
  • Loading branch information
tedbee authored and karootplx committed Dec 9, 2024
1 parent 1b4ef9e commit 956f5cf
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 89 deletions.
67 changes: 35 additions & 32 deletions simulator/miner.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import asyncio
import json
import os
import random
import redis
import traceback
import asyncio
import random
import json
from datetime import datetime, timezone

import redis
from bittensor.btlogging import logging as logger

from commons.utils import get_new_uuid
from dojo.protocol import FeedbackRequest, Result, TaskResult, TaskResultRequest
from dojo.utils.config import get_config
from bittensor.btlogging import logging as logger
from neurons.miner import Miner
from dojo.protocol import (
FeedbackRequest,
TaskResultRequest,
TaskResult,
Result
)
from commons.utils import get_new_uuid


class MinerSim(Miner):
Expand All @@ -22,7 +25,10 @@ def __init__(self):
host = os.getenv("REDIS_HOST", "localhost")
port = int(os.getenv("REDIS_PORT", 6379))
self.redis_client = redis.Redis(
host=host, port=port, db=0, decode_responses=True
host=host,
port=port,
db=0,
decode_responses=True
)
logger.info("Redis connection established")

Expand All @@ -39,14 +45,12 @@ def __init__(self):
def _configure_simulation(self):
"""Configure simulation parameters with environment variables or defaults."""
self.response_behaviors = {
"normal": float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)),
"no_response": float(os.getenv("SIM_NO_RESP_PROB", 0.1)),
"timeout": float(os.getenv("SIM_TIMEOUT_PROB", 0.1)),
'normal': float(os.getenv("SIM_NORMAL_RESP_PROB", 0.8)),
'no_response': float(os.getenv("SIM_NO_RESP_PROB", 0.1)),
'timeout': float(os.getenv("SIM_TIMEOUT_PROB", 0.1))
}

async def forward_feedback_request(
self, synapse: FeedbackRequest
) -> FeedbackRequest:
async def forward_feedback_request(self, synapse: FeedbackRequest) -> FeedbackRequest:
try:
# Validate that synapse, dendrite, dendrite.hotkey, and response are not None
if not synapse or not synapse.dendrite or not synapse.dendrite.hotkey:
Expand All @@ -68,7 +72,7 @@ async def forward_feedback_request(
self.redis_client.set(
redis_key,
new_synapse.model_dump_json(),
ex=86400, # expire after 24 hours
ex=86400 # expire after 24 hours
)
logger.info(f"Stored feedback request {synapse.request_id}")

Expand All @@ -80,23 +84,21 @@ async def forward_feedback_request(
traceback.print_exc()
return synapse

async def forward_task_result_request(
self, synapse: TaskResultRequest
) -> TaskResultRequest | None:
async def forward_task_result_request(self, synapse: TaskResultRequest) -> TaskResultRequest | None:
try:
logger.info(f"Received TaskResultRequest for task id: {synapse.task_id}")
if not synapse or not synapse.task_id:
logger.error("Invalid TaskResultRequest: missing task_id")
return None

# Simulate different response behaviors
behavior = self._get_response_behavior()
# behavior = self._get_response_behavior()

if behavior in ["no_response", "timeout"]:
logger.debug(f"Simulating {behavior} for task {synapse.task_id}")
if behavior == "timeout":
await asyncio.sleep(30)
return None
# if behavior in ['no_response', 'timeout']:
# logger.debug(f"Simulating {behavior} for task {synapse.task_id}")
# if behavior == 'timeout':
# await asyncio.sleep(30)
# return None

redis_key = f"feedback:{synapse.task_id}"
request_data = self.redis_client.get(redis_key)
Expand All @@ -114,17 +116,17 @@ async def forward_task_result_request(
for criteria_type in feedback_request.criteria_types:
result = Result(
type=criteria_type.type,
value=self._generate_scores(feedback_request.ground_truth),
value=self._generate_scores(feedback_request.ground_truth)
)

task_result = TaskResult(
id=get_new_uuid(),
status="COMPLETED",
status='COMPLETED',
created_at=current_time,
updated_at=current_time,
result_data=[result],
worker_id=get_new_uuid(),
task_id=synapse.task_id,
task_id=synapse.task_id
)
task_results.append(task_result)

Expand All @@ -145,21 +147,22 @@ def _get_response_behavior(self) -> str:
"""Determine the response behavior based on configured probabilities."""
return random.choices(
list(self.response_behaviors.keys()),
weights=list(self.response_behaviors.values()),
weights=list(self.response_behaviors.values())
)[0]

def _generate_scores(self, ground_truth: dict) -> dict:
scores = {}
max_rank = max(ground_truth.values())

for k, v in ground_truth.items():
base_weight = int(10 - (v * (10 / max_rank)))
if self.is_bad_miner:
deviation = random.randint(-5, 5)
else:
deviation = random.randint(-2, 2)
random_score = max(1, min(10, v + deviation))
random_score = max(0, min(9, base_weight + deviation))
score = int((random_score / (10 - 1)) * (100 - 1) + 1)
scores[k] = score

return scores

# def __del__(self):
Expand Down
117 changes: 60 additions & 57 deletions simulator/validator.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,85 @@
import asyncio
import traceback
from typing import List

import aiohttp
import bittensor as bt
from bittensor.btlogging import logging as logger
from tenacity import RetryError

import dojo
from commons.dataset.synthetic import SyntheticAPI
from commons.orm import ORM
from commons.utils import get_epoch_time, get_new_uuid, set_expire_time, ttl_get_block
from dojo.protocol import (
DendriteQueryResponse,
FeedbackRequest,
MultiScoreCriteria,
TaskType,
)
from dojo.protocol import FeedbackRequest, TaskType, MultiScoreCriteria, DendriteQueryResponse
from neurons.validator import Validator
from bittensor.btlogging import logging as logger
from tenacity import RetryError
import bittensor as bt
import asyncio


class ValidatorSim(Validator):
def __init__(self):
super().__init__()
logger.info("Starting Validator Simulator")
self._last_block = None
self._block_check_attempts = 0
self.MAX_BLOCK_CHECK_ATTEMPTS = 3
self._connection_lock = asyncio.Lock()

super().__init__()
logger.info("Starting Validator Simulator")

async def _try_reconnect_subtensor(self):
"""Attempt to reconnect to the subtensor network"""
self._block_check_attempts += 1
if self._block_check_attempts >= self.MAX_BLOCK_CHECK_ATTEMPTS:
logger.error(f"Failed to reconnect after {self.MAX_BLOCK_CHECK_ATTEMPTS} attempts")
return False

try:
logger.info("Attempting to reconnect to subtensor...")
logger.info(
f"Attempting to reconnect to subtensor (attempt {self._block_check_attempts}/{self.MAX_BLOCK_CHECK_ATTEMPTS})...")
if hasattr(self.subtensor.substrate, 'websocket'):
self.subtensor.substrate.websocket.close()

self.subtensor = bt.subtensor(self.subtensor.config)
self._block_check_attempts = 0
await asyncio.sleep(1)
return True
except Exception as e:
logger.error(f"Failed to reconnect to subtensor: {e}")
return False
return await self._try_reconnect_subtensor()

async def _ensure_subtensor_connection(self):
async with self._connection_lock:
try:
self.subtensor.get_current_block()
self._block_check_attempts = 0
return True
except (BrokenPipeError, ConnectionError):
logger.warning("Connection lost, attempting immediate reconnection")
return await self._try_reconnect_subtensor()
except Exception as e:
logger.error(f"Unexpected error checking connection: {e}")
return False

@property
def block(self):
try:
if not asyncio.get_event_loop().run_until_complete(self._ensure_subtensor_connection()):
logger.warning("Subtensor connection failed - returning last known block")
return self._last_block if self._last_block is not None else 0

self._last_block = ttl_get_block(self.subtensor)
self._block_check_attempts = 0
return self._last_block
except BrokenPipeError:
self._block_check_attempts += 1
if self._block_check_attempts >= self.MAX_BLOCK_CHECK_ATTEMPTS:
logger.error(
"Multiple failed attempts to get block number, attempting reconnection"
)
if asyncio.get_event_loop().run_until_complete(
self._try_reconnect_subtensor()
):
return self.block

return self._last_block if self._last_block is not None else 0
except Exception as e:
logger.error(f"Error getting block number: {e}")
return self._last_block if self._last_block is not None else 0

def check_registered(self):
new_subtensor = bt.subtensor(self.subtensor.config)
if not new_subtensor.is_hotkey_registered(
netuid=self.config.netuid,
hotkey_ss58=self.wallet.hotkey.ss58_address,
):
logger.error(
f"Wallet: {self.wallet} is not registered on netuid {self.config.netuid}."
f" Please register the hotkey using `btcli s register` before trying again"
)
exit()
async def sync(self):
has_connection = await self._ensure_subtensor_connection()
if not has_connection:
logger.warning("Subtensor connection failed - continuing with partial sync")

await super().sync()

async def send_request(
self,
synapse: FeedbackRequest | None = None,
external_user: bool = False,
self,
synapse: FeedbackRequest | None = None,
external_user: bool = False,
):
start = get_epoch_time()
# typically the request may come from an external source however,
Expand All @@ -93,7 +96,7 @@ async def send_request(
self.metagraph.axons[uid]
for uid in sel_miner_uids
if self.metagraph.axons[uid].hotkey.casefold()
!= self.wallet.hotkey.ss58_address.casefold()
!= self.wallet.hotkey.ss58_address.casefold()
]
if not len(axons):
logger.warning("🤷 No axons to query ... skipping")
Expand Down Expand Up @@ -139,7 +142,7 @@ async def send_request(
prompt=data.prompt,
completion_responses=data.responses,
expire_at=expire_at,
ground_truth=data.ground_truth, # Added ground truth!!!!!
ground_truth=data.ground_truth # Added ground truth!!!!!
)
elif external_user:
obfuscated_model_to_model = self.obfuscate_model_names(
Expand Down Expand Up @@ -221,24 +224,24 @@ async def send_request(

@staticmethod
async def _send_shuffled_requests(
dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest
dendrite: bt.dendrite, axons: List[bt.AxonInfo], synapse: FeedbackRequest
) -> list[FeedbackRequest]:
"""Send the same request to all miners without shuffling the order.
WARNING: This should only be used for testing/debugging as it could allow miners to game the system.
WARNING: This should only be used for testing/debugging as it could allow miners to game the system.
Args:
dendrite (bt.dendrite): Communication channel to send requests
axons (List[bt.AxonInfo]): List of miner endpoints
synapse (FeedbackRequest): The feedback request to send
Args:
dendrite (bt.dendrite): Communication channel to send requests
axons (List[bt.AxonInfo]): List of miner endpoints
synapse (FeedbackRequest): The feedback request to send
Returns:
list[FeedbackRequest]: List of miner responses
"""
Returns:
list[FeedbackRequest]: List of miner responses
"""
all_responses = []
batch_size = 10

for i in range(0, len(axons), batch_size):
batch_axons = axons[i : i + batch_size]
batch_axons = axons[i: i + batch_size]
tasks = []

for axon in batch_axons:
Expand All @@ -247,7 +250,7 @@ async def _send_shuffled_requests(
axons=[axon],
synapse=synapse,
deserialize=False,
timeout=12,
timeout=60,
)
)

Expand Down

0 comments on commit 956f5cf

Please sign in to comment.