import os
import json
import tempfile
import zipfile
from datetime import datetime
import gradio as gr
import numpy as np
import torch
from PIL import Image
# Program A imports
from utils import MEGABenchEvalDataLoader
from constants import * # This is assumed to define CITATION_BUTTON_TEXT, CITATION_BUTTON_LABEL, TABLE_INTRODUCTION, LEADERBOARD_INTRODUCTION, DATA_INFO, SUBMIT_INTRODUCTION, BASE_MODEL_GROUPS, etc.
# Program B imports
import spaces
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, Qwen2_5_VLForConditionalGeneration
from qwen_vl_utils import process_vision_info
from gliner import GLiNER
# ----------------------------------------------------------------
# Combined CSS
# ----------------------------------------------------------------
current_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_dir, "static", "css", "style.css"), "r") as f:
base_css = f.read()
with open(os.path.join(current_dir, "static", "css", "table.css"), "r") as f:
table_css = f.read()
css_program_b = """
/* Program B CSS */
.gradio-container {
max-width: 1200px !important;
margin: 0 auto;
padding: 20px;
background-color: #f8f9fa;
}
.tabs {
border-radius: 8px;
background: white;
padding: 20px;
box-shadow: 0 2px 6px rgba(0, 0, 0, 0.1);
}
.input-container, .output-container {
background: white;
border-radius: 8px;
padding: 15px;
margin: 10px 0;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05);
}
.submit-btn {
background-color: #2d31fa !important;
border: none !important;
padding: 8px 20px !important;
border-radius: 6px !important;
color: white !important;
transition: all 0.3s ease !important;
}
.submit-btn:hover {
background-color: #1f24c7 !important;
transform: translateY(-1px);
}
#output {
height: 500px;
overflow: auto;
border: 1px solid #e0e0e0;
border-radius: 6px;
padding: 15px;
background: #ffffff;
font-family: 'Arial', sans-serif;
}
.gr-dropdown {
border-radius: 6px !important;
border: 1px solid #e0e0e0 !important;
}
.gr-image-input {
border: 2px dashed #ccc;
border-radius: 8px;
padding: 20px;
transition: all 0.3s ease;
}
.gr-image-input:hover {
border-color: #2d31fa;
}
"""
css_global = base_css + "\n" + table_css + "\n" + css_program_b
# ----------------------------------------------------------------
# Program A Global Initializations
# ----------------------------------------------------------------
default_loader = MEGABenchEvalDataLoader("./static/eval_results/Default")
si_loader = MEGABenchEvalDataLoader("./static/eval_results/SI")
# ----------------------------------------------------------------
# Program B Global Initializations
# ----------------------------------------------------------------
gliner_model = GLiNER.from_pretrained("knowledgator/modern-gliner-bi-large-v1.0")
DEFAULT_NER_LABELS = "person, organization, location, date, event"
models = {
"Qwen/Qwen2.5-VL-7B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained(
"Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True, torch_dtype="auto"
).cuda().eval()
}
processors = {
"Qwen/Qwen2.5-VL-7B-Instruct": AutoProcessor.from_pretrained(
"Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True
)
}
user_prompt = '<|user|>\n'
assistant_prompt = '<|assistant|>\n'
prompt_suffix = "<|end|>\n"
# A simple metadata container for OCR results and entity information.
class TextWithMetadata(list):
def __init__(self, *args, **kwargs):
super().__init__(*args)
self.original_text = kwargs.get('original_text', '')
self.entities = kwargs.get('entities', [])
# ----------------------------------------------------------------
# UI DEFINITION (placed at the top)
# ----------------------------------------------------------------
with gr.Blocks(css=css_global) as demo:
with gr.Tabs():
# -------------------------
# Tab 1: Dashboard (Program A)
# -------------------------
with gr.TabItem("Dashboard"):
with gr.Tabs(elem_classes="tab-buttons") as dashboard_tabs:
# --- MEGA-Bench Leaderboard Tab ---
with gr.TabItem("📊 MEGA-Bench"):
# Inject table CSS (will be updated when the table is refreshed)
css_style = gr.HTML(f"", visible=False)
# Define captions for default vs. single-image tables
default_caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks "
"of each keyword.
The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by "
"rule-based metrics, and the Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a "
"VLM judge (we use GPT-4o-0806).
Different from the results in our paper, we only use the Core "
"results with CoT prompting here for clarity and compatibility with the released data.
"
"$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} "
"\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$
* indicates self-reported "
"results from the model authors.")
single_image_caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks "
"in each keyword.
This subset contains 273 single-image tasks from the Core set and 42 single-image tasks "
"from the Open-ended set. For open-source models, we drop the image input in the 1-shot demonstration example so that "
"the entire query contains a single image only.
Compared to the default table, some models with only "
"single-image support are added.")
caption_component = gr.Markdown(
value=default_caption,
elem_classes="table-caption",
latex_delimiters=[{"left": "$", "right": "$", "display": False}],
)
with gr.Row():
super_group_selector = gr.Radio(
choices=list(default_loader.SUPER_GROUPS.keys()),
label="Select a dimension to display breakdown results. We use different column colors to distinguish the overall benchmark scores and breakdown results.",
value=list(default_loader.SUPER_GROUPS.keys())[0]
)
model_group_selector = gr.Radio(
choices=list(BASE_MODEL_GROUPS.keys()),
label="Select a model group",
value="All"
)
initial_headers, initial_data = default_loader.get_leaderboard_data(
list(default_loader.SUPER_GROUPS.keys())[0], "All"
)
data_component = gr.Dataframe(
value=initial_data,
headers=initial_headers,
datatype=["number", "html"] + ["number"] * (len(initial_headers) - 2),
interactive=True,
elem_classes="custom-dataframe",
max_height=2400,
column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(initial_headers) - 5),
)
with gr.Row():
with gr.Accordion("Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
elem_id="citation-button",
lines=10,
)
gr.Markdown(TABLE_INTRODUCTION)
with gr.Row():
table_selector = gr.Radio(
choices=["Default", "Single Image"],
label="Select table to display. Default: all MEGA-Bench tasks; Single Image: single-image tasks only.",
value="Default"
)
refresh_button = gr.Button("Refresh")
# Wire up event handlers (functions defined below)
refresh_button.click(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
super_group_selector.change(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
model_group_selector.change(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
table_selector.change(
fn=update_selectors,
inputs=[table_selector],
outputs=[super_group_selector, model_group_selector]
).then(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
# --- Introduction Tab ---
with gr.TabItem("📚 Introduction"):
gr.Markdown(LEADERBOARD_INTRODUCTION)
# --- Data Information Tab ---
with gr.TabItem("📝 Data Information"):
gr.Markdown(DATA_INFO, elem_classes="markdown-text")
# --- Submit Tab ---
with gr.TabItem("🚀 Submit"):
with gr.Row():
gr.Markdown(SUBMIT_INTRODUCTION, elem_classes="markdown-text")
# -------------------------
# Tab 2: Image Processing (Program B)
# -------------------------
with gr.TabItem("Image Processing"):
# A default image is shown for context.
gr.Image("Caracal.jpg", interactive=False)
# It is important to create a state variable to store the OCR/NER result.
ocr_state = gr.State()
with gr.Tab(label="Image Input", elem_classes="tabs"):
with gr.Row():
with gr.Column(elem_classes="input-container"):
input_img = gr.Image(label="Input Picture", elem_classes="gr-image-input")
model_selector = gr.Dropdown(
choices=list(models.keys()),
label="Model",
value="Qwen/Qwen2.5-VL-7B-Instruct",
elem_classes="gr-dropdown"
)
with gr.Row():
ner_checkbox = gr.Checkbox(label="Run Named Entity Recognition", value=False)
ner_labels = gr.Textbox(
label="NER Labels (comma-separated)",
value=DEFAULT_NER_LABELS,
visible=False
)
submit_btn = gr.Button(value="Submit", elem_classes="submit-btn")
with gr.Column(elem_classes="output-container"):
output_text = gr.HighlightedText(label="Output Text", elem_id="output")
# Toggle visibility of the NER labels textbox.
ner_checkbox.change(
lambda x: gr.update(visible=x),
inputs=[ner_checkbox],
outputs=[ner_labels]
)
submit_btn.click(
fn=run_example,
inputs=[input_img, model_selector, ner_checkbox, ner_labels],
outputs=[output_text, ocr_state]
)
with gr.Row():
filename = gr.Textbox(label="Save filename (without extension)", placeholder="Enter filename to save")
download_btn = gr.Button("Download Image & Text", elem_classes="submit-btn")
download_output = gr.File(label="Download")
download_btn.click(
fn=create_zip,
inputs=[input_img, filename, ocr_state],
outputs=[download_output]
)
# ----------------------------------------------------------------
# FUNCTION DEFINITIONS
# ----------------------------------------------------------------
def update_table_and_caption(table_type, super_group, model_group):
"""
Updates the leaderboard DataFrame, caption and CSS based on the table type and selectors.
"""
if table_type == "Default":
headers, data = default_loader.get_leaderboard_data(super_group, model_group)
caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks "
"of each keyword.
The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by rule-based metrics, and the "
"Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a VLM judge (we use GPT-4o-0806).
"
"Different from the results in our paper, we only use the Core results with CoT prompting here for clarity and compatibility "
"with the released data.
$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} "
"\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$
* indicates self-reported results from the model authors.")
else: # Single-image table
headers, data = si_loader.get_leaderboard_data(super_group, model_group)
caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks "
"in each keyword.
This subset contains 273 single-image tasks from the Core set and 42 single-image tasks from the Open-ended set. "
"For open-source models, we drop the image input in the 1-shot demonstration example so that the entire query contains a single image only.
"
"Compared to the default table, some models with only single-image support are added.")
dataframe = gr.Dataframe(
value=data,
headers=headers,
datatype=["number", "html"] + ["number"] * (len(headers) - 2),
interactive=True,
column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(headers) - 5),
)
style_html = f""
return dataframe, caption, style_html
def update_selectors(table_type):
"""
Updates the options in the radio selectors based on the selected table type.
"""
loader = default_loader if table_type == "Default" else si_loader
return [gr.Radio.update(choices=list(loader.SUPER_GROUPS.keys())),
gr.Radio.update(choices=list(loader.MODEL_GROUPS.keys()))]
def array_to_image_path(image_array):
"""
Converts a NumPy image array to a PIL Image, saves it to disk, and returns its path.
"""
img = Image.fromarray(np.uint8(image_array))
img.thumbnail((1024, 1024))
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"image_{timestamp}.png"
img.save(filename)
return os.path.abspath(filename)
@spaces.GPU
def run_example(image, model_id="Qwen/Qwen2.5-VL-7B-Instruct", run_ner=False, ner_labels=DEFAULT_NER_LABELS):
"""
Given an input image, uses the selected VL model to perform OCR (and optionally NER).
Returns the highlighted text and stores the raw OCR output in state.
"""
text_input = "Convert the image to text."
image_path = array_to_image_path(image)
model = models[model_id]
processor = processors[model_id]
prompt = f"{user_prompt}<|image_1|>\n{text_input}{prompt_suffix}{assistant_prompt}"
image_pil = Image.fromarray(image).convert("RGB")
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image_path},
{"type": "text", "text": text_input},
],
}
]
# Prepare text and vision inputs
text_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text_full],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
# Generate model output
generated_ids = model.generate(**inputs, max_new_tokens=1024)
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
ocr_text = output_text[0]
if run_ner:
ner_results = gliner_model.predict_entities(ocr_text, ner_labels.split(","), threshold=0.3)
highlighted_text = []
last_end = 0
for entity in sorted(ner_results, key=lambda x: x["start"]):
if last_end < entity["start"]:
highlighted_text.append((ocr_text[last_end:entity["start"]], None))
highlighted_text.append((ocr_text[entity["start"]:entity["end"]], entity["label"]))
last_end = entity["end"]
if last_end < len(ocr_text):
highlighted_text.append((ocr_text[last_end:], None))
result = TextWithMetadata(highlighted_text, original_text=ocr_text, entities=ner_results)
return result, result # one for display, one for state
result = TextWithMetadata([(ocr_text, None)], original_text=ocr_text, entities=[])
return result, result
def create_zip(image, fname, ocr_result):
"""
Creates a zip file containing the saved image, the OCR text, and a JSON of the OCR output.
"""
if not fname or image is None:
return None
try:
if isinstance(image, np.ndarray):
image_pil = Image.fromarray(image)
elif isinstance(image, Image.Image):
image_pil = image
else:
return None
with tempfile.TemporaryDirectory() as temp_dir:
img_path = os.path.join(temp_dir, f"{fname}.png")
image_pil.save(img_path)
original_text = ocr_result.original_text if ocr_result else ""
txt_path = os.path.join(temp_dir, f"{fname}.txt")
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(original_text)
json_data = {
"text": original_text,
"entities": ocr_result.entities if ocr_result else [],
"image_file": f"{fname}.png"
}
json_path = os.path.join(temp_dir, f"{fname}.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
zip_path = os.path.join(output_dir, f"{fname}.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(img_path, os.path.basename(img_path))
zipf.write(txt_path, os.path.basename(txt_path))
zipf.write(json_path, os.path.basename(json_path))
return zip_path
except Exception as e:
print(f"Error creating zip: {str(e)}")
return None
# ----------------------------------------------------------------
# Launch the merged Gradio app
# ----------------------------------------------------------------
if __name__ == "__main__":
demo.queue(api_open=False)
demo.launch(debug=True)