Spaces:
Running
Running
import h11 | |
import socket | |
import json | |
import ssl | |
import urllib.parse | |
def embeddings_run_h11(input_text, url="https://sanbo1200-jina-embeddings-v3.hf.space/api/v1/embeddings"): | |
parsed_url = urllib.parse.urlparse(url) | |
# 创建SSL上下文 | |
context = ssl.create_default_context() | |
# 创建普通socket | |
sock = socket.create_connection((parsed_url.hostname, 443)) | |
# 包装成SSL socket | |
sock = context.wrap_socket(sock, server_hostname=parsed_url.hostname) | |
conn = h11.Connection(our_role=h11.CLIENT) | |
data = json.dumps({ | |
"input": input_text, | |
"model": "jinaai/jina-embeddings-v3" | |
}) | |
request = h11.Request( | |
method="POST", | |
target=parsed_url.path, | |
headers=[ | |
("Host", parsed_url.hostname), | |
("Content-Type", "application/json"), | |
("Content-Length", str(len(data))), | |
("Connection", "close") # 添加这个头 | |
] | |
) | |
sock.send(conn.send(request)) | |
sock.send(conn.send(h11.Data(data=data.encode()))) | |
sock.send(conn.send(h11.EndOfMessage())) | |
response = b"" | |
while True: | |
event = conn.next_event() | |
if event is h11.NEED_DATA: | |
data = sock.recv(2048) | |
if not data: # 连接关闭 | |
break | |
conn.receive_data(data) | |
continue | |
if isinstance(event, h11.EndOfMessage): | |
break | |
if isinstance(event, h11.Data): | |
response += event.data | |
sock.close() | |
return json.loads(response) | |
if __name__ == "__main__": | |
try: | |
result = embeddings_run_h11("Your text string goes here") | |
print(f"---{result}") | |
except Exception as e: | |
print(f"Error: {e}") |