space02 / app.py
thefish1's picture
rollback
1d6dd60
raw
history blame
40.4 kB
# import gradio as gr
# from huggingface_hub import InferenceClient
# import json
# import random
# import re
# from load_data import load_data
# from openai import OpenAI
# from transformers import AutoTokenizer, AutoModel
# import weaviate
# import os
# import torch
# from tqdm import tqdm
# import numpy as np
# import time
# # 设置缓存目录
# os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib'
# os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache'
# os.makedirs(os.environ['MPLCONFIGDIR'], exist_ok=True)
# os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
# # Weaviate 连接配置
# WEAVIATE_API_KEY = "Y7c8DRmcxZ4nP5IJLwkznIsK84l6EdwfXwcH"
# WEAVIATE_URL = "https://39nlafviqvard82k6y8btq.c0.asia-southeast1.gcp.weaviate.cloud"
# weaviate_auth_config = weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
# weaviate_client = weaviate.Client(url=WEAVIATE_URL, auth_client_secret=weaviate_auth_config)
# # 预训练模型配置
# MODEL_NAME = "bert-base-chinese"
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
# model = AutoModel.from_pretrained(MODEL_NAME)
# # OpenAI 客户端
# openai_client = None
# def initialize_openai_client(api_key):
# global openai_client
# openai_client = OpenAI(api_key=api_key)
# def extract_keywords(text):
# prompt = """
# 你是一个关键词提取机器人。提取用户输入中的关键词,特别是名词和形容词,关键词之间用空格分隔。例如:苹果 电脑 裤子 蓝色 裙。
# """
# messages = [
# {"role": "system", "content": prompt},
# {"role": "user", "content": f"从下面的文本中提取五个关键词,以空格分隔:{text}"}
# ]
# response = openai_client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=100,
# temperature=0.7,
# top_p=0.9,
# )
# keywords = response.choices[0].message.content.split(' ')
# return ','.join(keywords)
# def match_keywords(query_keywords, ad_keywords_list, triggered_keywords, current_turn, window_size, threshold):
# best_match_distance = 0
# best_match_index = -1
# for i, ad_keywords in enumerate(ad_keywords_list):
# match_count = sum(
# any(
# ad_keyword in keyword and
# (keyword not in triggered_keywords or current_turn - triggered_keywords[keyword] > window_size)
# ) for keyword in query_keywords
# )
# if match_count > best_match_distance:
# best_match_distance = match_count
# best_match_index = i
# if best_match_distance >= threshold:
# for keyword in query_keywords:
# if any(ad_keyword in keyword for ad_keyword in ad_keywords_list[best_match_index]):
# triggered_keywords[keyword] = current_turn
# return best_match_distance, best_match_index
# def encode_keywords_to_avg(keywords, model, tokenizer, device):
# embeddings = []
# for keyword in tqdm(keywords):
# inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
# inputs.to(device)
# with torch.no_grad():
# outputs = model(**inputs)
# embeddings.append(outputs.last_hidden_state.mean(dim=1))
# avg_embedding = sum(embeddings) / len(embeddings)
# return avg_embedding
# def get_response_from_db(keywords_dict, class_name):
# avg_vec = encode_keywords_to_avg(keywords_dict.keys(), model, tokenizer, device).numpy()
# response = (
# weaviate_client.query
# .get(class_name, ['keywords', 'summary'])
# .with_near_vector({'vector': avg_vec})
# .with_limit(1)
# .with_additional(['distance'])
# .do()
# )
# if class_name.capitalize() in response['data']['Get']:
# result = response['data']['Get'][class_name.capitalize()][0]
# return result['_additional']['distance'], result['summary'], result['keywords']
# else:
# return None, None, None
# def chatbot_response(message, max_tokens, temperature, top_p, window_size, threshold, user_weight, triggered_weight, api_key, state):
# initialize_openai_client(api_key)
# history = state.get('history', [])
# triggered_keywords = state.get('triggered_keywords', {})
# current_turn = len(history) + 1
# combined_user_message = " ".join([h[0] for h in history[-window_size:]] + [message])
# combined_assistant_message = " ".join([h[1] for h in history[-window_size:]])
# user_keywords = extract_keywords(combined_user_message).split(',')
# assistant_keywords = extract_keywords(combined_assistant_message).split(',')
# keywords_dict = {keyword: user_weight for keyword in user_keywords}
# for keyword in assistant_keywords:
# keywords_dict[keyword] = keywords_dict.get(keyword, 0) + 1
# for keyword in list(keywords_dict.keys()):
# if keyword in triggered_keywords and current_turn - triggered_keywords[keyword] < window_size:
# keywords_dict[keyword] = triggered_weight
# distance, ad_summary, ad_keywords = get_response_from_db(keywords_dict, class_name="ad_DB02")
# if distance and distance < threshold:
# ad_message = f"{message} <sep>品牌<sep>{ad_summary}"
# messages = [{"role": "system", "content": "你是一个热情的聊天机器人,应微妙地嵌入广告内容。"}]
# for msg in history:
# messages.extend([{"role": "user", "content": msg[0]}, {"role": "assistant", "content": msg[1]}])
# messages.append({"role": "user", "content": ad_message})
# for keyword in keywords_dict.keys():
# if any(ad_keyword in keyword for ad_keyword in ad_keywords.split(',')):
# triggered_keywords[keyword] = current_turn
# else:
# messages = [{"role": "system", "content": "你是一个热情的聊天机器人。"}]
# for msg in history:
# messages.extend([{"role": "user", "content": msg[0]}, {"role": "assistant", "content": msg[1]}])
# messages.append({"role": "user", "content": message})
# response = openai_client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=max_tokens,
# temperature=temperature,
# top_p=top_p,
# )
# history.append((message, response.choices[0].message.content))
# state['history'] = history
# state['triggered_keywords'] = triggered_keywords
# return response.choices[0].message.content, state
# # Gradio UI
# demo = gr.Interface(
# fn=chatbot_response,
# inputs=[
# gr.Textbox(label="Message"),
# gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
# gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
# gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Window size"),
# gr.Slider(minimum=0.01, maximum=0.20, value=0.08, step=0.01, label="Distance threshold"),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Weight of keywords from users"),
# gr.Slider(minimum=0, maximum=2, value=0.5, step=0.5, label="Weight of triggered keywords"),
# gr.Textbox(label="API Key"),
# gr.State(value={'history': [], 'triggered_keywords': {}}) # Combined state
# ],
# outputs=[
# gr.Textbox(label="Response"),
# gr.State() # Return the updated state
# ]
# )
# if __name__ == "__main__":
# demo.launch(share=True)
import gradio as gr
from huggingface_hub import InferenceClient
import json
import random
import re
from load_data import load_data
from openai import OpenAI
from transformers import AutoTokenizer, AutoModel
import weaviate
import os
import torch
from tqdm import tqdm
import numpy as np
import time
# 设置缓存目录
os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib'
os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache'
os.makedirs(os.environ['MPLCONFIGDIR'], exist_ok=True)
os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
# Weaviate 连接配置
WEAVIATE_API_KEY = "Y7c8DRmcxZ4nP5IJLwkznIsK84l6EdwfXwcH"
WEAVIATE_URL = "https://39nlafviqvard82k6y8btq.c0.asia-southeast1.gcp.weaviate.cloud"
weaviate_auth_config = weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
weaviate_client = weaviate.Client(url=WEAVIATE_URL, auth_client_secret=weaviate_auth_config)
# 预训练模型配置
MODEL_NAME = "bert-base-chinese"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)
# OpenAI 客户端
openai_client = None
def initialize_openai_client(api_key):
global openai_client
openai_client = OpenAI(api_key=api_key)
def extract_keywords(text):
prompt = """
你是一个关键词提取机器人。提取用户输入中的关键词,特别是名词和形容词,关键词之间用空格分隔。例如:苹果 电脑 裤子 蓝色 裙。
"""
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"从下面的文本中提取五个关键词,以空格分隔:{text}"}
]
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=100,
temperature=0.7,
top_p=0.9,
)
keywords = response.choices[0].message.content.split(' ')
return ','.join(keywords)
# def match_keywords(query_keywords, ad_keywords_list, triggered_keywords, current_turn, window_size, threshold):
# best_match_distance = 0
# best_match_index = -1
# for i, ad_keywords in enumerate(ad_keywords_list):
# match_count = sum(
# any(
# ad_keyword in keyword and
# (keyword not in triggered_keywords or current_turn - triggered_keywords[keyword] > window_size)
# ) for keyword in query_keywords
# )
# if match_count > best_match_distance:
# best_match_distance = match_count
# best_match_index = i
# if best_match_distance >= threshold:
# for keyword in query_keywords:
# if any(ad_keyword in keyword for ad_keyword in ad_keywords_list[best_match_index]):
# triggered_keywords[keyword] = current_turn
# return best_match_distance, best_match_index
def encode_keywords_to_avg(keywords, model, tokenizer, device):
embeddings = []
for keyword in tqdm(keywords):
inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
inputs.to(device)
with torch.no_grad():
outputs = model(**inputs)
embeddings.append(outputs.last_hidden_state.mean(dim=1))
avg_embedding = sum(embeddings) / len(embeddings)
return avg_embedding
def encode_keywords_to_list(keywords, model, tokenizer, device):
embeddings = []
for keyword in tqdm(keywords):
inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
inputs.to(device)
with torch.no_grad():
outputs = model(**inputs)
embeddings.append(outputs.last_hidden_state.mean(dim=1).squeeze().tolist())
return embeddings
def get_response_from_db(keywords_dict, class_name):
avg_vec = encode_keywords_to_avg(keywords_dict.keys(), model, tokenizer, device).numpy()
response = (
weaviate_client.query
.get(class_name, ['keywords', 'summary'])
.with_near_vector({'vector': avg_vec})
.with_limit(1)
.with_additional(['distance'])
.do()
)
if class_name.capitalize() in response['data']['Get']:
result = response['data']['Get'][class_name.capitalize()][0]
return result['_additional']['distance'], result['summary'], result['keywords']
else:
return None, None, None
def get_candidates_from_db(keywords_dict, class_name,limit=3):
embeddings= encode_keywords_to_list(keywords_dict.keys(), model, tokenizer, device)
candidate_list=[]
for embedding in embeddings:
response = (
weaviate_client.query
.get(class_name, ['keywords', 'summary'])
.with_near_vector({'vector': embedding})
.with_limit(limit)
.with_additional(['distance'])
.do()
)
if class_name.capitalize() in response['data']['Get']:
results = response['data']['Get'][class_name.capitalize()]
for result in results:
candidate_list.append({
'distance': result['_additional']['distance'],
'summary': result['summary'],
'keywords': result['keywords']
})
return candidate_list
triggered_keywords = {}
def keyword_match(keywords_dict,candidates):
for candidate in candidates:
keywords=candidate['keywords'].split('*')
processed_keywords=[keyword.split('#')[1] for keyword in keywords]
candidate_keywords_list=','.join(processed_keywords)
for keyword in keywords_dict.keys():
if any(candidate_keyword in keyword for candidate_keyword in candidate_keywords_list):
# triggered_keywords[keyword]=True
return candidate['distance'],candidate['summary'],candidate['keywords']
return 1000,None,None
def chatbot_response(message, history, max_tokens, temperature, top_p, window_size, threshold, user_weight, triggered_weight, api_key):
#初始化openai client
initialize_openai_client(api_key)
#更新轮次,获取窗口历史
current_turn = len(history) + 1
combined_user_message = " ".join([h[0] for h in history[-window_size:]] + [message])
combined_assistant_message = " ".join([h[1] for h in history[-window_size:]])
#提取关键词
user_keywords = extract_keywords(combined_user_message).split(',')
assistant_keywords = extract_keywords(combined_assistant_message).split(',')
#获取关键词字典
keywords_dict = {keyword: user_weight for keyword in user_keywords}
for keyword in assistant_keywords:
keywords_dict[keyword] = keywords_dict.get(keyword, 0) + 1
for keyword in list(keywords_dict.keys()):
if keyword in triggered_keywords and current_turn - triggered_keywords[keyword] < window_size:
keywords_dict[keyword] = triggered_weight
#数据库检索,双方平均方式
# distance, ad_summary, ad_keywords = get_response_from_db(keywords_dict, class_name="ad_DB02")
#数据库索引,数据库关键词平均方式
candidates=get_candidates_from_db(keywords_dict, class_name="ad_DB02",limit=3)
print(f"candidates{candidates}")
#先对候选集的distance进行筛选,保留小于threshold的
candidates.sort(key=lambda x:x['distance'])
candidates=[candidate for candidate in candidates if candidate['distance']<threshold]
print(f"candidates{candidates}")
if(candidates):
distance, ad_summary, ad_keywords=keyword_match(keywords_dict,candidates)
else:
distance=1000
#判断相似度
if distance and distance < threshold:
brands=['腾讯','阿里巴巴','百度','京东','华为','小米','苹果','微软','谷歌','亚马逊']
brand=random.choice(brands)
ad_message = f"{message} <sep>品牌{brand}<sep>{ad_summary}"
messages = [{"role": "system", "content": "你是一个热情的聊天机器人,应微妙地嵌入广告内容。"}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": ad_message})
for keyword in keywords_dict.keys():
if any(ad_keyword in keyword for ad_keyword in ad_keywords.split(',')):
triggered_keywords[keyword] = current_turn
else:
messages = [{"role": "system", "content": "你是一个热情的聊天机器人。"}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
#获取回复
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
)
print(f"triggered_keywords: {triggered_keywords}")
return response.choices[0].message.content
# Gradio UI
demo = gr.ChatInterface(
chatbot_response,
additional_inputs=[
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Window size"),
gr.Slider(minimum=0.01, maximum=0.20, value=0.08, step=0.01, label="Distance threshold"),
gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Weight of keywords from users"),
gr.Slider(minimum=0, maximum=2, value=0.5, step=0.5, label="Weight of triggered keywords"),
gr.Textbox(label="API Key"),
],
)
if __name__ == "__main__":
demo.launch(share=True)
print("happyhappyhappy")
# import gradio as gr
# from huggingface_hub import InferenceClient
# import json
# import random
# import re
# from load_data import load_data
# from openai import OpenAI
# from transformers import AutoTokenizer, AutoModel
# import weaviate
# import os
# import subprocess
# import torch
# from tqdm import tqdm
# import numpy as np
# import time
# # 设置 Matplotlib 的缓存目录
# os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib'
# # 设置 Hugging Face Transformers 的缓存目录
# os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache'
# # 确保这些目录存在
# os.makedirs(os.environ['MPLCONFIGDIR'], exist_ok=True)
# os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
# auth_config = weaviate.AuthApiKey(api_key="Y7c8DRmcxZ4nP5IJLwkznIsK84l6EdwfXwcH")
# URL = "https://39nlafviqvard82k6y8btq.c0.asia-southeast1.gcp.weaviate.cloud"
# # Connect to a WCS instance
# db_client = weaviate.Client(
# url=URL,
# auth_client_secret=auth_config
# )
# class_name="ad_DB02"
# device = torch.device(device='cuda' if torch.cuda.is_available() else 'cpu')
# tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
# model = AutoModel.from_pretrained("bert-base-chinese")
# global_api_key = None
# client = None
# def initialize_clients(api_key):
# global client
# client = OpenAI(api_key=api_key)
# def get_keywords(message):
# system_message = """
# # 角色
# 你是一个关键词提取机器人
# # 指令
# 你的目标是从用户的输入中提取关键词,这些关键词应该尽可能是购买意图相关的。关键词中应该尽可能注意那些名词和形容词
# # 输出格式
# 你应该直接输出关键词,关键词之间用空格分隔。例如:苹果 电脑 裤子 蓝色 裙
# # 注意:如果输入文本过短可以重复输出关键词,例如对输入“你好”可以输出:你好 你好 你好 你好 你好
# """
# messages = [{"role": "system", "content": system_message}]
# messages.append({"role": "user", "content": f"从下面的文本中给我提取五个关键词,只输出这五个关键词,以空格分隔{message}"})
# response = client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=100,
# temperature=0.7,
# top_p=0.9,
# )
# keywords = response.choices[0].message.content.split(' ')
# return ','.join(keywords)
# #字符串匹配模块
# def keyword_match(query_keywords_dict, ad_keywords_lists, triggered_keywords, current_turn, window_size,distance_threshold):
# distance = 0
# most_matching_list = None
# index = 0
# # query_keywords = query_keywords.split(',')
# # query_keywords = [keyword for keyword in query_keywords if keyword]
# #匹配模块
# query_keywords= list(query_keywords_dict.keys())
# for i, lst in enumerate(ad_keywords_lists):
# lst = lst.split(',')
# matches = sum(
# any(
# ad_keyword in keyword and
# (
# keyword not in triggered_keywords or
# triggered_keywords.get(keyword) is None or
# current_turn - triggered_keywords.get(keyword, 0) > window_size
# ) * query_keywords_dict.get(keyword, 1) #计数乘以权重
# for keyword in query_keywords
# )
# for ad_keyword in lst
# )
# if matches > distance:
# distance = matches
# most_matching_list = lst
# index = i
# #更新对distance 有贡献的关键词
# if distance >= distance_threshold:
# for keyword in query_keywords:
# if any(
# ad_keyword in keyword for ad_keyword in most_matching_list
# ):
# triggered_keywords[keyword] = current_turn
# return distance, index
# def encode_list_to_avg(keywords_list_list, model, tokenizer, device):
# if torch.cuda.is_available():
# print('Using GPU')
# print(device)
# else:
# print('Using CPU')
# print(device)
# avg_embeddings = []
# for keywords in tqdm(keywords_list_list):
# keywords_lst=[]
# # keywords.split(',')
# for keyword in keywords:
# inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
# inputs.to(device)
# with torch.no_grad():
# outputs = model(**inputs)
# embeddings = outputs.last_hidden_state.mean(dim=1)
# keywords_lst.append(embeddings)
# avg_embedding = sum(keywords_lst) / len(keywords_lst)
# avg_embeddings.append(avg_embedding)
# return avg_embeddings
# def encode_to_avg(keywords_dict, model, tokenizer, device):
# if torch.cuda.is_available():
# print('Using GPU')
# print(device)
# else:
# print('Using CPU')
# print(device)
# keyword_embeddings=[]
# for keyword, weight in keywords_dict.items():
# inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
# inputs.to(device)
# with torch.no_grad():
# outputs = model(**inputs)
# embedding = outputs.last_hidden_state.mean(dim=1)
# keyword_embedding=embedding * weight
# keyword_embeddings.append(keyword_embedding * weight)
# avg_embedding = sum(keyword_embeddings) / sum(keywords_dict.values())
# return avg_embedding.tolist()
# def fetch_response_from_db(query_keywords_dict,class_name):
# start_time = time.time()
# avg_vec=np.array(encode_to_avg(query_keywords_dict, model, tokenizer, device))
# end_time = time.time()
# print(f"Time taken to encode to avg: {end_time - start_time}")
# nearVector = {
# 'vector': avg_vec
# }
# start_time = time.time()
# response = (
# db_client.query
# .get(class_name, ['keywords', 'summary'])
# .with_near_vector(nearVector)
# .with_limit(1)
# .with_additional(['distance'])
# .do()
# )
# end_time = time.time()
# print(f"Time taken to search in DB: {end_time - start_time}")
# print(response)
# class_name=class_name[0].upper()+class_name[1:]
# if class_name in response['data']['Get']:
# results = response['data']['Get'][class_name]
# print(results[0]['keywords'])
# return results[0]['_additional']['distance'],results[0]['summary'], results[0]['keywords']
# else:
# print(f"Class name {class_name} not found in response")
# return None
# def wrapper(message, history, max_tokens, temperature, top_p, window_size, distance_threshold, weight_keywords_users, weight_keywords_triggered, api_key):
# initialize_clients(api_key)
# return respond(message, history, max_tokens, temperature, top_p, window_size, distance_threshold, weight_keywords_users, weight_keywords_triggered)
# #触发词及触发回合字典
# triggered_keywords = {}
# def respond(
# message,
# history: list[tuple[str, str]],
# max_tokens,
# temperature,
# top_p,
# window_size,
# distance_threshold,
# weight_keywords_users,
# weight_keywords_triggered,
# ):
# system_message_with_ad = """
# # 角色
# 你是一个热情的聊天机器人
# # 指令
# 你的目标是以一种微妙的方式将给定的广告内容嵌入对话中。插入广告时应尽量自然,避免突兀。优先正常回答用户问题,然后逐步以推荐方式引入广告内容。
# 注意要在商品的描述前面加上是来自哪个品牌的广告。
# 注意在推荐中不要脑补用户的身份,只是进行简单推荐。
# 注意要热情但是语气只要适度热情
# # 输入格式
# 用户查询后跟随广告品牌,用<sep>分隔,广告品牌后跟随广告描述,再用<sep>分隔。
# 例如:我想买一条阔腿裤 <sep> 腾讯 <sep> 宽松的阔腿裤这两年真的吸粉不少,明星时尚达人的心头爱。毕竟好穿时尚,谁都能穿出腿长2米的效果宽松的裤腿,当然是遮肉小能手啊。上身随性自然不拘束,面料亲肤舒适贴身体验感棒棒哒。系带部分增加设计看点,还让单品的设计感更强。腿部线条若隐若现的,性感撩人。颜色敲温柔的,与裤子本身所呈现的风格有点反差萌。
# 注意: 当没有<sep>时,正常回复用户,不插入广告。
# # 输出格式
# 始终使用中文,只输出聊天内容,不输出任何自我分析的信息
# """
# system_message_without_ad = """
# 你是一个热情的聊天机器人
# """
# print(f"triggered_keywords{triggered_keywords}")
# # 更新当前轮次
# current_turn = len(history) + 1
# print(f"\ncurrent_turn: {current_turn}\n")
# # 检查历史记录的长度
# if len(history) >= window_size:
# combined_message_user = " ".join([h[0] for h in history[-window_size:] if h[0]] + [message])
# combined_message_assistant=" ".join(h[1] for h in history[-window_size:] if h[1])
# else:
# combined_message_user = message
# combined_message_assistant = ""
# start_time = time.time()
# key_words_users=get_keywords(combined_message_user).split(',')
# key_words_assistant=get_keywords(combined_message_assistant).split(',')
# end_time = time.time()
# print(f"Time taken to get keywords: {end_time - start_time}")
# print(f"Initial keywords_users: {key_words_users}")
# print(f"Initial keywords_assistant: {key_words_assistant}")
# keywords_dict = {}
# added_keywords = set()
# for keywords in key_words_users:
# if keywords not in added_keywords:
# if keywords in keywords_dict:
# keywords_dict[keywords] += weight_keywords_users
# else:
# keywords_dict[keywords] = weight_keywords_users
# added_keywords.add(keywords)
# for keywords in key_words_assistant:
# if keywords not in added_keywords:
# if keywords in keywords_dict:
# keywords_dict[keywords] += 1
# else:
# keywords_dict[keywords] = 1
# added_keywords.add(keywords)
# #窗口内触发过的关键词权重下调为0.5
# for keyword in list(keywords_dict.keys()):
# if keyword in triggered_keywords:
# if current_turn - triggered_keywords[keyword] < window_size:
# keywords_dict[keyword] = weight_keywords_triggered
# query_keywords = list(keywords_dict.keys())
# print(keywords_dict)
# start_time = time.time()
# distance,top_keywords_list,top_summary = fetch_response_from_db(keywords_dict,class_name)
# end_time = time.time()
# print(f"Time taken to fetch response from db: {end_time - start_time}")
# print(f"distance: {distance}")
# if distance<distance_threshold:
# ad =top_summary
# messages = [{"role": "system", "content": system_message_with_ad}]
# for val in history:
# if val[0]:
# messages.append({"role": "user", "content": val[0]})
# if val[1]:
# messages.append({"role": "assistant", "content": val[1]})
# brands = ['腾讯', '百度', '京东', '华为', '小米', '苹果', '微软', '谷歌', '亚马逊']
# brand = random.choice(brands)
# messages.append({"role": "user", "content": f"{message} <sep>{brand}的 <sep> {ad}"})
# #更新触发词
# for keyword in query_keywords:
# if any(
# ad_keyword in keyword for ad_keyword in top_keywords_list
# ):
# triggered_keywords[keyword] = current_turn
# else:
# messages = [{"role": "system", "content": system_message_without_ad}]
# for val in history:
# if val[0]:
# messages.append({"role": "user", "content": val[0]})
# if val[1]:
# messages.append({"role": "assistant", "content": val[1]})
# messages.append({"role": "user", "content": message})
# start_time = time.time()
# response = client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=max_tokens,
# temperature=temperature,
# top_p=top_p,
# )
# end_time = time.time()
# print(f"Time taken to get response from GPT: {end_time - start_time}")
# return response.choices[0].message.content
# # def chat_interface(message, history, max_tokens, temperature, top_p, window_size, distance_threshold):
# # global triggered_keywords
# # response, triggered_keywords = respond(
# # message,
# # history,
# # max_tokens,
# # temperature,
# # top_p,
# # window_size,
# # distance_threshold,
# # triggered_keywords
# # )
# # return response, history + [(message, response)]
# demo = gr.ChatInterface(
# wrapper,
# additional_inputs=[
# gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
# gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
# gr.Slider(
# minimum=0.1,
# maximum=1.0,
# value=0.95,
# step=0.05,
# label="Top-p (nucleus sampling)",
# ),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Window size"),
# gr.Slider(minimum=0.01, maximum=0.20, value=0.08, step=0.01, label="Distance threshold"),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Weight of keywords from users"),
# gr.Slider(minimum=0, maximum=2, value=0.5, step=0.5, label="Weight of triggered keywords"),
# gr.Textbox(label="api_key"),
# ],
# )
# if __name__ == "__main__":
# demo.launch(share=True)
# import gradio as gr
# from huggingface_hub import InferenceClient
# import json
# import random
# import re
# from load_data import load_data
# from openai import OpenAI
# from transformers import AutoTokenizer, AutoModel
# import weaviate
# import os
# import subprocess
# import torch
# from tqdm import tqdm
# import numpy as np
# # 设置 Matplotlib 和 Hugging Face Transformers 的缓存目录
# os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib'
# os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache'
# os.makedirs(os.environ['MPLCONFIGDIR'], exist_ok=True)
# os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
# auth_config = weaviate.AuthApiKey(api_key="Y7c8DRmcxZ4nP5IJLwkznIsK84l6EdwfXwcH")
# URL = "https://39nlafviqvard82k6y8btq.c0.asia-southeast1.gcp.weaviate.cloud"
# # Connect to a WCS instance
# db_client = weaviate.Client(
# url=URL,
# auth_client_secret=auth_config
# )
# class_name = "ad_DB02"
# device = torch.device(device='cuda' if torch.cuda.is_available() else 'cpu')
# tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
# model = AutoModel.from_pretrained("bert-base-chinese")
# global_api_key = None
# client = None
# def initialize_clients(api_key):
# global client
# client = OpenAI(api_key=api_key)
# def get_keywords(message):
# system_message = """
# # 角色
# 你是一个关键词提取机器人
# # 指令
# 你的目标是从用户的输入中提取关键词,这些关键词应该尽可能是购买意图相关的。关键词中应该尽可能注意那些名词和形容词
# # 输出格式
# 你应该直接输出关键词,关键词之间用空格分隔。例如:苹果 电脑 裤子 蓝色 裙
# # 注意:如果输入文本过短可以重复输出关键词,例如对输入“你好”可以输出:你好 你好 你好 你好 你好
# """
# messages = [{"role": "system", "content": system_message}]
# messages.append({"role": "user", "content": f"从下面的文本中给我提取五个关键词,只输出这五个关键词,以空格分隔{message}"})
# response = client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=100,
# temperature=0.7,
# top_p=0.9,
# )
# keywords = response.choices[0].message.content.split(' ')
# return ','.join(keywords)
# def fetch_response_from_db(query_keywords_dict, class_name):
# avg_vec = np.array(encode_to_avg(query_keywords_dict, model, tokenizer, device))
# nearVector = {'vector': avg_vec}
# response = (
# db_client.query
# .get(class_name, ['keywords', 'summary'])
# .with_near_vector(nearVector)
# .with_limit(1)
# .with_additional(['distance'])
# .do()
# )
# class_name = class_name[0].upper() + class_name[1:]
# if class_name in response['data']['Get']:
# results = response['data']['Get'][class_name]
# return results[0]['_additional']['distance'], results[0]['summary'], results[0]['keywords']
# else:
# print(f"Class name {class_name} not found in response")
# return None
# def encode_to_avg(keywords_dict, model, tokenizer, device):
# if torch.cuda.is_available():
# print('Using GPU')
# print(device)
# else:
# print('Using CPU')
# print(device)
# keyword_embeddings = []
# for keyword, weight in keywords_dict.items():
# inputs = tokenizer(keyword, return_tensors='pt', padding=True, truncation=True, max_length=512)
# inputs.to(device)
# with torch.no_grad():
# outputs = model(**inputs)
# embedding = outputs.last_hidden_state.mean(dim=1)
# keyword_embedding = embedding * weight
# keyword_embeddings.append(keyword_embedding)
# avg_embedding = sum(keyword_embeddings) / sum(keywords_dict.values())
# return avg_embedding.tolist()
# def wrapper(message, history, max_tokens, temperature, top_p, window_size, distance_threshold, weight_keywords_users, weight_keywords_triggered, api_key, state):
# initialize_clients(api_key)
# return respond(message, history, max_tokens, temperature, top_p, window_size, distance_threshold, weight_keywords_users, weight_keywords_triggered, state)
# def respond(
# message,
# history,
# max_tokens,
# temperature,
# top_p,
# window_size,
# distance_threshold,
# weight_keywords_users,
# weight_keywords_triggered,
# state
# ):
# triggered_keywords = state.get('triggered_keywords', {})
# current_turn = len(history) + 1
# if len(history) >= window_size:
# combined_message_user = " ".join([h[0] for h in history[-window_size:] if h[0]] + [message])
# combined_message_assistant = " ".join(h[1] for h in history[-window_size:] if h[1])
# else:
# combined_message_user = message
# combined_message_assistant = ""
# key_words_users = get_keywords(combined_message_user).split(',')
# key_words_assistant = get_keywords(combined_message_assistant).split(',')
# keywords_dict = {}
# for keyword in key_words_users:
# if keyword in keywords_dict:
# keywords_dict[keyword] += weight_keywords_users
# else:
# keywords_dict[keyword] = weight_keywords_users
# for keyword in key_words_assistant:
# if keyword in keywords_dict:
# keywords_dict[keyword] += 1
# else:
# keywords_dict[keyword] = 1
# for keyword in list(keywords_dict.keys()):
# if keyword in triggered_keywords:
# if current_turn - triggered_keywords[keyword] < window_size:
# keywords_dict[keyword] = weight_keywords_triggered
# query_keywords = list(keywords_dict.keys())
# distance, top_keywords_list, top_summary = fetch_response_from_db(keywords_dict, class_name)
# if distance < distance_threshold:
# ad = top_summary
# messages = [{"role": "system", "content": system_message_with_ad}]
# for val in history:
# if val[0]:
# messages.append({"role": "user", "content": val[0]})
# if val[1]:
# messages.append({"role": "assistant", "content": val[1]})
# brands = ['腾讯', '百度', '京东', '华为', '小米', '苹果', '微软', '谷歌', '亚马逊']
# brand = random.choice(brands)
# messages.append({"role": "user", "content": f"{message} <sep>{brand}的 <sep> {ad}"})
# for keyword in query_keywords:
# if any(ad_keyword in keyword for ad_keyword in top_keywords_list):
# triggered_keywords[keyword] = current_turn
# else:
# messages = [{"role": "system", "content": system_message_without_ad}]
# for val in history:
# if val[0]:
# messages.append({"role": "user", "content": val[0]})
# if val[1]:
# messages.append({"role": "assistant", "content": val[1]})
# messages.append({"role": "user", "content": message})
# response = client.chat.completions.create(
# model="gpt-3.5-turbo",
# messages=messages,
# max_tokens=max_tokens,
# temperature=temperature,
# top_p=top_p,
# )
# state['triggered_keywords'] = triggered_keywords
# return response.choices[0].message.content, state
# demo = gr.ChatInterface(
# wrapper,
# additional_inputs=[
# gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
# gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
# gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Window size"),
# gr.Slider(minimum=0.01, maximum=0.20, value=0.08, step=0.01, label="Distance threshold"),
# gr.Slider(minimum=1, maximum=5, value=2, step=1, label="Weight of keywords from users"),
# gr.Slider(minimum=0, maximum=2, value=0.5, step=0.5, label="Weight of triggered keywords"),
# gr.Textbox(label="api_key"),
# gr.State(value="state")
# ],
# )
# if __name__ == "__main__":
# demo.launch(share=True)