Qwen3-Omni-30B-A3B-Thinking-INT8FP16 / qwen_ultimate_offloading.py
vito95311's picture
Initial release: Qwen3-Omni quantized with smart offloading
5efb267
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Qwen3-Omni 智能GPU/CPU Offloading系統
功能: 使用Transformers accelerate的自動offloading,避免手動設備分配問題
策略: 讓accelerate庫自動處理設備間的數據傳輸
"""
import torch
import gc
import time
import warnings
import traceback
import psutil
from transformers import (
Qwen3OmniMoeForConditionalGeneration,
Qwen3OmniMoeProcessor,
)
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
warnings.filterwarnings("ignore")
class SmartOffloadingRunner:
"""智能Offloading推理運行器"""
def __init__(self, model_path: str = "/var/www/qwen_model_quantized"):
self.model_path = model_path
self.model = None
self.processor = None
self.device = None
self.gpu_available = torch.cuda.is_available()
if self.gpu_available:
self.gpu_props = torch.cuda.get_device_properties(0)
self.total_gpu_memory = self.gpu_props.total_memory / 1024**3
# 設置合理的GPU記憶體限制,預留緩衝
self.max_gpu_memory = min(self.total_gpu_memory * 0.85, 24.0) # 最多24GB
else:
self.max_gpu_memory = 0
def get_optimal_device_map(self):
"""獲取最佳設備映射"""
if not self.gpu_available:
print("🖥️ GPU不可用,使用CPU模式")
return "cpu"
print(f"🔍 GPU: {self.gpu_props.name} ({self.total_gpu_memory:.1f}GB)")
print(f"📊 允許GPU使用: {self.max_gpu_memory:.1f}GB")
# 使用accelerate的自動offloading
device_map = "auto"
return device_map
def load_model_with_smart_offloading(self):
"""使用智能offloading載入模型"""
print("🚀 Qwen3-Omni 智能GPU/CPU Offloading系統")
print("=" * 60)
# 記憶體狀態
cpu_memory = psutil.virtual_memory().available / 1024**3
print(f"💾 可用記憶體: CPU {cpu_memory:.1f}GB", end="")
if self.gpu_available:
print(f", GPU {self.total_gpu_memory:.1f}GB")
else:
print()
print("\n📦 載入processor...")
self.processor = Qwen3OmniMoeProcessor.from_pretrained(
self.model_path,
trust_remote_code=True
)
# 設置tokenizer
if self.processor.tokenizer.pad_token is None:
self.processor.tokenizer.pad_token = self.processor.tokenizer.eos_token
print("🧠 使用智能offloading載入模型...")
start_time = time.time()
# 獲取設備映射
device_map = self.get_optimal_device_map()
# 載入模型
try:
if device_map == "cpu":
# 純CPU模式
self.device = "cpu"
torch.set_num_threads(min(8, psutil.cpu_count()))
self.model = Qwen3OmniMoeForConditionalGeneration.from_pretrained(
self.model_path,
torch_dtype=torch.float32,
device_map="cpu",
trust_remote_code=True,
low_cpu_mem_usage=True,
)
# 處理meta device
has_meta = any(p.device.type == 'meta' for p in self.model.parameters())
if has_meta:
print("⚠️ 處理meta device權重...")
self.model = self.model.to_empty(device="cpu")
print("✅ meta device權重已初始化到CPU")
else:
# GPU+CPU offloading模式
self.device = "cuda:0"
# 設置記憶體限制
max_memory = {0: f"{self.max_gpu_memory}GB", "cpu": "60GB"}
self.model = Qwen3OmniMoeForConditionalGeneration.from_pretrained(
self.model_path,
torch_dtype=torch.float16,
device_map=device_map,
max_memory=max_memory,
trust_remote_code=True,
low_cpu_mem_usage=True,
offload_folder="./offload_cache", # offload到磁碟的臨時文件夾
offload_state_dict=True,
)
self.model.eval()
load_time = time.time() - start_time
print(f"✅ 模型載入完成! 用時: {load_time:.1f}秒")
# 顯示最終記憶體使用
print("📊 記憶體使用狀態:")
print(f" CPU: {psutil.virtual_memory().used / 1024**3:.1f}GB")
if self.gpu_available:
gpu_allocated = torch.cuda.memory_allocated() / 1024**3
print(f" GPU: {gpu_allocated:.1f}GB")
# 顯示設備分配摘要
if hasattr(self.model, 'hf_device_map'):
gpu_layers = sum(1 for dev in self.model.hf_device_map.values() if str(dev).startswith('cuda'))
cpu_layers = sum(1 for dev in self.model.hf_device_map.values() if str(dev) == 'cpu')
print(f"🎯 設備分配: GPU層數={gpu_layers}, CPU層數={cpu_layers}")
return True
except Exception as e:
print(f"❌ 載入失敗: {e}")
print("🔄 回退到CPU模式...")
return self.fallback_to_cpu()
def fallback_to_cpu(self):
"""回退到CPU模式"""
try:
self.device = "cpu"
torch.set_num_threads(6)
# 不使用device_map,避免自動分配問題
self.model = Qwen3OmniMoeForConditionalGeneration.from_pretrained(
self.model_path,
torch_dtype=torch.float32,
trust_remote_code=True,
low_cpu_mem_usage=True,
)
# 處理meta device
has_meta = any(p.device.type == 'meta' for p in self.model.parameters())
if has_meta:
print("⚠️ CPU模式處理meta device...")
self.model = self.model.to_empty(device="cpu")
print("✅ CPU模式載入完成")
else:
# 確保模型在CPU上
self.model = self.model.to("cpu")
print("✅ CPU模式載入完成")
self.model.eval()
return True
except Exception as e:
print(f"❌ CPU模式也失敗: {e}")
traceback.print_exc()
return False
def generate_response(self, prompt: str, max_tokens: int = 128) -> tuple:
"""生成回應"""
start_time = time.time()
# 準備輸入
inputs = self.processor.tokenizer(
prompt,
return_tensors="pt",
max_length=2048,
truncation=True
)
# 確定主設備
main_device = "cuda:0" if (self.gpu_available and hasattr(self.model, 'hf_device_map')) else "cpu"
# 將輸入移到主設備
if main_device == "cuda:0":
inputs = {k: v.to(main_device) for k, v in inputs.items()}
print(f"💭 生成中... (主設備: {main_device})")
# 生成
with torch.no_grad():
outputs = self.model.generate(
input_ids=inputs['input_ids'],
attention_mask=inputs.get('attention_mask'),
max_new_tokens=max_tokens,
do_sample=False, # 使用greedy解碼避免採樣問題
num_beams=1,
pad_token_id=self.processor.tokenizer.eos_token_id,
eos_token_id=self.processor.tokenizer.eos_token_id,
)
# 解碼
response = self.processor.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
).strip()
# 統計
gen_time = time.time() - start_time
new_tokens = outputs.shape[1] - inputs['input_ids'].shape[1]
tokens_per_sec = new_tokens / gen_time if gen_time > 0 else 0
# 清理
del inputs, outputs
if self.gpu_available:
torch.cuda.empty_cache()
gc.collect()
stats = {
'generation_time': gen_time,
'new_tokens': new_tokens,
'tokens_per_second': tokens_per_sec,
'main_device': main_device
}
return response, stats
def run_tests(self):
"""運行測試"""
test_prompts = [
"你好,請用一句話介紹你自己。",
"什麼是人工智能?",
]
print("\n🧪 智能Offloading測試...")
print("-" * 50)
total_tokens = 0
total_time = 0
for i, prompt in enumerate(test_prompts, 1):
print(f"\n📝 測試 {i}/{len(test_prompts)}: {prompt}")
try:
response, stats = self.generate_response(prompt, max_tokens=80)
print(f"⚡ 速度: {stats['tokens_per_second']:.2f} tokens/秒")
print(f"📤 回應: {response}")
total_tokens += stats['new_tokens']
total_time += stats['generation_time']
except Exception as e:
print(f"❌ 測試失敗: {e}")
print("🔍 詳細錯誤:")
traceback.print_exc()
# 性能總結
if total_time > 0:
avg_speed = total_tokens / total_time
print(f"\n📈 Offloading性能總結:")
print(f" 平均速度: {avg_speed:.2f} tokens/秒")
print(f" 總tokens: {total_tokens}")
print(f" 總用時: {total_time:.2f}秒")
# 最終記憶體狀態
print(f" 最終CPU記憶體: {psutil.virtual_memory().used / 1024**3:.1f}GB")
if self.gpu_available:
print(f" 最終GPU記憶體: {torch.cuda.memory_allocated() / 1024**3:.1f}GB")
def cleanup(self):
"""清理資源"""
if self.model is not None:
del self.model
if self.processor is not None:
del self.processor
if self.gpu_available:
torch.cuda.empty_cache()
gc.collect()
# 清理offload緩存
import shutil
import os
if os.path.exists("./offload_cache"):
shutil.rmtree("./offload_cache")
print("🧹 資源清理完成")
def main():
runner = SmartOffloadingRunner()
try:
# 載入模型
success = runner.load_model_with_smart_offloading()
if success:
# 運行測試
runner.run_tests()
print("\n🎉 智能Offloading測試完成!")
print("💡 提示: 使用accelerate自動offloading,GPU+CPU協同工作")
else:
print("💥 載入失敗")
except Exception as e:
print(f"❌ 執行失敗: {e}")
traceback.print_exc()
finally:
runner.cleanup()
if __name__ == "__main__":
main()