Steven10429's picture
init
260542b
raw
history blame
10.3 kB
import os
import torch
import psutil
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
from peft import PeftModel, PeftConfig
from pathlib import Path
from tqdm import tqdm
from huggingface_hub import login, create_repo, HfApi
import subprocess
import math
from dotenv import load_dotenv
import gradio as gr
import threading
import queue
import time
# 创建一个队列用于存储日志消息
log_queue = queue.Queue()
current_logs = []
def log(msg):
"""统一的日志处理函数"""
print(msg)
current_logs.append(msg)
return "\n".join(current_logs)
def get_model_size_in_gb(model_name):
"""估算模型大小(以GB为单位)"""
try:
config = AutoConfig.from_pretrained(model_name)
num_params = config.num_parameters if hasattr(config, 'num_parameters') else None
if num_params is None:
# 手动计算参数量
if hasattr(config, 'num_hidden_layers') and hasattr(config, 'hidden_size'):
# 简单估算,可能不够准确
num_params = config.num_hidden_layers * config.hidden_size * config.hidden_size * 4
if num_params:
# 每个参数占用2字节(float16)
size_in_gb = (num_params * 2) / (1024 ** 3)
return size_in_gb
else:
# 如果无法计算,返回一个保守的估计
return 16 # 默认假设是7B模型
except Exception as e:
log(f"无法估算模型大小: {str(e)}")
return 16 # 默认返回16GB
def check_system_resources(model_name):
"""检查系统资源并决定使用什么设备"""
log("正在检查系统资源...")
# 获取系统内存信息
system_memory = psutil.virtual_memory()
total_memory_gb = system_memory.total / (1024 ** 3)
available_memory_gb = system_memory.available / (1024 ** 3)
log(f"系统总内存: {total_memory_gb:.1f}GB")
log(f"可用内存: {available_memory_gb:.1f}GB")
# 估算模型所需内存
model_size_gb = get_model_size_in_gb(model_name)
required_memory_gb = model_size_gb * 2.5 # 需要额外的内存用于计算
log(f"估计模型需要内存: {required_memory_gb:.1f}GB")
# 检查CUDA是否可用
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3)
log(f"发现GPU: {gpu_name}")
log(f"GPU显存: {gpu_memory_gb:.1f}GB")
if gpu_memory_gb >= required_memory_gb:
log("✅ GPU显存足够,将使用GPU进行转换")
return "cuda", gpu_memory_gb
else:
log(f"⚠️ GPU显存不足 (需要 {required_memory_gb:.1f}GB, 实际 {gpu_memory_gb:.1f}GB)")
else:
log("❌ 未检测到可用的GPU")
# 检查CPU内存是否足够
if available_memory_gb >= required_memory_gb:
log("✅ CPU内存足够,将使用CPU进行转换")
return "cpu", available_memory_gb
else:
raise MemoryError(f"❌ 系统内存不足 (需要 {required_memory_gb:.1f}GB, 可用 {available_memory_gb:.1f}GB)")
def setup_environment(model_name):
"""设置环境并返回设备信息"""
load_dotenv()
hf_token = os.getenv('HF_TOKEN')
if not hf_token:
raise ValueError("请在环境变量中设置HF_TOKEN")
login(hf_token)
# 检查系统资源并决定使用什么设备
device, available_memory = check_system_resources(model_name)
return device
def create_hf_repo(repo_name, private=True):
"""创建HuggingFace仓库"""
try:
repo_url = create_repo(repo_name, private=private)
log(f"创建仓库成功: {repo_url}")
return repo_url
except Exception as e:
log(f"创建仓库失败: {str(e)}")
raise
def download_and_merge_model(base_model_name, lora_model_name, output_dir, device):
log(f"正在加载基础模型: {base_model_name}")
try:
# 先加载原始模型
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map={"": device}
)
# 加载tokenizer
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
log(f"正在加载LoRA模型: {lora_model_name}")
log("基础模型配置:" + str(base_model.config))
# 加载adapter配置
adapter_config = PeftConfig.from_pretrained(lora_model_name)
log("Adapter配置:" + str(adapter_config))
model = PeftModel.from_pretrained(base_model, lora_model_name)
log("正在合并LoRA权重")
model = model.merge_and_unload()
# 创建输出目录
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 保存合并后的模型
log(f"正在保存合并后的模型到: {output_dir}")
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
return output_dir
except Exception as e:
log(f"错误: {str(e)}")
log(f"错误类型: {type(e)}")
import traceback
log("详细错误信息:")
log(traceback.format_exc())
raise
def quantize_and_push_model(model_path, repo_id, bits=8):
"""量化模型并推送到HuggingFace"""
try:
from optimum.bettertransformer import BetterTransformer
from transformers import AutoModelForCausalLM
log(f"正在加载模型用于{bits}位量化...")
model = AutoModelForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 转换为BetterTransformer格式
model = BetterTransformer.transform(model)
# 量化
if bits == 8:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
elif bits == 4:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4"
)
else:
raise ValueError(f"不支持的量化位数: {bits}")
# 保存量化后的模型
quantized_model_path = f"{model_path}_q{bits}"
model.save_pretrained(
quantized_model_path,
quantization_config=quantization_config
)
tokenizer.save_pretrained(quantized_model_path)
# 推送到HuggingFace
log(f"正在将{bits}位量化模型推送到HuggingFace...")
api = HfApi()
api.upload_folder(
folder_path=quantized_model_path,
repo_id=repo_id,
repo_type="model"
)
log(f"{bits}位量化模型上传完成")
except Exception as e:
log(f"量化或上传过程中出错: {str(e)}")
raise
def process_model(base_model, lora_model, repo_name, progress=gr.Progress()):
"""处理模型的主函数,用于Gradio界面"""
try:
# 清空之前的日志
current_logs.clear()
# 设置环境和检查资源
device = setup_environment(base_model)
# 创建HuggingFace仓库
repo_url = create_hf_repo(repo_name)
# 设置输出目录
output_dir = os.path.join(".", "output", repo_name)
progress(0.1, desc="开始模型转换流程...")
# 下载并合并模型
model_path = download_and_merge_model(base_model, lora_model, output_dir, device)
progress(0.4, desc="开始8位量化...")
# 量化并上传模型
quantize_and_push_model(model_path, repo_name, bits=8)
progress(0.7, desc="开始4位量化...")
quantize_and_push_model(model_path, repo_name, bits=4)
final_message = f"全部完成!模型已上传至: https://huggingface.co/{repo_name}"
log(final_message)
progress(1.0, desc="处理完成")
return "\n".join(current_logs)
except Exception as e:
error_message = f"处理过程中出错: {str(e)}"
log(error_message)
return "\n".join(current_logs)
def create_ui():
"""创建Gradio界面"""
with gr.Blocks(title="模型转换工具") as app:
gr.Markdown("""
# 🤗 模型转换与量化工具
这个工具可以帮助你:
1. 合并基础模型和LoRA适配器
2. 创建4位和8位量化版本
3. 自动上传到HuggingFace Hub
""")
with gr.Row():
with gr.Column():
base_model = gr.Textbox(
label="基础模型路径",
placeholder="例如: Qwen/Qwen2.5-7B-Instruct",
value="Qwen/Qwen2.5-7B-Instruct"
)
lora_model = gr.Textbox(
label="LoRA模型路径",
placeholder="输入你的LoRA模型路径"
)
repo_name = gr.Textbox(
label="HuggingFace仓库名称",
placeholder="输入要创建的仓库名称"
)
convert_btn = gr.Button("开始转换", variant="primary")
with gr.Column():
output = gr.TextArea(
label="处理日志",
placeholder="处理日志将在这里显示...",
interactive=False,
autoscroll=True,
lines=20
)
# 设置事件处理
convert_btn.click(
fn=process_model,
inputs=[base_model, lora_model, repo_name],
outputs=output
)
return app
if __name__ == "__main__":
# 创建并启动Gradio界面
app = create_ui()
app.queue()
app.launch()