Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
@Time : 2024/3/27 9:44 | |
@Author : leiwu30 | |
@File : stream_output_via_api.py | |
@Description : Stream log information and communicate over the network via web api. | |
""" | |
import asyncio | |
import json | |
import socket | |
import threading | |
from contextvars import ContextVar | |
from flask import Flask, Response, jsonify, request, send_from_directory | |
from metagpt.const import TUTORIAL_PATH | |
from metagpt.logs import logger, set_llm_stream_logfunc | |
from metagpt.roles.tutorial_assistant import TutorialAssistant | |
from metagpt.utils.stream_pipe import StreamPipe | |
app = Flask(__name__) | |
def stream_pipe_log(content): | |
print(content, end="") | |
stream_pipe = stream_pipe_var.get(None) | |
if stream_pipe: | |
stream_pipe.set_message(content) | |
def write_tutorial(message): | |
async def main(idea, stream_pipe): | |
stream_pipe_var.set(stream_pipe) | |
role = TutorialAssistant() | |
await role.run(idea) | |
def thread_run(idea: str, stream_pipe: StreamPipe = None): | |
""" | |
Convert asynchronous function to thread function | |
""" | |
asyncio.run(main(idea, stream_pipe)) | |
stream_pipe = StreamPipe() | |
thread = threading.Thread( | |
target=thread_run, | |
args=( | |
message["content"], | |
stream_pipe, | |
), | |
) | |
thread.start() | |
while thread.is_alive(): | |
msg = stream_pipe.get_message() | |
yield stream_pipe.msg2stream(msg) | |
def completions(): | |
""" | |
data: { | |
"model": "write_tutorial", | |
"stream": true, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": "Write a tutorial about MySQL" | |
} | |
] | |
} | |
""" | |
data = json.loads(request.data) | |
logger.info(json.dumps(data, indent=4, ensure_ascii=False)) | |
# Non-streaming interfaces are not supported yet | |
stream_type = True if data.get("stream") else False | |
if not stream_type: | |
return jsonify({"status": 400, "msg": "Non-streaming requests are not supported, please use `stream=True`."}) | |
# Only accept the last user information | |
# openai['model'] ~ MetaGPT['agent'] | |
last_message = data["messages"][-1] | |
model = data["model"] | |
# write_tutorial | |
if model == "write_tutorial": | |
return Response(write_tutorial(last_message), mimetype="text/plain") | |
else: | |
return jsonify({"status": 400, "msg": "No suitable agent found."}) | |
def download_file(filename): | |
return send_from_directory(TUTORIAL_PATH, filename, as_attachment=True) | |
if __name__ == "__main__": | |
""" | |
curl https://$server_address:$server_port/v1/chat/completions -X POST -d '{ | |
"model": "write_tutorial", | |
"stream": true, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": "Write a tutorial about MySQL" | |
} | |
] | |
}' | |
""" | |
server_port = 7860 | |
server_address = socket.gethostbyname(socket.gethostname()) | |
set_llm_stream_logfunc(stream_pipe_log) | |
stream_pipe_var: ContextVar[StreamPipe] = ContextVar("stream_pipe") | |
app.run(port=server_port, host=server_address) | |