Spaces:
Runtime error
Runtime error
import streamlit as st | |
import cv2 | |
import numpy as np | |
import supervision as sv | |
from ultralytics import YOLO | |
import os | |
from collections import defaultdict | |
def ensure_dir(file_path): | |
if not os.path.exists(file_path): | |
os.makedirs(file_path) | |
def is_point_in_polygon(point, polygon): | |
return cv2.pointPolygonTest(polygon, (int(point[0]), int(point[1])), False) > 0 | |
def process_video(input_video_path, output_video_path, polygons): | |
model = YOLO('yolov8x-seg.pt') | |
cap = cv2.VideoCapture(input_video_path) | |
intruder_ids = set() | |
ensure_dir(os.path.dirname(output_video_path)) | |
if cap.isOpened(): | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (width, height), True) | |
poly_arrays = [np.array(polygon, np.int32) for polygon in polygons] | |
while cap.isOpened(): | |
success, frame = cap.read() | |
if not success: | |
break | |
results = model.track(frame, conf=0.6, classes=None, persist=True, save=True, tracker="bytetrack.yaml") | |
boxes = results[0].boxes.xywh.cpu() | |
track_ids = results[0].boxes.id.int().cpu().tolist() | |
classes = results[0].boxes.cls.int().cpu().numpy() | |
annotated_frame = results[0].plot() if hasattr(results[0], 'plot') else frame | |
for poly_array in poly_arrays: | |
cv2.polylines(annotated_frame, [poly_array], True, (0, 255, 0), 3) | |
for box, track_id, cls in zip(boxes, track_ids, classes): | |
x, y, w, h = box | |
centroid = (x + w / 2, y + h / 2) | |
if cls != 19: | |
for poly_array in poly_arrays: | |
if is_point_in_polygon(centroid, poly_array): | |
if track_id not in intruder_ids: | |
intruder_ids.add(track_id) | |
cv2.rectangle(annotated_frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 0, 255), 2) | |
cv2.putText(annotated_frame, f"Intruder ID: {track_id}", (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) | |
break | |
intrusion_text = f"Total Intruders Detected: {len(intruder_ids)}" | |
cv2.putText(annotated_frame, intrusion_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2) | |
out.write(annotated_frame) | |
cap.release() | |
out.release() | |
def main(): | |
st.title("Video Processing for Intrusion Detection") | |
video_file = st.file_uploader("Upload a video", type=['mp4', 'avi']) | |
if video_file is not None: | |
output_folder = "output_videos" | |
ensure_dir(output_folder) | |
file_path = os.path.join(output_folder, "uploaded_video.mp4") | |
file_name = os.path.join(output_folder, "processed_video.mp4") | |
with open(file_path, "wb") as f: | |
f.write(video_file.getbuffer()) | |
polygons = [ | |
np.array([ | |
[110, 70],[1754, 62],[1754, 1062],[138, 1066],[110, 70] | |
]) | |
] | |
if st.button("Process Video"): | |
process_video(file_path, file_name, polygons) | |
st.video(file_name) | |
with open(file_name, "rb") as file: | |
st.download_button( | |
label="Download processed video", | |
data=file, | |
file_name="processed_video.mp4", | |
mime="video/mp4" | |
) | |
if __name__ == "__main__": | |
main() |