#!/usr/bin/env python3 import os import glob import time import pandas as pd import torch from transformers import AutoModelForCausalLM, AutoTokenizer from diffusers import StableDiffusionPipeline import fitz import requests from PIL import Image import logging import asyncio import aiofiles from io import BytesIO from dataclasses import dataclass from typing import Optional import gradio as gr logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str domain: Optional[str] = None @property def model_path(self): return f"diffusion_models/{self.name}" class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: Optional[ModelConfig] = None): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config self.model.to("cuda" if torch.cuda.is_available() else "cpu") return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") if config: self.config = config return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] def generate_filename(sequence, ext="png"): timestamp = time.strftime("%d%m%Y%H%M%S") return f"{sequence}_{timestamp}.{ext}" def get_gallery_files(file_types): return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) # Deduplicate files async def process_image_gen(prompt, output_file, builder): if builder and isinstance(builder, DiffusionBuilder) and builder.pipeline: pipeline = builder.pipeline else: pipeline = StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") gen_image = pipeline(prompt, num_inference_steps=20).images[0] gen_image.save(output_file) return gen_image # Smart Uploader Functions def upload_files(files, links_title, links_url, history, selected_files): uploaded = {"images": [], "videos": [], "documents": [], "datasets": [], "links": []} if files: for file in files: ext = file.name.split('.')[-1].lower() output_path = f"uploaded_{int(time.time())}_{file.name}" with open(output_path, "wb") as f: f.write(file.read()) if ext in ["jpg", "png"]: uploaded["images"].append(output_path) elif ext == "mp4": uploaded["videos"].append(output_path) elif ext in ["md", "pdf", "docx"]: uploaded["documents"].append(output_path) elif ext in ["csv", "xlsx"]: uploaded["datasets"].append(output_path) history.append(f"Uploaded: {output_path}") selected_files[output_path] = False # Default unchecked if links_title and links_url: links = list(zip(links_title.split('\n'), links_url.split('\n'))) for title, url in links: if title and url: link_entry = f"[{title}]({url})" uploaded["links"].append(link_entry) history.append(f"Added Link: {link_entry}") selected_files[link_entry] = False return uploaded, history, selected_files def update_galleries(history, selected_files): galleries = { "images": get_gallery_files(["jpg", "png"]), "videos": get_gallery_files(["mp4"]), "documents": get_gallery_files(["md", "pdf", "docx"]), "datasets": get_gallery_files(["csv", "xlsx"]), "links": [f for f in selected_files.keys() if f.startswith('[') and '](' in f and f.endswith(')')] } gallery_outputs = { "images": [(Image.open(f), os.path.basename(f)) for f in galleries["images"][:4]], "videos": [(f, os.path.basename(f)) for f in galleries["videos"][:4]], # Video preview as file path "documents": [(Image.frombytes("RGB", fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).size, fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).samples) if f.endswith('.pdf') else f, os.path.basename(f)) for f in galleries["documents"][:4]], "datasets": [(f, os.path.basename(f)) for f in galleries["datasets"][:4]], # Text preview "links": [(f, f.split(']')[0][1:]) for f in galleries["links"][:4]] } history.append(f"Updated galleries: {sum(len(g) for g in galleries.values())} files") return gallery_outputs, history, selected_files def toggle_selection(file_list, selected_files): for file in file_list: selected_files[file] = not selected_files.get(file, False) return selected_files def image_gen(prompt, builder, history, selected_files): selected = [f for f, sel in selected_files.items() if sel and f.endswith(('.jpg', '.png'))] if not selected: return "No images selected", None, history, selected_files output_file = generate_filename("gen_output", "png") gen_image = asyncio.run(process_image_gen(prompt, output_file, builder)) history.append(f"Image Gen: {prompt} -> {output_file}") selected_files[output_file] = True return f"Image saved to {output_file}", gen_image, history, selected_files # Gradio UI with gr.Blocks(title="AI Vision & SFT Titans 🚀") as demo: gr.Markdown("# AI Vision & SFT Titans 🚀") history = gr.State(value=[]) builder = gr.State(value=None) selected_files = gr.State(value={}) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 📁 File Tree") with gr.Accordion("🌳 Uploads", open=True): with gr.Row(): gr.Markdown("### 🖼️ Images (jpg/png)") img_gallery = gr.Gallery(label="Images", columns=4, height="auto") with gr.Row(): gr.Markdown("### 🎥 Videos (mp4)") vid_gallery = gr.Gallery(label="Videos", columns=4, height="auto") with gr.Row(): gr.Markdown("### 📜 Docs (md/pdf/docx)") doc_gallery = gr.Gallery(label="Documents", columns=4, height="auto") with gr.Row(): gr.Markdown("### 📊 Data (csv/xlsx)") data_gallery = gr.Gallery(label="Datasets", columns=4, height="auto") with gr.Row(): gr.Markdown("### 🔗 Links") link_gallery = gr.Gallery(label="Links", columns=4, height="auto") gr.Markdown("## 📜 History") history_output = gr.Textbox(label="Log", lines=5, interactive=False) with gr.Column(scale=3): with gr.Row(): gr.Markdown("## 🛠️ Toolbar") upload_btn = gr.Button("📤 Upload") select_btn = gr.Button("✅ Select") gen_btn = gr.Button("🎨 Generate") with gr.Tabs(): with gr.TabItem("📤 Smart Upload"): file_upload = gr.File(label="Upload Files", file_count="multiple", type="binary") links_title = gr.Textbox(label="Link Titles (one per line)", lines=3) links_url = gr.Textbox(label="Link URLs (one per line)", lines=3) upload_status = gr.Textbox(label="Status") with gr.TabItem("🔍 Operations"): prompt = gr.Textbox(label="Image Gen Prompt", value="Generate a neon version") op_status = gr.Textbox(label="Status") op_output = gr.Image(label="Output") upload_btn.click( upload_files, inputs=[file_upload, links_title, links_url, history, selected_files], outputs=[upload_status, history, selected_files] ).then( update_galleries, inputs=[history, selected_files], outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] ) select_btn.click( toggle_selection, inputs=[gr.Dropdown(choices=list(selected_files.value.keys()), multiselect=True, label="Select Files"), selected_files], outputs=[selected_files] ).then( update_galleries, inputs=[history, selected_files], outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] ) gen_btn.click( image_gen, inputs=[prompt, builder, history, selected_files], outputs=[op_status, op_output, history, selected_files] ).then( update_galleries, inputs=[history, selected_files], outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files] ) # Update history output demo.load(lambda h: "\n".join(h[-5:]), inputs=[history], outputs=[history_output]) demo.launch()