"""Image processor class for KimiVL.""" import math import numpy as np from PIL import Image from typing import Optional, Union import torch from torchvision.transforms import functional as TF from transformers.image_utils import ImageInput, make_list_of_images, valid_images from transformers.image_processing_utils import BaseImageProcessor, BatchFeature from transformers.utils import TensorType OPENAI_DATASET_MEAN = (0.48145466, 0.4578275, 0.40821073) OPENAI_DATASET_STD = (0.26862954, 0.26130258, 0.27577711) class KimiVLImageProcessor(BaseImageProcessor): model_type = "kimi_vl" def __init__( self, patch_size: int = 14, pad_input: bool = False, image_mean: tuple[float, float, float] = OPENAI_DATASET_MEAN, image_std: tuple[float, float, float] = OPENAI_DATASET_STD, in_token_limit: int = 4096, merge_kernel_size: list[int, int] = [2, 2], **kwargs, ): super().__init__(**kwargs) self.in_token_limit = in_token_limit self.patch_size = patch_size self.pad_input = pad_input self.image_mean = image_mean self.image_std = image_std self.merge_kernel_size = merge_kernel_size def rescale( self, image: Image.Image, merge_kernel_size: list[int, int] = [2, 2] ) -> Image.Image: w, h = image.size patch_size = self.patch_size if (w // patch_size) * (h // patch_size) > self.in_token_limit: scale = math.sqrt(self.in_token_limit / ((w // patch_size) * (h // patch_size))) new_w, new_h = int(w * scale), int(h * scale) image = image.resize((new_w, new_h), Image.Resampling.BICUBIC) if self.pad_input: new_w, new_h = image.size pad_size_h = merge_kernel_size[0] * patch_size pad_size_w = merge_kernel_size[1] * patch_size pad_h = (pad_size_h - new_h % pad_size_h) % pad_size_h pad_w = (pad_size_w - new_w % pad_size_w) % pad_size_w image = TF.pad(image, (0, 0, pad_w, pad_h)) else: new_w, new_h = image.size new_w = new_w - new_w % patch_size new_h = new_h - new_h % patch_size image = TF.center_crop(image, (new_h, new_w)) w, h = image.size if w // patch_size >= 512 or h // patch_size >= 512: raise ValueError("Exceed pos emb") return image def to_tensor(self, image: Image.Image) -> torch.Tensor: return TF.to_tensor(image.convert("RGB")) def normalize(self, image: torch.Tensor) -> torch.Tensor: return TF.normalize(image, self.image_mean, self.image_std) def patchify(self, image: torch.Tensor) -> tuple[torch.Tensor, list[int, int]]: patch_size = self.patch_size C, H, W = image.shape patches = image.reshape(C, H // patch_size, patch_size, W // patch_size, patch_size) patches = patches.permute(1, 3, 0, 2, 4) patches = patches.contiguous().view(-1, C, patch_size, patch_size) grid_hw = (H // patch_size, W // patch_size) return patches, grid_hw def _preprocess(self, image: ImageInput) -> tuple[torch.Tensor, list[int, int]]: """ Preprocess image and patchify it. Args: image (`ImageInput`): Image to preprocess. Expects pixel values ranging from 0 to 255. If pixel values range from 0 to 1, set `do_rescale=False`. Returns: patches: torch.Tensor grid_hw: list[int, int] """ image = self.rescale(image, self.merge_kernel_size) image = self.to_tensor(image) image = self.normalize(image) patches, grid_hw = self.patchify(image) return patches, grid_hw def preprocess( self, images: ImageInput, return_tensors: Optional[Union[str, TensorType]] = None, ) -> BatchFeature: images = make_list_of_images(images) if not valid_images(images): raise ValueError( "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " "torch.Tensor, tf.Tensor or jax.ndarray." ) pixel_values, image_grid_hws = [], [] for image in images: patches, image_grid_hw = self._preprocess(image) pixel_values.append(patches) image_grid_hws.append(image_grid_hw) pixel_values = torch.concat(pixel_values, dim=0) image_grid_hws = np.array(image_grid_hws) data = {"pixel_values": pixel_values, "image_grid_hws": image_grid_hws} return BatchFeature(data=data, tensor_type=return_tensors)