Spaces:
Runtime error
Runtime error
File size: 5,337 Bytes
c6a9f21 bcee635 c6a9f21 fb24ad1 c6a9f21 1654acc c6a9f21 1654acc c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 fb24ad1 bcee635 fb24ad1 bcee635 fb24ad1 c6a9f21 bcee635 c6a9f21 bcee635 1654acc bcee635 1654acc c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 bcee635 c6a9f21 7a0e77d bcee635 fb24ad1 bcee635 fb24ad1 bcee635 fb24ad1 bcee635 fb24ad1 bcee635 fb24ad1 bcee635 1654acc bcee635 1654acc bcee635 1654acc bcee635 1654acc bcee635 1654acc bcee635 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from fastapi.testclient import TestClient
from fastapi.routing import APIRoute
from app.routers.video import updateArtifact, createThumbnail, inferenceFrame
from app.main import app
from app.constants import deviceId
from app import db
import mmcv
import os
import pytest
import requests
import json
import cv2
import shutil
from google.cloud.firestore_v1.base_query import FieldFilter
def endpoints():
endpoints = []
for route in app.routes:
if isinstance(route, APIRoute):
endpoints.append(route.path)
return endpoints
@pytest.fixture
def client():
client = TestClient(app, "http://0.0.0.0:3000")
yield client
@pytest.fixture
def user():
url = (
"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key="
+ os.environ.get("FIREBASE_API_KEY")
)
payload = json.dumps(
{
"email": "[email protected]",
"password": "testing",
"returnSecureToken": True,
}
)
headers = {"Content-Type": "application/json"}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
user = {"id": data["localId"], "token": data["idToken"]}
db.collection("user").document(user["id"]).set({"deviceId": deviceId})
yield user
db.collection("user").document(user["id"]).delete()
class TestVideoAPI:
@pytest.mark.skipif("/video" not in endpoints(), reason="Route not defined")
def test_video_API(self, user, client):
# Test when no token is pass to route
payload = {}
files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
headers = {}
response = client.request(
"POST", "video", headers=headers, data=payload, files=files
)
assert response.status_code == 403
# Test when a dummy (not valid) token passed
payload = {}
files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
headers = {"Authorization": "Bearer saikoljncaskljnfckjnasckjna"}
response = client.request(
"POST", "video", headers=headers, data=payload, files=files
)
assert response.status_code == 401
# Test when sent file is not a video
payload = {}
files = [
("file", ("demo.jpg", open("demo.jpg", "rb"), "application/octet-stream"))
]
headers = {"Authorization": "Bearer " + user["token"]}
while True:
response = client.request(
"POST", "video", headers=headers, data=payload, files=files
)
if response.status_code != 401:
break
assert response.status_code == 400
# Test when all requirements have been fulfilled
payload = {}
files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
headers = {"Authorization": "Bearer " + user["token"]}
response = client.request(
"POST", "video", headers=headers, data=payload, files=files
)
assert response.status_code == 200
artifactName = response.text
docs = (
db.collection("artifacts")
.where(filter=FieldFilter("name", "==", artifactName))
.stream()
)
index = 0
for doc in docs:
# For each document in docs. Verify name and status of the artifact
index += 1
data = doc.get().to_dict()
assert data["name"] == artifactName
assert data["status"] == "pending"
assert index == 1
doc.delete()
def test_update_artifact(self):
# Check and preprocess test data before testing
test_artifact = db.collection("artifacts").document("test")
if not test_artifact.get().exists:
db.collection("artifacts").document("test").set(
{"name": "test", "path": "", "status": "testing", "thumbnailURL": ""}
)
test_artifact = db.collection("artifacts").document("test")
else:
test_artifact.update({"status": "testing", "path": "", "thumbnailURL": ""})
# Testing update on each field
updateArtifact(test_artifact.id, {"status": "test_done"})
assert (
db.collection("artifacts").document("test").get().to_dict()["status"]
== "test_done"
)
# Delete data for next time test
test_artifact.delete()
def test_inference_frame(self):
if not os.path.exists("test_vid"):
os.mkdir("test_vid")
shutil.copyfile("demo.mp4", "test_vid/input.mp4")
thumbnail = inferenceFrame("test_vid")
assert os.path.exists("test_vid/out.mp4") and os.path.isfile("test_vid/out.mp4")
vidcap = cv2.VideoCapture("test_vid/input.mp4")
success, image = vidcap.read()
if success:
assert image.shape == thumbnail.shape
vidcap.release()
del vidcap
shutil.rmtree("test_vid")
def test_create_thumbnal(self):
vidcap = cv2.VideoCapture("demo.mp4")
success, image = vidcap.read()
if success:
createThumbnail(image, "")
result = mmcv.imread("thumbnail.jpg", channel_order="rgb")
assert result.shape == (160, 160, 3)
|