corgi-qwen3-vl-demo / corgi /gradio_app.py
dung-vpt-uney
Deploy latest CoRGI Gradio demo
9c4a163
raw
history blame
20.9 kB
from __future__ import annotations
import logging
import itertools
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
try:
import spaces # type: ignore
except ImportError: # pragma: no cover - spaces library only on HF Spaces
spaces = None # type: ignore
from PIL import Image, ImageDraw
from .cli import DEFAULT_MODEL_ID
from .pipeline import CoRGIPipeline, PipelineResult
from .qwen_client import Qwen3VLClient, QwenGenerationConfig
from .types import GroundedEvidence, PromptLog
@dataclass
class PipelineState:
model_id: str
pipeline: Optional[CoRGIPipeline]
_PIPELINE_CACHE: dict[str, CoRGIPipeline] = {}
_GLOBAL_FACTORY: Callable[[Optional[str]], CoRGIPipeline] | None = None
logger = logging.getLogger("corgi.gradio_app")
MAX_UI_STEPS = 6
GALLERY_MAX_DIM = 768
EVIDENCE_COLORS: Tuple[Tuple[int, int, int], ...] = (
(244, 67, 54), # red
(255, 193, 7), # amber
(76, 175, 80), # green
(33, 150, 243), # blue
(156, 39, 176), # purple
(255, 87, 34), # deep orange
)
try:
_THUMBNAIL_RESAMPLE = Image.Resampling.LANCZOS # type: ignore[attr-defined]
except AttributeError: # pragma: no cover - Pillow < 9.1
_THUMBNAIL_RESAMPLE = Image.LANCZOS # type: ignore
def _default_factory(model_id: Optional[str]) -> CoRGIPipeline:
config = QwenGenerationConfig(model_id=model_id or DEFAULT_MODEL_ID)
return CoRGIPipeline(vlm_client=Qwen3VLClient(config=config))
def _warm_default_pipeline() -> None:
if DEFAULT_MODEL_ID in _PIPELINE_CACHE:
return
try:
logger.info("Preloading default pipeline for model_id=%s", DEFAULT_MODEL_ID)
_PIPELINE_CACHE[DEFAULT_MODEL_ID] = _default_factory(DEFAULT_MODEL_ID)
except Exception as exc: # pragma: no cover - defensive
logger.exception("Failed to preload default model %s: %s", DEFAULT_MODEL_ID, exc)
_GLOBAL_FACTORY = _default_factory # type: ignore[assignment]
_warm_default_pipeline()
def _get_pipeline(model_id: str, factory: Callable[[Optional[str]], CoRGIPipeline]) -> CoRGIPipeline:
pipeline = _PIPELINE_CACHE.get(model_id)
if pipeline is None:
logger.info("Creating new pipeline for model_id=%s", model_id)
pipeline = factory(model_id)
_PIPELINE_CACHE[model_id] = pipeline
else:
logger.debug("Reusing cached pipeline for model_id=%s", model_id)
return pipeline
# @spaces.GPU(duration=120)
def _execute_pipeline(
image: Image.Image,
question: str,
max_steps: int,
max_regions: int,
model_id: str,
) -> PipelineResult:
factory = _GLOBAL_FACTORY or _default_factory
pipeline = _get_pipeline(model_id, factory)
logger.info(
"Executing pipeline for model_id=%s | max_steps=%s | max_regions=%s",
model_id,
max_steps,
max_regions,
)
return pipeline.run(
image=image,
question=question,
max_steps=max_steps,
max_regions=max_regions,
)
def _group_evidence_by_step(evidences: List[GroundedEvidence]) -> Dict[int, List[GroundedEvidence]]:
grouped: Dict[int, List[GroundedEvidence]] = {}
for ev in evidences:
grouped.setdefault(ev.step_index, []).append(ev)
return grouped
def _format_evidence_caption(evidence: GroundedEvidence) -> str:
bbox_str = ", ".join(f"{coord:.2f}" for coord in evidence.bbox)
parts = [f"Step {evidence.step_index}"]
if evidence.description:
parts.append(evidence.description)
if evidence.confidence is not None:
parts.append(f"Confidence: {evidence.confidence:.2f}")
parts.append(f"BBox: ({bbox_str})")
return "\n".join(parts)
def _annotate_evidence_image(
image: Image.Image,
evidence: GroundedEvidence,
color: Tuple[int, int, int],
) -> Image.Image:
base = image.copy().convert("RGBA")
overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
width, height = base.size
x1 = max(0, min(int(evidence.bbox[0] * width), width - 1))
y1 = max(0, min(int(evidence.bbox[1] * height), height - 1))
x2 = max(0, min(int(evidence.bbox[2] * width), width - 1))
y2 = max(0, min(int(evidence.bbox[3] * height), height - 1))
x1, x2 = sorted((x1, x2))
y1, y2 = sorted((y1, y2))
outline_width = max(2, int(min(width, height) * 0.005))
rgba_color = color + (255,)
fill_color = color + (64,)
draw.rectangle([x1, y1, x2, y2], fill=fill_color, outline=rgba_color, width=outline_width)
annotated = Image.alpha_composite(base, overlay).convert("RGB")
if max(annotated.size) > GALLERY_MAX_DIM:
annotated.thumbnail((GALLERY_MAX_DIM, GALLERY_MAX_DIM), _THUMBNAIL_RESAMPLE)
return annotated
def _empty_ui_payload(message: str) -> Dict[str, object]:
placeholder_prompt = f"```text\n{message}\n```"
return {
"answer_markdown": f"### Final Answer\n{message}",
"chain_markdown": message,
"chain_prompt": placeholder_prompt,
"roi_overview": None,
"roi_gallery": [],
"roi_prompt": placeholder_prompt,
"evidence_markdown": message,
"evidence_prompt": placeholder_prompt,
"answer_process_markdown": message,
"answer_prompt": placeholder_prompt,
"timing_markdown": message,
}
def _annotate_overview_image(image: Image.Image, evidences: List[GroundedEvidence]) -> Optional[Image.Image]:
if not evidences:
return None
base = image.copy().convert("RGBA")
overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
width, height = base.size
step_colors: Dict[int, Tuple[int, int, int]] = {}
color_cycle = itertools.cycle(EVIDENCE_COLORS)
for ev in evidences:
color = step_colors.setdefault(ev.step_index, next(color_cycle))
x1 = max(0, min(int(ev.bbox[0] * width), width - 1))
y1 = max(0, min(int(ev.bbox[1] * height), height - 1))
x2 = max(0, min(int(ev.bbox[2] * width), width - 1))
y2 = max(0, min(int(ev.bbox[3] * height), height - 1))
x1, x2 = sorted((x1, x2))
y1, y2 = sorted((y1, y2))
outline_width = max(2, int(min(width, height) * 0.005))
rgba_color = color + (255,)
fill_color = color + (60,)
draw.rectangle([x1, y1, x2, y2], outline=rgba_color, width=outline_width)
label = f"S{ev.step_index}"
draw.text((x1 + 4, y1 + 4), label, fill=rgba_color)
annotated = Image.alpha_composite(base, overlay).convert("RGB")
if max(annotated.size) > GALLERY_MAX_DIM:
annotated.thumbnail((GALLERY_MAX_DIM, GALLERY_MAX_DIM), _THUMBNAIL_RESAMPLE)
return annotated
def _format_prompt_markdown(log: Optional[PromptLog], title: str) -> str:
if log is None:
return f"**{title} Prompt**\n_Prompt unavailable._"
lines = [f"**{title} Prompt**", "```text", log.prompt, "```"]
if log.response:
lines.extend(["**Model Response**", "```text", log.response, "```"])
return "\n".join(lines)
def _format_grounding_prompts(logs: List[PromptLog]) -> str:
if not logs:
return "_No ROI prompts available._"
blocks: List[str] = []
for log in logs:
heading = f"#### Step {log.step_index}" if log.step_index is not None else "#### ROI Prompt"
sections = [heading, "**Prompt**", "```text", log.prompt, "```"]
if log.response:
sections.extend(["**Model Response**", "```text", log.response, "```"])
blocks.append("\n".join(sections))
return "\n\n".join(blocks)
def _prepare_ui_payload(
image: Image.Image,
result: PipelineResult,
max_slots: int = MAX_UI_STEPS,
) -> Dict[str, object]:
answer_text = f"### Final Answer\n{result.answer or '(no answer returned)'}"
step_lines: List[str] = []
evidences_by_step = _group_evidence_by_step(result.evidence)
for step in result.steps[:max_slots]:
lines = [
f"**Step {step.index}:** {step.statement}",
f"- Needs vision: {'yes' if step.needs_vision else 'no'}",
]
if step.reason:
lines.append(f"- Reason: {step.reason}")
evs = evidences_by_step.get(step.index, [])
if evs:
lines.append(f"- Visual evidence items: {len(evs)}")
else:
lines.append("- No visual evidence returned for this step.")
step_lines.append("\n".join(lines))
if len(result.steps) > max_slots:
step_lines.append(f"_Only the first {max_slots} steps are shown._")
chain_markdown = "\n\n".join(step_lines) if step_lines else "_No reasoning steps returned._"
roi_overview = _annotate_overview_image(image, result.evidence)
aggregated_gallery: List[Tuple[Image.Image, str]] = []
for idx, evidence in enumerate(result.evidence):
color = EVIDENCE_COLORS[idx % len(EVIDENCE_COLORS)]
annotated = _annotate_evidence_image(image, evidence, color)
aggregated_gallery.append((annotated, _format_evidence_caption(evidence)))
evidence_blocks: List[str] = []
for idx, evidence in enumerate(result.evidence, start=1):
bbox = ", ".join(f"{coord:.2f}" for coord in evidence.bbox)
desc = evidence.description or "(no description)"
conf = f"Confidence: {evidence.confidence:.2f}" if evidence.confidence is not None else "Confidence: n/a"
evidence_blocks.append(
f"**Evidence {idx} — Step {evidence.step_index}**\n- {desc}\n- {conf}\n- BBox: ({bbox})"
)
evidence_markdown = "\n\n".join(evidence_blocks) if evidence_blocks else "_No visual evidence collected._"
reasoning_prompt_md = _format_prompt_markdown(result.reasoning_log, "Reasoning")
roi_prompt_md = _format_grounding_prompts(result.grounding_logs)
evidence_prompt_md = roi_prompt_md if result.grounding_logs else "_No ROI prompts available._"
answer_prompt_md = _format_prompt_markdown(result.answer_log, "Answer Synthesis")
answer_process_lines = [
f"**Question:** {result.question}",
f"**Final Answer:** {result.answer or '(no answer returned)'}",
f"**Steps considered:** {len(result.steps)}",
f"**Visual evidence items:** {len(result.evidence)}",
]
answer_process_markdown = "\n".join(answer_process_lines)
timing_lines: List[str] = []
if result.timings:
total_entry = next((t for t in result.timings if t.name == "total_pipeline"), None)
if total_entry:
timing_lines.append(f"**Total pipeline:** {total_entry.duration_ms/1000:.2f} s")
for timing in result.timings:
if timing.name == "total_pipeline":
continue
label = timing.name.replace("_", " ")
if timing.step_index is not None:
label += f" (step {timing.step_index})"
timing_lines.append(f"- {label}: {timing.duration_ms/1000:.2f} s")
timing_markdown = "\n".join(timing_lines) if timing_lines else "_No timing data available._"
return {
"answer_markdown": answer_text,
"chain_markdown": chain_markdown,
"chain_prompt": reasoning_prompt_md,
"roi_overview": roi_overview,
"roi_gallery": aggregated_gallery,
"roi_prompt": roi_prompt_md,
"evidence_markdown": evidence_markdown,
"evidence_prompt": evidence_prompt_md,
"answer_process_markdown": answer_process_markdown,
"answer_prompt": answer_prompt_md,
"timing_markdown": timing_markdown,
}
if spaces is not None:
@spaces.GPU(duration=120) # type: ignore[attr-defined]
def _execute_pipeline_gpu(
image: Image.Image,
question: str,
max_steps: int,
max_regions: int,
model_id: str,
) -> PipelineResult:
logger.debug("Running GPU-decorated pipeline.")
return _execute_pipeline(image, question, max_steps, max_regions, model_id)
else:
def _execute_pipeline_gpu(
image: Image.Image,
question: str,
max_steps: int,
max_regions: int,
model_id: str,
) -> PipelineResult:
return _execute_pipeline(image, question, max_steps, max_regions, model_id)
def ensure_pipeline_state(
previous: Optional[PipelineState],
model_id: Optional[str],
factory: Callable[[Optional[str]], CoRGIPipeline] | None = None,
) -> PipelineState:
target_model = model_id or DEFAULT_MODEL_ID
factory = factory or _default_factory
if previous is not None and previous.model_id == target_model:
return previous
pipeline = factory(target_model)
return PipelineState(model_id=target_model, pipeline=pipeline)
def format_result_markdown(result: PipelineResult) -> str:
lines: list[str] = []
lines.append("### Answer")
lines.append(result.answer or "(no answer returned)")
lines.append("")
lines.append("### Reasoning Steps")
if result.steps:
for step in result.steps:
needs = "yes" if step.needs_vision else "no"
reason = f" — {step.reason}" if step.reason else ""
lines.append(f"- **Step {step.index}**: {step.statement} _(needs vision: {needs})_{reason}")
else:
lines.append("- No reasoning steps returned.")
lines.append("")
lines.append("### Visual Evidence")
if result.evidence:
for ev in result.evidence:
bbox = ", ".join(f"{coord:.2f}" for coord in ev.bbox)
desc = ev.description or "(no description)"
conf = f" — confidence {ev.confidence:.2f}" if ev.confidence is not None else ""
lines.append(f"- Step {ev.step_index}: bbox=({bbox}) — {desc}{conf}")
else:
lines.append("- No visual evidence collected.")
return "\n".join(lines)
def _run_pipeline(
state: Optional[PipelineState],
image: Image.Image | None,
question: str,
max_steps: int,
max_regions: int,
model_id: Optional[str],
) -> tuple[PipelineState, Dict[str, object]]:
target_model = (model_id or DEFAULT_MODEL_ID).strip() or DEFAULT_MODEL_ID
cached_pipeline = _PIPELINE_CACHE.get(target_model)
base_state = state or PipelineState(model_id=target_model, pipeline=cached_pipeline)
if image is None:
logger.info("Request skipped: no image provided.")
return base_state, _empty_ui_payload("Please provide an image before running the demo.")
if not question.strip():
logger.info("Request skipped: question empty.")
return base_state, _empty_ui_payload("Please enter a question before running the demo.")
logger.info("Received request for model_id=%s", target_model)
rgb_image = image.convert("RGB")
try:
result = _execute_pipeline_gpu(
image=rgb_image,
question=question.strip(),
max_steps=int(max_steps),
max_regions=int(max_regions),
model_id=target_model,
)
except Exception as exc: # pragma: no cover - defensive error handling
logger.exception("Pipeline execution failed: %s", exc)
return PipelineState(model_id=target_model, pipeline=_PIPELINE_CACHE.get(target_model)), _empty_ui_payload(
f"Pipeline error: {exc}"
)
new_state = PipelineState(model_id=target_model, pipeline=_PIPELINE_CACHE.get(target_model))
payload = _prepare_ui_payload(rgb_image, result, MAX_UI_STEPS)
return new_state, payload
def build_demo(
pipeline_factory: Callable[[Optional[str]], CoRGIPipeline] | None = None,
) -> "gradio.Blocks":
try:
import gradio as gr
except ImportError as exc: # pragma: no cover - exercised when gradio missing
raise RuntimeError("Gradio is required to build the demo. Install gradio>=4.0.") from exc
factory = pipeline_factory or _default_factory
global _GLOBAL_FACTORY
_GLOBAL_FACTORY = factory
logger.info("Registering pipeline factory %s", factory)
try:
logger.info("Preloading pipeline with factory for model_id=%s", DEFAULT_MODEL_ID)
_PIPELINE_CACHE[DEFAULT_MODEL_ID] = factory(DEFAULT_MODEL_ID)
except Exception as exc: # pragma: no cover - defensive
logger.exception("Unable to preload pipeline via factory: %s", exc)
with gr.Blocks(title="CoRGI Qwen3-VL Demo") as demo:
state = gr.State() # stores PipelineState
with gr.Row():
with gr.Column(scale=1, min_width=320):
image_input = gr.Image(label="Input image", type="pil")
question_input = gr.Textbox(label="Question", placeholder="What is happening in the image?", lines=2)
model_id_input = gr.Textbox(
label="Model ID",
value=DEFAULT_MODEL_ID,
placeholder="Leave blank to use default",
)
max_steps_slider = gr.Slider(
label="Max reasoning steps",
minimum=1,
maximum=6,
step=1,
value=3,
)
max_regions_slider = gr.Slider(
label="Max regions per step",
minimum=1,
maximum=6,
step=1,
value=3,
)
run_button = gr.Button("Run CoRGI")
with gr.Column(scale=1, min_width=320):
answer_markdown = gr.Markdown(value="### Final Answer\nUpload an image and ask a question to begin.")
with gr.Tabs():
with gr.Tab("Chain of Thought"):
chain_markdown = gr.Markdown("_No reasoning steps yet._")
chain_prompt = gr.Markdown("```text\nAwaiting prompt...\n```")
with gr.Tab("ROI Extraction"):
roi_overview_image = gr.Image(label="Annotated image", value=None)
roi_gallery = gr.Gallery(
label="Evidence gallery",
columns=2,
height=280,
allow_preview=True,
)
roi_prompt_markdown = gr.Markdown("```text\nAwaiting ROI prompts...\n```")
with gr.Tab("Evidence Descriptions"):
evidence_markdown = gr.Markdown("_No visual evidence collected._")
evidence_prompt_markdown = gr.Markdown("```text\nAwaiting ROI prompts...\n```")
with gr.Tab("Answer Synthesis"):
answer_process_markdown = gr.Markdown("_No answer generated yet._")
answer_prompt_markdown = gr.Markdown("```text\nAwaiting answer prompt...\n```")
with gr.Tab("Performance"):
timing_markdown = gr.Markdown("_No timing data available._")
def _on_submit(state_data, image, question, model_id, max_steps, max_regions):
pipeline_state = state_data if isinstance(state_data, PipelineState) else None
new_state, payload = _run_pipeline(
pipeline_state,
image,
question,
int(max_steps),
int(max_regions),
model_id if model_id else None,
)
return [
new_state,
payload["answer_markdown"],
payload["chain_markdown"],
payload["chain_prompt"],
payload["roi_overview"],
payload["roi_gallery"],
payload["roi_prompt"],
payload["evidence_markdown"],
payload["evidence_prompt"],
payload["answer_process_markdown"],
payload["answer_prompt"],
payload["timing_markdown"],
]
output_components = [
state,
answer_markdown,
chain_markdown,
chain_prompt,
roi_overview_image,
roi_gallery,
roi_prompt_markdown,
evidence_markdown,
evidence_prompt_markdown,
answer_process_markdown,
answer_prompt_markdown,
timing_markdown,
]
run_button.click(
fn=_on_submit,
inputs=[state, image_input, question_input, model_id_input, max_steps_slider, max_regions_slider],
outputs=output_components,
)
return demo
def launch_demo(
*,
pipeline_factory: Callable[[Optional[str]], CoRGIPipeline] | None = None,
**launch_kwargs,
) -> None:
demo = build_demo(pipeline_factory=pipeline_factory)
demo.launch(**launch_kwargs)
__all__ = [
"PipelineState",
"ensure_pipeline_state",
"format_result_markdown",
"build_demo",
"launch_demo",
"DEFAULT_MODEL_ID",
]