Steven10429's picture
1
cfd08d1
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel, PeftConfig
from huggingface_hub import login, create_repo, HfApi
import gradio as gr
import time
import shutil
from gradio_log import Log
import logging
MEMORY = int(os.getenv("MEMORY", 16)[:-2]) # 64Gi
CPU_CORES = int(os.getenv("CPU_CORES", 4)) # 4
SPACE_AUTHOR_NAME = os.getenv("SPACE_AUTHOR_NAME", "Steven10429") # str
SPACE_REPO_NAME = os.getenv("SPACE_REPO_NAME", "apply_lora_and_quantize") # str
SPACE_ID = os.getenv("SPACE_ID", "apply_lora_and_quantize") # str
# 全局日志
log = logging.getLogger("space_convert")
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler())
log.addHandler(logging.FileHandler("convert.log"))
def timeit(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
log.info(f"{func.__name__}: {end_time - start_time:.2f} s")
return result
return wrapper
@timeit
def get_model_size_in_gb(model_name):
"""通过 Hugging Face Hub 元数据估算模型大小(GB)"""
try:
api = HfApi()
model_info = api.model_info(model_name)
# 使用 safetensors 大小(不假定文件扩展名)
return model_info.safetensors.total / (1024 ** 3)
except Exception as e:
log.error(f"Unable to estimate model size: {e}")
return 1 # 默认值
@timeit
def check_system_resources(model_name):
"""检查系统资源,决定使用 CPU 或 GPU"""
log.info("Checking system resources...")
log.info(f"Total CPU cores: {CPU_CORES}")
log.info(f"Total system memory: {MEMORY}GB")
model_size_gb = get_model_size_in_gb(model_name)
required_memory_gb = model_size_gb * 2.5
log.info(f"Estimated required memory for model: {required_memory_gb:.1f}GB")
# if torch.cuda.is_available(): # failed with torch complie without GPU FLAG
# gpu_name = torch.cuda.get_device_name(0)
# gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3)
# log.info(f"Detected GPU: {gpu_name} with {gpu_memory_gb:.1f}GB memory")
# if gpu_memory_gb >= required_memory_gb:
# log.info("✅ Sufficient GPU memory available; using GPU.")
# return "cuda", gpu_memory_gb
# else:
# log.warning(f"⚠️ Insufficient GPU memory (requires {required_memory_gb:.1f}GB, found {gpu_memory_gb:.1f}GB).")
# else:
# log.error("❌ No GPU detected.")
# just use CPU, it's enough for merge and quantize
if MEMORY >= required_memory_gb:
log.info("✅ Sufficient CPU memory available; using CPU.")
return "cpu", MEMORY
else:
log.warning(f"⚠️ Insufficient CPU memory (requires {required_memory_gb:.1f}GB, found {MEMORY}GB).")
log.error("❌ No CPU detected.")
log.error("Will try low memory mode, but it may fail.")
return "cpu", MEMORY
@timeit
def setup_environment(model_name):
"""选择模型转换时使用的设备"""
try:
device, _ = check_system_resources(model_name)
except Exception as e:
log.error(f"Resource check failed: {e}. Defaulting to CPU.")
device = "cpu"
return device
@timeit
def create_hf_repo(repo_name, private=True):
"""创建 Hugging Face 仓库(如果不存在的话)"""
try:
api = HfApi()
# 如果仓库已存在,则尝试附加索引直到名称可用
if api.repo_exists(repo_name):
retry_index = 0
repo_name_with_index = repo_name
while api.repo_exists(repo_name_with_index):
retry_index += 1
log.info(f"Repository {repo_name_with_index} exists; trying {repo_name}_{retry_index}")
repo_name_with_index = f"{repo_name}_{retry_index}"
repo_name = repo_name_with_index
repo_url = create_repo(repo_name, private=private)
log.info(f"Repository created successfully: {repo_url}")
return repo_name
except Exception as e:
log.error(f"Failed to create repository: {e}")
raise
@timeit
def download_and_merge_model(base_model_name, lora_model_name, output_dir, device):
"""
1. 先加载 adapter 的 tokenizer 获取其词表大小
2. 加载 base tokenizer 用于后续合并词表
3. 加载 base 模型,并将嵌入层调整至 adapter 词表大小
4. 使用高层 API 加载 LoRA adapter 并合并其权重
5. 求 base 与 adapter tokenizer 的词表并取并集,扩展 tokenizer
6. 调整合并模型嵌入层尺寸并保存
"""
os.makedirs("temp", exist_ok=True)
bnb_config = BitsAndBytesConfig(load_in_8bit=True)
log.info("Loading base model...")
model = AutoModelForCausalLM.from_pretrained(base_model_name, low_cpu_mem_usage=True, device_map="auto", force_download=True, trust_remote_code=True, quantization_config=bnb_config, cache_dir="temp")
log.info("Loading adapter tokenizer...")
adapter_tokenizer = AutoTokenizer.from_pretrained(lora_model_name, trust_remote_code=True, device_map="auto", force_download=True)
log.info("Resizing token embeddings...")
added_tokens_decoder = adapter_tokenizer.added_tokens_decoder
model.resize_token_embeddings(adapter_tokenizer.vocab_size + len(added_tokens_decoder))
log.info("Loading LoRA adapter...")
peft_model = PeftModel.from_pretrained(model, lora_model_name, low_cpu_mem_usage=True, device_map="auto", force_download=True, trust_remote_code=True, quantization_config=bnb_config, cache_dir="temp")
log.info("Merging and unloading model...")
model = peft_model.merge_and_unload()
log.info("Saving model...")
model.save_pretrained(output_dir)
adapter_tokenizer.save_pretrained(output_dir)
del model, peft_model
shutil.rmtree("temp") # to save space due to huggingface space limit(50GB)
return output_dir
@timeit
def clone_llamacpp_and_download_build():
"""克隆 llama.cpp 并下载最新构建"""
llamacpp_repo = "https://github.com/ggerganov/llama.cpp.git"
llamacpp_dir = os.path.join(os.getcwd(), "llama.cpp")
if not os.path.exists(llamacpp_dir):
log.info(f"Cloning llama.cpp from {llamacpp_repo}...")
os.system(f"git clone {llamacpp_repo} {llamacpp_dir}")
log.info("Building llama.cpp...")
build_dir = os.path.join(llamacpp_dir, "build")
os.makedirs(build_dir, exist_ok=True)
"""
cmake -B build
cmake --build build --config Release
"""
# 进入构建目录并执行 cmake 和 make
os.chdir(build_dir)
os.system("cmake -B build")
os.system("cmake --build build --config Release")
log.info("llama.cpp build completed.")
# 返回到原始目录
os.chdir(os.path.dirname(llamacpp_dir))
def remove_illegal_chars_in_path(text):
return text.replace(".", "_").replace(":", "_").replace("/", "_")
@timeit
def quantize(model_path, repo_id, quant_method=None):
"""
利用 llama-cpp-python 对模型进行量化,并上传到 Hugging Face Hub。
使用的量化预设:
- 8-bit: Q8_0
- 4-bit: Q4_K_M 或 Q4_K_L
- 2-bit: Q2_K_L
模型输入(model_path)应为全精度(例如 fp16)的 GGUF 文件,
输出文件将保存为 <model_path>_q{bits}_{quant_method}
"""
# 使用llama.cpp的转换工具
llamacpp_dir = os.path.join(os.getcwd(), "llama.cpp")
if not os.path.exists(llamacpp_dir):
clone_llamacpp_and_download_build()
# 确保 model_output 目录存在
model_output_dir = f"{model_path}/quantized/"
os.makedirs(model_output_dir, exist_ok=True)
# 中间文件保存在 model_output 目录下
guff_8_path =f"./{repo_id}-q8_0.gguf"
if not os.path.exists(guff_8_path):
log.info(f"正在将模型转换为GGML格式")
convert_script = os.path.join(llamacpp_dir, "convert_hf_to_gguf.py")
convert_cmd = f"python {convert_script} {model_path} --outfile {guff_8_path} --outtype q8_0"
print(f"syscall:[{convert_cmd}]")
os.system(convert_cmd)
else:
log.info(f"GGML中间文件已存在,跳过转换")
if quant_method.lower() == "q8_0":
return guff_8_path # for upload to hub
# 最终文件保存在 model_output 目录下
final_path = os.path.join(model_output_dir, f"{repo_id}-{quant_method}.gguf")
log.info(f"正在进行{quant_method}量化")
quantize_bin = os.path.join(llamacpp_dir, "build", "bin", "llama-quantize")
quant_cmd = f"{quantize_bin} {guff_8_path} {final_path} {quant_method}"
print(f"syscall:[{quant_cmd}]")
if not os.path.exists(final_path):
os.system(quant_cmd)
else:
log.info(f"{quant_method}量化文件已存在,跳过量化")
return None
return final_path
def create_readme(repo_name, base_model_name, lora_model_name, quant_methods):
readme_path = os.path.join("output", repo_name, "README.md")
readme_template = """---
tags:
- autotrain
- text-generation-inference
- text-generation
- peft{quantization}
library_name: transformers
base_model: {base_model_name}{lora_model_name}
widget:
- messages:
- role: user
content: What is your favorite condiment?
license: other
---
# Model
{repo_name}
## Details:
- base_model: [{base_model_name}](https://huggingface.co/{base_model_name})
- lora_model: [{lora_model_name}](https://huggingface.co/{lora_model_name})
- quant_methods: {quant_methods}
- created_at: {created_at}
- created_by: [Steven10429/apply_lora_and_quantize](https://github.com/Steven10429/apply_lora_and_quantize)
""".format(
quantization="\n- quantization" if len(quant_methods) > 0 else "",
base_model_name=base_model_name,
lora_model_name=lora_model_name,
repo_name=repo_name,
quant_methods=quant_methods,
created_at=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
)
with open(readme_path, "w") as f:
f.write(readme_template)
@timeit
def process_model(base_model_name, lora_model_name, repo_name, quant_methods, hf_token):
"""
主处理函数:
1. 登录并(必要时)创建 Hugging Face 仓库;
2. 设置设备;
3. 下载并合并 base 模型与 LoRA adapter;
4. 异步上传合并后的模型;
5. 同时启动四个量化任务(8-bit、2-bit、4-bit 两种模式);
6. 最后统一等待所有 Future 完成,再返回日志。
"""
try:
if hf_token.strip().lower() == "auto":
hf_token = os.getenv("HF_TOKEN")
elif hf_token.startswith("hf_"):
os.environ["HF_TOKEN"] = hf_token
login(hf_token)
api = HfApi(token=hf_token)
username = api.whoami()["name"]
if base_model_name.strip().lower() == "auto":
adapter_config = PeftConfig.from_pretrained(lora_model_name)
base_model_name = adapter_config.base_model_name_or_path
if repo_name.strip().lower() == "auto":
repo_name = f"{base_model_name.split('/')[-1]}_{lora_model_name.split('/')[-1]}"
repo_name = remove_illegal_chars_in_path(repo_name)
device = setup_environment(base_model_name)
repo_name = create_hf_repo(repo_name)
output_dir = os.path.join(".", "output", repo_name)
log.info("Starting model merge process...")
model_path = download_and_merge_model(base_model_name, lora_model_name, output_dir, device)
create_readme(repo_name, base_model_name, lora_model_name, quant_methods)
# 上传合并后的模型和量化模型
api.upload_large_folder(
folder_path=model_path,
repo_id=repo_name,
repo_type="model",
num_workers=os.cpu_count() if os.cpu_count() > 4 else 4,
print_report_every=10,
)
log.info("Upload completed.")
if len(quant_methods) > 0:
quantize(output_dir, repo_name, "Q8_0")
# remove model for space limit
shutil.rmtree(model_path)
log.info("Removed model from local")
os.makedirs(os.path.join(output_dir, "quantized"), exist_ok=True)
if len(quant_methods) > 0:
for quant_method in quant_methods:
quantize(output_dir, repo_name, quant_method=quant_method)
os.system(f"mv ./{repo_name}-f16.gguf ./{output_dir}/quantized/")
api.upload_large_folder(
folder_path=os.path.join(output_dir, "quantized"),
repo_id=repo_name,
repo_type="model",
num_workers=os.cpu_count() if os.cpu_count() > 4 else 4,
print_report_every=10,
)
# rm -rf model_path
shutil.rmtree(model_path)
log.info("Removed model from local")
except Exception as e:
error_message = f"Error during processing: {e}"
log.error(error_message)
raise e
@timeit
def create_ui():
"""创建 Gradio 界面,仅展示日志"""
with gr.Blocks(title="Model Merge & Quantization Tool") as app:
gr.Markdown("""
# 🤗 Model Merge and Quantization Tool
This tool merges a base model with a LoRA adapter, creates 8-bit, 4-bit and 2-bit quantized versions
(using guff's quantization: Q8_0, Q2_K_L, Q4_K_M, Q4_K_L), and uploads them to the Hugging Face Hub.
""")
with gr.Row():
with gr.Column():
base_model = gr.Textbox(
label="Base Model Path",
placeholder="e.g., Qwen/Qwen2.5-14B-Instruct",
value="Auto"
)
lora_model = gr.Textbox(
label="LoRA Model Path",
placeholder="Enter the path to your LoRA model"
)
repo_name = gr.Textbox(
label="Hugging Face Repository Name",
placeholder="Enter the repository name to create",
value="Auto",
)
quant_method = gr.CheckboxGroup(
choices=["Q2_K", "Q4_K", "IQ4_NL", "Q5_K_M", "Q6_K", "Q8_0"],
value=["Q2_K", "Q4_K", "IQ4_NL", "Q5_K_M", "Q6_K", "Q8_0"],
label="Quantization Method"
)
hf_token = gr.Textbox(
label="Hugging Face Token",
placeholder="Enter your Hugging Face Token",
value="Auto",
type="password"
)
convert_btn = gr.Button("Start Conversion", variant="primary")
with gr.Column():
Log("convert.log", dark=True, xterm_font_size=12)
convert_btn.click(
fn=process_model,
inputs=[base_model, lora_model, repo_name, quant_method, hf_token],
)
return app
if __name__ == "__main__":
app = create_ui()
app.queue()
app.launch()