Spaces:
Running
Running
import httpx | |
import asyncio | |
import json | |
from typing import List, Union | |
# 同步版本 | |
def embeddings_run_sync(input: Union[str, List[str]], | |
url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings", | |
model: str = "jinaai/jina-embeddings-v3") -> dict: | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"input": input, | |
"model": model | |
} | |
with httpx.Client() as client: | |
response = client.post(url, headers=headers, json=data) | |
response.raise_for_status() | |
return response.json() | |
# 异步版本 | |
async def embeddings_run_async(input: Union[str, List[str]], | |
url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings", | |
model: str = "jinaai/jina-embeddings-v3") -> dict: | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"input": input, | |
"model": model | |
} | |
async with httpx.AsyncClient() as client: | |
response = await client.post(url, headers=headers, json=data) | |
response.raise_for_status() | |
return response.json() | |
# 批量处理的异步函数 | |
async def batch_process_async(texts: List[str]) -> List[dict]: | |
async with httpx.AsyncClient() as client: | |
tasks = [] | |
for text in texts: | |
task = embeddings_run_async(text) | |
tasks.append(task) | |
return await asyncio.gather(*tasks) | |
# 使用示例 | |
if __name__ == "__main__": | |
# 1. 同步方式使用 | |
input_text = "Your text string goes here" | |
result = embeddings_run_sync(input_text) | |
print(f"Sync result: {result}") | |
# 2. 异步方式使用 | |
async def main(): | |
# 单个异步请求 | |
result = await embeddings_run_async(input_text) | |
print(f"Async single result: {result}") | |
# 批量异步请求 | |
texts = [ | |
"First text to process", | |
"Second text to process", | |
"Third text to process" | |
] | |
results = await batch_process_async(texts) | |
for i, result in enumerate(results): | |
print(f"Batch result {i+1}: {result}") | |
# 运行异步函数 | |
asyncio.run(main()) |