Spaces:
Sleeping
Sleeping
import glob | |
import os | |
import random | |
import shutil | |
import time | |
from pathlib import Path | |
from threading import Thread | |
import cv2 | |
import math | |
import numpy as np | |
import torch | |
from PIL import Image, ExifTags | |
from torch.utils.data import Dataset | |
from tqdm import tqdm | |
def letterbox_for_img(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True): | |
# Resize image to a 32-pixel-multiple rectangle https://github.com/ultralytics/yolov3/issues/232 | |
shape = img.shape[:2] # current shape [height, width] | |
if isinstance(new_shape, int): | |
new_shape = (new_shape, new_shape) | |
# Scale ratio (new / old) | |
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
if not scaleup: # only scale down, do not scale up (for better test mAP) | |
r = min(r, 1.0) | |
# Compute padding | |
ratio = r, r # width, height ratios | |
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) | |
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding | |
if auto: # minimum rectangle | |
dw, dh = np.mod(dw, 32), np.mod(dh, 32) # wh padding | |
elif scaleFill: # stretch | |
dw, dh = 0.0, 0.0 | |
new_unpad = (new_shape[1], new_shape[0]) | |
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios | |
dw /= 2 # divide padding into 2 sides | |
dh /= 2 | |
if shape[::-1] != new_unpad: # resize | |
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_AREA) | |
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border | |
return img, ratio, (dw, dh) | |
def clean_str(s): | |
# Cleans a string by replacing special characters with underscore _ | |
return re.sub(pattern="[|@#!¡·$€%&()=?¿^*;:,¨´><+]", repl="_", string=s) | |
img_formats = ['.bmp', '.jpg', '.jpeg', '.png', '.tif', '.tiff', '.dng'] | |
vid_formats = ['.mov', '.avi', '.mp4', '.mpg', '.mpeg', '.m4v', '.wmv', '.mkv'] | |
class LoadImages: # for inference | |
def __init__(self, path, img_size=640): | |
p = str(Path(path)) # os-agnostic | |
p = os.path.abspath(p) # absolute path | |
if '*' in p: | |
files = sorted(glob.glob(p, recursive=True)) # glob | |
elif os.path.isdir(p): | |
files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir | |
elif os.path.isfile(p): | |
files = [p] # files | |
else: | |
raise Exception('ERROR: %s does not exist' % p) | |
images = [x for x in files if os.path.splitext(x)[-1].lower() in img_formats] | |
videos = [x for x in files if os.path.splitext(x)[-1].lower() in vid_formats] | |
ni, nv = len(images), len(videos) | |
self.img_size = img_size | |
self.files = images + videos | |
self.nf = ni + nv # number of files | |
self.video_flag = [False] * ni + [True] * nv | |
self.mode = 'images' | |
if any(videos): | |
self.new_video(videos[0]) # new video | |
else: | |
self.cap = None | |
assert self.nf > 0, 'No images or videos found in %s. Supported formats are:\nimages: %s\nvideos: %s' % \ | |
(p, img_formats, vid_formats) | |
def __iter__(self): | |
self.count = 0 | |
return self | |
def __next__(self): | |
if self.count == self.nf: | |
raise StopIteration | |
path = self.files[self.count] | |
if self.video_flag[self.count]: | |
# Read video | |
self.mode = 'video' | |
ret_val, img0 = self.cap.read() | |
if not ret_val: | |
self.count += 1 | |
self.cap.release() | |
if self.count == self.nf: # last video | |
raise StopIteration | |
else: | |
path = self.files[self.count] | |
self.new_video(path) | |
ret_val, img0 = self.cap.read() | |
h0, w0 = img0.shape[:2] | |
self.frame += 1 | |
print('\n video %g/%g (%g/%g) %s: ' % (self.count + 1, self.nf, self.frame, self.nframes, path), end='') | |
else: | |
# Read image | |
self.count += 1 | |
img0 = cv2.imread(path, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION) # BGR | |
#img0 = cv2.cvtColor(img0, cv2.COLOR_BGR2RGB) | |
assert img0 is not None, 'Image Not Found ' + path | |
print('image %g/%g %s: \n' % (self.count, self.nf, path), end='') | |
h0, w0 = img0.shape[:2] | |
# Padded resize | |
img, ratio, pad = letterbox_for_img(img0, new_shape=self.img_size, auto=True) | |
h, w = img.shape[:2] | |
shapes = (h0, w0), ((h / h0, w / w0), pad) | |
# Convert | |
#img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 | |
img = np.ascontiguousarray(img) | |
# cv2.imwrite(path + '.letterbox.jpg', 255 * img.transpose((1, 2, 0))[:, :, ::-1]) # save letterbox image | |
return path, img, img0, self.cap, shapes | |
def new_video(self, path): | |
self.frame = 0 | |
self.cap = cv2.VideoCapture(path) | |
self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
def __len__(self): | |
return self.nf # number of files | |
class LoadStreams: # multiple IP or RTSP cameras | |
def __init__(self, sources='streams.txt', img_size=640, auto=True): | |
self.mode = 'stream' | |
self.img_size = img_size | |
if os.path.isfile(sources): | |
with open(sources, 'r') as f: | |
sources = [x.strip() for x in f.read().strip().splitlines() if len(x.strip())] | |
else: | |
sources = [sources] | |
n = len(sources) | |
self.imgs, self.fps, self.frames, self.threads = [None] * n, [0] * n, [0] * n, [None] * n | |
self.sources = [clean_str(x) for x in sources] # clean source names for later | |
self.auto = auto | |
for i, s in enumerate(sources): # index, source | |
# Start thread to read frames from video stream | |
print(f'{i + 1}/{n}: {s}... ', end='') | |
s = eval(s) if s.isnumeric() else s # i.e. s = '0' local webcam | |
cap = cv2.VideoCapture(s) | |
assert cap.isOpened(), f'Failed to open {s}' | |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.fps[i] = max(cap.get(cv2.CAP_PROP_FPS) % 100, 0) or 30.0 # 30 FPS fallback | |
self.frames[i] = max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float('inf') # infinite stream fallback | |
_, self.imgs[i] = cap.read() # guarantee first frame | |
self.threads[i] = Thread(target=self.update, args=([i, cap]), daemon=True) | |
print(f" success ({self.frames[i]} frames {w}x{h} at {self.fps[i]:.2f} FPS)") | |
self.threads[i].start() | |
print('') # newline | |
# check for common shapes | |
s = np.stack([letterbox_for_img(x, self.img_size, auto=self.auto)[0].shape for x in self.imgs], 0) # shapes | |
self.rect = np.unique(s, axis=0).shape[0] == 1 # rect inference if all shapes equal | |
if not self.rect: | |
print('WARNING: Different stream shapes detected. For optimal performance supply similarly-shaped streams.') | |
def update(self, i, cap): | |
# Read stream `i` frames in daemon thread | |
n, f, read = 0, self.frames[i], 1 # frame number, frame array, inference every 'read' frame | |
while cap.isOpened() and n < f: | |
n += 1 | |
# _, self.imgs[index] = cap.read() | |
cap.grab() | |
if n % read == 0: | |
success, im = cap.retrieve() | |
self.imgs[i] = im if success else self.imgs[i] * 0 | |
time.sleep(1 / self.fps[i]) # wait time | |
def __iter__(self): | |
self.count = -1 | |
return self | |
def __next__(self): | |
self.count += 1 | |
if not all(x.is_alive() for x in self.threads) or cv2.waitKey(1) == ord('q'): # q to quit | |
cv2.destroyAllWindows() | |
raise StopIteration | |
# Letterbox | |
img0 = self.imgs.copy() | |
h0, w0 = img0[0].shape[:2] | |
img, _, pad = letterbox_for_img(img0[0], self.img_size, auto=self.rect and self.auto) | |
# Stack | |
h, w = img.shape[:2] | |
shapes = (h0, w0), ((h / h0, w / w0), pad) | |
# Convert | |
#img = img[..., ::-1].transpose((0, 3, 1, 2)) # BGR to RGB, BHWC to BCHW | |
img = np.ascontiguousarray(img) | |
return self.sources, img, img0[0], None, shapes | |
def __len__(self): | |
return len(self.sources) # 1E12 frames = 32 streams at 30 FPS for 30 years | |