Steven10429's picture
Update app.py
85f845d verified
raw
history blame
10.8 kB
import os
import torch
import psutil
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
from peft import PeftModel, PeftConfig
from pathlib import Path
from tqdm import tqdm
from huggingface_hub import login, create_repo, HfApi
import subprocess
import math
import gradio as gr
import threading
import queue
import time
# 创建一个队列用于存储日志消息
log_queue = queue.Queue()
current_logs = []
def log(msg):
"""统一的日志处理函数"""
print(msg)
current_logs.append(msg)
return "\n".join(current_logs)
def get_model_size_in_gb(model_name):
"""估算模型大小(以GB为单位)"""
try:
# get model size from huggingface
api = HfApi()
model_info = api.model_info(model_name)
return model_info.safetensors.total / (1024 ** 3)
except Exception as e:
log(f"无法估算模型大小: {str(e)}")
return 1 # bypass memory check
def check_system_resources(model_name):
"""检查系统资源并决定使用什么设备"""
log("正在检查系统资源...")
# 获取系统内存信息
system_memory = psutil.virtual_memory()
total_memory_gb = system_memory.total / (1024 ** 3)
available_memory_gb = system_memory.available / (1024 ** 3)
log(f"系统总内存: {total_memory_gb:.1f}GB")
log(f"可用内存: {available_memory_gb:.1f}GB")
# 估算模型所需内存
model_size_gb = get_model_size_in_gb(model_name)
required_memory_gb = model_size_gb * 2.5 # 需要额外的内存用于计算
log(f"估计模型需要内存: {required_memory_gb:.1f}GB")
# 检查CUDA是否可用
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3)
log(f"发现GPU: {gpu_name}")
log(f"GPU显存: {gpu_memory_gb:.1f}GB")
if gpu_memory_gb >= required_memory_gb:
log("✅ GPU显存足够,将使用GPU进行转换")
return "cuda", gpu_memory_gb
else:
log(f"⚠️ GPU显存不足 (需要 {required_memory_gb:.1f}GB, 实际 {gpu_memory_gb:.1f}GB)")
else:
log("❌ 未检测到可用的GPU")
# 检查CPU内存是否足够
if available_memory_gb >= required_memory_gb:
log("✅ CPU内存足够,将使用CPU进行转换")
return "cpu", available_memory_gb
else:
raise MemoryError(f"❌ 系统内存不足 (需要 {required_memory_gb:.1f}GB, 可用 {available_memory_gb:.1f}GB)")
def setup_environment(model_name):
# # 检查系统资源并决定使用什么设备
# device, available_memory = check_system_resources(model_name)
device = "cpu"
return device
def create_hf_repo(repo_name, private=True):
"""创建HuggingFace仓库"""
try:
# check if repo already exists
api = HfApi()
if api.repo_exists(repo_name):
log(f"仓库已存在: {repo_name}")
return ValueError(f"仓库已存在: {repo_name}, 请使用其他名称或删除已存在的仓库")
repo_url = create_repo(repo_name, private=private)
log(f"创建仓库成功: {repo_url}")
return repo_url
except Exception as e:
log(f"创建仓库失败: {str(e)}")
raise
def download_and_merge_model(base_model_name, lora_model_name, output_dir, device):
log(f"正在加载基础模型: {base_model_name}")
try:
# 先加载原始模型
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map={"": device}
)
# 加载tokenizer
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
log(f"正在加载LoRA模型: {lora_model_name}")
log("基础模型配置:" + str(base_model.config))
# 加载adapter配置
adapter_config = PeftConfig.from_pretrained(lora_model_name)
log("Adapter配置:" + str(adapter_config))
model = PeftModel.from_pretrained(base_model, lora_model_name)
log("正在合并LoRA权重")
model = model.merge_and_unload()
# 创建输出目录
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 保存合并后的模型
log(f"正在保存合并后的模型到: {output_dir}")
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
return output_dir
except Exception as e:
log(f"错误: {str(e)}")
log(f"错误类型: {type(e)}")
import traceback
log("详细错误信息:")
log(traceback.format_exc())
raise
def quantize_and_push_model(model_path, repo_id, bits=8):
"""量化模型并推送到HuggingFace"""
try:
from optimum.bettertransformer import BetterTransformer
from transformers import AutoModelForCausalLM
log(f"正在加载模型用于{bits}位量化...")
model = AutoModelForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 转换为BetterTransformer格式
model = BetterTransformer.transform(model)
# 量化
if bits == 8:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
elif bits == 4:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4"
)
else:
raise ValueError(f"不支持的量化位数: {bits}")
# 保存量化后的模型
quantized_model_path = f"{model_path}_q{bits}"
model.save_pretrained(
quantized_model_path,
quantization_config=quantization_config
)
tokenizer.save_pretrained(quantized_model_path)
# 推送到HuggingFace
log(f"正在将{bits}位量化模型推送到HuggingFace...")
api = HfApi()
api.upload_folder(
folder_path=quantized_model_path,
repo_id=repo_id,
repo_type="model"
)
log(f"{bits}位量化模型上传完成")
except Exception as e:
log(f"量化或上传过程中出错: {str(e)}")
raise
def process_model(base_model, lora_model, repo_name, hf_token, progress=gr.Progress()):
"""处理模型的主函数,用于Gradio界面"""
try:
login(hf_token) # 我不理解为什么登录一次不行,非得放到环境变量里
os.environ["HF_TOKEN"] = hf_token
api = HfApi(token=hf_token)
username = api.whoami()["name"]
if repo_name == "Auto":
repo_name = username + "/" + base_model.split("/")[-1] + "_" + lora_model.split("/")[-1]
# 清空之前的日志
current_logs.clear()
# 设置环境和检查资源
device = setup_environment(base_model)
# 创建HuggingFace仓库
repo_url = create_hf_repo(repo_name)
# 设置输出目录
output_dir = os.path.join(".", "output", repo_name)
progress(0.1, desc="开始模型转换流程...")
# 下载并合并模型
model_path = download_and_merge_model(base_model, lora_model, output_dir, device)
# 推送到HuggingFace
log(f"正在将模型推送到HuggingFace...")
api.upload_folder(
folder_path=model_path,
repo_id=repo_name,
repo_type="model"
)
progress(0.4, desc="开始8位量化...")
# 量化并上传模型
quantize_and_push_model(model_path, repo_name, bits=8)
progress(0.7, desc="开始4位量化...")
quantize_and_push_model(model_path, repo_name, bits=4)
final_message = f"全部完成!模型已上传至: https://huggingface.co/{repo_name}"
log(final_message)
progress(1.0, desc="处理完成")
# remove hf_token from env
os.environ.pop("HF_TOKEN")
log("HF_TOKEN已从环境变量中删除")
# remove model_path
os.remove(model_path)
log(f"模型路径已删除: {model_path}")
return "\n".join(current_logs)
except Exception as e:
error_message = f"处理过程中出错: {str(e)}"
log(error_message)
return "\n".join(current_logs)
def create_ui():
"""创建Gradio界面"""
with gr.Blocks(title="模型转换工具") as app:
gr.Markdown("""
# 🤗 模型转换与量化工具
这个工具可以帮助你:
1. 合并基础模型和LoRA适配器
2. 创建4位和8位量化版本
3. 自动上传到HuggingFace Hub
""")
with gr.Row():
with gr.Column():
base_model = gr.Textbox(
label="基础模型路径",
placeholder="例如: Qwen/Qwen2.5-14B-Instruct",
value="Qwen/Qwen2.5-7B-Instruct"
)
lora_model = gr.Textbox(
label="LoRA模型路径",
placeholder="输入你的LoRA模型路径"
)
repo_name = gr.Textbox(
label="HuggingFace仓库名称",
placeholder="输入要创建的仓库名称",
value="Auto"
)
hf_token = gr.Textbox(
label="HuggingFace Token",
placeholder="输入你的HuggingFace Token"
)
convert_btn = gr.Button("开始转换", variant="primary")
with gr.Column():
output = gr.TextArea(
label="处理日志",
placeholder="处理日志将在这里显示...",
interactive=False,
autoscroll=True,
lines=20
)
# 设置事件处理
convert_btn.click(
fn=process_model,
inputs=[base_model, lora_model, repo_name, hf_token],
outputs=output
)
return app
if __name__ == "__main__":
# 创建并启动Gradio界面
app = create_ui()
app.queue()
app.launch()