Skip to content

Commit

Permalink
refactor: rename method, explicitly set expire_at
Browse files Browse the repository at this point in the history
perf: remove completion responses from miner data

chore: reduce num miners to query, shorten task deadline

fix: isnan args

fix: empty completion for miners

fix: consistency between torch and numpy
  • Loading branch information
jarvis8x7b committed Oct 16, 2024
1 parent 8d8cdf8 commit 195f81d
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 36 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ By creating an open platform for gathering human-generated datasets, Tensorplex

- 4 cores
- 16 GB RAM
- 256 SSD
- 2TB SSD

## Miner

Expand All @@ -120,7 +120,7 @@ By creating an open platform for gathering human-generated datasets, Tensorplex

- 2 cores
- 8 GB RAM
- 32GB SSD
- 32GB SSD or 1TB SSD if decentralised

# Getting Started

Expand Down
48 changes: 25 additions & 23 deletions commons/orm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import json
from datetime import datetime, timedelta, timezone
from typing import AsyncGenerator, List

import torch
from bittensor.btlogging import logging as logger
from dotenv import find_dotenv, load_dotenv

from commons.exceptions import (
InvalidCompletion,
Expand All @@ -15,7 +13,7 @@
UnexpiredTasksAlreadyProcessed,
)
from commons.utils import datetime_as_utc
from database.client import connect_db, disconnect_db, transaction
from database.client import transaction
from database.mappers import (
map_child_feedback_request_to_model,
map_completion_response_to_model,
Expand All @@ -39,6 +37,8 @@
)
from dojo import TASK_DEADLINE
from dojo.protocol import (
CodeAnswer,
CompletionResponses,
DendriteQueryResponse,
FeedbackRequest,
)
Expand Down Expand Up @@ -88,12 +88,15 @@ async def get_last_expire_at_cutoff(
raise ValueError("Unable to determine expire at cutoff")

@staticmethod
async def get_unexpired_tasks(
async def get_expired_tasks(
validator_hotkeys: list[str],
batch_size: int = 10,
expire_at: datetime | None = None,
) -> AsyncGenerator[tuple[List[DendriteQueryResponse], bool], None]:
"""Returns a batch of Feedback_Request_Model and a boolean indicating if there are more batches
"""Returns a batch of Feedback_Request_Model and a boolean indicating if there are more batches.
Depending on the `expire_at` provided, it will return different results.
YOUR LOGIC ON WHETHER TASKS ARE EXPIRED OR NON-EXPIRED SHOULD BE HANDLED BY SETTING EXPIRE_AT YOURSELF.
Args:
validator_hotkeys (list[str]): List of validator hotkeys.
Expand Down Expand Up @@ -427,8 +430,16 @@ async def save_task(

# Create related completions for miner responses
for completion in miner_response.completion_responses:
# remove the completion field, since the miner receives an obfuscated completion_response anyways
# therefore it is useless for training
try:
completion_copy = completion.model_dump()
completion_copy["completion"] = CodeAnswer(files=[])
except KeyError:
pass
completion_input = map_completion_response_to_model(
completion, created_miner_model.id
CompletionResponses.model_validate(completion_copy),
created_miner_model.id,
)
await tx.completion_response_model.create(
data=completion_input
Expand Down Expand Up @@ -470,6 +481,14 @@ async def save_task(
await tx.ground_truth_model.create(
data=Ground_Truth_ModelCreateInput(**gt_create_input)
)
for vali_completion in validator_request.completion_responses:
vali_completion_input = map_completion_response_to_model(
vali_completion,
feedback_request_model.id,
)
await tx.completion_response_model.create(
data=vali_completion_input
)

feedback_request_model.child_requests = created_miner_models
return feedback_request_model
Expand Down Expand Up @@ -503,20 +522,3 @@ async def get_validator_score() -> torch.Tensor | None:
return None

return torch.tensor(json.loads(score_record.score))


async def _test_get_unexpired_tasks():
load_dotenv(find_dotenv(".env.validator"))
await connect_db()
batch_id = 0
async for task_batch, has_more_batches in ORM.get_unexpired_tasks(
validator_hotkeys=["5Hdf4hSQoLGj4JyJuabTnp85ZYKezLE366SXqkWYjcUw5PfJ"]
):
for task in task_batch:
print(f"Task expire_at: {task.request.expire_at}")
batch_id += 1
await disconnect_db()


if __name__ == "__main__":
asyncio.run(_test_get_unexpired_tasks())
4 changes: 1 addition & 3 deletions commons/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ def consensus_score(
# create df with the original number of completions
df = pd.DataFrame(
{
"subject": [
i for i in range(len(miner_responses[0].completion_responses))
],
"subject": [i for i in range(len(request.completion_responses))],
}
)
# prepare dataframe for calculating ICC
Expand Down
2 changes: 1 addition & 1 deletion dojo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_latest_git_tag():
# Import all submodules.

VALIDATOR_MIN_STAKE = 20000
TASK_DEADLINE = 8 * 60 * 60
TASK_DEADLINE = 6 * 60 * 60

# Define the time intervals for various tasks.
VALIDATOR_RUN = 300
Expand Down
2 changes: 1 addition & 1 deletion dojo/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def add_args(parser):
"--neuron.sample_size",
type=int,
help="The number of miners to query per dendrite call.",
default=10,
default=8,
)

parser.add_argument(
Expand Down
21 changes: 15 additions & 6 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from traceback import print_exception
from typing import List

Expand All @@ -25,6 +26,7 @@
from commons.orm import ORM
from commons.scoring import Scoring
from commons.utils import (
datetime_as_utc,
get_epoch_time,
get_new_uuid,
init_wandb,
Expand Down Expand Up @@ -61,7 +63,9 @@ def __init__(self):
self.dendrite = bt.dendrite(wallet=self.wallet)
logger.info(f"Dendrite: {self.dendrite}")
# Set up initial scoring weights for validation
self.scores = torch.zeros(self.metagraph.n.item(), dtype=torch.float32)
self.scores: torch.Tensor = torch.zeros(
self.metagraph.n.item(), dtype=torch.float32
)
self.load_state()

# manually always register and always sync metagraph when application starts
Expand Down Expand Up @@ -103,12 +107,12 @@ async def update_score_and_send_feedback(self):
batch_size = 10
processed_request_ids = []

# TODO @dev not sure if this is necessary
# figure out an expire_at cutoff time to determine those requests ready for scoring
expire_at = await ORM.get_last_expire_at_cutoff(validator_hotkeys)
async for (
task_batch,
has_more_batches,
) in ORM.get_unexpired_tasks(
) in ORM.get_expired_tasks(
validator_hotkeys, batch_size=batch_size, expire_at=expire_at
):
if not has_more_batches:
Expand Down Expand Up @@ -539,6 +543,7 @@ async def run(self):

self.step += 1
except Exception as e:
traceback.print_exc()
logger.error(f"Error during validator run: {e}")
pass
await asyncio.sleep(dojo.VALIDATOR_RUN)
Expand All @@ -561,6 +566,7 @@ def set_weights(self):
"""

# Check if self.scores contains any NaN values and log a warning if it does.
# TODO @torch fix inconsistency between numpy and torch
if torch.isnan(self.scores).any():
logger.warning(
"Scores contain NaN values. This may be due to a lack of responses from miners, or a bug in your reward functions."
Expand Down Expand Up @@ -644,7 +650,7 @@ def resync_metagraph(self):
# If so, we need to add new hotkeys and moving averages.
if len(previous_metagraph.hotkeys) < len(self.metagraph.hotkeys):
# Update the size of the moving average scores.
new_moving_average = np.zeros(self.metagraph.n)
new_moving_average = torch.zeros(self.metagraph.n)
min_len = min(len(previous_metagraph.hotkeys), len(self.scores))
new_moving_average[:min_len] = self.scores[:min_len]
self.scores = new_moving_average
Expand Down Expand Up @@ -686,7 +692,7 @@ def update_scores(self, hotkey_to_scores: dict[str, float]):
# Update scores with rewards produced by this step.
# shape: [ metagraph.n ]
alpha: float = self.config.neuron.moving_average_alpha
self.scores: torch.Tensor = alpha * rewards + (1 - alpha) * self.scores
self.scores = alpha * rewards + (1 - alpha) * self.scores
logger.debug(f"Updated scores: {self.scores}")

async def _save_state(
Expand Down Expand Up @@ -805,9 +811,12 @@ async def monitor_task_completions(self):

batch_id = 0
batch_size = 10
async for task_batch, has_more_batches in ORM.get_unexpired_tasks(
# use current time as cutoff so we get only unexpired tasks
now = datetime_as_utc(datetime.now(timezone.utc))
async for task_batch, has_more_batches in ORM.get_expired_tasks(
validator_hotkeys=validator_hotkeys,
batch_size=batch_size,
expire_at=now,
):
if not has_more_batches:
logger.success(
Expand Down

0 comments on commit 195f81d

Please sign in to comment.