Kimi-Audio-7B-Instruct / tokenization_kimia.py
bigmoyan's picture
Upload folder using huggingface_hub
c102e8c verified
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
"""Megatron tokenizers."""
from transformers.tokenization_utils import PreTrainedTokenizer
from typing import Union
from typing import (
AbstractSet,
cast,
Collection,
Dict,
Iterator,
List,
Literal,
Sequence,
Union,
Optional,
)
from tiktoken.load import load_tiktoken_bpe
import tiktoken
from pathlib import Path
import os
import logging
from tokenizers import AddedToken
logger = logging.getLogger(__name__)
VOCAB_FILES_NAMES = {"vocab_file": "tiktoken.model"}
class TikTokenTokenizer(PreTrainedTokenizer):
"""
Tokenizing and encoding/decoding text using the Tiktoken tokenizer.
"""
special_tokens: Dict[str, int]
num_reserved_special_tokens = 293 + 128
pat_str = "(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\\r\\n\\p{L}\\p{N}]?\\p{L}+|\\p{N}| ?[^\\s\\p{L}\\p{N}]+[\\r\\n]*|\\s*[\\r\\n]+|\\s+(?!\\S)|\\s+"
vocab_files_names = VOCAB_FILES_NAMES
def __init__(
self,
vocab_file,
bos_token: Union[str, AddedToken] = "[BOS]",
eos_token: Union[str, AddedToken] = "[EOS]",
unk_token: Union[str, AddedToken] = "[UNK]",
pad_token: Union[str, AddedToken] = "[PAD]",
additional_special_tokens: Optional[List[str]] = None,
added_tokens_decoder: Optional[dict] = None,
**kwargs,
):
"""
Initializes the Tokenizer with a Tiktoken model.
Args:
model_path (str): The path to the Tiktoken model file.
"""
assert os.path.isfile(vocab_file), vocab_file
mergeable_ranks = load_tiktoken_bpe(vocab_file)
num_base_tokens = len(mergeable_ranks)
used_special_tokens = [
"[BOS]",
"[EOS]",
"<|im_msg_end|>", # 0
"<|im_user_msg_start|>", # 1
"<|im_assistant_msg_start|>", # 2
"<|reserved_token_0|>", # 3
"<|reserved_token_1|>",
"<|reserved_token_2|>",
"<|reserved_token_3|>", # 4
"[EOT]",
"<|reserved_token_4|>", # 5
"<|reserved_token_5|>", # 6
"<|reserved_token_6|>", # 7
"<|reserved_token_7|>", # 8
"<|reserved_token_8|>", # 9
"<|reserved_token_9|>", # 10
"<|reserved_token_10|>", # 11
"<|reserved_token_11|>", # 12
"<|im_media_begin|>", # 13
"<|reserved_token_12|>", # 14
"<|im_media_end|>", # 15
"<|reserved_token_13|>", # 16
"<|reserved_token_14|>", # 17
"<|im_kimia_text_blank|>", # 18
"<|im_kimia_text_eos|>", # 19
"<|reserved_token_15|>", # 20
"<|reserved_token_16|>", # 21
"<|im_kimia_user_msg_start|>", # 22
"<|im_kimia_assistant_msg_start|>", # 23
"<|reserved_token_17|>", # 24
"<|reserved_token_18|>", # 25
"<|reserved_token_19|>", # 26
"<|im_kimia_speech_ct_id|>", # 27
"<|im_kimia_speech_ctd_id|>", # 28
]
autoset_special_tokens = [
f"<|reserved_token_{i}|>"
for i in range(
20, self.num_reserved_special_tokens - len(used_special_tokens) + 20
)
]
special_tokens = used_special_tokens + autoset_special_tokens
self.special_tokens = {
token: num_base_tokens + i for i, token in enumerate(special_tokens)
}
self.model = tiktoken.Encoding(
name=Path(vocab_file).name,
pat_str=self.pat_str,
mergeable_ranks=mergeable_ranks,
special_tokens=self.special_tokens,
)
logger.info(f"Reloaded tiktoken model from {vocab_file}")
self.n_words: int = self.model.n_vocab
# BOS / EOS token IDs
self.bos_token = "[BOS]"
self.bos_id: int = self.special_tokens["[BOS]"]
self.eos_token = "[EOS]"
self.eos_id: int = self.special_tokens["[EOS]"]
# use last speical token as pad token, the last - 1 is unk_token
self.pad_token: str = special_tokens[-1]
self.pad_id: int = self.special_tokens[self.pad_token]
self.unk_token: str = special_tokens[-2]
self.unk_id: int = self.special_tokens[self.pad_token]
self.stop_tokens = {
self.special_tokens["[EOS]"],
self.special_tokens["[EOT]"],
}
logger.info(
f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}"
)
def encode(
self,
s: str,
*,
bos: bool,
eos: bool,
allowed_special: Union[Literal["all"], AbstractSet[str]] = set(),
disallowed_special: Union[Literal["all"], Collection[str]] = (),
) -> List[int]:
"""
Encodes a string into a list of token IDs.
Args:
s (str): The input string to be encoded.
bos (bool): Whether to prepend the beginning-of-sequence token.
eos (bool): Whether to append the end-of-sequence token.
allowed_tokens ("all"|set[str]): allowed special tokens in string
disallowed_tokens ("all"|set[str]): special tokens that raise an error when in string
Returns:
list[int]: A list of token IDs.
By default, setting disallowed_special=() encodes a string by ignoring
special tokens. Specifically:
- Setting `disallowed_special` to () will cause all text corresponding
to special tokens to be encoded as natural text (insteading of raising
an error).
- Setting `allowed_special` to "all" will treat all text corresponding
to special tokens to be encoded as special tokens.
"""
assert type(s) is str
# The tiktoken tokenizer can handle <=400k chars without
# pyo3_runtime.PanicException.
TIKTOKEN_MAX_ENCODE_CHARS = 400_000
# https://github.com/openai/tiktoken/issues/195
# Here we iterate over subsequences and split if we exceed the limit
# of max consecutive non-whitespace or whitespace characters.
MAX_NO_WHITESPACES_CHARS = 25_000
substrs = (
substr
for i in range(0, len(s), TIKTOKEN_MAX_ENCODE_CHARS)
for substr in self._split_whitespaces_or_nonwhitespaces(
s[i : i + TIKTOKEN_MAX_ENCODE_CHARS], MAX_NO_WHITESPACES_CHARS
)
)
t: List[int] = []
for substr in substrs:
t.extend(
self.model.encode(
substr,
allowed_special=allowed_special,
disallowed_special=disallowed_special,
)
)
if bos:
t.insert(0, self.bos_id)
if eos:
t.append(self.eos_id)
return t
def decode(self, t: Sequence[int]) -> str:
"""
Decodes a list of token IDs into a string.
Args:
t (List[int]): The list of token IDs to be decoded.
Returns:
str: The decoded string.
"""
# Typecast is safe here. Tiktoken doesn't do anything list-related with the sequence.
return self.model.decode(cast(List[int], t))
@staticmethod
def _split_whitespaces_or_nonwhitespaces(
s: str, max_consecutive_slice_len: int
) -> Iterator[str]:
"""
Splits the string `s` so that each substring contains no more than `max_consecutive_slice_len`
consecutive whitespaces or consecutive non-whitespaces.
"""
current_slice_len = 0
current_slice_is_space = s[0].isspace() if len(s) > 0 else False
slice_start = 0
for i in range(len(s)):
is_now_space = s[i].isspace()
if current_slice_is_space ^ is_now_space:
current_slice_len = 1
current_slice_is_space = is_now_space
else:
current_slice_len += 1
if current_slice_len > max_consecutive_slice_len:
yield s[slice_start:i]
slice_start = i
current_slice_len = 1
yield s[slice_start:]
""" ----- Below are the abstract methods required by megatron ----- """
@property
def vocab_size(self):
return self.n_words
@property
def vocab(self):
if hasattr(self, "str_vocab"):
return self.str_vocab
self.str_vocab = {}
# convert mergeable_ranks from bytes to string
utf8_num, unicode_num = 0, 0
for byte_key, index in self.model._mergeable_ranks.items():
try:
str_key = byte_key.decode("utf-8")
utf8_num += 1
except UnicodeDecodeError:
# use backslashreplace so we can get num vocab different tokens
# see: https://docs.python.org/3/howto/unicode.html
# this vocab is only used for offline processing, so this is fine
str_key = byte_key.decode("utf-8", "backslashreplace") + "_unicode_"
unicode_num += 1
self.str_vocab[str_key] = index
logger.info(f"num utf8: {utf8_num}, num unicode: {unicode_num}")
# add all special tokens to the dictionary
self.str_vocab.update(self.model._special_tokens)
assert len(self.str_vocab) == self.vocab_size
return self.str_vocab
@property
def inv_vocab(self):
return {v: k for k, v in self.vocab.items()}
def tokenize(self, text, eos=True):
# BOS: always add bos token
# EOS:
# Most cases should be true when we are tokenizing a full sequence
# Only setting to false when we are running a inference
return self.encode(text, bos=True, eos=eos)
def detokenize(self, tokens):
# convert tensor to list if needed...
if not isinstance(tokens, list):
tokens = tokens.tolist()
return self.decode(tokens)
@property
def eod(self):
return self.eos_id
def bod(self):
return self.bos_id
@property
def msk_start_id(self):
return self.msk_start
@property
def msk_end_id(self):
return self.msk_end
def _get_index_2_bytes(self):
if hasattr(self, "index_2_bytes"):
return self.index_2_bytes
# use array rather than dict for faster access
self.index_2_bytes = [0] * self.model.n_vocab
for byte_key, index in self.model._mergeable_ranks.items():
self.index_2_bytes[index] = len(byte_key)
for _, index in self.model._special_tokens.items():
# in total we have 256 special tokens, 2^8 = 256
# so the num of bytes of each token is only 1
self.index_2_bytes[index] = 1
return self.index_2_bytes
def get_array_bytes(self, array):
index_2_bytes = self._get_index_2_bytes()
return sum(index_2_bytes[i] for i in array)
@property
def eos_token_id(self):
return self.eos_id
@property
def pad_token_id(self):
return self.pad_id