Spaces:
Sleeping
Sleeping
from asyncio import create_subprocess_shell, gather, sleep | |
from logging import ERROR, INFO, basicConfig, getLogger | |
from pathlib import Path | |
from random import choice | |
from shutil import rmtree | |
from subprocess import CalledProcessError, PIPE | |
from typing import Any, List | |
from uuid import uuid4 | |
from PIL import Image | |
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import PlainTextResponse | |
from httpx import AsyncClient, HTTPStatusError, RequestError | |
from pydantic import BaseModel, HttpUrl | |
from uvicorn import run as uvicorn_run | |
need_logging = False | |
basicConfig(level=INFO if need_logging else ERROR) | |
logger = getLogger(__name__) | |
oxipng_bin = Path(__file__).parent / 'oxipng' | |
if not oxipng_bin.stat().st_mode & 0o111: | |
oxipng_bin.chmod(0o755) | |
tokens = [ | |
# мне в общем-то все равно на эти токены | |
'7e0ea3da6a73d77003c1abba7f0ea13c', | |
'bc2e68b5918e5bb59ebca6c05d73daf9', | |
'fecbfbe0938bcd1df27b7a9be1702cc9', | |
'04e9981d4d0981964cb4c9753173244d', | |
'dee75b07981c7aa211628ea7c7cbc03d', | |
] | |
async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path: | |
logger.info(f'загрузка изображения: {url}') | |
for attempt in range(retries): | |
try: | |
response = await client.get(url, timeout=30.0) | |
response.raise_for_status() | |
file_path = Path(__file__).parent / folder / f'{uuid4()}.png' | |
file_path.parent.mkdir(parents=True, exist_ok=True) | |
file_path.write_bytes(response.content) | |
return file_path | |
except (HTTPStatusError, RequestError) as e: | |
if attempt < retries - 1: | |
await sleep(2 ** attempt) | |
else: | |
raise e | |
async def download_pngs(urls: str | list[str]) -> list[Any]: | |
urls = [urls] if isinstance(urls, str) else urls | |
logger.info(f'скачивается список список из {len(urls)}: {urls}') | |
# бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата: | |
valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != ''] | |
if len(valid_urls) != len(urls): | |
logger.warning(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}') | |
async with AsyncClient() as client: | |
tasks = [download_png(url, str(uuid4()), client) for url in valid_urls] | |
return list(await gather(*tasks)) | |
async def optimize_png(image_path: Path, retries: int = 3) -> None: | |
command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' | |
logger.info(f'оптимизация картинки {image_path}') | |
for attempt in range(retries): | |
try: | |
process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) | |
stdout, stderr = await process.communicate() | |
if process.returncode == 0: | |
return | |
else: | |
raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) | |
except CalledProcessError as e: | |
logger.error(f'ошибка при оптимизации {image_path}') | |
if attempt < retries - 1: | |
await sleep(2 ** attempt) | |
else: | |
raise e | |
async def convert_to_jpeg(image_path: Path) -> Path: | |
logger.info(f'конвертируется {image_path}') | |
try: | |
image = Image.open(image_path) | |
output_path = image_path.with_suffix('.jpg') | |
image.save(output_path, 'JPEG', quality=98, optimize=True) | |
image_path.unlink(missing_ok=True) | |
return output_path | |
except: | |
logger.error(f'ошибка при конвертации {image_path}') | |
return image_path | |
async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]: | |
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] | |
logger.info(f'оптимизируется список список из {len(image_paths)}: {image_paths}') | |
tasks = [convert_to_jpeg(image_path) for image_path in image_paths] | |
return await gather(*tasks) | |
async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: | |
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] | |
logger.info(f'оптимизируется список список из {len(image_paths)}: {image_paths}') | |
tasks = [optimize_png(image_path) for image_path in image_paths] | |
await gather(*tasks) | |
async def upload_image_to_imgbb(file_path: Path) -> str | None: | |
url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}' | |
try: | |
with file_path.open('rb') as file: | |
files = {'image': (file_path.name, file, 'image/png')} | |
data = {} | |
async with AsyncClient() as client: | |
response = await client.post(url, files=files, data=data, timeout=30) | |
response.raise_for_status() | |
json = response.json() | |
if json.get('success'): | |
return json['data']['url'] | |
except: | |
return None | |
async def upload_image(file_path: Path | str) -> str | None: | |
file_path = Path(file_path) | |
return await upload_image_to_imgbb(file_path) | |
async def optimize_and_upload(images_urls: list[str] | str, convert: bool = False) -> list[str]: | |
images_urls = [images_urls] if isinstance(images_urls, str) else images_urls | |
logger.info(f'принятые ссылки в обработку ({len(images_urls)}): {images_urls}') | |
images_paths = await download_pngs(images_urls) | |
if not convert: | |
await optimize_pngs(images_paths) | |
new_images_urls = [] | |
images_paths = images_paths if not convert else await convert_to_jpegs(images_paths) | |
for image_path in images_paths: | |
new_url = await upload_image(image_path) | |
if new_url: | |
new_images_urls.append(new_url) | |
logger.info(f'загружено изображение {image_path} в {new_url}') | |
try: | |
image_path.unlink() | |
except Exception as e: | |
logger.error(f'не удалось удалить файл {image_path}: {e}') | |
logger.info(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}') | |
try: | |
rmtree(images_paths[0].parent) | |
except Exception as e: | |
logger.error(f'не удалось удалить файл {images_paths[0].parent}: {e}') | |
return new_images_urls | |
app = FastAPI() | |
class ImageURLs(BaseModel): | |
urls: List[HttpUrl] | |
async def read_root(): | |
return PlainTextResponse('ну пролапс, ну и что', status_code=200) | |
async def optimize_images_endpoint(image_urls: ImageURLs): | |
try: | |
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) | |
return {"optimized_urls": optimized_urls} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def optimize_images_endpoint(image_urls: ImageURLs): | |
try: | |
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) | |
return {"optimized_urls": optimized_urls} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90) | |