From 30a5f6e69993e71020a53a56717bc17fa549000d Mon Sep 17 00:00:00 2001 From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com> Date: Thu, 30 May 2024 19:07:23 +0800 Subject: [PATCH] feat: add ICC metric for consensus scoring --- commons/human_feedback/dojo.py | 104 ++++-------- commons/reward_model/models.py | 2 +- commons/scoring.py | 302 +++++++++++++++++++++++++++++---- commons/utils.py | 65 ++----- main_validator.py | 15 +- neurons/validator.py | 57 +++---- template/base/miner.py | 8 +- 7 files changed, 352 insertions(+), 201 deletions(-) diff --git a/commons/human_feedback/dojo.py b/commons/human_feedback/dojo.py index 9bb51c1b..b83b1daf 100644 --- a/commons/human_feedback/dojo.py +++ b/commons/human_feedback/dojo.py @@ -1,10 +1,9 @@ -import asyncio import datetime import json -import os -from typing import Dict, List +from typing import Dict, List, Optional from requests_toolbelt import MultipartEncoder import httpx +from commons import utils from template.protocol import ( FeedbackRequest, @@ -17,51 +16,15 @@ load_dotenv() -DOJO_API_BASE_URL = os.getenv("DOJO_API_BASE_URL") -if not DOJO_API_BASE_URL: - raise ValueError("DOJO_API_BASE_URL is not set") - - -def get_dojo_api_key(): - key = os.getenv("DOJO_API_KEY") - if key is None: - raise ValueError("DOJO_API_KEY is not set") - return key - - -def _extract_ranking_result_data(result_data: List[Dict]) -> Dict[str, str]: - # sample data: [{'type': 'ranking', 'value': {'1': 'Code 2', '2': 'Code 1'}] - ranking_results = list( - filter(lambda x: x["type"] == RankingCriteria.type, result_data) - ) - if len(ranking_results) == 1: - return ranking_results[0].get("value", {}) - return {} - - -def parse_task_results(response_json: Dict) -> List[Dict[str, str]]: - task_results = response_json.get("body", {}).get("taskResults", []) - parsed_results = [] - for t in task_results: - res = _extract_ranking_result_data(t.get("result_data", [])) - if not res: - continue - parsed_results.append(res) - return parsed_results - - -def check_task_completion_status(response_json: Dict): - status = response_json.get("body", {}).get("status") - return status and status.lower() == "completed" +DOJO_API_BASE_URL = "***REMOVED***" +DOJO_API_KEY = utils.loaddotenv("DOJO_API_KEY") class DojoAPI: - # _api_key = get_dojo_api_key() - # _http_client = httpx.AsyncClient(headers={"Authorization": f"Bearer {_api_key}"}) _http_client = httpx.AsyncClient() @classmethod - async def get_task_by_id(cls, task_id: str): + async def _get_task_by_id(cls, task_id: str): """Gets task by task id and checks completion status""" url = f"{DOJO_API_BASE_URL}/api/v1/tasks/{task_id}" async with cls._http_client as client: @@ -70,8 +33,8 @@ async def get_task_by_id(cls, task_id: str): return response.json() @classmethod - async def get_task_results_by_task_id(cls, task_id: str): - """Gets ranking task results from task id""" + async def _get_task_results_by_task_id(cls, task_id: str): + """Gets task results from task id""" url = f"{DOJO_API_BASE_URL}/api/v1/tasks/get-results/{task_id}" async with cls._http_client as client: response = await client.get(url) @@ -79,20 +42,27 @@ async def get_task_results_by_task_id(cls, task_id: str): return response.json() @classmethod - async def get_task_and_results(cls, task_id: str): - """Returns optional [{'1': 'model hash 1', '2': 'model hash 2'}], - where '1' and '2' are the explicit rank integers""" - completion_status = check_task_completion_status( - await cls.get_task_by_id(task_id) - ) - if not completion_status: - return None - ranking_results = parse_task_results( - await cls.get_task_results_by_task_id(task_id) - ) - if not ranking_results: - return None - return ranking_results + async def get_task_results_by_task_id(cls, task_id: str) -> Optional[List[Dict]]: + """Gets task results from task id to prepare for scoring later on""" + task_response = await cls._get_task_by_id(task_id) + task_status = task_response.get("body", {}).get("status", None) + is_completed = task_status and task_status.lower() == "completed" + if is_completed is None: + logger.error(f"Failed to read status field for task_id: {task_id}") + return + + if is_completed is False: + return + task_results_response = await cls._get_task_results_by_task_id(task_id) + task_results = task_results_response.get("body", {}).get("taskResults") + if task_results is None: + logger.error(f"Failed to read task results for task_id: {task_id}") + return + + if not task_results: + return + + return task_results @classmethod async def create_task( @@ -116,6 +86,7 @@ async def create_task( { **criteria_type.dict(), "options": [ + # TODO remove model from name f"Model {option}" for option in criteria_type.dict().get("options", []) ], @@ -127,6 +98,7 @@ async def create_task( { **criteria_type.dict(), "options": [ + # TODO remove model from name f"Model {option}" for option in criteria_type.dict().get("options", []) ], @@ -146,10 +118,6 @@ async def create_task( "maxResults": "10", } - DOJO_API_KEY = os.getenv("DOJO_API_KEY") - if not DOJO_API_KEY: - logger.error("DOJO_API_KEY is not set") - mp = MultipartEncoder(fields=body) response = await client.post( path, @@ -169,15 +137,3 @@ async def create_task( ) response.raise_for_status() return task_ids - - -if __name__ == "__main__": - - async def main(): - print( - await DojoAPI.get_task_results_by_task_id( - "bdb56d72-dd98-40c0-a42f-312d018a0a1e" - ) - ) - - asyncio.run(main()) diff --git a/commons/reward_model/models.py b/commons/reward_model/models.py index 773910fa..91243ced 100644 --- a/commons/reward_model/models.py +++ b/commons/reward_model/models.py @@ -20,9 +20,9 @@ from commons.llm.prompts import PromptBuilder, ScoreRange from commons.utils import PydanticUtils from template.protocol import ( - Response, ModelConfig, PreferenceResponse, + Response, ScoresResponse, ) diff --git a/commons/scoring.py b/commons/scoring.py index 3d3e3f58..0f3a78f1 100644 --- a/commons/scoring.py +++ b/commons/scoring.py @@ -1,23 +1,35 @@ import json +import sys from collections import defaultdict from typing import Dict, List, Optional import bittensor as bt import numpy as np -from pydantic import Field, BaseModel -import scipy -from attr import define, field +import pandas as pd +import pingouin as pg import torch -from torch.nn import functional as F +from attr import define, field +from loguru import logger + +# Configure Loguru logger +logger.remove() # Remove the default logger +logger.add(sys.stderr, level="DEBUG") # Add a new logger with the desired level +from pydantic import BaseModel, Field +from scipy.stats import spearmanr from sklearn.metrics import cohen_kappa_score -from commons.dataset.leaderboard import get_gt_ranks, get_leaderboard_scores +from torch.nn import functional as F +from commons.dataset.leaderboard import get_gt_ranks, get_leaderboard_scores from template.protocol import ( + CodeAnswer, CriteriaType, FeedbackRequest, + MultiScoreCriteria, RankingCriteria, - ScoringResult, + Response, ScoringMethod, + ScoringResult, + TaskType, ) @@ -98,7 +110,7 @@ def _spearman_scoring(responses: List[FeedbackRequest]): # compute spearman correlation and handle nan values spearman_corr = [ - np.nan_to_num(scipy.stats.spearmanr(miner_score, averages)[0]) + np.nan_to_num(spearmanr(miner_score, averages)[0]) for miner_score in miner_scores ] @@ -156,36 +168,143 @@ def consensus_score(criteria: CriteriaType, responses: List[FeedbackRequest]): """Given a list of responses, will only return a dict of hotkey to their normalized scores. e.g. if a miner failed to respond, its hotkey won't be a key in the dict. """ + + # depending on the criteria, this may be average ranks or average scores if not len(responses): raise ValueError("Responses cannot be empty") + # shape (num completions) + avg = None + # shape (num miners, num completions) + miner_outputs = None + icc_arr = [] + + logger.info("Average scores: ", avg) + logger.info("Miner outptus", miner_outputs) if isinstance(criteria, RankingCriteria): - avg_ranks, all_miner_ranks = Scoring._process_for_ranking(responses) - spearman_corr = [ - scipy.stats.spearmanr(miner_ranks, avg_ranks).statistic - for miner_ranks in all_miner_ranks - ] - cohen_kappa = [ - cohen_kappa_score(miner_ranks, avg_ranks) - for miner_ranks in all_miner_ranks - ] - dist_penalty = -1 * torch.sum( - torch.square(torch.tensor(avg_ranks - all_miner_ranks)), axis=1 - ).to(dtype=torch.float64) - snorm = F.normalize(torch.tensor(spearman_corr), dim=0, p=2) - cknorm = F.normalize(torch.tensor(cohen_kappa), dim=0, p=2) - dnorm = F.normalize(dist_penalty, dim=0, p=2) - bt.logging.trace(snorm) - bt.logging.trace(cknorm) - bt.logging.trace(dnorm) - combined_sm = F.softmax(snorm + cknorm + 1.5 * dnorm, dim=0) - bt.logging.trace(f"{combined_sm}") - return ConsensusScore( - weighted_score=combined_sm, - spearman_by_miner=snorm, - cohen_kappa_by_miner=cknorm, - dist_penalty_by_miner=dnorm, + logger.debug("ranking criteria") + avg, miner_outputs = Scoring._process_for_ranking(responses) + elif isinstance(criteria, MultiScoreCriteria): + logger.debug("got multi score criteria") + # calculate average score per model + model_id_to_scores = defaultdict(list) + for response in responses: + for completion in response.responses: + model_id_to_scores[completion.model].append(completion.score) + # for each model calculate the average score + # USE DICT BECAUSE WE NEED TO ENSURE CORRECT ORDERING + model_id_to_avg_score = { + model: sum(scores) / len(scores) + for model, scores in model_id_to_scores.items() + } + + # shape (num miners, num completions) + # collect all scores from each miner based on ordering in model_id_avg_score + miner_outputs = np.array( + [ + [ + completion.score + for completion in sorted( + response.responses, + key=lambda x: model_id_to_avg_score[x.model], + ) + ] + for response in responses + ] + ) + + avg: np.ndarray = np.array([v for k, v in model_id_to_avg_score.items()]) + logger.info(f"Average scores: {avg}") + logger.info(f"Miner outptus {miner_outputs}") + logger.info(f"Model id to avg {model_id_to_avg_score}") + + df = pd.DataFrame( + { + "subject": [i for i in range(len(responses[0].responses))], + } ) + # prepare dataframe for calculating ICC + for response in responses: + rater_id = response.axon.hotkey + ordered_scores = [ + x.score + for x in sorted( + response.responses, key=lambda x: model_id_to_avg_score[x.model] + ) + ] + # order scores based on order in model_id_to_avg_score + df[rater_id] = ordered_scores + rater_ids = list(df.columns) + rater_ids.remove("subject") + df["avg"] = df[rater_ids].mean(axis=1) + + # this works because we are calculating ICC for each rater VS the avg + for rater_id in rater_ids: + data_by_rater = df[["subject", rater_id, "avg"]] + # only use the columns for the current rater and avg + data_by_rater = data_by_rater.melt( + id_vars=["subject"], var_name=rater_id, value_name="score" + ) + icc = pg.intraclass_corr( + data=data_by_rater, + targets="subject", + raters=rater_id, + ratings="score", + ) + logger.debug("ICC raw") + logger.debug(icc) + + # take ICC(2,1) + icc2_value = icc[icc["Type"] == "ICC2"]["ICC"].iloc[0] + icc_arr.append(icc2_value) + icc_arr: np.ndarray = np.array(icc_arr) + logger.info(f"ICC: {icc_arr}") + + else: + raise NotImplementedError( + f"Consensus score for type {criteria} not implemented yet" + ) + + if avg is None or miner_outputs is None: + raise ValueError("avg and miner_outputs cannot be None") + + # spearman_corr = [ + # scipy.stats.spearmanr(miner_output, avg).statistic + # for miner_output in miner_outputs + # ] + + spearman = spearmanr( + np.expand_dims(avg, axis=0) if len(avg.shape) == 1 else avg, + miner_outputs, + axis=1, + ) + # spearman == 2D matrix, remove diagonal, grab first row for correlation between avg and each miner + spearman_corr = spearman.statistic[0, 1:] + logger.info(f"{spearman_corr=}") + + # TODO fix this for continuous data + # this doesn't work for continuous data, use ICC instead + cohen_kappa = [ + cohen_kappa_score(miner_output, avg) for miner_output in miner_outputs + ] + + dist_penalty = -1 * torch.sum( + torch.square(torch.tensor(avg - miner_outputs)), axis=1 + ).to(dtype=torch.float64) + snorm = F.normalize(torch.tensor(spearman_corr), dim=0, p=2) + cknorm = F.normalize(torch.tensor(cohen_kappa), dim=0, p=2) + dnorm = F.normalize(dist_penalty, dim=0, p=2) + bt.logging.debug(snorm) + bt.logging.debug(cknorm) + bt.logging.trace(dnorm) + combined_sm = F.softmax(snorm + cknorm + 1.5 * dnorm, dim=0) + bt.logging.trace(f"{combined_sm}") + return ConsensusScore( + weighted_score=combined_sm, + spearman_by_miner=snorm, + cohen_kappa_by_miner=cknorm, + dist_penalty_by_miner=dnorm, + ) @staticmethod def cmp_ground_truth( @@ -238,7 +357,6 @@ def calculate_score( ) -> Dict[CriteriaType, Score]: """Combines both consensus score and difference with ground truths scoring to output a final score per miner""" criteria_to_miner_scores = defaultdict(Score) - # TODO @dev support different criteria in the future for criteria in criteria_types: gt_score = Scoring.cmp_ground_truth(criteria, request, responses) consensus_score = Scoring.consensus_score(criteria, responses) @@ -265,3 +383,121 @@ def _calculate_average_rank_by_model( sorted(model_id_to_average_rank.items(), key=lambda item: item[1]) ) return sorted_dict + + +def prepare_responses(): + miner_a = FeedbackRequest( + axon=bt.TerminalInfo(hotkey="hotkeyA"), + prompt="Write a hello world program in python", + task_type=TaskType.CODE_GENERATION, + criteria_types=[ + MultiScoreCriteria(type="multi-score", options=[], min=0.0, max=100.0) + ], + responses=[ + Response( + model="anthropic/claude-3-haiku-20240307", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=75, + ), + Response( + model="anthropic/claude-3-opus-20240229", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=100, + ), + Response( + model="anthropic/claude-3-sonnet-20240229", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=50, + ), + Response( + model="meta-llama/llama-3-8b-instruct", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=69, + ), + ], + ) + + miner_b = FeedbackRequest( + axon=bt.TerminalInfo(hotkey="hotkeyB"), + prompt="Write a hello world program in python", + task_type=TaskType.CODE_GENERATION, + criteria_types=[ + MultiScoreCriteria(type="multi-score", options=[], min=0.0, max=100.0) + ], + responses=[ + Response( + model="anthropic/claude-3-haiku-20240307", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=51, + ), + Response( + model="anthropic/claude-3-opus-20240229", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=49, + ), + Response( + model="anthropic/claude-3-sonnet-20240229", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=52, + ), + Response( + model="meta-llama/llama-3-8b-instruct", + completion=CodeAnswer( + code="print('hello, world!')", + language="python", + files=[], + additional_notes=None, + installation_commands="", + ), + score=53, + ), + ], + ) + return [miner_a, miner_b] + + +if __name__ == "__main__": + responses = prepare_responses() + Scoring.consensus_score(responses[0].criteria_types[0], responses) diff --git a/commons/utils.py b/commons/utils.py index 87d2b0af..6fdfb008 100644 --- a/commons/utils.py +++ b/commons/utils.py @@ -4,6 +4,9 @@ from collections import OrderedDict from collections.abc import Mapping from typing import Tuple, Type, get_origin +from functools import lru_cache, update_wrapper +from math import floor +from typing import Any, Callable import bittensor as bt import jsonref @@ -14,6 +17,8 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_exponential_jitter import wandb +import os +from dotenv import load_dotenv def get_new_uuid(): @@ -24,6 +29,14 @@ def get_epoch_time(): return time.time() +def loaddotenv(varname: str): + """Wrapper to get env variables for sanity checking""" + value = os.getenv(varname) + if not value: + raise SystemExit(f"{varname} is not set") + return value + + def keccak256_hash(data): k = keccak.new(digest_bits=256) k.update(data.encode("utf-8")) @@ -124,16 +137,6 @@ def check_registered(subtensor, wallet, config): exit() -def should_resync_metagraph(subtensor, metagraph, wallet, config): - """ - Check if enough blocks have elapsed since the last checkpoint to sync. - """ - uid = metagraph.hotkeys.index(wallet.hotkey.ss58_address) - return ( - ttl_get_block(subtensor) - metagraph.last_update[uid] - ) > config.neuron.epoch_length - - def get_external_ip() -> str: response = requests.get("https://ifconfig.me/ip") response.raise_for_status() @@ -229,38 +232,6 @@ def build_minimal_json(cls, model: Type[BaseModel]): result[field_name] = field.field_info.description return result - # @classmethod - # def build_minimal_json(cls, model: Type[BaseModel]): - # result = { - # "files": "filename(Name of the file), content(Content of the file), language(Programming language of the file) as separate keys, type: List[Dict['filename', 'content', 'language']]", - # "installation_commands": "Terminal commands for the code to be able to run to install any third-party packages for the code to be able to run", - # "additional_notes": "Any additional notes or comments about the code solution", - # } - # return result - - -# The MIT License (MIT) -# Copyright © 2023 Yuma Rao -# Copyright © 2023 Opentensor Foundation - -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of -# the Software. - -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. - -from functools import lru_cache, update_wrapper -from math import floor -from typing import Any, Callable - # LRU Cache with TTL def ttl_cache(maxsize: int = 128, typed: bool = False, ttl: int = -1): @@ -348,13 +319,3 @@ def ttl_get_block(subtensor) -> int: Note: self here is the miner or validator instance """ return subtensor.get_current_block() - - -def main(): - pydantic_utils = PydanticUtils() - minimal_json = pydantic_utils.build_minimal_json() - print(minimal_json) - - -if __name__ == "__main__": - main() diff --git a/main_validator.py b/main_validator.py index 15cd67c9..d17f9f4a 100644 --- a/main_validator.py +++ b/main_validator.py @@ -3,14 +3,13 @@ import bittensor as bt import uvicorn -import wandb from dotenv import load_dotenv from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +import wandb from commons.api.middleware import LimitContentLengthMiddleware from commons.api.reward_route import reward_router -from commons.dataset.synthetic import SyntheticAPI from commons.objects import ObjectManager from neurons.validator import DojoTaskTracker @@ -22,10 +21,8 @@ @asynccontextmanager async def lifespan(app: FastAPI): - # BEFORE YIELD == ON STARTUP bt.logging.info("Performing startup tasks...") yield - # AFTER YIELD == ON SHUTDOWN bt.logging.info("Performing shutdown tasks...") validator._should_exit = True DojoTaskTracker()._should_exit = True @@ -55,9 +52,11 @@ async def main(): ) server = uvicorn.Server(config) running_tasks = [ - asyncio.create_task(validator.log_validator_status()), + # TODO re-enable after working on scoring + # asyncio.create_task(validator.log_validator_status()), asyncio.create_task(validator.run()), - asyncio.create_task(validator.update_score_and_send_feedback()), + # asyncio.create_task(validator.update_score_and_send_feedback()), + asyncio.create_task(DojoTaskTracker.monitor_task_completions()), ] await server.serve() @@ -68,6 +67,10 @@ async def main(): await task except asyncio.CancelledError: bt.logging.info(f"Cancelled task {task.get_name()}") + except Exception as e: + bt.logging.error(f"Task {task.get_name()} raised an exception: {e}") + pass + bt.logging.info("Exiting main function.") diff --git a/neurons/validator.py b/neurons/validator.py index cfa240c1..9995ac70 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -1,6 +1,5 @@ import asyncio import copy -from random import random import threading import time from collections import defaultdict @@ -11,26 +10,26 @@ import numpy as np import torch from fastapi.encoders import jsonable_encoder +from loguru import logger from torch.nn import functional as F -import wandb -from commons.data_manager import DataManager +import wandb +from commons.data_manager import DataManager, ValidatorStateKeys from commons.dataset.synthetic import SyntheticAPI from commons.human_feedback.aws_mturk import MTurkUtils, parse_assignment from commons.human_feedback.dojo import DojoAPI from commons.scoring import Scoring -from commons.utils import get_epoch_time, init_wandb +from commons.utils import get_epoch_time from template.base.neuron import BaseNeuron from template.protocol import ( - SCORING_METHOD_PRIORITY, AWSCredentials, DendriteQueryResponse, - MTurkResponse, FeedbackRequest, + MTurkResponse, MultiScoreCriteria, RankingCriteria, - ScoringResult, ScoringMethod, + ScoringResult, SyntheticQA, TaskType, ) @@ -40,21 +39,14 @@ extract_miner_uids, is_miner, ) -from loguru import logger - - -def _filter_valid_responses(responses: List[FeedbackRequest]) -> List[FeedbackRequest]: - return [response for response in responses if len(response.ranks) > 0] class DojoTaskTracker: _instance = None - # request id to miner hotkey to task id _rid_to_mhotkey_to_task_id: Dict[str, Dict[str, str]] = defaultdict( lambda: defaultdict(str) ) _lock = asyncio.Lock() - _background_tasks = set() _should_exit: bool = False def __new__(cls, *args, **kwargs): @@ -90,8 +82,7 @@ def _parse_dojo_task_results(results: List[Dict]): } @classmethod - async def update_task_map(cls, responses: List[FeedbackRequest]): - dojo_responses = DojoTaskTracker.filter_dojo_responses(responses) + async def update_task_map(cls, dojo_responses: List[FeedbackRequest]): if not dojo_responses: bt.logging.warning("No Dojo responses found") return @@ -104,10 +95,13 @@ async def update_task_map(cls, responses: List[FeedbackRequest]): ) ) bt.logging.info( - f"Validated N={len(dojo_responses)} responses into {len(valid_responses)} valid responses for request id: {responses[0].request_id}" + f"Got {len(valid_responses)} valid Dojo responses to update task tracker" ) for r in valid_responses: + if r.request_id not in cls._rid_to_mhotkey_to_task_id: + cls._rid_to_mhotkey_to_task_id[r.request_id] = {} + cls._rid_to_mhotkey_to_task_id[r.request_id][r.axon.hotkey] = ( r.dojo_task_id ) @@ -115,7 +109,7 @@ async def update_task_map(cls, responses: List[FeedbackRequest]): @classmethod async def monitor_task_completions(cls): - SLEEP_SECONDS = 10 + SLEEP_SECONDS = 30 while not cls._should_exit: bt.logging.info(f"Monitoring Dojo Task completions... {get_epoch_time()}") async with cls._lock: @@ -135,7 +129,9 @@ async def monitor_task_completions(cls): ) continue - task_results = await DojoAPI.get_task_and_results(task_id) + task_results = await DojoAPI.get_task_results_by_task_id( + task_id + ) if not task_results: bt.logging.warning( f"Task ID: {task_id} by miner: {miner_hotkey} has not been completed yet or no task results." @@ -160,6 +156,9 @@ async def monitor_task_completions(cls): request_id=request_id, responses=data.request.responses, ) + logger.info( + f"Appending Dojo task results for request id: {request_id}" + ) await DataManager.append_responses(request_id, [parsed_request]) await asyncio.sleep(SLEEP_SECONDS) @@ -182,9 +181,10 @@ def __init__(self): self.load_state() bt.logging.debug(f"Scores state: {self.scores}") - # Init sync with the network. Updates the metagraph. - self.sync() - init_wandb(config=self.config, my_uid=self.uid, wallet=self.wallet) + # manually always register and always sync metagraph when application starts + self.check_registered() + self.resync_metagraph() + # init_wandb(config=self.config, my_uid=self.uid, wallet=self.wallet) async def blacklist_mturk_response( self, synapse: MTurkResponse @@ -382,10 +382,8 @@ async def update_score_and_send_feedback(self): responses=d.responses, ) - # TOOD wandb logging here - # calculate by hotkey as well, perform weighting here - GT_WEIGHT = 0.65 - CONSENSUS_WEIGHT = 0.35 + GT_WEIGHT = 0.4 + CONSENSUS_WEIGHT = 0.6 # TODO for now only single criteria # log_data is score by each miner score_data = {} @@ -541,7 +539,7 @@ async def send_request( ) dojo_responses = DojoTaskTracker.filter_dojo_responses(responses) - await DojoTaskTracker().update_task_map(dojo_responses) + await DojoTaskTracker.update_task_map(dojo_responses) non_dojo_responses = list(filter(lambda r: r not in dojo_responses, responses)) response_data = DendriteQueryResponse( @@ -555,9 +553,6 @@ async def send_request( return async def run(self): - # manually always register and always sync metagraph when application starts - self.sync() - bt.logging.info( f"Running validator {self.axon} on network: {self.config.subtensor.chain_endpoint} with netuid: {self.config.netuid}" ) @@ -699,7 +694,7 @@ def update_scores(self, hotkey_to_scores): # Compute forward pass rewards, assumes uids are mutually exclusive. # scores dimensions might have been updated after resyncing... len(uids) != len(self.scores) - rewards = torch.zeros((len(self.metagraph.axons),)) + rewards = np.zeros((len(self.metagraph.axons),)) neuron_hotkeys: List[str] = [neuron.hotkey for neuron in self.metagraph.neurons] for index, (key, value) in enumerate(hotkey_to_scores.items()): # handle nan values diff --git a/template/base/miner.py b/template/base/miner.py index 0923ffc0..020a51ff 100644 --- a/template/base/miner.py +++ b/template/base/miner.py @@ -15,13 +15,13 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -import time +import asyncio import threading import traceback import bittensor as bt -from commons.utils import serve_axon +from commons.utils import serve_axon from template.base.neuron import BaseNeuron @@ -33,7 +33,7 @@ class BaseMinerNeuron(BaseNeuron): def __init__(self): super(BaseMinerNeuron, self).__init__() - def run(self): + async def run(self): """ Initiates and manages the main loop for the miner on the Bittensor network. The main loop handles graceful shutdown on keyboard interrupts and logs unforeseen errors. @@ -87,7 +87,7 @@ def run(self): # Sync metagraph and potentially set weights. self.sync() self.step += 1 - time.sleep(12) + await asyncio.sleep(12) # If someone intentionally stops the miner, it'll safely terminate operations. except KeyboardInterrupt: