|
import copy
|
|
import json
|
|
import re
|
|
import tiktoken
|
|
import uuid
|
|
|
|
from curl_cffi import requests
|
|
from tclogger import logger
|
|
|
|
from constants.envs import PROXIES
|
|
from constants.headers import OPENAI_GET_HEADERS, OPENAI_POST_DATA
|
|
from constants.models import TOKEN_LIMIT_MAP, TOKEN_RESERVED
|
|
|
|
from messagers.message_outputer import OpenaiStreamOutputer
|
|
from networks.proof_worker import ProofWorker
|
|
|
|
|
|
class OpenaiRequester:
|
|
def __init__(self):
|
|
self.init_requests_params()
|
|
|
|
def init_requests_params(self):
|
|
self.api_base = "https://chat.openai.com/backend-anon"
|
|
self.api_me = f"{self.api_base}/me"
|
|
self.api_models = f"{self.api_base}/models"
|
|
self.api_chat_requirements = f"{self.api_base}/sentinel/chat-requirements"
|
|
self.api_conversation = f"{self.api_base}/conversation"
|
|
self.uuid = str(uuid.uuid4())
|
|
self.requests_headers = copy.deepcopy(OPENAI_GET_HEADERS)
|
|
extra_headers = {
|
|
"Oai-Device-Id": self.uuid,
|
|
}
|
|
self.requests_headers.update(extra_headers)
|
|
|
|
def log_request(self, url, method="GET"):
|
|
logger.note(f"> {method}:", end=" ")
|
|
logger.mesg(f"{url}", end=" ")
|
|
|
|
def log_response(
|
|
self, res: requests.Response, stream=False, iter_lines=False, verbose=False
|
|
):
|
|
status_code = res.status_code
|
|
status_code_str = f"[{status_code}]"
|
|
|
|
if status_code == 200:
|
|
logger_func = logger.success
|
|
else:
|
|
logger_func = logger.warn
|
|
|
|
logger_func(status_code_str)
|
|
|
|
logger.enter_quiet(not verbose)
|
|
|
|
if stream:
|
|
if not iter_lines:
|
|
return
|
|
|
|
if not hasattr(self, "content_offset"):
|
|
self.content_offset = 0
|
|
|
|
for line in res.iter_lines():
|
|
line = line.decode("utf-8")
|
|
line = re.sub(r"^data:\s*", "", line)
|
|
if re.match(r"^\[DONE\]", line):
|
|
logger.success("\n[Finished]")
|
|
break
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
data = json.loads(line, strict=False)
|
|
message_role = data["message"]["author"]["role"]
|
|
message_status = data["message"]["status"]
|
|
if (
|
|
message_role == "assistant"
|
|
and message_status == "in_progress"
|
|
):
|
|
content = data["message"]["content"]["parts"][0]
|
|
delta_content = content[self.content_offset :]
|
|
self.content_offset = len(content)
|
|
logger_func(delta_content, end="")
|
|
except Exception as e:
|
|
logger.warn(e)
|
|
else:
|
|
logger_func(res.json())
|
|
|
|
logger.exit_quiet(not verbose)
|
|
|
|
def get_models(self):
|
|
self.log_request(self.api_models)
|
|
res = requests.get(
|
|
self.api_models,
|
|
headers=self.requests_headers,
|
|
proxies=PROXIES,
|
|
timeout=10,
|
|
impersonate="chrome120",
|
|
)
|
|
self.log_response(res)
|
|
|
|
def auth(self):
|
|
self.log_request(self.api_chat_requirements, method="POST")
|
|
res = requests.post(
|
|
self.api_chat_requirements,
|
|
headers=self.requests_headers,
|
|
proxies=PROXIES,
|
|
timeout=10,
|
|
impersonate="chrome120",
|
|
)
|
|
data = res.json()
|
|
self.chat_requirements_token = data["token"]
|
|
self.chat_requirements_seed = data["proofofwork"]["seed"]
|
|
self.chat_requirements_difficulty = data["proofofwork"]["difficulty"]
|
|
self.log_response(res)
|
|
|
|
def transform_messages(self, messages: list[dict]):
|
|
def get_role(role):
|
|
if role in ["system", "user", "assistant"]:
|
|
return role
|
|
else:
|
|
return "system"
|
|
|
|
new_messages = [
|
|
{
|
|
"author": {"role": get_role(message["role"])},
|
|
"content": {"content_type": "text", "parts": [message["content"]]},
|
|
"metadata": {},
|
|
}
|
|
for message in messages
|
|
]
|
|
return new_messages
|
|
|
|
def chat_completions(self, messages: list[dict], iter_lines=False, verbose=False):
|
|
proof_token = ProofWorker().calc_proof_token(
|
|
self.chat_requirements_seed, self.chat_requirements_difficulty
|
|
)
|
|
extra_headers = {
|
|
"Accept": "text/event-stream",
|
|
"Openai-Sentinel-Chat-Requirements-Token": self.chat_requirements_token,
|
|
"Openai-Sentinel-Proof-Token": proof_token,
|
|
}
|
|
requests_headers = copy.deepcopy(self.requests_headers)
|
|
requests_headers.update(extra_headers)
|
|
|
|
post_data = copy.deepcopy(OPENAI_POST_DATA)
|
|
extra_data = {
|
|
"messages": self.transform_messages(messages),
|
|
"websocket_request_id": str(uuid.uuid4()),
|
|
}
|
|
post_data.update(extra_data)
|
|
|
|
self.log_request(self.api_conversation, method="POST")
|
|
s = requests.Session()
|
|
res = s.post(
|
|
self.api_conversation,
|
|
headers=requests_headers,
|
|
json=post_data,
|
|
proxies=PROXIES,
|
|
timeout=10,
|
|
impersonate="chrome120",
|
|
stream=True,
|
|
)
|
|
self.log_response(res, stream=True, iter_lines=iter_lines, verbose=verbose)
|
|
return res
|
|
|
|
|
|
class OpenaiStreamer:
|
|
def __init__(self):
|
|
self.model = "gpt-3.5-turbo"
|
|
self.message_outputer = OpenaiStreamOutputer(
|
|
owned_by="openai", model="gpt-3.5-turbo"
|
|
)
|
|
self.tokenizer = tiktoken.get_encoding("cl100k_base")
|
|
|
|
def count_tokens(self, messages: list[dict]):
|
|
token_count = sum(
|
|
len(self.tokenizer.encode(message["content"])) for message in messages
|
|
)
|
|
logger.note(f"Prompt Token Count: {token_count}")
|
|
return token_count
|
|
|
|
def check_token_limit(self, messages: list[dict]):
|
|
token_limit = TOKEN_LIMIT_MAP[self.model]
|
|
token_count = self.count_tokens(messages)
|
|
token_redundancy = int(token_limit - TOKEN_RESERVED - token_count)
|
|
if token_redundancy <= 0:
|
|
raise ValueError(
|
|
f"Prompt exceeded token limit: {token_count} > {token_limit}"
|
|
)
|
|
return True
|
|
|
|
def chat_response(self, messages: list[dict], iter_lines=False, verbose=False):
|
|
self.check_token_limit(messages)
|
|
logger.enter_quiet(not verbose)
|
|
requester = OpenaiRequester()
|
|
requester.auth()
|
|
logger.exit_quiet(not verbose)
|
|
return requester.chat_completions(
|
|
messages=messages, iter_lines=iter_lines, verbose=verbose
|
|
)
|
|
|
|
def chat_return_generator(self, stream_response: requests.Response, verbose=False):
|
|
content_offset = 0
|
|
is_finished = False
|
|
|
|
for line in stream_response.iter_lines():
|
|
line = line.decode("utf-8")
|
|
line = re.sub(r"^data:\s*", "", line)
|
|
line = line.strip()
|
|
|
|
if not line:
|
|
continue
|
|
|
|
if re.match(r"^\[DONE\]", line):
|
|
content_type = "Finished"
|
|
delta_content = ""
|
|
logger.success("\n[Finished]")
|
|
is_finished = True
|
|
else:
|
|
content_type = "Completions"
|
|
delta_content = ""
|
|
try:
|
|
data = json.loads(line, strict=False)
|
|
message_role = data["message"]["author"]["role"]
|
|
message_status = data["message"]["status"]
|
|
if message_role == "assistant" and message_status == "in_progress":
|
|
content = data["message"]["content"]["parts"][0]
|
|
if not len(content):
|
|
continue
|
|
delta_content = content[content_offset:]
|
|
content_offset = len(content)
|
|
if verbose:
|
|
logger.success(delta_content, end="")
|
|
else:
|
|
continue
|
|
except Exception as e:
|
|
logger.warn(e)
|
|
|
|
output = self.message_outputer.output(
|
|
content=delta_content, content_type=content_type
|
|
)
|
|
yield output
|
|
|
|
if not is_finished:
|
|
yield self.message_outputer.output(content="", content_type="Finished")
|
|
|
|
def chat_return_dict(self, stream_response: requests.Response):
|
|
final_output = self.message_outputer.default_data.copy()
|
|
final_output["choices"] = [
|
|
{
|
|
"index": 0,
|
|
"finish_reason": "stop",
|
|
"message": {"role": "assistant", "content": ""},
|
|
}
|
|
]
|
|
final_content = ""
|
|
for item in self.chat_return_generator(stream_response):
|
|
try:
|
|
data = json.loads(item)
|
|
delta = data["choices"][0]["delta"]
|
|
delta_content = delta.get("content", "")
|
|
if delta_content:
|
|
final_content += delta_content
|
|
except Exception as e:
|
|
logger.warn(e)
|
|
final_output["choices"][0]["message"]["content"] = final_content.strip()
|
|
return final_output
|
|
|
|
|
|
if __name__ == "__main__":
|
|
streamer = OpenaiStreamer()
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": "You are an LLM developed by NiansuhAI.\nYour name is Niansuh-Copilot.",
|
|
},
|
|
{"role": "user", "content": "Hello, what is your role?"},
|
|
{"role": "assistant", "content": "I am an LLM."},
|
|
{"role": "user", "content": "What is your name?"},
|
|
]
|
|
|
|
streamer.chat_response(messages=messages, iter_lines=True, verbose=True)
|
|
|
|
|