Spaces:
Sleeping
Sleeping
File size: 5,452 Bytes
4597eac 41eb20c 4597eac 41eb20c 4597eac 41eb20c 4597eac 41eb20c 4597eac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
from asyncio import create_subprocess_shell, gather, sleep
from pathlib import Path
from random import choice
from subprocess import CalledProcessError, PIPE
from typing import Any, List
from uuid import uuid4
from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient, HTTPStatusError, RequestError
from pydantic import BaseModel, HttpUrl
from uvicorn import run as uvicorn_run
oxipng_bin = Path(__file__).parent / 'oxipng'
oxipng_bin.chmod(0o755)
tokens = [
# мне в общем-то все равно на эти токены
'7e0ea3da6a73d77003c1abba7f0ea13c',
'bc2e68b5918e5bb59ebca6c05d73daf9',
'fecbfbe0938bcd1df27b7a9be1702cc9',
'04e9981d4d0981964cb4c9753173244d',
'dee75b07981c7aa211628ea7c7cbc03d',
]
async def download_png(url: str, client: AsyncClient, retries: int = 5) -> Path:
for attempt in range(retries):
try:
response = await client.get(url, timeout=30.0)
response.raise_for_status()
file_path = Path(f'{uuid4()}.png')
file_path.write_bytes(response.content)
return file_path
except (HTTPStatusError, RequestError) as e:
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def download_pngs(urls: str | list[str]) -> list[Any]:
urls = [urls] if isinstance(urls, str) else urls
async with AsyncClient() as client:
tasks = [download_png(url, client) for url in urls]
return list(await gather(*tasks))
async def optimize_png(image_path: Path, retries: int = 3) -> None:
command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
for attempt in range(retries):
try:
process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return
else:
raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
except CalledProcessError as e:
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
tasks = [optimize_png(image_path) for image_path in image_paths]
await gather(*tasks)
async def telegraph_upload_png(file_path: str | Path) -> str | None:
file_path = Path(file_path)
if not file_path.is_file() or file_path.stat().st_size > 5 * 1024 * 1024:
return None
url = 'https://telegra.ph/upload'
headers = {
'authority': url.rsplit('/')[2],
'accept': 'application/json, text/javascript, */*; q=0.01',
'origin': url.rsplit('/', 1)[0],
'referer': url.rsplit('/', 1)[0],
'x-requested-with': 'XMLHttpRequest',
}
async with AsyncClient() as client:
try:
response = await client.post(url, headers=headers, files={'file': ('blob', file_path.read_bytes(), 'image/png')})
response.raise_for_status()
result = response.json()
except:
return None
if response.is_success and 'error' not in result:
link = result[0]['src']
return url.rsplit('/', 1)[0] + link
else:
return None
async def upload_image_to_imgbb(file_path: Path) -> str | None:
url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}'
try:
with file_path.open('rb') as file:
files = {'image': (file_path.name, file, 'image/png')}
data = {}
async with AsyncClient() as client:
response = await client.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
json = response.json()
if json.get('success'):
return json['data']['url']
except:
return None
async def upload_image(file_path: Path | str) -> str | None:
file_path = Path(file_path)
return await telegraph_upload_png(file_path) or await upload_image_to_imgbb(file_path)
async def optimize_and_upload(images_urls: list[str] | str) -> list[str]:
images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
images_paths = await download_pngs(images_urls)
await optimize_pngs(images_paths)
new_images_urls = []
for image_path in images_paths:
new_url = await upload_image(image_path)
if new_url:
new_images_urls.append(new_url)
image_path.unlink(missing_ok=True)
return new_images_urls
app = FastAPI()
class ImageURLs(BaseModel):
urls: List[HttpUrl]
@app.get('/')
async def read_root():
return PlainTextResponse('ну пролапс, ну и что', status_code=200)
@app.post('/pngopt_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
try:
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
return {"optimized_urls": optimized_urls}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90)
|