from __future__ import annotations from dataclasses import dataclass from typing import List, Protocol from PIL import Image from .types import ( GroundedEvidence, ReasoningStep, evidences_to_serializable, steps_to_serializable, ) class SupportsQwenClient(Protocol): """Protocol describing the methods required from a Qwen3-VL client.""" def structured_reasoning(self, image: Image.Image, question: str, max_steps: int) -> List[ReasoningStep]: ... def extract_step_evidence( self, image: Image.Image, question: str, step: ReasoningStep, max_regions: int, ) -> List[GroundedEvidence]: ... def synthesize_answer( self, image: Image.Image, question: str, steps: List[ReasoningStep], evidences: List[GroundedEvidence], ) -> str: ... @dataclass(frozen=True) class PipelineResult: """Aggregated output of the CoRGI pipeline.""" question: str steps: List[ReasoningStep] evidence: List[GroundedEvidence] answer: str def to_json(self) -> dict: return { "question": self.question, "steps": steps_to_serializable(self.steps), "evidence": evidences_to_serializable(self.evidence), "answer": self.answer, } class CoRGIPipeline: """Orchestrates the CoRGI reasoning pipeline using a Qwen3-VL client.""" def __init__(self, vlm_client: SupportsQwenClient): if vlm_client is None: raise ValueError("A Qwen3-VL client instance must be provided.") self._vlm = vlm_client def run( self, image: Image.Image, question: str, max_steps: int = 4, max_regions: int = 4, ) -> PipelineResult: steps = self._vlm.structured_reasoning(image=image, question=question, max_steps=max_steps) evidences: List[GroundedEvidence] = [] for step in steps: if not step.needs_vision: continue step_evs = self._vlm.extract_step_evidence( image=image, question=question, step=step, max_regions=max_regions, ) if not step_evs: continue evidences.extend(step_evs[:max_regions]) answer = self._vlm.synthesize_answer(image=image, question=question, steps=steps, evidences=evidences) return PipelineResult(question=question, steps=steps, evidence=evidences, answer=answer) __all__ = ["CoRGIPipeline", "PipelineResult"]