import os import torch import psutil from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig from peft import PeftModel, PeftConfig from pathlib import Path from tqdm import tqdm from huggingface_hub import login, create_repo, HfApi import subprocess import math import gradio as gr import threading import queue import time # 创建一个队列用于存储日志消息 log_queue = queue.Queue() current_logs = [] def log(msg): """统一的日志处理函数""" print(msg) current_logs.append(msg) return "\n".join(current_logs) def get_model_size_in_gb(model_name): """估算模型大小(以GB为单位)""" try: # get model size from huggingface api = HfApi() model_info = api.model_info(model_name) return model_info.safetensors.total / (1024 ** 3) except Exception as e: log(f"无法估算模型大小: {str(e)}") return 1 # bypass memory check def check_system_resources(model_name): """检查系统资源并决定使用什么设备""" log("正在检查系统资源...") # 获取系统内存信息 system_memory = psutil.virtual_memory() total_memory_gb = system_memory.total / (1024 ** 3) available_memory_gb = system_memory.available / (1024 ** 3) log(f"系统总内存: {total_memory_gb:.1f}GB") log(f"可用内存: {available_memory_gb:.1f}GB") # 估算模型所需内存 model_size_gb = get_model_size_in_gb(model_name) required_memory_gb = model_size_gb * 2.5 # 需要额外的内存用于计算 log(f"估计模型需要内存: {required_memory_gb:.1f}GB") # 检查CUDA是否可用 if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) log(f"发现GPU: {gpu_name}") log(f"GPU显存: {gpu_memory_gb:.1f}GB") if gpu_memory_gb >= required_memory_gb: log("✅ GPU显存足够,将使用GPU进行转换") return "cuda", gpu_memory_gb else: log(f"⚠️ GPU显存不足 (需要 {required_memory_gb:.1f}GB, 实际 {gpu_memory_gb:.1f}GB)") else: log("❌ 未检测到可用的GPU") # 检查CPU内存是否足够 if available_memory_gb >= required_memory_gb: log("✅ CPU内存足够,将使用CPU进行转换") return "cpu", available_memory_gb else: raise MemoryError(f"❌ 系统内存不足 (需要 {required_memory_gb:.1f}GB, 可用 {available_memory_gb:.1f}GB)") def setup_environment(model_name): # # 检查系统资源并决定使用什么设备 # device, available_memory = check_system_resources(model_name) device = "cpu" return device def create_hf_repo(repo_name, private=True): """创建HuggingFace仓库""" try: # check if repo already exists api = HfApi() if api.repo_exists(repo_name): log(f"仓库已存在: {repo_name}") return ValueError(f"仓库已存在: {repo_name}, 请使用其他名称或删除已存在的仓库") repo_url = create_repo(repo_name, private=private) log(f"创建仓库成功: {repo_url}") return repo_url except Exception as e: log(f"创建仓库失败: {str(e)}") raise def download_and_merge_model(base_model_name, lora_model_name, output_dir, device): log(f"正在加载基础模型: {base_model_name}") try: # 先加载原始模型 base_model = AutoModelForCausalLM.from_pretrained( base_model_name, torch_dtype=torch.float16, device_map={"": device} ) # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained(base_model_name) log(f"正在加载LoRA模型: {lora_model_name}") log("基础模型配置:" + str(base_model.config)) # 加载adapter配置 adapter_config = PeftConfig.from_pretrained(lora_model_name) log("Adapter配置:" + str(adapter_config)) model = PeftModel.from_pretrained(base_model, lora_model_name) log("正在合并LoRA权重") model = model.merge_and_unload() # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 保存合并后的模型 log(f"正在保存合并后的模型到: {output_dir}") model.save_pretrained(output_dir) tokenizer.save_pretrained(output_dir) return output_dir except Exception as e: log(f"错误: {str(e)}") log(f"错误类型: {type(e)}") import traceback log("详细错误信息:") log(traceback.format_exc()) raise def quantize_and_push_model(model_path, repo_id, bits=8): """量化模型并推送到HuggingFace""" try: from optimum.bettertransformer import BetterTransformer from transformers import AutoModelForCausalLM log(f"正在加载模型用于{bits}位量化...") model = AutoModelForCausalLM.from_pretrained(model_path) tokenizer = AutoTokenizer.from_pretrained(model_path) # 转换为BetterTransformer格式 model = BetterTransformer.transform(model) # 量化 if bits == 8: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_threshold=6.0 ) elif bits == 4: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4" ) else: raise ValueError(f"不支持的量化位数: {bits}") # 保存量化后的模型 quantized_model_path = f"{model_path}_q{bits}" model.save_pretrained( quantized_model_path, quantization_config=quantization_config ) tokenizer.save_pretrained(quantized_model_path) # 推送到HuggingFace log(f"正在将{bits}位量化模型推送到HuggingFace...") api = HfApi() api.upload_folder( folder_path=quantized_model_path, repo_id=repo_id, repo_type="model" ) log(f"{bits}位量化模型上传完成") except Exception as e: log(f"量化或上传过程中出错: {str(e)}") raise def process_model(base_model, lora_model, repo_name, hf_token, progress=gr.Progress()): """处理模型的主函数,用于Gradio界面""" try: login(hf_token) # 我不理解为什么登录一次不行,非得放到环境变量里 os.environ["HF_TOKEN"] = hf_token api = HfApi(token=hf_token) username = api.whoami()["name"] if repo_name == "Auto": repo_name = username + "/" + base_model.split("/")[-1] + "_" + lora_model.split("/")[-1] # 清空之前的日志 current_logs.clear() # 设置环境和检查资源 device = setup_environment(base_model) # 创建HuggingFace仓库 repo_url = create_hf_repo(repo_name) # 设置输出目录 output_dir = os.path.join(".", "output", repo_name) progress(0.1, desc="开始模型转换流程...") # 下载并合并模型 model_path = download_and_merge_model(base_model, lora_model, output_dir, device) # 推送到HuggingFace log(f"正在将模型推送到HuggingFace...") api.upload_folder( folder_path=model_path, repo_id=repo_name, repo_type="model" ) progress(0.4, desc="开始8位量化...") # 量化并上传模型 quantize_and_push_model(model_path, repo_name, bits=8) progress(0.7, desc="开始4位量化...") quantize_and_push_model(model_path, repo_name, bits=4) final_message = f"全部完成!模型已上传至: https://huggingface.co/{repo_name}" log(final_message) progress(1.0, desc="处理完成") # remove hf_token from env os.environ.pop("HF_TOKEN") log("HF_TOKEN已从环境变量中删除") # remove model_path os.remove(model_path) log(f"模型路径已删除: {model_path}") return "\n".join(current_logs) except Exception as e: error_message = f"处理过程中出错: {str(e)}" log(error_message) return "\n".join(current_logs) def create_ui(): """创建Gradio界面""" with gr.Blocks(title="模型转换工具") as app: gr.Markdown(""" # 🤗 模型转换与量化工具 这个工具可以帮助你: 1. 合并基础模型和LoRA适配器 2. 创建4位和8位量化版本 3. 自动上传到HuggingFace Hub """) with gr.Row(): with gr.Column(): base_model = gr.Textbox( label="基础模型路径", placeholder="例如: Qwen/Qwen2.5-14B-Instruct", value="Qwen/Qwen2.5-7B-Instruct" ) lora_model = gr.Textbox( label="LoRA模型路径", placeholder="输入你的LoRA模型路径" ) repo_name = gr.Textbox( label="HuggingFace仓库名称", placeholder="输入要创建的仓库名称", value="Auto" ) hf_token = gr.Textbox( label="HuggingFace Token", placeholder="输入你的HuggingFace Token" ) convert_btn = gr.Button("开始转换", variant="primary") with gr.Column(): output = gr.TextArea( label="处理日志", placeholder="处理日志将在这里显示...", interactive=False, autoscroll=True, lines=20 ) # 设置事件处理 convert_btn.click( fn=process_model, inputs=[base_model, lora_model, repo_name, hf_token], outputs=output ) return app if __name__ == "__main__": # 创建并启动Gradio界面 app = create_ui() app.queue() app.launch()