r1 / app.py
2ch's picture
Update app.py
a2beacf verified
from datetime import datetime
from json import dumps, loads
from re import sub
from typing import AsyncGenerator
from urllib.parse import urlparse, urlunparse
from cchardet import detect
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from ftfy import fix_text
from httpx import AsyncClient, Limits, Timeout
app = FastAPI(title='PROXI-API')
MODELS = [
'deepseek-ai/DeepSeek-R1',
'deepseek-ai/DeepSeek-V3',
'deepseek-ai/deepseek-llm-67b-chat',
'databricks/dbrx-instruct',
'Qwen/QwQ-32B-Preview',
'NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO'
]
MODELS_OBJ = {
'object': 'list',
'data': [
{
'id': model,
'object': 'model',
'created': int(datetime.now().timestamp()),
'owned_by': 'blackbox.ai'
}
for model in MODELS
]
}
BLOCKED_HEADERS = [
'x-forwarded-for',
'x-real-ip',
'proxy-authorization'
]
HTTPX_CLIENT_KWARGS = dict(
timeout=Timeout(
connect=15,
read=60,
write=30,
pool=30
),
limits=Limits(
max_keepalive_connections=10,
max_connections=100
),
follow_redirects=True,
http2=True,
verify=False
)
def normalize_headers(request: Request, original_url: str) -> dict:
headers = {
key: value for (key, value) in request.headers.items()
if key.lower() not in BLOCKED_HEADERS and key.lower() != 'host'
}
parsed_url = urlparse(original_url)
origin = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
normalized = {key.lower(): value for key, value in headers.items()}
normalized['referer'] = original_url
normalized['origin'] = origin
normalized['accept-encoding'] = 'deflate'
return normalized
def decode(chunk: bytes) -> str:
try:
return fix_text(chunk.decode('utf-8'))
except UnicodeDecodeError:
pass
detected = detect(chunk)
if detected['encoding'] and detected['confidence'] > 0.75:
try:
return fix_text(chunk.decode(detected['encoding'].lower()))
except (UnicodeDecodeError, LookupError):
pass
for encoding in [
'cp1251', 'cp1252', 'iso-8859-5', 'iso-8859-1', 'windows-1252',
'gb18030', 'big5', 'gb2312', 'shift_jis', 'euc-kr', 'cp1256',
'iso-8859-7', 'koi8-r', 'cp866', 'mac_cyrillic'
]:
try:
return fix_text(chunk.decode(encoding))
except UnicodeDecodeError:
continue
return fix_text(chunk.decode('latin-1', errors='replace'))
def format_chunk(chunk: bytes, model: str) -> str:
data = {
'id': 'chatcmpl-AQ8Lzxlg8eSCB1lgVmboiXwZiexqE',
'object': 'chat.completion.chunk',
'created': int(datetime.now().timestamp()),
'model': model,
'system_fingerprint': 'fp_67802d9a6d',
'choices': [
{
'index': 0,
'delta': {
'content': decode(chunk)
},
'finish_reason': None
}
]
}
json_data = dumps(data, ensure_ascii=False)
str_data = f'data: {sub(r'\\\\([ntr])', r'\\\1', json_data)}\n\n'
return str_data
async def generate(request: Request, url: str, headers: dict, body: bytes) -> AsyncGenerator:
body_str = body.decode('utf-8')
body_obj: dict = loads(body_str)
model = body_obj.get('model')
if 'max_tokens' not in body_obj:
body_obj['max_tokens'] = '32000'
body = dumps(body_obj, ensure_ascii=False).encode()
headers = dict(headers)
headers['content-length'] = str(len(body))
headers['content-type'] = 'application/json'
async with AsyncClient(**HTTPX_CLIENT_KWARGS) as stream_client:
async with stream_client.stream(method=request.method, url=url, headers=headers, content=body, cookies=request.cookies) as stream:
async for chunk in stream.aiter_raw():
yield format_chunk(chunk, model)
yield 'data: [DONE]\n\n'.encode()
@app.post('/api/chat/completions')
@app.post('/api/v1/chat/completions')
async def proxy(request: Request):
try:
url = 'https://api.blackbox.ai/api/chat'
headers = normalize_headers(request, url)
body = await request.body()
async with AsyncClient(**HTTPX_CLIENT_KWARGS) as headers_client:
async with headers_client.stream(method=request.method, url=url, headers=headers, content=body, cookies=request.cookies) as response:
excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection']
filtered_headers = {
name: value for (name, value) in response.headers.items()
if name.lower() not in excluded_headers
}
return StreamingResponse(generate(request, url, headers, body), status_code=response.status_code, headers=filtered_headers)
except Exception as exc:
return JSONResponse({'error': response.status_code, 'body': f'{exc}'}, status_code=500)
@app.get('/')
@app.get('/api')
@app.get('/api/v1')
async def root():
script = ('<html><body><script>'
'document.body.textContent = window.location.origin + "/api/v1";'
'</script><body></html>')
return HTMLResponse(script, status_code=200)
@app.get('/api/models')
@app.get('/api/v1/models')
async def models():
return JSONResponse(MODELS_OBJ, status_code=200, media_type='application/json')
if __name__ == '__main__':
import uvicorn
port = 7860
uvicorn.run(app, host='0.0.0.0', port=port)