Spaces:
Paused
Paused
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import torch | |
import sqlite3 | |
import json | |
#setup database | |
conn = sqlite3.connect('sfdc_la.db') | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS sfdc_la ( | |
LAid TEXT PRIMARY KEY, | |
Amount REAL, | |
Tenure INTEGER, | |
ROI REAL, | |
Stage VARCHAR | |
) | |
''') | |
#inserting data into table | |
loan_data = [ | |
('LA00001', 10000, 12, 5.5, 'Customer Onboarding'), | |
('LA00002', 15000, 24, 6.0, 'Credit Review'), | |
('LA00003', 20000, 36, 6.5, 'Loan Disbursal'), | |
('LA23455', 25000, 48, 7.0, 'OTC Clearance'), | |
('LA00005', 30000, 60, 7.5, 'Customer Onboarding') | |
] | |
cursor.executemany(''' | |
INSERT INTO sfdc_la (LAid, Amount, Tenure, ROI, Stage) | |
VALUES (?, ?, ?, ?, ?) | |
''', loan_data) | |
conn.commit() | |
#conn.close() | |
""" | |
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference | |
""" | |
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
modelpath = "Salesforce/xLAM-1b-fc-r" | |
model = AutoModelForCausalLM.from_pretrained(modelpath, torch_dtype="auto", trust_remote_code=True) | |
tokenizer = AutoTokenizer.from_pretrained(modelpath) | |
#=============prompt template and task instructions============== | |
# Please use our provided instruction prompt for best performance | |
task_instruction = """ | |
You are an expert in composing functions. You are given a question and a set of possible functions. | |
Based on the question, you will need to make one or more function/tool calls to achieve the purpose. | |
If none of the functions can be used, point it out and refuse to answer. | |
If the given question lacks the parameters required by the function, also point it out. | |
""".strip() | |
format_instruction = """ | |
The output MUST strictly adhere to the following JSON format, and NO other text MUST be included. | |
The example format is as follows. Please make sure the parameter type is correct. If no function call is needed, please make tool_calls an empty list '[]'. | |
``` | |
{ | |
"tool_calls": [ | |
{"name": "func_name1", "arguments": {"argument1": "value1", "argument2": "value2"}}, | |
... (more tool calls as required) | |
] | |
} | |
``` | |
""".strip()##==output format | |
#=============APIs and Functions Metadata======================== | |
get_weather_api = { | |
"name": "get_weather", | |
"description": "Get the current weather for a location", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"location": { | |
"type": "string", | |
"description": "The city and state, e.g. San Francisco, New York" | |
}, | |
"unit": { | |
"type": "string", | |
"enum": ["celsius", "fahrenheit"], | |
"description": "The unit of temperature to return" | |
} | |
}, | |
"required": ["location"] | |
} | |
} | |
search_api = { | |
"name": "search", | |
"description": "Search for information on the internet", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "The search query, e.g. 'latest news on AI'" | |
} | |
}, | |
"required": ["query"] | |
} | |
} | |
search_loanapplication = { | |
"name": "searchLA", | |
"description": "Search for Loan Application status", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"loan_application_id": { | |
"type": "alphanumeric string", | |
"description": "The unique identifier for a loan application, eg: LA1234" | |
}, | |
"phone_number": { | |
"type": "string", | |
"description": "The phone number associated with the loan application" | |
} | |
}, | |
"required": ["loan_application_id", "phone_number"] | |
} | |
} | |
openai_format_tools = [search_api, search_loanapplication, get_weather_api] | |
# Helper function to convert openai format tools to our more concise xLAM format | |
def convert_to_xlam_tool(tools): | |
'''''' | |
if isinstance(tools, dict): | |
return { | |
"name": tools["name"], | |
"description": tools["description"], | |
"parameters": {k: v for k, v in tools["parameters"].get("properties", {}).items()} | |
} | |
elif isinstance(tools, list): | |
return [convert_to_xlam_tool(tool) for tool in tools] | |
else: | |
return tools | |
#=========prompt builder==================================== | |
# Helper function to build the input prompt for our model | |
def build_prompt(task_instruction: str, format_instruction: str, xlam_format_tools: list, query: str): | |
prompt = f"[BEGIN OF TASK INSTRUCTION]\n{task_instruction}\n[END OF TASK INSTRUCTION]\n\n" | |
prompt += f"[BEGIN OF AVAILABLE TOOLS]\n{json.dumps(xlam_format_tools)}\n[END OF AVAILABLE TOOLS]\n\n" | |
prompt += f"[BEGIN OF FORMAT INSTRUCTION]\n{format_instruction}\n[END OF FORMAT INSTRUCTION]\n\n" | |
prompt += f"[BEGIN OF QUERY]\n{query}\n[END OF QUERY]\n\n" | |
return prompt | |
def to_model(query): | |
xlam_format_tools = convert_to_xlam_tool(openai_format_tools) | |
content = build_prompt(task_instruction, format_instruction, xlam_format_tools, query) | |
#print(f"content: {content}") | |
messages=[ | |
{ 'role': 'user', 'content': content} | |
] | |
inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt").to(model.device) | |
# tokenizer.eos_token_id is the id of <|EOT|> token | |
outputs = model.generate(inputs, max_new_tokens=512, do_sample=False, num_return_sequences=1, eos_token_id=tokenizer.eos_token_id) | |
return (tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)) | |
def to_app(callobj): | |
callobject = json.loads(callobj) | |
callfunctions = [] | |
callarguments = [] | |
for tool_call in callobject['tool_calls']: | |
callfunctions.append(tool_call['name']) | |
callarguments.append(list(tool_call['arguments'].values())) | |
#print(f"fuctions: {callfunctions}") | |
#print(f"arguments: {callarguments}") | |
return callfunctions, callarguments | |
#===========sample application=================================== | |
def application(callfunctions, callarguments): | |
##los application functions | |
def get_weather(location): | |
return (print(f"weather function executed with city {location}")) | |
def searchLA(laid, phnumber): | |
query = f"""SELECT * from sfdc_la where LAid = '{laid}'""" | |
cursor.execute(query) | |
result = cursor.fetchall() | |
return (print(result)) | |
losfunctions_list = ['get_weather','searchLA'] | |
for i, functionname in enumerate(callfunctions): | |
if functionname in losfunctions_list: | |
function = globals().get(functionname) or locals().get(functionname) | |
if function: | |
arguments = callarguments[i] | |
out = function(*arguments) | |
return out | |
#out = application(callfunctions, callarguments) | |
def process_input(input_str): | |
if not input_str: | |
return "No input provided!" | |
try: | |
model_out = to_model(input_str) | |
funs, args = to_app(model_out) | |
output_obj = application(funs, args) | |
return output_obj | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
#messages = [] | |
#for val in history: | |
# if val[0]: | |
# messages.append({"role": "user", "content": val[0]}) | |
# if val[1]: | |
# messages.append({"role": "assistant", "content": val[1]}) | |
#messages.append({"role": "user", "content": message}) | |
response = process_input(message) | |
#for message in client.chat_completion( | |
# messages, | |
# max_tokens=max_tokens, | |
# stream=True, | |
# temperature=temperature, | |
# top_p=top_p, | |
#): | |
# token = message.choices[0].delta.content | |
# response += token | |
# yield response | |
return response | |
""" | |
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface | |
""" | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
], | |
) | |
if __name__ == "__main__": | |
demo.launch() | |