Spaces:
Runtime error
Runtime error
import os | |
import random | |
import uuid | |
import numpy as np | |
def seed_everything(seed): | |
random.seed(seed) | |
np.random.seed(seed) | |
try: | |
import torch | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
except: | |
pass | |
return seed | |
def cut_dialogue_history(history_memory, keep_last_n_words=500): | |
tokens = history_memory.split() | |
n_tokens = len(tokens) | |
print(f"history_memory:{history_memory}, n_tokens: {n_tokens}") | |
if n_tokens < keep_last_n_words: | |
return history_memory | |
else: | |
paragraphs = history_memory.split("\n") | |
last_n_tokens = n_tokens | |
while last_n_tokens >= keep_last_n_words: | |
last_n_tokens = last_n_tokens - len(paragraphs[0].split(" ")) | |
paragraphs = paragraphs[1:] | |
return "\n" + "\n".join(paragraphs) | |
def get_new_image_name(org_img_name, func_name="update"): | |
head_tail = os.path.split(org_img_name) | |
head = head_tail[0] | |
tail = head_tail[1] | |
name_split = tail.split(".")[0].split("_") | |
this_new_uuid = str(uuid.uuid4())[0:4] | |
if len(name_split) == 1: | |
most_org_file_name = name_split[0] | |
recent_prev_file_name = name_split[0] | |
new_file_name = "{}_{}_{}_{}.png".format( | |
this_new_uuid, func_name, recent_prev_file_name, most_org_file_name | |
) | |
else: | |
assert len(name_split) == 4 | |
most_org_file_name = name_split[3] | |
recent_prev_file_name = name_split[0] | |
new_file_name = "{}_{}_{}_{}.png".format( | |
this_new_uuid, func_name, recent_prev_file_name, most_org_file_name | |
) | |
return os.path.join(head, new_file_name) | |
def get_new_dataframe_name(org_img_name, func_name="update"): | |
head_tail = os.path.split(org_img_name) | |
head = head_tail[0] | |
tail = head_tail[1] | |
name_split = tail.split(".")[0].split("_") | |
this_new_uuid = str(uuid.uuid4())[0:4] | |
if len(name_split) == 1: | |
most_org_file_name = name_split[0] | |
recent_prev_file_name = name_split[0] | |
new_file_name = "{}_{}_{}_{}.csv".format( | |
this_new_uuid, func_name, recent_prev_file_name, most_org_file_name | |
) | |
else: | |
assert len(name_split) == 4 | |
most_org_file_name = name_split[3] | |
recent_prev_file_name = name_split[0] | |
new_file_name = "{}_{}_{}_{}.csv".format( | |
this_new_uuid, func_name, recent_prev_file_name, most_org_file_name | |
) | |
return os.path.join(head, new_file_name) | |
#########=======================> utils end | |
#########=======================> ANSI BEGINNING | |
class Code: | |
def __init__(self, value: int): | |
self.value = value | |
def __str__(self): | |
return "%d" % self.value | |
class Color(Code): | |
def bg(self) -> "Color": | |
self.value += 10 | |
return self | |
def bright(self) -> "Color": | |
self.value += 60 | |
return self | |
def black() -> "Color": | |
return Color(30) | |
def red() -> "Color": | |
return Color(31) | |
def green() -> "Color": | |
return Color(32) | |
def yellow() -> "Color": | |
return Color(33) | |
def blue() -> "Color": | |
return Color(34) | |
def magenta() -> "Color": | |
return Color(35) | |
def cyan() -> "Color": | |
return Color(36) | |
def white() -> "Color": | |
return Color(37) | |
def default() -> "Color": | |
return Color(39) | |
class Style(Code): | |
def reset() -> "Style": | |
return Style(0) | |
def bold() -> "Style": | |
return Style(1) | |
def dim() -> "Style": | |
return Style(2) | |
def italic() -> "Style": | |
return Style(3) | |
def underline() -> "Style": | |
return Style(4) | |
def blink() -> "Style": | |
return Style(5) | |
def reverse() -> "Style": | |
return Style(7) | |
def conceal() -> "Style": | |
return Style(8) | |
class ANSI: | |
ESCAPE = "\x1b[" | |
CLOSE = "m" | |
def __init__(self, text: str): | |
self.text = text | |
self.args = [] | |
def join(self) -> str: | |
return ANSI.ESCAPE + ";".join([str(a) for a in self.args]) + ANSI.CLOSE | |
def wrap(self, text: str) -> str: | |
return self.join() + text + ANSI(Style.reset()).join() | |
def to(self, *args: str): | |
self.args = list(args) | |
return self.wrap(self.text) | |
def dim_multiline(message: str) -> str: | |
lines = message.split("\n") | |
if len(lines) <= 1: | |
return lines[0] | |
return lines[0] + ANSI("\n... ".join([""] + lines[1:])).to(Color.black().bright()) | |
#+=============================> ANSI Ending | |
#================================> upload base | |
from abc import ABC, abstractmethod, abstractstaticmethod | |
STATIC_DIR = "static" | |
class AbstractUploader(ABC): | |
def upload(self, filepath: str) -> str: | |
pass | |
def from_settings() -> "AbstractUploader": | |
pass | |
#================================> upload end | |
#========================= upload s3 | |
import boto3 | |
class S3Uploader(AbstractUploader): | |
def __init__(self, accessKey: str, secretKey: str, region: str, bucket: str): | |
self.accessKey = accessKey | |
self.secretKey = secretKey | |
self.region = region | |
self.bucket = bucket | |
self.client = boto3.client( | |
"s3", | |
aws_access_key_id=self.accessKey, | |
aws_secret_access_key=self.secretKey, | |
) | |
def from_settings() -> "S3Uploader": | |
return S3Uploader( | |
os.environ["AWS_ACCESS_KEY_ID"], | |
os.environ["AWS_SECRET_ACCESS_KEY"], | |
os.environ["AWS_REGION"], | |
os.environ["AWS_S3_BUCKET"], | |
) | |
def get_url(self, object_name: str) -> str: | |
return f"https://{self.bucket}.s3.{self.region}.amazonaws.com/{object_name}" | |
def upload(self, filepath: str) -> str: | |
object_name = os.path.basename(filepath) | |
self.client.upload_file(filepath, self.bucket, object_name) | |
return self.get_url(object_name) | |
#========================= upload s3 | |
#========================> upload/static | |
import shutil | |
from pathlib import Path | |
class StaticUploader(AbstractUploader): | |
def __init__(self, server: str, path: Path, endpoint: str): | |
self.server = server | |
self.path = path | |
self.endpoint = endpoint | |
def from_settings(path: Path, endpoint: str) -> "StaticUploader": | |
server = os.environ.get("SERVER", "http://localhost:8000") | |
return StaticUploader(server, path, endpoint) | |
def get_url(self, uploaded_path: str) -> str: | |
return f"{self.server}/{uploaded_path}" | |
def upload(self, filepath: str): | |
relative_path = Path("generated") / filepath.split("/")[-1] | |
file_path = self.path / relative_path | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
shutil.copy(filepath, file_path) | |
endpoint_path = self.endpoint / relative_path | |
return f"{self.server}/{endpoint_path}" | |
#========================> handlers/base | |
import uuid | |
from enum import Enum | |
from typing import Dict | |
import requests | |
# from env import settings | |
class FileType(Enum): | |
IMAGE = "image" | |
AUDIO = "audio" | |
VIDEO = "video" | |
DATAFRAME = "dataframe" | |
UNKNOWN = "unknown" | |
def from_filename(url: str) -> "FileType": | |
filename = url.split("?")[0] | |
if filename.endswith(".png") or filename.endswith(".jpg"): | |
return FileType.IMAGE | |
elif filename.endswith(".mp3") or filename.endswith(".wav"): | |
return FileType.AUDIO | |
elif filename.endswith(".mp4") or filename.endswith(".avi"): | |
return FileType.VIDEO | |
elif filename.endswith(".csv"): | |
return FileType.DATAFRAME | |
else: | |
return FileType.UNKNOWN | |
def from_url(url: str) -> "FileType": | |
return FileType.from_filename(url.split("?")[0]) | |
def to_extension(self) -> str: | |
if self == FileType.IMAGE: | |
return ".png" | |
elif self == FileType.AUDIO: | |
return ".mp3" | |
elif self == FileType.VIDEO: | |
return ".mp4" | |
elif self == FileType.DATAFRAME: | |
return ".csv" | |
else: | |
return ".unknown" | |
class BaseHandler: | |
def handle(self, filename: str) -> str: | |
raise NotImplementedError | |
class FileHandler: | |
def __init__(self, handlers: Dict[FileType, BaseHandler], path: Path): | |
self.handlers = handlers | |
self.path = path | |
def register(self, filetype: FileType, handler: BaseHandler) -> "FileHandler": | |
self.handlers[filetype] = handler | |
return self | |
def download(self, url: str) -> str: | |
filetype = FileType.from_url(url) | |
data = requests.get(url).content | |
local_filename = os.path.join( | |
"file", str(uuid.uuid4())[0:8] + filetype.to_extension() | |
) | |
os.makedirs(os.path.dirname(local_filename), exist_ok=True) | |
with open(local_filename, "wb") as f: | |
size = f.write(data) | |
print(f"Inputs: {url} ({size//1000}MB) => {local_filename}") | |
return local_filename | |
def handle(self, url: str) -> str: | |
try: | |
if url.startswith(os.environ.get("SERVER", "http://localhost:8000")): | |
local_filepath = url[len(os.environ.get("SERVER", "http://localhost:8000")) + 1 :] | |
local_filename = Path("file") / local_filepath.split("/")[-1] | |
src = self.path / local_filepath | |
dst = self.path / os.environ.get("PLAYGROUND_DIR", "./playground") / local_filename | |
os.makedirs(os.path.dirname(dst), exist_ok=True) | |
shutil.copy(src, dst) | |
else: | |
local_filename = self.download(url) | |
handler = self.handlers.get(FileType.from_url(url)) | |
if handler is None: | |
if FileType.from_url(url) == FileType.IMAGE: | |
raise Exception( | |
f"No handler for {FileType.from_url(url)}. " | |
f"Please set USE_GPU to True in env/settings.py" | |
) | |
else: | |
raise Exception(f"No handler for {FileType.from_url(url)}") | |
return handler.handle(local_filename) | |
except Exception as e: | |
raise e | |
########################### => base end | |
#############===========================> | |
from swarms.models.prompts.prebuild.multi_modal_prompts import DATAFRAME_PROMPT | |
import pandas as pd | |
class CsvToDataframe(BaseHandler): | |
def handle(self, filename: str): | |
df = pd.read_csv(filename) | |
description = ( | |
f"Dataframe with {len(df)} rows and {len(df.columns)} columns. " | |
"Columns are: " | |
f"{', '.join(df.columns)}" | |
) | |
print( | |
f"\nProcessed CsvToDataframe, Input CSV: {filename}, Output Description: {description}" | |
) | |
return DATAFRAME_PROMPT.format(filename=filename, description=description) | |