File size: 13,632 Bytes
55a294e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# Modification Copyright© 2025 Advanced Micro Devices, Inc. All rights reserved.
import re
import base64
import dataclasses
from PIL import Image
from io import BytesIO
from enum import auto, Enum
from typing import List, Any, Dict, Union, Tuple
from transformers import AutoTokenizer
class SeparatorStyle(Enum):
"""Different separator style."""
SINGLE = auto()
MPT = auto()
INSTELLA = auto()
@dataclasses.dataclass
class Conversation:
r"""A class that keeps all conversation history."""
system: str
roles: List[str]
messages: List[List[str]]
offset: int
sep_style: SeparatorStyle = SeparatorStyle.SINGLE
sep: str = "###"
sep2: str = None
version: str = "Unknown"
tokenizer_id: str = ""
tokenizer: Any = None
# Stop criteria (the default one is EOS token)
stop_str: Union[str, List[str]] = None
# Stops generation if meeting any token in this list
stop_token_ids: List[int] = None
skip_next: bool = False
def get_prompt(self):
"""
Generates a formatted prompt string based on the messages and separator style.
The function processes the messages stored in the instance, applies specific formatting rules
based on the separator style, and returns the resulting prompt string.
Returns:
`str`: The formatted prompt string.
Raises:
`ValueError`: If an invalid separator style is specified.
"""
messages = self.messages
if len(messages) > 0 and type(messages[0][1]) is tuple:
messages = self.messages.copy()
init_role, init_msg = messages[0].copy()
init_msg = init_msg[0]
if "mmtag" in self.version:
init_msg = init_msg.replace("<image>", "").strip()
messages[0] = (init_role, init_msg)
messages.insert(0, (self.roles[0], "<Image><image></Image>"))
messages.insert(1, (self.roles[1], "Received."))
elif not init_msg.startswith("<image>"):
init_msg = init_msg.replace("<image>", "").strip()
messages[0] = (init_role, "<image>\n" + init_msg)
else:
messages[0] = (init_role, init_msg)
if self.sep_style == SeparatorStyle.SINGLE:
ret = self.system + self.sep
for role, message in messages:
if message:
if type(message) is tuple:
message, _, _ = message
ret += role + ": " + message + self.sep
else:
ret += role + ":"
elif self.sep_style == SeparatorStyle.MPT:
ret = self.system + self.sep
for role, message in messages:
if message:
if type(message) is tuple:
message, _, _ = message
ret += role + message + self.sep
else:
ret += role
elif self.sep_style == SeparatorStyle.INSTELLA:
seps = [self.sep, self.sep2]
ret = "|||IP_ADDRESS|||"
for i, (role, message) in enumerate(messages):
if message:
if type(message) is tuple:
message, _, _ = message
if i % 2 == 1:
message = message.strip()
ret += role + message + seps[i % 2]
else:
ret += role
else:
raise ValueError(f"Invalid style: {self.sep_style}")
return ret
def append_message(self, role, message):
self.messages.append([role, message])
def process_image(self, image: Union[str, Image.Image], image_process_mode: str, return_pil: bool=False, image_format: str="PNG")->Union[str, Image.Image]:
r"""
Processes an image according to the specified mode and returns either a PIL image or a base64 encoded string.
Args:
- image (Union[str, Image.Image]): The image to be processed. Can be a file path or a PIL Image object.
- image_process_mode (str): The mode of image processing. Options are "Pad", "Default", "Crop", or "Resize".
- return_pil (bool, optional): If True, returns a PIL Image object. If False, returns a base64 encoded string. Defaults to False.
- image_format (str, optional): The format to save the image in if returning a base64 encoded string. Defaults to "PNG".
Returns:
Union[str, Image.Image]: The processed image, either as a PIL Image object or a base64 encoded string.
Raises:
ValueError: If an invalid image_process_mode is provided.
"""
if image_process_mode == "Pad":
def expand2square(pil_img, background_color=(122, 116, 104)):
width, height = pil_img.size
if width == height:
return pil_img
elif width > height:
result = Image.new(pil_img.mode, (width, width), background_color)
result.paste(pil_img, (0, (width - height) // 2))
return result
else:
result = Image.new(pil_img.mode, (height, height), background_color)
result.paste(pil_img, ((height - width) // 2, 0))
return result
image = expand2square(image)
elif image_process_mode in ["Default", "Crop"]:
pass
elif image_process_mode == "Resize":
image = image.resize((336, 336))
else:
raise ValueError(f"Invalid image_process_mode: {image_process_mode}")
if type(image) is not Image.Image:
image = Image.open(image).convert("RGB")
max_hw, min_hw = max(image.size), min(image.size)
aspect_ratio = max_hw / min_hw
max_len, min_len = 672, 448
shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw))
longest_edge = int(shortest_edge * aspect_ratio)
W, H = image.size
if H > W:
H, W = longest_edge, shortest_edge
else:
H, W = shortest_edge, longest_edge
image = image.resize((W, H))
if return_pil:
return image
else:
buffered = BytesIO()
image.save(buffered, format=image_format)
img_b64_str = base64.b64encode(buffered.getvalue()).decode()
return img_b64_str
def get_images(self, return_pil: bool=False, return_path: bool=False) -> List[Union[str, Image.Image]]:
"""
Retrieve images from the conversation messages.
Args:
return_pil (bool): If True, return images as PIL objects. Defaults to False.
return_path (bool): If True, return the image file paths instead of processing them. Defaults to False.
Returns:
list: A list of images or image paths depending on the arguments.
"""
images = []
for i, (role, msg) in enumerate(self.messages[self.offset :]):
if i % 2 == 0:
if type(msg) is tuple:
msg, image, image_process_mode = msg
if type(image) != list:
image = [image]
for img in image:
if not return_path and self.is_image_file(img):
img = self.process_image(img, image_process_mode, return_pil=return_pil)
else:
images.append(img)
return images
def is_image_file(self, filename: str)->bool:
image_extensions = [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"]
return any(filename.lower().endswith(ext) for ext in image_extensions)
def is_video_file(self, filename: str)->bool:
video_extensions = [".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".mpeg", ".mpg"]
return any(filename.lower().endswith(ext) for ext in video_extensions)
def to_gradio_chatbot(self)->list:
ret = []
for i, (role, msg) in enumerate(self.messages[self.offset :]):
if i % 2 == 0:
if type(msg) is tuple:
msg, image, image_process_mode = msg
if type(image) != list:
image = [image]
if len(image) == 1:
msg = "<image>\n" + msg.replace("<image>", "").strip()
else:
msg = re.sub(r"(<image>)\n(?=<image>)", r"\1 ", msg)
img_str_list = []
for img in image:
if self.is_image_file(img):
img_b64_str = self.process_image(img, "Default", return_pil=False, image_format="JPEG")
img_str = f'<img src="data:image/jpeg;base64,{img_b64_str}" style="max-width: 256px; max-height: 256px; width: auto; height: auto; object-fit: contain;"/>'
img_str_list.append(img_str)
elif self.is_video_file(img):
ret.append(((img,), None))
msg = msg.strip()
img_place_holder = ""
for img_str in img_str_list:
img_place_holder += f"{img_str}\n\n"
if len(img_str_list) > 0:
msg = f"{img_place_holder}\n\n{msg}"
if len(msg) > 0:
ret.append([msg, None])
else:
ret.append([msg, None])
else:
ret[-1][-1] = msg
return ret
def copy(self)->"Conversation":
return Conversation(system=self.system, roles=self.roles, messages=[[x, y] for x, y in self.messages], offset=self.offset, sep_style=self.sep_style, sep=self.sep, sep2=self.sep2, version=self.version)
def dict(self)->Dict[str, Any]:
if len(self.get_images()) > 0:
return {
"system": self.system,
"roles": self.roles,
"messages": [[x, y[0] if type(y) is tuple else y] for x, y in self.messages],
"offset": self.offset,
"sep": self.sep,
"sep2": self.sep2,
}
return {
"system": self.system,
"roles": self.roles,
"messages": self.messages,
"offset": self.offset,
"sep": self.sep,
"sep2": self.sep2,
}
conv_vicuna_v0 = Conversation(
system="A chat between a curious human and an artificial intelligence assistant. " "The assistant gives helpful, detailed, and polite answers to the human's questions.",
roles=("Human", "Assistant"),
messages=[
["Human", "What are the key differences between renewable and non-renewable energy sources?"],
[
"Assistant",
"Renewable energy sources are those that can be replenished naturally in a relatively "
"short amount of time, such as solar, wind, hydro, geothermal, and biomass. "
"Non-renewable energy sources, on the other hand, are finite and will eventually be "
"depleted, such as coal, oil, and natural gas. Here are some key differences between "
"renewable and non-renewable energy sources:\n"
"1. Availability: Renewable energy sources are virtually inexhaustible, while non-renewable "
"energy sources are finite and will eventually run out.\n"
"2. Environmental impact: Renewable energy sources have a much lower environmental impact "
"than non-renewable sources, which can lead to air and water pollution, greenhouse gas emissions, "
"and other negative effects.\n"
"3. Cost: Renewable energy sources can be more expensive to initially set up, but they typically "
"have lower operational costs than non-renewable sources.\n"
"4. Reliability: Renewable energy sources are often more reliable and can be used in more remote "
"locations than non-renewable sources.\n"
"5. Flexibility: Renewable energy sources are often more flexible and can be adapted to different "
"situations and needs, while non-renewable sources are more rigid and inflexible.\n"
"6. Sustainability: Renewable energy sources are more sustainable over the long term, while "
"non-renewable sources are not, and their depletion can lead to economic and social instability.\n",
],
],
offset=2,
sep_style=SeparatorStyle.SINGLE,
sep="###",
)
conv_mpt = Conversation(
system="""<|im_start|>system
A conversation between a user and an LLM-based AI assistant. The assistant gives helpful and honest answers.""",
roles=("<|im_start|>user\n", "<|im_start|>assistant\n"),
version="mpt",
messages=[],
offset=0,
sep_style=SeparatorStyle.MPT,
sep="<|im_end|>",
)
conv_instella = Conversation(
system="",
roles=("<|user|>\n", "<|assistant|>\n"),
version="instella",
messages=(),
offset=0,
sep_style=SeparatorStyle.INSTELLA,
sep="\n",
sep2='|||IP_ADDRESS|||\n'
)
default_conversation = conv_instella
conv_templates = {
"default": conv_instella,
"mpt": conv_mpt,
"instella": conv_instella,
}
if __name__ == "__main__":
print(default_conversation.get_prompt())
|