|
|
|
import os |
|
import glob |
|
import time |
|
import pandas as pd |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from diffusers import StableDiffusionPipeline |
|
import fitz |
|
import requests |
|
from PIL import Image |
|
import logging |
|
import asyncio |
|
import aiofiles |
|
from io import BytesIO |
|
from dataclasses import dataclass |
|
from typing import Optional |
|
import gradio as gr |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
log_records = [] |
|
|
|
class LogCaptureHandler(logging.Handler): |
|
def emit(self, record): |
|
log_records.append(record) |
|
|
|
logger.addHandler(LogCaptureHandler()) |
|
|
|
@dataclass |
|
class ModelConfig: |
|
name: str |
|
base_model: str |
|
size: str |
|
domain: Optional[str] = None |
|
model_type: str = "causal_lm" |
|
@property |
|
def model_path(self): |
|
return f"models/{self.name}" |
|
|
|
@dataclass |
|
class DiffusionConfig: |
|
name: str |
|
base_model: str |
|
size: str |
|
domain: Optional[str] = None |
|
@property |
|
def model_path(self): |
|
return f"diffusion_models/{self.name}" |
|
|
|
class ModelBuilder: |
|
def __init__(self): |
|
self.config = None |
|
self.model = None |
|
self.tokenizer = None |
|
def load_model(self, model_path: str, config: Optional[ModelConfig] = None): |
|
self.model = AutoModelForCausalLM.from_pretrained(model_path) |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
if self.tokenizer.pad_token is None: |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
if config: |
|
self.config = config |
|
self.model.to("cuda" if torch.cuda.is_available() else "cpu") |
|
return self |
|
def save_model(self, path: str): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.model.save_pretrained(path) |
|
self.tokenizer.save_pretrained(path) |
|
|
|
class DiffusionBuilder: |
|
def __init__(self): |
|
self.config = None |
|
self.pipeline = None |
|
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): |
|
self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") |
|
if config: |
|
self.config = config |
|
return self |
|
def save_model(self, path: str): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.pipeline.save_pretrained(path) |
|
def generate(self, prompt: str): |
|
return self.pipeline(prompt, num_inference_steps=20).images[0] |
|
|
|
def generate_filename(sequence, ext="png"): |
|
timestamp = time.strftime("%d%m%Y%H%M%S") |
|
return f"{sequence}_{timestamp}.{ext}" |
|
|
|
def get_gallery_files(file_types): |
|
return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) |
|
|
|
async def process_image_gen(prompt, output_file, builder): |
|
if builder and isinstance(builder, DiffusionBuilder) and builder.pipeline: |
|
pipeline = builder.pipeline |
|
else: |
|
pipeline = StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") |
|
gen_image = pipeline(prompt, num_inference_steps=20).images[0] |
|
gen_image.save(output_file) |
|
return gen_image |
|
|
|
|
|
def upload_files(files, links_title, links_url, history, selected_files): |
|
uploaded = {"images": [], "videos": [], "documents": [], "datasets": [], "links": []} |
|
if files: |
|
for file in files: |
|
ext = file.name.split('.')[-1].lower() |
|
output_path = f"uploaded_{int(time.time())}_{file.name}" |
|
with open(output_path, "wb") as f: |
|
f.write(file.read()) |
|
if ext in ["jpg", "png"]: |
|
uploaded["images"].append(output_path) |
|
elif ext == "mp4": |
|
uploaded["videos"].append(output_path) |
|
elif ext in ["md", "pdf", "docx"]: |
|
uploaded["documents"].append(output_path) |
|
elif ext in ["csv", "xlsx"]: |
|
uploaded["datasets"].append(output_path) |
|
history.append(f"Uploaded: {output_path}") |
|
selected_files[output_path] = False |
|
if links_title and links_url: |
|
links = list(zip(links_title.split('\n'), links_url.split('\n'))) |
|
for title, url in links: |
|
if title and url: |
|
link_entry = f"[{title}]({url})" |
|
uploaded["links"].append(link_entry) |
|
history.append(f"Added Link: {link_entry}") |
|
selected_files[link_entry] = False |
|
return uploaded, history, selected_files |
|
|
|
def update_galleries(history, selected_files): |
|
galleries = { |
|
"images": get_gallery_files(["jpg", "png"]), |
|
"videos": get_gallery_files(["mp4"]), |
|
"documents": get_gallery_files(["md", "pdf", "docx"]), |
|
"datasets": get_gallery_files(["csv", "xlsx"]), |
|
"links": [f for f in selected_files.keys() if f.startswith('[') and '](' in f and f.endswith(')')] |
|
} |
|
gallery_outputs = { |
|
"images": [(Image.open(f), os.path.basename(f)) for f in galleries["images"][:4]], |
|
"videos": [(f, os.path.basename(f)) for f in galleries["videos"][:4]], |
|
"documents": [(Image.frombytes("RGB", fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).size, fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).samples) if f.endswith('.pdf') else f, os.path.basename(f)) for f in galleries["documents"][:4]], |
|
"datasets": [(f, os.path.basename(f)) for f in galleries["datasets"][:4]], |
|
"links": [(f, f.split(']')[0][1:]) for f in galleries["links"][:4]] |
|
} |
|
history.append(f"Updated galleries: {sum(len(g) for g in galleries.values())} files") |
|
return gallery_outputs, history, selected_files |
|
|
|
def toggle_selection(file_list, selected_files): |
|
for file in file_list: |
|
selected_files[file] = not selected_files.get(file, False) |
|
return selected_files |
|
|
|
def image_gen(prompt, builder, history, selected_files): |
|
selected = [f for f, sel in selected_files.items() if sel and f.endswith(('.jpg', '.png'))] |
|
if not selected: |
|
return "No images selected", None, history, selected_files |
|
output_file = generate_filename("gen_output", "png") |
|
gen_image = asyncio.run(process_image_gen(prompt, output_file, builder)) |
|
history.append(f"Image Gen: {prompt} -> {output_file}") |
|
selected_files[output_file] = True |
|
return f"Image saved to {output_file}", gen_image, history, selected_files |
|
|
|
|
|
with gr.Blocks(title="AI Vision & SFT Titans π") as demo: |
|
gr.Markdown("# AI Vision & SFT Titans π") |
|
history = gr.State(value=[]) |
|
builder = gr.State(value=None) |
|
selected_files = gr.State(value={}) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("## π File Tree") |
|
with gr.Accordion("π³ Uploads", open=True): |
|
with gr.Row(): |
|
gr.Markdown("### πΌοΈ Images (jpg/png)") |
|
img_gallery = gr.Gallery(label="Images", columns=4, height="auto") |
|
with gr.Row(): |
|
gr.Markdown("### π₯ Videos (mp4)") |
|
vid_gallery = gr.Gallery(label="Videos", columns=4, height="auto") |
|
with gr.Row(): |
|
gr.Markdown("### π Docs (md/pdf/docx)") |
|
doc_gallery = gr.Gallery(label="Documents", columns=4, height="auto") |
|
with gr.Row(): |
|
gr.Markdown("### π Data (csv/xlsx)") |
|
data_gallery = gr.Gallery(label="Datasets", columns=4, height="auto") |
|
with gr.Row(): |
|
gr.Markdown("### π Links") |
|
link_gallery = gr.Gallery(label="Links", columns=4, height="auto") |
|
gr.Markdown("## π History") |
|
history_output = gr.Textbox(label="Log", lines=5, interactive=False) |
|
|
|
with gr.Column(scale=3): |
|
with gr.Row(): |
|
gr.Markdown("## π οΈ Toolbar") |
|
upload_btn = gr.Button("π€ Upload") |
|
select_btn = gr.Button("β
Select") |
|
gen_btn = gr.Button("π¨ Generate") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("π€ Smart Upload"): |
|
file_upload = gr.File(label="Upload Files", file_count="multiple", type="binary") |
|
links_title = gr.Textbox(label="Link Titles (one per line)", lines=3) |
|
links_url = gr.Textbox(label="Link URLs (one per line)", lines=3) |
|
upload_status = gr.Textbox(label="Status") |
|
|
|
with gr.TabItem("π Operations"): |
|
prompt = gr.Textbox(label="Image Gen Prompt", value="Generate a neon version") |
|
op_status = gr.Textbox(label="Status") |
|
op_output = gr.Image(label="Output") |
|
|
|
upload_btn.click( |
|
upload_files, |
|
inputs=[file_upload, links_title, links_url, history, selected_files], |
|
outputs=[upload_status, history, selected_files] |
|
).then( |
|
update_galleries, |
|
inputs=[history, selected_files], |
|
outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] |
|
) |
|
|
|
select_btn.click( |
|
toggle_selection, |
|
inputs=[gr.Dropdown(choices=list(selected_files.value.keys()), multiselect=True, label="Select Files"), selected_files], |
|
outputs=[selected_files] |
|
).then( |
|
update_galleries, |
|
inputs=[history, selected_files], |
|
outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] |
|
) |
|
|
|
gen_btn.click( |
|
image_gen, |
|
inputs=[prompt, builder, history, selected_files], |
|
outputs=[op_status, op_output, history, selected_files] |
|
).then( |
|
update_galleries, |
|
inputs=[history, selected_files], |
|
outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] |
|
) |
|
|
|
|
|
demo.load(lambda h: "\n".join(h[-5:]), inputs=[history], outputs=[history_output]) |
|
|
|
demo.launch() |