import os import json import tempfile import zipfile from datetime import datetime import gradio as gr import numpy as np import torch from PIL import Image # Program A imports from utils import MEGABenchEvalDataLoader from constants import * # This is assumed to define CITATION_BUTTON_TEXT, CITATION_BUTTON_LABEL, TABLE_INTRODUCTION, LEADERBOARD_INTRODUCTION, DATA_INFO, SUBMIT_INTRODUCTION, BASE_MODEL_GROUPS, etc. # Program B imports import spaces from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, Qwen2_5_VLForConditionalGeneration from qwen_vl_utils import process_vision_info from gliner import GLiNER # ---------------------------------------------------------------- # Combined CSS # ---------------------------------------------------------------- current_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(current_dir, "static", "css", "style.css"), "r") as f: base_css = f.read() with open(os.path.join(current_dir, "static", "css", "table.css"), "r") as f: table_css = f.read() css_program_b = """ /* Program B CSS */ .gradio-container { max-width: 1200px !important; margin: 0 auto; padding: 20px; background-color: #f8f9fa; } .tabs { border-radius: 8px; background: white; padding: 20px; box-shadow: 0 2px 6px rgba(0, 0, 0, 0.1); } .input-container, .output-container { background: white; border-radius: 8px; padding: 15px; margin: 10px 0; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05); } .submit-btn { background-color: #2d31fa !important; border: none !important; padding: 8px 20px !important; border-radius: 6px !important; color: white !important; transition: all 0.3s ease !important; } .submit-btn:hover { background-color: #1f24c7 !important; transform: translateY(-1px); } #output { height: 500px; overflow: auto; border: 1px solid #e0e0e0; border-radius: 6px; padding: 15px; background: #ffffff; font-family: 'Arial', sans-serif; } .gr-dropdown { border-radius: 6px !important; border: 1px solid #e0e0e0 !important; } .gr-image-input { border: 2px dashed #ccc; border-radius: 8px; padding: 20px; transition: all 0.3s ease; } .gr-image-input:hover { border-color: #2d31fa; } """ css_global = base_css + "\n" + table_css + "\n" + css_program_b # ---------------------------------------------------------------- # Program A Global Initializations # ---------------------------------------------------------------- default_loader = MEGABenchEvalDataLoader("./static/eval_results/Default") si_loader = MEGABenchEvalDataLoader("./static/eval_results/SI") # ---------------------------------------------------------------- # Program B Global Initializations # ---------------------------------------------------------------- gliner_model = GLiNER.from_pretrained("knowledgator/modern-gliner-bi-large-v1.0") DEFAULT_NER_LABELS = "person, organization, location, date, event" models = { "Qwen/Qwen2.5-VL-7B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained( "Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True, torch_dtype="auto" ).cuda().eval() } processors = { "Qwen/Qwen2.5-VL-7B-Instruct": AutoProcessor.from_pretrained( "Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True ) } user_prompt = '<|user|>\n' assistant_prompt = '<|assistant|>\n' prompt_suffix = "<|end|>\n" # A simple metadata container for OCR results and entity information. class TextWithMetadata(list): def __init__(self, *args, **kwargs): super().__init__(*args) self.original_text = kwargs.get('original_text', '') self.entities = kwargs.get('entities', []) # ---------------------------------------------------------------- # UI DEFINITION (placed at the top) # ---------------------------------------------------------------- with gr.Blocks(css=css_global) as demo: with gr.Tabs(): # ------------------------- # Tab 1: Dashboard (Program A) # ------------------------- with gr.TabItem("Dashboard"): with gr.Tabs(elem_classes="tab-buttons") as dashboard_tabs: # --- MEGA-Bench Leaderboard Tab --- with gr.TabItem("📊 MEGA-Bench"): # Inject table CSS (will be updated when the table is refreshed) css_style = gr.HTML(f"", visible=False) # Define captions for default vs. single-image tables default_caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks " "of each keyword.
The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by " "rule-based metrics, and the Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a " "VLM judge (we use GPT-4o-0806).
Different from the results in our paper, we only use the Core " "results with CoT prompting here for clarity and compatibility with the released data.
" "$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} " "\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$
* indicates self-reported " "results from the model authors.") single_image_caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks " "in each keyword.
This subset contains 273 single-image tasks from the Core set and 42 single-image tasks " "from the Open-ended set. For open-source models, we drop the image input in the 1-shot demonstration example so that " "the entire query contains a single image only.
Compared to the default table, some models with only " "single-image support are added.") caption_component = gr.Markdown( value=default_caption, elem_classes="table-caption", latex_delimiters=[{"left": "$", "right": "$", "display": False}], ) with gr.Row(): super_group_selector = gr.Radio( choices=list(default_loader.SUPER_GROUPS.keys()), label="Select a dimension to display breakdown results. We use different column colors to distinguish the overall benchmark scores and breakdown results.", value=list(default_loader.SUPER_GROUPS.keys())[0] ) model_group_selector = gr.Radio( choices=list(BASE_MODEL_GROUPS.keys()), label="Select a model group", value="All" ) initial_headers, initial_data = default_loader.get_leaderboard_data( list(default_loader.SUPER_GROUPS.keys())[0], "All" ) data_component = gr.Dataframe( value=initial_data, headers=initial_headers, datatype=["number", "html"] + ["number"] * (len(initial_headers) - 2), interactive=True, elem_classes="custom-dataframe", max_height=2400, column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(initial_headers) - 5), ) with gr.Row(): with gr.Accordion("Citation", open=False): citation_button = gr.Textbox( value=CITATION_BUTTON_TEXT, label=CITATION_BUTTON_LABEL, elem_id="citation-button", lines=10, ) gr.Markdown(TABLE_INTRODUCTION) with gr.Row(): table_selector = gr.Radio( choices=["Default", "Single Image"], label="Select table to display. Default: all MEGA-Bench tasks; Single Image: single-image tasks only.", value="Default" ) refresh_button = gr.Button("Refresh") # Wire up event handlers (functions defined below) refresh_button.click( fn=update_table_and_caption, inputs=[table_selector, super_group_selector, model_group_selector], outputs=[data_component, caption_component, css_style] ) super_group_selector.change( fn=update_table_and_caption, inputs=[table_selector, super_group_selector, model_group_selector], outputs=[data_component, caption_component, css_style] ) model_group_selector.change( fn=update_table_and_caption, inputs=[table_selector, super_group_selector, model_group_selector], outputs=[data_component, caption_component, css_style] ) table_selector.change( fn=update_selectors, inputs=[table_selector], outputs=[super_group_selector, model_group_selector] ).then( fn=update_table_and_caption, inputs=[table_selector, super_group_selector, model_group_selector], outputs=[data_component, caption_component, css_style] ) # --- Introduction Tab --- with gr.TabItem("📚 Introduction"): gr.Markdown(LEADERBOARD_INTRODUCTION) # --- Data Information Tab --- with gr.TabItem("📝 Data Information"): gr.Markdown(DATA_INFO, elem_classes="markdown-text") # --- Submit Tab --- with gr.TabItem("🚀 Submit"): with gr.Row(): gr.Markdown(SUBMIT_INTRODUCTION, elem_classes="markdown-text") # ------------------------- # Tab 2: Image Processing (Program B) # ------------------------- with gr.TabItem("Image Processing"): # A default image is shown for context. gr.Image("Caracal.jpg", interactive=False) # It is important to create a state variable to store the OCR/NER result. ocr_state = gr.State() with gr.Tab(label="Image Input", elem_classes="tabs"): with gr.Row(): with gr.Column(elem_classes="input-container"): input_img = gr.Image(label="Input Picture", elem_classes="gr-image-input") model_selector = gr.Dropdown( choices=list(models.keys()), label="Model", value="Qwen/Qwen2.5-VL-7B-Instruct", elem_classes="gr-dropdown" ) with gr.Row(): ner_checkbox = gr.Checkbox(label="Run Named Entity Recognition", value=False) ner_labels = gr.Textbox( label="NER Labels (comma-separated)", value=DEFAULT_NER_LABELS, visible=False ) submit_btn = gr.Button(value="Submit", elem_classes="submit-btn") with gr.Column(elem_classes="output-container"): output_text = gr.HighlightedText(label="Output Text", elem_id="output") # Toggle visibility of the NER labels textbox. ner_checkbox.change( lambda x: gr.update(visible=x), inputs=[ner_checkbox], outputs=[ner_labels] ) submit_btn.click( fn=run_example, inputs=[input_img, model_selector, ner_checkbox, ner_labels], outputs=[output_text, ocr_state] ) with gr.Row(): filename = gr.Textbox(label="Save filename (without extension)", placeholder="Enter filename to save") download_btn = gr.Button("Download Image & Text", elem_classes="submit-btn") download_output = gr.File(label="Download") download_btn.click( fn=create_zip, inputs=[input_img, filename, ocr_state], outputs=[download_output] ) # ---------------------------------------------------------------- # FUNCTION DEFINITIONS # ---------------------------------------------------------------- def update_table_and_caption(table_type, super_group, model_group): """ Updates the leaderboard DataFrame, caption and CSS based on the table type and selectors. """ if table_type == "Default": headers, data = default_loader.get_leaderboard_data(super_group, model_group) caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks " "of each keyword.
The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by rule-based metrics, and the " "Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a VLM judge (we use GPT-4o-0806).
" "Different from the results in our paper, we only use the Core results with CoT prompting here for clarity and compatibility " "with the released data.
$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} " "\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$
* indicates self-reported results from the model authors.") else: # Single-image table headers, data = si_loader.get_leaderboard_data(super_group, model_group) caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks " "in each keyword.
This subset contains 273 single-image tasks from the Core set and 42 single-image tasks from the Open-ended set. " "For open-source models, we drop the image input in the 1-shot demonstration example so that the entire query contains a single image only.
" "Compared to the default table, some models with only single-image support are added.") dataframe = gr.Dataframe( value=data, headers=headers, datatype=["number", "html"] + ["number"] * (len(headers) - 2), interactive=True, column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(headers) - 5), ) style_html = f"" return dataframe, caption, style_html def update_selectors(table_type): """ Updates the options in the radio selectors based on the selected table type. """ loader = default_loader if table_type == "Default" else si_loader return [gr.Radio.update(choices=list(loader.SUPER_GROUPS.keys())), gr.Radio.update(choices=list(loader.MODEL_GROUPS.keys()))] def array_to_image_path(image_array): """ Converts a NumPy image array to a PIL Image, saves it to disk, and returns its path. """ img = Image.fromarray(np.uint8(image_array)) img.thumbnail((1024, 1024)) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"image_{timestamp}.png" img.save(filename) return os.path.abspath(filename) @spaces.GPU def run_example(image, model_id="Qwen/Qwen2.5-VL-7B-Instruct", run_ner=False, ner_labels=DEFAULT_NER_LABELS): """ Given an input image, uses the selected VL model to perform OCR (and optionally NER). Returns the highlighted text and stores the raw OCR output in state. """ text_input = "Convert the image to text." image_path = array_to_image_path(image) model = models[model_id] processor = processors[model_id] prompt = f"{user_prompt}<|image_1|>\n{text_input}{prompt_suffix}{assistant_prompt}" image_pil = Image.fromarray(image).convert("RGB") messages = [ { "role": "user", "content": [ {"type": "image", "image": image_path}, {"type": "text", "text": text_input}, ], } ] # Prepare text and vision inputs text_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text_full], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda") # Generate model output generated_ids = model.generate(**inputs, max_new_tokens=1024) generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] output_text = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) ocr_text = output_text[0] if run_ner: ner_results = gliner_model.predict_entities(ocr_text, ner_labels.split(","), threshold=0.3) highlighted_text = [] last_end = 0 for entity in sorted(ner_results, key=lambda x: x["start"]): if last_end < entity["start"]: highlighted_text.append((ocr_text[last_end:entity["start"]], None)) highlighted_text.append((ocr_text[entity["start"]:entity["end"]], entity["label"])) last_end = entity["end"] if last_end < len(ocr_text): highlighted_text.append((ocr_text[last_end:], None)) result = TextWithMetadata(highlighted_text, original_text=ocr_text, entities=ner_results) return result, result # one for display, one for state result = TextWithMetadata([(ocr_text, None)], original_text=ocr_text, entities=[]) return result, result def create_zip(image, fname, ocr_result): """ Creates a zip file containing the saved image, the OCR text, and a JSON of the OCR output. """ if not fname or image is None: return None try: if isinstance(image, np.ndarray): image_pil = Image.fromarray(image) elif isinstance(image, Image.Image): image_pil = image else: return None with tempfile.TemporaryDirectory() as temp_dir: img_path = os.path.join(temp_dir, f"{fname}.png") image_pil.save(img_path) original_text = ocr_result.original_text if ocr_result else "" txt_path = os.path.join(temp_dir, f"{fname}.txt") with open(txt_path, 'w', encoding='utf-8') as f: f.write(original_text) json_data = { "text": original_text, "entities": ocr_result.entities if ocr_result else [], "image_file": f"{fname}.png" } json_path = os.path.join(temp_dir, f"{fname}.json") with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=2, ensure_ascii=False) output_dir = "downloads" os.makedirs(output_dir, exist_ok=True) zip_path = os.path.join(output_dir, f"{fname}.zip") with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.write(img_path, os.path.basename(img_path)) zipf.write(txt_path, os.path.basename(txt_path)) zipf.write(json_path, os.path.basename(json_path)) return zip_path except Exception as e: print(f"Error creating zip: {str(e)}") return None # ---------------------------------------------------------------- # Launch the merged Gradio app # ---------------------------------------------------------------- if __name__ == "__main__": demo.queue(api_open=False) demo.launch(debug=True)