transformer-ranker / utils.py
lukasgarbas's picture
add gradio app
73d9a01
import gradio as gr
from datasets import concatenate_datasets
from huggingface_hub import HfApi
from huggingface_hub.errors import HFValidationError
from requests.exceptions import HTTPError
from transformer_ranker import Result
from transformer_ranker.datacleaner import DatasetCleaner, TaskCategory
from transformer_ranker.embedder import Embedder
import math
DISABLED_BUTTON_VARIANT = "huggingface"
ENABLED_BUTTON_VARIANT = "primary"
HEADLINE = """
<h1 align="center">TransformerRanker</h1>
<p align="center" style="max-width: 560px; margin: auto;">
A very simple library that helps you find the best-suited language model for your NLP task.
All you need to do is to select a dataset and a list of pre-trained language models (LMs) from the 🤗 HuggingFace Hub.
TransformerRanker will quickly estimate which of these LMs will perform best on the given dataset!
</p>
<p align="center" style="font-weight: bold; margin-top: 20px; display: flex; justify-content: center; gap: 10px;">
<a href="https://github.com/flairNLP/transformer-ranker">
<img src="https://img.shields.io/github/stars/flairNLP/transformer-ranker?style=social&label=Repository" alt="GitHub Badge">
</a>
<a href="https://pypi.org/project/transformer-ranker/">
<img src="https://img.shields.io/badge/Package-orange?style=flat&logo=python" alt="Package Badge">
</a>
<a href="https://github.com/flairNLP/transformer-ranker/blob/main/examples/01-walkthrough.md">
<img src="https://img.shields.io/badge/Tutorials-blue?style=flat&logo=readthedocs&logoColor=white" alt="Tutorials Badge">
</a>
<img src="https://img.shields.io/badge/license-MIT-green?style=flat" alt="License: MIT">
</p>
<p align="center">Developed at <a href="https://www.informatik.hu-berlin.de/en/forschung-en/gebiete/ml-en/">Humboldt University of Berlin</a>.</p>
"""
FOOTER = """
**Note:** This demonstration currently runs on a CPU and is suited for smaller models only.
**Developers:** [@plonerma](https://huggingface.co/plonerma) and [@lukasgarbas](https://huggingface.co/lukasgarbas).
For feedback, suggestions, or contributions, reach out via GitHub or leave a message in the [discussions](https://huggingface.co/spaces/lukasgarbas/transformer-ranker/discussions).
"""
CSS = """
.gradio-container{max-width: 800px !important}
a {color: #ff9d00;}
@media (prefers-color-scheme: dark) { a {color: #be185d;} }
"""
hf_api = HfApi()
def check_dataset_exists(dataset_name):
"""Update loading button if dataset can be found"""
try:
hf_api.dataset_info(dataset_name)
return gr.update(interactive=True, variant=ENABLED_BUTTON_VARIANT)
except (HTTPError, HFValidationError):
return gr.update(value="Load dataset", interactive=False, variant=DISABLED_BUTTON_VARIANT)
def check_dataset_is_loaded(dataset, text_column, label_column, task_category):
if dataset and text_column != "-" and label_column != "-" and task_category != "-":
return gr.update(interactive=True, variant=ENABLED_BUTTON_VARIANT)
else:
return gr.update(interactive=False, variant=DISABLED_BUTTON_VARIANT)
def get_dataset_info(dataset):
"""Show information for dataset settings"""
joined_dataset = concatenate_datasets(list(dataset.values()))
datacleaner = DatasetCleaner()
try:
text_column = datacleaner._find_column(joined_dataset, "text column")
except ValueError:
gr.Warning("Text column can not be found. Select it in the dataset settings.")
text_column = "-"
try:
label_column = datacleaner._find_column(joined_dataset, "label column")
except ValueError:
gr.Warning("Label column can not be found. Select it in the dataset settings.")
label_column = "-"
task_category = "-"
if label_column != "-":
try:
# Find or set the task_category
task_category = datacleaner._find_task_category(joined_dataset, label_column)
except ValueError:
gr.Warning(
"Task category could not be determined. The dataset must support classification or regression tasks.",
)
pass
num_samples = len(joined_dataset)
return (
gr.update(
value=task_category,
choices=[str(t) for t in TaskCategory],
interactive=True,
),
gr.update(
value=text_column, choices=joined_dataset.column_names, interactive=True
),
gr.update(
value="-", choices=["-", *joined_dataset.column_names], interactive=True
),
gr.update(
value=label_column, choices=joined_dataset.column_names, interactive=True
),
num_samples,
)
def compute_ratio(num_samples_to_use, num_samples):
if num_samples > 0:
return num_samples_to_use / num_samples
else:
return 0.0
def ensure_one_lm_selected(checkbox_values, previous_values):
if not any(checkbox_values):
return previous_values
return checkbox_values
# Apply monkey patch to enable callbacks
_old_embed = Embedder.embed
def _new_embed(embedder, sentences, batch_size: int = 32, **kw):
if embedder.tracker is not None:
embedder.tracker.update_num_batches(math.ceil(len(sentences) / batch_size))
return _old_embed(embedder, sentences, batch_size=batch_size, **kw)
Embedder.embed = _new_embed
_old_embed_batch = Embedder.embed_batch
def _new_embed_batch(embedder, *args, **kw):
r = _old_embed_batch(embedder, *args, **kw)
if embedder.tracker is not None:
embedder.tracker.update_batch_complete()
return r
Embedder.embed_batch = _new_embed_batch
_old_init = Embedder.__init__
def _new_init(embedder, *args, tracker=None, **kw):
_old_init(embedder, *args, **kw)
embedder.tracker = tracker
Embedder.__init__ = _new_init
class EmbeddingProgressTracker:
def __init__(self, *, progress, model_names):
self.model_names = model_names
self.progress_bar = progress
@property
def total(self):
return len(self.model_names)
def __enter__(self):
self.progress_bar = gr.Progress(track_tqdm=False)
self.current_model = -1
self.batches_complete = 0
self.batches_total = None
return self
def __exit__(self, typ, value, tb):
if typ is None:
self.progress_bar(1.0, desc="Done")
else:
self.progress_bar(1.0, desc="Error")
# Do not suppress any errors
return False
def update_num_batches(self, total):
self.current_model += 1
self.batches_complete = 0
self.batches_total = total
self.update_bar()
def update_batch_complete(self):
self.batches_complete += 1
self.update_bar()
def update_bar(self):
i = self.current_model
description = f"Running {self.model_names[i]} ({i + 1} / {self.total})"
progress = i / self.total
if self.batches_total is not None:
progress += (self.batches_complete / self.batches_total) / self.total
self.progress_bar(progress=progress, desc=description)