import os import torch import psutil from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig from peft import PeftModel, PeftConfig from pathlib import Path from tqdm import tqdm from huggingface_hub import login, create_repo, HfApi import subprocess import math from dotenv import load_dotenv import gradio as gr import threading import queue import time # 创建一个队列用于存储日志消息 log_queue = queue.Queue() current_logs = [] def log(msg): """统一的日志处理函数""" print(msg) current_logs.append(msg) return "\n".join(current_logs) def get_model_size_in_gb(model_name): """估算模型大小(以GB为单位)""" try: config = AutoConfig.from_pretrained(model_name) num_params = config.num_parameters if hasattr(config, 'num_parameters') else None if num_params is None: # 手动计算参数量 if hasattr(config, 'num_hidden_layers') and hasattr(config, 'hidden_size'): # 简单估算,可能不够准确 num_params = config.num_hidden_layers * config.hidden_size * config.hidden_size * 4 if num_params: # 每个参数占用2字节(float16) size_in_gb = (num_params * 2) / (1024 ** 3) return size_in_gb else: # 如果无法计算,返回一个保守的估计 return 16 # 默认假设是7B模型 except Exception as e: log(f"无法估算模型大小: {str(e)}") return 16 # 默认返回16GB def check_system_resources(model_name): """检查系统资源并决定使用什么设备""" log("正在检查系统资源...") # 获取系统内存信息 system_memory = psutil.virtual_memory() total_memory_gb = system_memory.total / (1024 ** 3) available_memory_gb = system_memory.available / (1024 ** 3) log(f"系统总内存: {total_memory_gb:.1f}GB") log(f"可用内存: {available_memory_gb:.1f}GB") # 估算模型所需内存 model_size_gb = get_model_size_in_gb(model_name) required_memory_gb = model_size_gb * 2.5 # 需要额外的内存用于计算 log(f"估计模型需要内存: {required_memory_gb:.1f}GB") # 检查CUDA是否可用 if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) log(f"发现GPU: {gpu_name}") log(f"GPU显存: {gpu_memory_gb:.1f}GB") if gpu_memory_gb >= required_memory_gb: log("✅ GPU显存足够,将使用GPU进行转换") return "cuda", gpu_memory_gb else: log(f"⚠️ GPU显存不足 (需要 {required_memory_gb:.1f}GB, 实际 {gpu_memory_gb:.1f}GB)") else: log("❌ 未检测到可用的GPU") # 检查CPU内存是否足够 if available_memory_gb >= required_memory_gb: log("✅ CPU内存足够,将使用CPU进行转换") return "cpu", available_memory_gb else: raise MemoryError(f"❌ 系统内存不足 (需要 {required_memory_gb:.1f}GB, 可用 {available_memory_gb:.1f}GB)") def setup_environment(model_name): """设置环境并返回设备信息""" load_dotenv() hf_token = os.getenv('HF_TOKEN') if not hf_token: raise ValueError("请在环境变量中设置HF_TOKEN") login(hf_token) # 检查系统资源并决定使用什么设备 device, available_memory = check_system_resources(model_name) return device def create_hf_repo(repo_name, private=True): """创建HuggingFace仓库""" try: repo_url = create_repo(repo_name, private=private) log(f"创建仓库成功: {repo_url}") return repo_url except Exception as e: log(f"创建仓库失败: {str(e)}") raise def download_and_merge_model(base_model_name, lora_model_name, output_dir, device): log(f"正在加载基础模型: {base_model_name}") try: # 先加载原始模型 base_model = AutoModelForCausalLM.from_pretrained( base_model_name, torch_dtype=torch.float16, device_map={"": device} ) # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained(base_model_name) log(f"正在加载LoRA模型: {lora_model_name}") log("基础模型配置:" + str(base_model.config)) # 加载adapter配置 adapter_config = PeftConfig.from_pretrained(lora_model_name) log("Adapter配置:" + str(adapter_config)) model = PeftModel.from_pretrained(base_model, lora_model_name) log("正在合并LoRA权重") model = model.merge_and_unload() # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 保存合并后的模型 log(f"正在保存合并后的模型到: {output_dir}") model.save_pretrained(output_dir) tokenizer.save_pretrained(output_dir) return output_dir except Exception as e: log(f"错误: {str(e)}") log(f"错误类型: {type(e)}") import traceback log("详细错误信息:") log(traceback.format_exc()) raise def quantize_and_push_model(model_path, repo_id, bits=8): """量化模型并推送到HuggingFace""" try: from optimum.bettertransformer import BetterTransformer from transformers import AutoModelForCausalLM log(f"正在加载模型用于{bits}位量化...") model = AutoModelForCausalLM.from_pretrained(model_path) tokenizer = AutoTokenizer.from_pretrained(model_path) # 转换为BetterTransformer格式 model = BetterTransformer.transform(model) # 量化 if bits == 8: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_threshold=6.0 ) elif bits == 4: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4" ) else: raise ValueError(f"不支持的量化位数: {bits}") # 保存量化后的模型 quantized_model_path = f"{model_path}_q{bits}" model.save_pretrained( quantized_model_path, quantization_config=quantization_config ) tokenizer.save_pretrained(quantized_model_path) # 推送到HuggingFace log(f"正在将{bits}位量化模型推送到HuggingFace...") api = HfApi() api.upload_folder( folder_path=quantized_model_path, repo_id=repo_id, repo_type="model" ) log(f"{bits}位量化模型上传完成") except Exception as e: log(f"量化或上传过程中出错: {str(e)}") raise def process_model(base_model, lora_model, repo_name, progress=gr.Progress()): """处理模型的主函数,用于Gradio界面""" try: # 清空之前的日志 current_logs.clear() # 设置环境和检查资源 device = setup_environment(base_model) # 创建HuggingFace仓库 repo_url = create_hf_repo(repo_name) # 设置输出目录 output_dir = os.path.join(".", "output", repo_name) progress(0.1, desc="开始模型转换流程...") # 下载并合并模型 model_path = download_and_merge_model(base_model, lora_model, output_dir, device) progress(0.4, desc="开始8位量化...") # 量化并上传模型 quantize_and_push_model(model_path, repo_name, bits=8) progress(0.7, desc="开始4位量化...") quantize_and_push_model(model_path, repo_name, bits=4) final_message = f"全部完成!模型已上传至: https://huggingface.co/{repo_name}" log(final_message) progress(1.0, desc="处理完成") return "\n".join(current_logs) except Exception as e: error_message = f"处理过程中出错: {str(e)}" log(error_message) return "\n".join(current_logs) def create_ui(): """创建Gradio界面""" with gr.Blocks(title="模型转换工具") as app: gr.Markdown(""" # 🤗 模型转换与量化工具 这个工具可以帮助你: 1. 合并基础模型和LoRA适配器 2. 创建4位和8位量化版本 3. 自动上传到HuggingFace Hub """) with gr.Row(): with gr.Column(): base_model = gr.Textbox( label="基础模型路径", placeholder="例如: Qwen/Qwen2.5-7B-Instruct", value="Qwen/Qwen2.5-7B-Instruct" ) lora_model = gr.Textbox( label="LoRA模型路径", placeholder="输入你的LoRA模型路径" ) repo_name = gr.Textbox( label="HuggingFace仓库名称", placeholder="输入要创建的仓库名称" ) convert_btn = gr.Button("开始转换", variant="primary") with gr.Column(): output = gr.TextArea( label="处理日志", placeholder="处理日志将在这里显示...", interactive=False, autoscroll=True, lines=20 ) # 设置事件处理 convert_btn.click( fn=process_model, inputs=[base_model, lora_model, repo_name], outputs=output ) return app if __name__ == "__main__": # 创建并启动Gradio界面 app = create_ui() app.queue() app.launch()