Spaces:
Runtime error
Runtime error
File size: 3,654 Bytes
01061be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import streamlit as st
import cv2
import numpy as np
import supervision as sv
from ultralytics import YOLO
import os
from collections import defaultdict
def ensure_dir(file_path):
if not os.path.exists(file_path):
os.makedirs(file_path)
def is_point_in_polygon(point, polygon):
return cv2.pointPolygonTest(polygon, (int(point[0]), int(point[1])), False) > 0
def process_video(input_video_path, output_video_path, polygons):
model = YOLO('yolov8x-seg.pt')
cap = cv2.VideoCapture(input_video_path)
intruder_ids = set()
ensure_dir(os.path.dirname(output_video_path))
if cap.isOpened():
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'avc1')
out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (width, height), True)
poly_arrays = [np.array(polygon, np.int32) for polygon in polygons]
while cap.isOpened():
success, frame = cap.read()
if not success:
break
results = model.track(frame, conf=0.6, classes=None, persist=True, save=True, tracker="bytetrack.yaml")
boxes = results[0].boxes.xywh.cpu()
track_ids = results[0].boxes.id.int().cpu().tolist()
classes = results[0].boxes.cls.int().cpu().numpy()
annotated_frame = results[0].plot() if hasattr(results[0], 'plot') else frame
for poly_array in poly_arrays:
cv2.polylines(annotated_frame, [poly_array], True, (0, 255, 0), 3)
for box, track_id, cls in zip(boxes, track_ids, classes):
x, y, w, h = box
centroid = (x + w / 2, y + h / 2)
if cls != 19:
for poly_array in poly_arrays:
if is_point_in_polygon(centroid, poly_array):
if track_id not in intruder_ids:
intruder_ids.add(track_id)
cv2.rectangle(annotated_frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 0, 255), 2)
cv2.putText(annotated_frame, f"Intruder ID: {track_id}", (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
break
intrusion_text = f"Total Intruders Detected: {len(intruder_ids)}"
cv2.putText(annotated_frame, intrusion_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
out.write(annotated_frame)
cap.release()
out.release()
def main():
st.title("Video Processing for Intrusion Detection")
video_file = st.file_uploader("Upload a video", type=['mp4', 'avi'])
if video_file is not None:
output_folder = "output_videos"
ensure_dir(output_folder)
file_path = os.path.join(output_folder, "uploaded_video.mp4")
file_name = os.path.join(output_folder, "processed_video.mp4")
with open(file_path, "wb") as f:
f.write(video_file.getbuffer())
polygons = [
np.array([
[110, 70],[1754, 62],[1754, 1062],[138, 1066],[110, 70]
])
]
if st.button("Process Video"):
process_video(file_path, file_name, polygons)
st.video(file_name)
with open(file_name, "rb") as file:
st.download_button(
label="Download processed video",
data=file,
file_name="processed_video.mp4",
mime="video/mp4"
)
if __name__ == "__main__":
main() |