# coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processor class for Aero. """ import math import os from typing import List, Optional, Union import numpy as np from transformers.feature_extraction_utils import BatchFeature from transformers.image_utils import ImageInput from transformers.video_utils import VideoInput from transformers.models.auto import AutoFeatureExtractor from transformers.processing_utils import ProcessingKwargs, ProcessorMixin, Unpack from transformers.tokenization_utils_base import PreTokenizedInput, TextInput from transformers.utils import logging logger = logging.get_logger(__name__) class AeroProcessorKwargs(ProcessingKwargs, total=False): _defaults = { "text_kwargs": { "padding": False, }, "audio_kwargs": { }, } class AeroProcessor(ProcessorMixin): attributes = ["tokenizer", "audio_processor"] valid_kwargs = [ "chat_template", "audio_token", ] tokenizer_class = "AutoTokenizer" audio_processor_class = "AutoFeatureExtractor" def __init__( self, tokenizer=None, audio_processor=None, chat_template=None, audio_token="<|AUDIO|>", **kwargs, ): self.audio_token = ( tokenizer.audio_token if hasattr(tokenizer, "audio_token") else audio_token ) if chat_template is None: chat_template = self.default_chat_template super().__init__( tokenizer, audio_processor, chat_template=chat_template, ) def __call__( self, text: Union[ TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput] ] = None, audios: Union[np.ndarray, List[np.ndarray]] = None, videos: VideoInput = None, images: ImageInput = None, sampling_rate: Optional[int] = None, **kwargs: Unpack[AeroProcessorKwargs], ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to LlamaTokenizerFast's [`~LlamaTokenizerFast.__call__`] if `text` is not `None` to encode the text. To prepare the image(s), this method forwards the `images` and `kwrags` arguments to LlavaNextImageProcessor's [`~LlavaNextImageProcessor.__call__`] if `images` is not `None`. Please refer to the doctsring of the above two methods for more information. Args: images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (`str`, `List[str]`, `List[List[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). videos (`np.ndarray`, `torch.Tensor`, `List[np.ndarray]`, `List[torch.Tensor]`): The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. - **pixel_values_videos** -- Pixel values of a video input to be fed to a model. Returned when `videos` is not `None`. - **image_sizes** -- Size of each image that will be used to unpad an image. Returned when `images` is not `None`. """ output_kwargs = self._merge_kwargs( AeroProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) # Make sure no duplicate padding if "padding" in output_kwargs["audio_kwargs"]: output_kwargs["audio_kwargs"].pop("padding") if isinstance(text, str): text = [text] elif not isinstance(text, list) and not isinstance(text[0], str): raise ValueError( "Invalid input text. Please provide a string, or a list of strings" ) audio_inputs = {} if audios is not None: audio_inputs = self.audio_processor( audios, sampling_rate=sampling_rate, return_attention_mask=True, padding="max_length", **output_kwargs["audio_kwargs"], ) audio_inputs["audio_attention_mask"] = audio_inputs.pop( "attention_mask" ) # rename attention_mask to prevent conflicts later on audio_inputs["audio_values"] = audio_inputs.pop( "input_features" ) # rename input_features to audio_features for clarification # Computes the output length of the convolutional layers and the output length of the audio encoder input_lengths = (audio_inputs["audio_attention_mask"].sum(-1) - 1) // 2 + 1 num_audio_tokens = (input_lengths - 2) // 2 + 1 text = self.expand_audio_tokens(text, num_audio_tokens, self.audio_token) text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) return BatchFeature(data={**text_inputs, **audio_inputs}) def expand_audio_tokens( self, text: List[TextInput], num_audio_tokens: List[int], special_token: str, ): prompt_strings = [] current_audio_idx = 0 for sample in text: while special_token in sample: num_audio_token = num_audio_tokens[current_audio_idx] sample = sample.replace( special_token, "" * num_audio_token, 1 ) current_audio_idx += 1 prompt_strings.append(sample) text = [ sample.replace("", special_token) for sample in prompt_strings ] return text # Copied from transformers.models.clip.processing_clip.CLIPProcessor.batch_decode with CLIP->Llama def batch_decode(self, *args, **kwargs): """ This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.batch_decode(*args, **kwargs) # Copied from transformers.models.clip.processing_clip.CLIPProcessor.decode with CLIP->Llama def decode(self, *args, **kwargs): """ This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.decode(*args, **kwargs) def batch_encode(self, *args, **kwargs): """ This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.batch_encode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.batch_encode(*args, **kwargs) def encode(self, *args, **kwargs): """ This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.encode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.encode(*args, **kwargs) @property # Copied from transformers.models.clip.processing_clip.CLIPProcessor.model_input_names def model_input_names(self): tokenizer_input_names = self.tokenizer.model_input_names image_processor_input_names = self.image_processor.model_input_names return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names)) @property def default_chat_template(self): # fmt: off return ( "{% set audio_count = namespace(value=0) %}" "{% for message in messages %}" "{% if loop.first and message['role'] != 'system' %}" "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" "{% endif %}" "<|im_start|>{{ message['role'] }}\n" "{% if message['content'] is string %}" "{{ message['content'] }}<|im_end|>\n" "{% else %}" "{% for content in message['content'] %}" "{% if 'audio' in content or 'audio_url' in content or content['type'] == 'audio'%}" "{% set audio_count.value = audio_count.value + 1 %}" "<|AUDIO|>\n" "{% elif 'text' in content %}" "{{ content['text'] }}" "{% endif %}" "{% endfor %}" "<|im_end|>\n" "{% endif %}" "{% endfor %}" "{% if add_generation_prompt %}" "<|im_start|>assistant\n" "{% endif %}" ) # fmt: on