diff --git a/README.md b/README.md index abc9de59..04ad5820 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ By creating an open platform for gathering human-generated datasets, Tensorplex - 4 cores - 16 GB RAM -- 256 SSD +- 2TB SSD ## Miner @@ -120,7 +120,7 @@ By creating an open platform for gathering human-generated datasets, Tensorplex - 2 cores - 8 GB RAM -- 32GB SSD +- 32GB SSD or 1TB SSD if decentralised # Getting Started diff --git a/commons/orm.py b/commons/orm.py index 2f82f545..839b3bf6 100644 --- a/commons/orm.py +++ b/commons/orm.py @@ -1,11 +1,9 @@ -import asyncio import json from datetime import datetime, timedelta, timezone from typing import AsyncGenerator, List import torch from bittensor.btlogging import logging as logger -from dotenv import find_dotenv, load_dotenv from commons.exceptions import ( InvalidCompletion, @@ -15,7 +13,7 @@ UnexpiredTasksAlreadyProcessed, ) from commons.utils import datetime_as_utc -from database.client import connect_db, disconnect_db, transaction +from database.client import transaction from database.mappers import ( map_child_feedback_request_to_model, map_completion_response_to_model, @@ -39,6 +37,8 @@ ) from dojo import TASK_DEADLINE from dojo.protocol import ( + CodeAnswer, + CompletionResponses, DendriteQueryResponse, FeedbackRequest, ) @@ -88,12 +88,15 @@ async def get_last_expire_at_cutoff( raise ValueError("Unable to determine expire at cutoff") @staticmethod - async def get_unexpired_tasks( + async def get_expired_tasks( validator_hotkeys: list[str], batch_size: int = 10, expire_at: datetime | None = None, ) -> AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]: - """Returns a batch of Feedback_Request_Model and a boolean indicating if there are more batches + """Returns a batch of Feedback_Request_Model and a boolean indicating if there are more batches. + Depending on the `expire_at` provided, it will return different results. + + YOUR LOGIC ON WHETHER TASKS ARE EXPIRED OR NON-EXPIRED SHOULD BE HANDLED BY SETTING EXPIRE_AT YOURSELF. Args: validator_hotkeys (list[str]): List of validator hotkeys. @@ -427,8 +430,16 @@ async def save_task( # Create related completions for miner responses for completion in miner_response.completion_responses: + # remove the completion field, since the miner receives an obfuscated completion_response anyways + # therefore it is useless for training + try: + completion_copy = completion.model_dump() + completion_copy["completion"] = CodeAnswer(files=[]) + except KeyError: + pass completion_input = map_completion_response_to_model( - completion, created_miner_model.id + CompletionResponses.model_validate(completion_copy), + created_miner_model.id, ) await tx.completion_response_model.create( data=completion_input @@ -470,6 +481,14 @@ async def save_task( await tx.ground_truth_model.create( data=Ground_Truth_ModelCreateInput(**gt_create_input) ) + for vali_completion in validator_request.completion_responses: + vali_completion_input = map_completion_response_to_model( + vali_completion, + feedback_request_model.id, + ) + await tx.completion_response_model.create( + data=vali_completion_input + ) feedback_request_model.child_requests = created_miner_models return feedback_request_model @@ -503,20 +522,3 @@ async def get_validator_score() -> torch.Tensor | None: return None return torch.tensor(json.loads(score_record.score)) - - -async def _test_get_unexpired_tasks(): - load_dotenv(find_dotenv(".env.validator")) - await connect_db() - batch_id = 0 - async for task_batch, has_more_batches in ORM.get_unexpired_tasks( - validator_hotkeys=["5Hdf4hSQoLGj4JyJuabTnp85ZYKezLE366SXqkWYjcUw5PfJ"] - ): - for task in task_batch: - print(f"Task expire_at: {task.request.expire_at}") - batch_id += 1 - await disconnect_db() - - -if __name__ == "__main__": - asyncio.run(_test_get_unexpired_tasks()) diff --git a/commons/scoring.py b/commons/scoring.py index 8acd5660..abec67b9 100644 --- a/commons/scoring.py +++ b/commons/scoring.py @@ -174,9 +174,7 @@ def consensus_score( # create df with the original number of completions df = pd.DataFrame( { - "subject": [ - i for i in range(len(miner_responses[0].completion_responses)) - ], + "subject": [i for i in range(len(request.completion_responses))], } ) # prepare dataframe for calculating ICC diff --git a/dojo/__init__.py b/dojo/__init__.py index f47fb30b..8ac40f3e 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -32,7 +32,7 @@ def get_latest_git_tag(): # Import all submodules. VALIDATOR_MIN_STAKE = 20000 -TASK_DEADLINE = 8 * 60 * 60 +TASK_DEADLINE = 6 * 60 * 60 # Define the time intervals for various tasks. VALIDATOR_RUN = 300 diff --git a/dojo/utils/config.py b/dojo/utils/config.py index 1c8b012b..a278777f 100644 --- a/dojo/utils/config.py +++ b/dojo/utils/config.py @@ -175,7 +175,7 @@ def add_args(parser): "--neuron.sample_size", type=int, help="The number of miners to query per dendrite call.", - default=10, + default=8, ) parser.add_argument( diff --git a/neurons/validator.py b/neurons/validator.py index b6f39fee..2d8b6b1b 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -5,6 +5,7 @@ import time import traceback from collections import defaultdict +from datetime import datetime, timezone from traceback import print_exception from typing import List @@ -25,6 +26,7 @@ from commons.orm import ORM from commons.scoring import Scoring from commons.utils import ( + datetime_as_utc, get_epoch_time, get_new_uuid, init_wandb, @@ -61,7 +63,9 @@ def __init__(self): self.dendrite = bt.dendrite(wallet=self.wallet) logger.info(f"Dendrite: {self.dendrite}") # Set up initial scoring weights for validation - self.scores = torch.zeros(self.metagraph.n.item(), dtype=torch.float32) + self.scores: torch.Tensor = torch.zeros( + self.metagraph.n.item(), dtype=torch.float32 + ) self.load_state() # manually always register and always sync metagraph when application starts @@ -103,12 +107,12 @@ async def update_score_and_send_feedback(self): batch_size = 10 processed_request_ids = [] - # TODO @dev not sure if this is necessary + # figure out an expire_at cutoff time to determine those requests ready for scoring expire_at = await ORM.get_last_expire_at_cutoff(validator_hotkeys) async for ( task_batch, has_more_batches, - ) in ORM.get_unexpired_tasks( + ) in ORM.get_expired_tasks( validator_hotkeys, batch_size=batch_size, expire_at=expire_at ): if not has_more_batches: @@ -539,6 +543,7 @@ async def run(self): self.step += 1 except Exception as e: + traceback.print_exc() logger.error(f"Error during validator run: {e}") pass await asyncio.sleep(dojo.VALIDATOR_RUN) @@ -561,6 +566,7 @@ def set_weights(self): """ # Check if self.scores contains any NaN values and log a warning if it does. + # TODO @torch fix inconsistency between numpy and torch if torch.isnan(self.scores).any(): logger.warning( "Scores contain NaN values. This may be due to a lack of responses from miners, or a bug in your reward functions." @@ -644,7 +650,7 @@ def resync_metagraph(self): # If so, we need to add new hotkeys and moving averages. if len(previous_metagraph.hotkeys) < len(self.metagraph.hotkeys): # Update the size of the moving average scores. - new_moving_average = np.zeros(self.metagraph.n) + new_moving_average = torch.zeros(self.metagraph.n) min_len = min(len(previous_metagraph.hotkeys), len(self.scores)) new_moving_average[:min_len] = self.scores[:min_len] self.scores = new_moving_average @@ -686,7 +692,7 @@ def update_scores(self, hotkey_to_scores: dict[str, float]): # Update scores with rewards produced by this step. # shape: [ metagraph.n ] alpha: float = self.config.neuron.moving_average_alpha - self.scores: torch.Tensor = alpha * rewards + (1 - alpha) * self.scores + self.scores = alpha * rewards + (1 - alpha) * self.scores logger.debug(f"Updated scores: {self.scores}") async def _save_state( @@ -805,9 +811,12 @@ async def monitor_task_completions(self): batch_id = 0 batch_size = 10 - async for task_batch, has_more_batches in ORM.get_unexpired_tasks( + # use current time as cutoff so we get only unexpired tasks + now = datetime_as_utc(datetime.now(timezone.utc)) + async for task_batch, has_more_batches in ORM.get_expired_tasks( validator_hotkeys=validator_hotkeys, batch_size=batch_size, + expire_at=now, ): if not has_more_batches: logger.success(