Spaces:
Runtime error
Runtime error
import gradio as gr | |
import bittensor as bt | |
import typing | |
from bittensor.extrinsics.serving import get_metadata | |
from dataclasses import dataclass | |
import requests | |
import wandb | |
import math | |
import os | |
import datetime | |
import time | |
import functools | |
import multiprocessing | |
from dotenv import load_dotenv | |
from huggingface_hub import HfApi | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from tqdm import tqdm | |
load_dotenv() | |
FONT = """<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">""" | |
TITLE = """<h1 align="center" id="space-title" class="typewriter">Subnet 6 Leaderboard</h1>""" | |
IMAGE = """<a href="https://discord.gg/jqVphNsB4H" target="_blank"><img src="https://i.ibb.co/88wyVQ7/nousgirl.png" alt="nousgirl" style="margin: auto; width: 20%; border: 0;" /></a>""" | |
HEADER = """<h2 align="center" class="typewriter"><a href="https://github.com/NousResearch/finetuning-subnet" target="_blank">Subnet 6</a> is a <a href="https://bittensor.com/" target="_blank">Bittensor</a> subnet that incentivizes the creation of the best open models by evaluating submissions on a constant stream of newly generated synthetic GPT-4 data. The models with the best <a href="https://github.com/NousResearch/finetuning-subnet/blob/master/docs/validator.md" target="_blank">head-to-head loss</a> on the evaluation data receive a steady emission of TAO.</h3>""" | |
EVALUATION_DETAILS = """<b>Name</b> is the 🤗 Hugging Face model name (click to go to the model card). <b>Rewards / Day</b> are the expected rewards per day for each model. <b>Perplexity</b> is represents the loss on all of the evaluation data for the model as calculated by the validator (lower is better). <b>UID</b> is the Bittensor user id of the submitter. <b>Block</b> is the Bittensor block that the model was submitted in. More stats on <a href="https://taostats.io/subnets/netuid-6/" target="_blank">taostats</a>.""" | |
EVALUATION_HEADER = """<h3 align="center">Shows the latest internal evaluation statistics as calculated by a validator run by Nous Research</h3>""" | |
VALIDATOR_WANDB_PROJECT = os.environ["VALIDATOR_WANDB_PROJECT"] | |
H4_TOKEN = os.environ.get("H4_TOKEN", None) | |
API = HfApi(token=H4_TOKEN) | |
REPO_ID = "NousResearch/finetuning_subnet_leaderboard" | |
METAGRAPH_RETRIES = 10 | |
METAGRAPH_DELAY_SECS = 30 | |
METADATA_TTL = 10 | |
NETUID = 6 | |
SUBNET_START_BLOCK = 2225782 | |
SECONDS_PER_BLOCK = 12 | |
SUBTENSOR = os.environ.get("SUBTENSOR", "finney") | |
class Competition: | |
id: str | |
name: str | |
COMPETITIONS = [Competition(id="m1", name="mistral-7b"), Competition(id="g1", name="gemma-2b")] | |
DEFAULT_COMPETITION_ID = "m1" | |
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any: | |
"""Runs the provided function on a subprocess with 'ttl' seconds to complete. | |
Args: | |
func (functools.partial): Function to be run. | |
ttl (int): How long to try for in seconds. | |
Returns: | |
Any: The value returned by 'func' | |
""" | |
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue): | |
try: | |
result = func() | |
queue.put(result) | |
except (Exception, BaseException) as e: | |
# Catch exceptions here to add them to the queue. | |
queue.put(e) | |
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem | |
# to work on "spawn". | |
ctx = multiprocessing.get_context("fork") | |
queue = ctx.Queue() | |
process = ctx.Process(target=wrapped_func, args=[func, queue]) | |
process.start() | |
process.join(timeout=ttl) | |
if process.is_alive(): | |
process.terminate() | |
process.join() | |
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds") | |
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out. | |
result = queue.get(block=False) | |
# If we put an exception on the queue then raise instead of returning. | |
if isinstance(result, Exception): | |
raise result | |
if isinstance(result, BaseException): | |
raise Exception(f"BaseException raised in subprocess: {str(result)}") | |
return result | |
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]: | |
for i in range(0, METAGRAPH_RETRIES): | |
try: | |
print("Connecting to subtensor...") | |
subtensor: bt.subtensor = bt.subtensor(SUBTENSOR) | |
print("Pulling metagraph...") | |
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
except: | |
if i == METAGRAPH_RETRIES - 1: | |
raise | |
print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...") | |
time.sleep(METAGRAPH_DELAY_SECS) | |
raise RuntimeError() | |
class ModelData: | |
uid: int | |
hotkey: str | |
namespace: str | |
name: str | |
commit: str | |
hash: str | |
block: int | |
incentive: float | |
emission: float | |
competition: str | |
def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2] if tokens[2] != "None" else "", | |
hash=tokens[3] if tokens[3] != "None" else "", | |
competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else DEFAULT_COMPETITION_ID, | |
block=block, | |
incentive=incentive, | |
emission=emission | |
) | |
def get_tao_price() -> float: | |
for i in range(0, METAGRAPH_RETRIES): | |
try: | |
return float(requests.get("https://api.kucoin.com/api/v1/market/stats?symbol=TAO-USDT").json()["data"]["last"]) | |
except: | |
if i == METAGRAPH_RETRIES - 1: | |
raise | |
time.sleep(METAGRAPH_DELAY_SECS) | |
raise RuntimeError() | |
def get_validator_weights(metagraph: bt.metagraph) -> typing.Dict[int, typing.Tuple[float, int, typing.Dict[int, float]]]: | |
ret = {} | |
for uid in metagraph.uids.tolist(): | |
vtrust = metagraph.validator_trust[uid].item() | |
if vtrust > 0: | |
ret[uid] = (vtrust, metagraph.S[uid].item(), {}) | |
for ouid in metagraph.uids.tolist(): | |
if ouid == uid: | |
continue | |
weight = round(metagraph.weights[uid][ouid].item(), 4) | |
if weight > 0: | |
ret[uid][-1][ouid] = weight | |
return ret | |
def get_subnet_data(subtensor: bt.subtensor, metagraph: bt.metagraph) -> typing.List[ModelData]: | |
result = [] | |
for uid in tqdm(metagraph.uids.tolist(), desc="Metadata for hotkeys"): | |
hotkey = metagraph.hotkeys[uid] | |
try: | |
# Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs. | |
partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
metadata = run_in_subprocess(partial, METADATA_TTL) | |
except KeyboardInterrupt: | |
raise | |
except: | |
metadata = None | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = metagraph.incentive[uid].nan_to_num().item() | |
emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def floatable(x) -> bool: | |
return (isinstance(x, float) and not math.isnan(x) and not math.isinf(x)) or isinstance(x, int) | |
def get_float_score(key: str, history, competition_id: str) -> typing.Tuple[typing.Optional[float], bool]: | |
if key in history and "competition_id" in history: | |
data = list(history[key]) | |
if len(data) > 0: | |
competitions = list(history["competition_id"]) | |
while True: | |
if competitions.pop() != competition_id: | |
data.pop() | |
continue | |
if floatable(data[-1]): | |
return float(data[-1]), True | |
else: | |
data = [float(x) for x in data if floatable(x)] | |
if len(data) > 0: | |
return float(data[-1]), False | |
break | |
return None, False | |
def get_sample(uid, history, competition_id: str) -> typing.Optional[typing.Tuple[str, str, str]]: | |
prompt_key = f"sample_prompt_data.{uid}" | |
response_key = f"sample_response_data.{uid}" | |
truth_key = f"sample_truth_data.{uid}" | |
if prompt_key in history and response_key in history and truth_key in history and "competition_id" in history: | |
competitions = list(history["competition_id"]) | |
prompts = list(history[prompt_key]) | |
responses = list(history[response_key]) | |
truths = list(history[truth_key]) | |
while True: | |
prompt = prompts.pop() | |
response = responses.pop() | |
truth = truths.pop() | |
if competitions.pop() != competition_id: | |
continue | |
if isinstance(prompt, str) and isinstance(response, str) and isinstance(truth, str): | |
return prompt, response, truth | |
break | |
return None | |
def get_scores(uids: typing.List[int], competition_id: str) -> typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]]: | |
api = wandb.Api() | |
runs = list(api.runs(VALIDATOR_WANDB_PROJECT)) | |
result = {} | |
for run in runs: | |
history = run.history() | |
for uid in uids: | |
if uid in result.keys(): | |
continue | |
perplexity, perplexity_fresh = get_float_score(f"perplexity_data.{uid}", history, competition_id) | |
win_rate, win_rate_fresh = get_float_score(f"win_rate_data.{uid}", history, competition_id) | |
win_total, win_total_fresh = get_float_score(f"win_total_data.{uid}", history, competition_id) | |
weight, weight_fresh = get_float_score(f"weight_data.{uid}", history, competition_id) | |
sample = get_sample(uid, history, competition_id) | |
result[uid] = { | |
"perplexity": perplexity, | |
"win_rate": win_rate, | |
"win_total": win_total, | |
"weight": weight, | |
"sample": sample, | |
"fresh": perplexity_fresh and win_rate_fresh and win_total_fresh | |
} | |
if len(result.keys()) == len(uids): | |
break | |
return result | |
def format_score(uid, scores, key) -> typing.Optional[float]: | |
if uid in scores: | |
if key in scores[uid]: | |
point = scores[uid][key] | |
if floatable(point): | |
return round(scores[uid][key], 4) | |
return None | |
def next_tempo(start_block, tempo, block): | |
start_num = start_block + tempo | |
intervals = (block - start_num) // tempo | |
nearest_num = start_num + ((intervals + 1) * tempo) | |
return nearest_num | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
tao_price = get_tao_price() | |
leaderboard_df = get_subnet_data(subtensor, metagraph) | |
leaderboard_df.sort(key=lambda x: x.incentive, reverse=True) | |
competition_scores = { | |
y.id: get_scores([x.uid for x in leaderboard_df if x.competition == y.id], y.id) | |
for y in COMPETITIONS | |
} | |
current_block = metagraph.block.item() | |
next_update = next_tempo( | |
SUBNET_START_BLOCK, | |
subtensor.get_subnet_hyperparameters(NETUID).tempo, | |
current_block | |
) | |
blocks_to_go = next_update - current_block | |
current_time = datetime.datetime.now() | |
next_update_time = current_time + datetime.timedelta(seconds=blocks_to_go * SECONDS_PER_BLOCK) | |
validator_df = get_validator_weights(metagraph) | |
weight_keys = set() | |
for uid, stats in validator_df.items(): | |
weight_keys.update(stats[-1].keys()) | |
def get_next_update(): | |
now = datetime.datetime.now() | |
delta = next_update_time - now | |
return f"""<div align="center" style="font-size: larger;">Next reward update: <b>{blocks_to_go}</b> blocks (~{int(delta.total_seconds() // 60)} minutes)</div>""" | |
def leaderboard_data(show_stale: bool, scores: typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]], competition_id: str): | |
value = [ | |
[ | |
f'[{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})', | |
format_score(c.uid, scores, "win_rate"), | |
format_score(c.uid, scores, "perplexity"), | |
format_score(c.uid, scores, "weight"), | |
c.uid, | |
c.block | |
] for c in leaderboard_df if c.competition == competition_id and (scores[c.uid]["fresh"] or show_stale) | |
] | |
return value | |
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") | |
with demo: | |
gr.HTML(FONT) | |
gr.HTML(TITLE) | |
gr.HTML(IMAGE) | |
gr.HTML(HEADER) | |
gr.HTML(value=get_next_update()) | |
with gr.Tabs(): | |
for competition in COMPETITIONS: | |
with gr.Tab(competition.name): | |
scores = competition_scores[competition.id] | |
print(scores) | |
class_denominator = sum(leaderboard_df[i].incentive for i in range(0, 10) if leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id) | |
class_values = { | |
f"{leaderboard_df[i].namespace}/{leaderboard_df[i].name} ({leaderboard_df[i].commit[0:8]}, UID={leaderboard_df[i].uid}) · ${round(leaderboard_df[i].emission * tao_price, 2):,} (τ{round(leaderboard_df[i].emission, 2):,})": \ | |
leaderboard_df[i].incentive / class_denominator for i in range(0, 10) if leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id | |
} | |
gr.Label( | |
value=class_values, | |
num_top_classes=10, | |
) | |
with gr.Accordion("Evaluation Stats"): | |
gr.HTML(EVALUATION_HEADER) | |
with gr.Tabs(): | |
for entry in leaderboard_df: | |
if entry.competition == competition.id: | |
sample = scores[entry.uid]["sample"] | |
if sample is not None: | |
name = f"{entry.namespace}/{entry.name} ({entry.commit[0:8]}, UID={entry.uid})" | |
with gr.Tab(name): | |
gr.Chatbot([(sample[0], sample[1])]) | |
# gr.Chatbot([(sample[0], f"*{name}*: {sample[1]}"), (None, f"*GPT-4*: {sample[2]}")]) | |
show_stale = gr.Checkbox(label="Show Stale", interactive=True) | |
leaderboard_table = gr.components.Dataframe( | |
value=leaderboard_data(show_stale.value, scores, competition.id), | |
headers=["Name", "Win Rate", "Perplexity", "Weight", "UID", "Block"], | |
datatype=["markdown", "number", "number", "number", "number", "number"], | |
elem_id="leaderboard-table", | |
interactive=False, | |
visible=True, | |
) | |
gr.HTML(EVALUATION_DETAILS) | |
show_stale.change(lambda x: leaderboard_data(x, scores, competition.id), [show_stale], leaderboard_table) | |
with gr.Accordion("Validator Stats"): | |
validator_table = gr.components.Dataframe( | |
value=[ | |
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] + [validator_df[uid][-1].get(c.uid) for c in leaderboard_df if c.incentive] | |
for uid, _ in sorted( | |
zip(validator_df.keys(), [validator_df[x][1] for x in validator_df.keys()]), | |
key=lambda x: x[1], | |
reverse=True | |
) | |
], | |
headers=["UID", "Stake (τ)", "V-Trust"] + [f"{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})" for c in leaderboard_df if c.incentive], | |
datatype=["number", "number", "number"] + ["number" for c in leaderboard_df if c.incentive], | |
interactive=False, | |
visible=True, | |
) | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", seconds=60 * 15) # restart every 15 minutes | |
scheduler.start() | |
demo.launch() |