import httpx import asyncio import json from typing import List, Union # 同步版本 def embeddings_run_sync(input: Union[str, List[str]], url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings", model: str = "jinaai/jina-embeddings-v3") -> dict: headers = { 'Content-Type': 'application/json' } data = { "input": input, "model": model } with httpx.Client() as client: response = client.post(url, headers=headers, json=data) response.raise_for_status() return response.json() # 异步版本 async def embeddings_run_async(input: Union[str, List[str]], url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings", model: str = "jinaai/jina-embeddings-v3") -> dict: headers = { 'Content-Type': 'application/json' } data = { "input": input, "model": model } async with httpx.AsyncClient() as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() return response.json() # 批量处理的异步函数 async def batch_process_async(texts: List[str]) -> List[dict]: async with httpx.AsyncClient() as client: tasks = [] for text in texts: task = embeddings_run_async(text) tasks.append(task) return await asyncio.gather(*tasks) # 使用示例 if __name__ == "__main__": # 1. 同步方式使用 input_text = "Your text string goes here" result = embeddings_run_sync(input_text) print(f"Sync result: {result}") # 2. 异步方式使用 async def main(): # 单个异步请求 result = await embeddings_run_async(input_text) print(f"Async single result: {result}") # 批量异步请求 texts = [ "First text to process", "Second text to process", "Third text to process" ] results = await batch_process_async(texts) for i, result in enumerate(results): print(f"Batch result {i+1}: {result}") # 运行异步函数 asyncio.run(main())