jina-embeddings-v3 / demo /httpx_demo.py
sanbo
update sth. at 2025-01-16 23:44:43
e397647
raw
history blame
2.29 kB
import httpx
import asyncio
import json
from typing import List, Union
# 同步版本
def embeddings_run_sync(input: Union[str, List[str]],
url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings",
model: str = "jinaai/jina-embeddings-v3") -> dict:
headers = {
'Content-Type': 'application/json'
}
data = {
"input": input,
"model": model
}
with httpx.Client() as client:
response = client.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
# 异步版本
async def embeddings_run_async(input: Union[str, List[str]],
url: str = "https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings",
model: str = "jinaai/jina-embeddings-v3") -> dict:
headers = {
'Content-Type': 'application/json'
}
data = {
"input": input,
"model": model
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
# 批量处理的异步函数
async def batch_process_async(texts: List[str]) -> List[dict]:
async with httpx.AsyncClient() as client:
tasks = []
for text in texts:
task = embeddings_run_async(text)
tasks.append(task)
return await asyncio.gather(*tasks)
# 使用示例
if __name__ == "__main__":
# 1. 同步方式使用
input_text = "Your text string goes here"
result = embeddings_run_sync(input_text)
print(f"Sync result: {result}")
# 2. 异步方式使用
async def main():
# 单个异步请求
result = await embeddings_run_async(input_text)
print(f"Async single result: {result}")
# 批量异步请求
texts = [
"First text to process",
"Second text to process",
"Third text to process"
]
results = await batch_process_async(texts)
for i, result in enumerate(results):
print(f"Batch result {i+1}: {result}")
# 运行异步函数
asyncio.run(main())