|
from transformers import ProcessorMixin, AutoProcessor |
|
from transformers.models.auto.processing_auto import AutoProcessor |
|
from transformers.processing_utils import ProcessorMixin |
|
from transformers.tokenization_utils_base import BatchEncoding |
|
import json |
|
import os |
|
|
|
class FlamingoProcessor(ProcessorMixin): |
|
""" |
|
Custom processor that combines a tokenizer and feature extractor. |
|
""" |
|
attributes = ["image_processor", "tokenizer"] |
|
image_processor_class = "AutoImageProcessor" |
|
tokenizer_class = "AutoTokenizer" |
|
|
|
def __init__(self, image_processor, tokenizer): |
|
super().__init__(image_processor, tokenizer) |
|
|
|
def __call__(self, text=None, images=None, **kwargs): |
|
""" |
|
Main processing method that handles both text and images. |
|
|
|
Args: |
|
text: Text input(s) to tokenize |
|
images: Image input(s) to process |
|
**kwargs: Additional arguments passed to tokenizer/image_processor |
|
|
|
Returns: |
|
Dictionary with processed inputs |
|
""" |
|
if text is None and images is None: |
|
raise ValueError("You need to specify either text or images") |
|
|
|
encoding = {} |
|
|
|
|
|
if text is not None: |
|
if type(text) == str: |
|
all_text = "<image> " + text |
|
else: |
|
if type(text[0]) == str: |
|
all_text = ["<image> " + _text for _text in text] |
|
else: |
|
all_text = ['<image> ' + " ".join(_text) for _text in text] |
|
text_encoding = self.tokenizer(all_text, **kwargs) |
|
encoding.update(text_encoding) |
|
|
|
|
|
if images is not None: |
|
image_encoding = self.image_processor(images, **kwargs) |
|
|
|
for key, value in image_encoding.items(): |
|
encoding[f"pixel_values" if key == "pixel_values" else f"image_{key}"] = value |
|
|
|
return BatchEncoding(encoding) |
|
|
|
def batch_decode(self, *args, **kwargs): |
|
""" |
|
Delegate batch decoding to the tokenizer. |
|
""" |
|
return self.tokenizer.batch_decode(*args, **kwargs) |
|
|
|
def decode(self, *args, **kwargs): |
|
""" |
|
Delegate decoding to the tokenizer. |
|
""" |
|
return self.tokenizer.decode(*args, **kwargs) |
|
|
|
|
|
|