Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/env python | |
"""A demo of the DAB-DETR model.""" | |
import pathlib | |
import tempfile | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import PIL.Image | |
import spaces | |
import supervision as sv | |
import torch | |
import tqdm | |
from transformers import AutoProcessor, AutoModelForObjectDetection | |
DESCRIPTION = """ | |
# DAB-DETR | |
##### [ArXiv](https://arxiv.org/abs/2201.12329) | [Docs](https://huggingface.co/docs/transformers/main/en/model_doc/dab-detr) | |
""" | |
MAX_NUM_FRAMES = 300 | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
checkpoint = "IDEA-Research/dab-detr-resnet-50-dc5-pat3" | |
image_processor = AutoProcessor.from_pretrained(checkpoint) | |
model = AutoModelForObjectDetection.from_pretrained(checkpoint, device_map=device) | |
def process_image(image: PIL.Image.Image) -> tuple[PIL.Image.Image, list[dict]]: | |
inputs = image_processor(images=image, return_tensors="pt").to(device) | |
outputs = model(**inputs) | |
results = image_processor.post_process_object_detection( | |
outputs, target_sizes=torch.tensor([(image.height, image.width)]), threshold=0.3 | |
) | |
result = results[0] # take first image results | |
boxes_xyxy = result["boxes"].cpu().numpy() | |
indexes = result["labels"].cpu().numpy() | |
scores = result["scores"].cpu().numpy() | |
text_labels = [ | |
f"{model.config.id2label[index]} [{score.item():.2f}]" for index, score in zip(indexes, scores) | |
] | |
detections = sv.Detections(xyxy=boxes_xyxy, class_id=indexes, confidence=scores) | |
bounding_box_annotator = sv.BoxAnnotator(color=sv.Color.WHITE, color_lookup=sv.ColorLookup.INDEX, thickness=1) | |
label_annotator = sv.LabelAnnotator() | |
# annotate bounding boxes | |
annotated_frame = bounding_box_annotator.annotate(scene=image.copy(), detections=detections) | |
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=text_labels) | |
return annotated_frame | |
def process_video( | |
video_path: str, | |
progress: gr.Progress = gr.Progress(track_tqdm=True), # noqa: ARG001, B008 | |
) -> str: | |
cap = cv2.VideoCapture(video_path) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as out_file: | |
writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height)) | |
for _ in tqdm.auto.tqdm(range(min(MAX_NUM_FRAMES, num_frames))): | |
ok, frame = cap.read() | |
if not ok: | |
break | |
rgb_frame = frame[:, :, ::-1] | |
annotated_frame = process_image(PIL.Image.fromarray(rgb_frame)) | |
writer.write(np.asarray(annotated_frame)[:, :, ::-1]) | |
writer.release() | |
cap.release() | |
return out_file.name | |
with gr.Blocks(css_paths="style.css") as demo: | |
gr.Markdown(DESCRIPTION) | |
with gr.Tabs(): | |
with gr.Tab("Image"): | |
with gr.Row(): | |
with gr.Column(): | |
input_image = gr.Image(label="Input Image", type="pil") | |
run_button_image = gr.Button() | |
with gr.Column(): | |
output_image = gr.Image(label="Output Image") | |
gr.Examples( | |
examples=sorted(pathlib.Path("images").glob("*.jpg")), | |
inputs=input_image, | |
outputs=[output_image], | |
fn=process_image, | |
) | |
run_button_image.click( | |
fn=process_image, | |
inputs=input_image, | |
outputs=[output_image], | |
) | |
with gr.Tab("Video"): | |
gr.Markdown(f"The input video will be truncated to {MAX_NUM_FRAMES} frames.") | |
with gr.Row(): | |
with gr.Column(): | |
input_video = gr.Video(label="Input Video") | |
run_button_video = gr.Button() | |
with gr.Column(): | |
output_video = gr.Video(label="Output Video") | |
gr.Examples( | |
examples=sorted(pathlib.Path("videos").glob("*.mp4")), | |
inputs=input_video, | |
outputs=output_video, | |
fn=process_video, | |
cache_examples=False, | |
) | |
run_button_video.click( | |
fn=process_video, | |
inputs=input_video, | |
outputs=output_video, | |
) | |
if __name__ == "__main__": | |
demo.queue(max_size=20).launch() | |