IM.FIN2 / app.py
James MacQuillan
push
39431bf
from googlesearch import search
import requests
import trafilatura
from concurrent.futures import ThreadPoolExecutor
import json
import ast
import gradio as gr
from huggingface_hub import InferenceClient
import tiktoken
import time
import os
client = InferenceClient(api_key=os.getenv('HF_TOKEN'))
dots_animation = [
"Working on your response.",
"Working on your response..",
"Working on your response...",
]
arrow_animation = [
"----> Preparing your answer",
"---> Preparing your answer",
"--> Preparing your answer",
"-> Preparing your answer",
"> Preparing your answer",
]
loader_animation = [
"[ ] Fetching data...",
"[= ] Fetching data...",
"[== ] Fetching data...",
"[=== ] Fetching data...",
"[====] Fetching data...",
]
typing_animation = [
"Bot is typing.",
"Bot is typing..",
"Bot is typing...",
]
rotating_text_animation = [
"Working |",
"Working /",
"Working -",
"Working \\",
]
def tokenize_with_qwen(text):
"""
Tokenizes the input text using a compatible tokenizer for Qwen models and returns a string of tokens.
Parameters:
text (list or str): The text (or list of strings) to be tokenized.
Returns:
str: A single string of tokens, truncated to 32,500 tokens if necessary.
"""
# Ensure input is a string (concatenate if it's a list)
if isinstance(text, list):
text = ''.join(text)
elif not isinstance(text, str):
raise ValueError("Input must be a string or a list of strings.")
# Use a base encoding like cl100k_base for GPT-style tokenization
encoding = tiktoken.get_encoding("cl100k_base")
# Tokenize the text into token IDs
token_ids = encoding.encode(text)
# Decode each token ID into its string representation
token_strings = [encoding.decode_single_token_bytes(token_id).decode('utf-8', errors='replace') for token_id in token_ids]
# Truncate if the number of tokens exceeds 32,500
if len(token_strings) > 23000:
token_strings = token_strings[:17000]
# Join tokens back into a single string
stringed_tokens = ''.join(token_strings)
return stringed_tokens
def fetch_and_process_url(link):
try:
# Fetch URL content
req = requests.get(link, headers={"User-Agent": "Mozilla/5.0"}, timeout=10)
html_content = req.text # Use raw HTML directly
# Extract main content using trafilatura
return trafilatura.extract(html_content)
except Exception as e:
return f"Error fetching or processing {link}: {e}"
def perform_search(query, num_results=5):
try:
# Perform Google search
urls = [url for url in search(query, num_results=num_results)]
print("URLs Found:")
print(urls)
except Exception as e:
print(f"An error occurred during search: {e}")
return
# Fetch and process URLs in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
results = list(executor.map(fetch_and_process_url, urls))
# Combine results into a single formatted output
formatted_text = '\n\n'.join(filter(None, results)) # Skip None or empty results
return formatted_text
def chat(user_input,history):
format_template = examples = """
{"user_input": "cisco systems stock price for the last 4 days", "searches": ["cisco stock price last 4 days", "cisco systems stock historical data", "current price of Cisco Systems", "cisco stock price chart"]},
{"user_input": "Apple stock price yesterday", "searches": ["Apple stock price yesterday", "historical price of Apple stock"]},
{"user_input": "Tesla quarterly revenue", "searches": ["Tesla latest quarterly revenue", "Tesla revenue report Q3 2024"]},
{"user_input": "CAPM model for Tesla", "searches": ["Tesla stock beta value", "current risk-free rate", "expected market return for CAPM model"]},
{"user_input": "Hi", "searches": []},
{"user_input": "Who are you?", "searches": []},
{"user_input": "Google earnings per share last quarter", "searches": ["Google EPS last quarter", "Google quarterly earnings report"]},
{"user_input": "Calculate WACC for Microsoft", "searches": ["Microsoft cost of equity", "Microsoft cost of debt", "Microsoft capital structure", "current risk-free rate", "Microsoft beta"]},
{"user_input": "Show Amazon stock chart for last 5 years", "searches": ["Amazon stock chart last 5 years", "Amazon historical price data"]},
{"user_input": "GDP of China in 2023", "searches": ["China GDP 2023", "latest GDP figures for China"]},
{"user_input": "Portfolio optimization model", "searches": ["efficient frontier portfolio theory", "input data for portfolio optimization model", "expected returns and covariances"]},
{"user_input": "Find current inflation rate in the US", "searches": ["current US inflation rate", "US CPI data"]},
{"user_input": "What is NPV and how do you calculate it?", "searches": ["definition of NPV", "how to calculate NPV"]},
{"user_input": "Dividend yield for Coca-Cola", "searches": ["Coca-Cola dividend yield", "latest Coca-Cola dividend data"]},
{"user_input": "Sharpe ratio formula example", "searches": ["Sharpe ratio formula", "example calculation of Sharpe ratio"]},
{"user_input": "What is the current Fed interest rate?", "searches": ["current Federal Reserve interest rate", "latest Fed interest rate decision"]},
{"user_input": "Generate DCF model for Tesla", "searches": ["Tesla free cash flow data", "Tesla growth rate projections", "current discount rate for Tesla", "steps to build a DCF model"]},
{"user_input": "Tell me a joke", "searches": []},
{"user_input": "Explain the concept of opportunity cost", "searches": ["definition of opportunity cost", "examples of opportunity cost in economics"]}
"""
search_messages = history or [{'role': 'system', 'content': 'you are IM.FIN'}]
print(f'here is the search messages: \n\n\n\n {search_messages} \n\n\n')
search_messages.append({'role':'user','content':f'based on {user_input} and {search_messages}, respond with a list of google searches that will give the correct data to respond, respond in this format: {format_template} with up to 3 searches but try and limit it to the minimum needed. RETURN 1 DICTIONARY IN THE SPECIFIED FORMAT BASED ON THE USER INPUT {user_input}. RETURN ABSOLUTELY NO OTHER TEXT OTHER THAN THE DICTIONARY WITH THE SEARCHES. here is the history use it {search_messages}'})
for value in dots_animation:
yield value
response_for_searches = client.chat.completions.create(
model='Qwen/Qwen2.5-72B-Instruct',
messages=search_messages
)
searches_resp = response_for_searches.choices[0].message.content
yield dots_animation[1]
print(f'search model response: {searches_resp}')
searches = ast.literal_eval(searches_resp)
search_messages.append(searches)
print(searches)
yield arrow_animation[0]
summary_messages = [
{'role':'system','content':'you are IM.FIN'}
]
var = [perform_search(search) for search in searches['searches']]
yield arrow_animation[1]
var = tokenize_with_qwen(var)
yield arrow_animation[2]
print(f'the type of var is {type(var)}')
var = ''.join(var)
print(f'the data: {var}')
yield arrow_animation[3]
summary_messages.append({'role':'user','content':f'use {user_input} to summarize {var}, return nothing other than the summarized response. MAKE SURE TO PICK OUT THE NUMERICAL DATA BASED ON THE USER RESPONSE'})
for value in arrow_animation:
time.sleep(1)
yield value
response_for_chat = client.chat.completions.create(
model='Qwen/Qwen2.5-72B-Instruct',
messages=summary_messages,
max_tokens=2000
)
summary = response_for_chat.choices[0].message.content
for value in arrow_animation:
yield value
final_messages = [
{'role':'system','content':'you are IM.FIN, you are a virtual stock analyst built to automate investing tasks and simulate the intelligence of stock analysts, you can form opinions based on data and form conclusions like stock analysts, you were created by quantineuron.com. KEEP RESPONSES CONCISE, ANSWERING THE USERS INPUT '}
]
print(f'here is the summary: {summary}')
final_messages.append({'role':'user','content': f'based on this data {summary}, answer {user_input}, here is the history {final_messages}. ONLY USE THE DATA THAT IS NEEDED AND ACT AS THOUGH THAT DATA IS YOURS AND CORRECT. KEEP RESPONSES CONCISE. IF THE DATA PROVIDED IS NOT RELEVANT TO THE USERS REQUEST, IGNORE IT AND ANSWER NORMALLY'})
yield typing_animation[0]
final_response = client.chat.completions.create(
model='Qwen/Qwen2.5-72B-Instruct',
messages=final_messages,
max_tokens=2000,
stream=True
)
yield typing_animation[1]
response = ""
for chunk in final_response:
content = chunk.choices[0].delta.content or ''
response += content
yield response
final_messages.append(response)
search_messages.append(response)
print(f'\n\n here is the chat history for the final response \n\n\n {response}')
avatar = 'IM.4o_logo.png'
theme = gr.themes.Soft(
primary_hue="sky",
neutral_hue="zinc",
)
chatbot = gr.Chatbot(
layout='panel',
avatar_images=[None,avatar],
height=600,
)
# Add the CSS to the ChatInterface
gr.ChatInterface(
chatbot=chatbot,
fn=chat,
theme=theme
).launch()