|
import streamlit as st |
|
import subprocess |
|
import os |
|
import re |
|
import time |
|
import base64 |
|
from patterns import count_patterns, list_patterns, pdf_request_patterns, greetings, farewell |
|
from google.cloud import storage |
|
from tempfile import NamedTemporaryFile |
|
import json |
|
|
|
|
|
BASE_GCS_PATH = "gs://fynd-assets-private/documents/daytrader/" |
|
LOCAL_DOWNLOAD_PATH = "/Users/ninadmandavkar/Desktop/daytrader/08-2024/" |
|
|
|
def fetch_pdf_from_gcs(pdf_name): |
|
gcs_search_path = f"{BASE_GCS_PATH}**/*{pdf_name}*.pdf" |
|
result = subprocess.run( |
|
["gsutil", "-m", "cp", "-r", gcs_search_path, LOCAL_DOWNLOAD_PATH], |
|
capture_output=True, |
|
text=True |
|
) |
|
return result |
|
|
|
def count_invoices_in_month(month_year): |
|
gcs_search_path = f"{BASE_GCS_PATH}PDFs/**/*{month_year}*.pdf" |
|
result = subprocess.run( |
|
["gsutil", "ls", gcs_search_path], |
|
capture_output=True, |
|
text=True |
|
) |
|
if result.returncode == 0: |
|
files = [os.path.basename(line)[:-4] for line in result.stdout.strip().split("\n") if line] |
|
return len(files), files |
|
else: |
|
return None, [] |
|
|
|
def extract_pdf_id(text): |
|
match = re.search(r'([A-Z]{2}-[A-Z]-[A-Z0-9]+-FY\d{2})', text) |
|
return match.group(0) if match else None |
|
|
|
def month_to_str(month): |
|
month_map = { |
|
"january": "01", "february": "02", "march": "03", "april": "04", |
|
"may": "05", "june": "06", "july": "07", "august": "08", |
|
"september": "09", "october": "10", "november": "11", "december": "12" |
|
} |
|
if month.isdigit() and 1 <= int(month) <= 12: |
|
return f"{int(month):02d}-2024" |
|
return month_map.get(month.lower(), None) |
|
|
|
def typing_effect(text): |
|
typed_text_placeholder = st.empty() |
|
styled_text = f""" |
|
<span style="background-image: linear-gradient(to right, #800000, #ff0000); |
|
-webkit-background-clip: text; |
|
-webkit-text-fill-color: transparent;"> |
|
{text} |
|
</span> |
|
""" |
|
for i in range(len(text) + 1): |
|
typed_text_placeholder.markdown(styled_text.replace(text, text[:i]), unsafe_allow_html=True) |
|
time.sleep(0.018) |
|
typed_text_placeholder.markdown(styled_text, unsafe_allow_html=True) |
|
|
|
def chatbot_response(user_input): |
|
lower_input = user_input.lower() |
|
|
|
if any(greet in lower_input for greet in greetings): |
|
return "Hello. Fynder here. How can I help you today?" |
|
|
|
elif any(fare in lower_input for fare in farewell): |
|
return "Goodbye! Let me know if you need help again." |
|
|
|
|
|
for pattern in pdf_request_patterns: |
|
pdf_request_match = re.search(pattern, lower_input) |
|
if pdf_request_match: |
|
pdf_id = pdf_request_match.group(1) |
|
return f"Looking for PDF for invoice '{pdf_id}'. Please wait...", pdf_id |
|
|
|
|
|
for pattern in count_patterns: |
|
month_year_match = re.search(pattern, lower_input) |
|
if month_year_match: |
|
month_year = month_year_match.group(1) |
|
if len(month_year.split()) == 2: |
|
month_str = month_to_str(month_year.split()[0]) |
|
if month_str: |
|
month_year = f"{month_str}-{month_year.split()[1]}" |
|
count, _ = count_invoices_in_month(month_year) |
|
if count is not None: |
|
return f"There are {count} invoices generated in {month_year}." |
|
else: |
|
return "I encountered an error while fetching the invoice count. Please try again later." |
|
|
|
|
|
for pattern in list_patterns: |
|
list_match = re.search(pattern, lower_input) |
|
if list_match: |
|
month_year = list_match.group(1) |
|
if len(month_year.split()) == 2: |
|
month_str = month_to_str(month_year.split()[0]) |
|
if month_str: |
|
month_year = f"{month_str}-{month_year.split()[1]}" |
|
_, files = count_invoices_in_month(month_year) |
|
if files: |
|
return [f"**{file}**" for file in files] |
|
else: |
|
return "No invoices found for that month." |
|
|
|
pdf_id = extract_pdf_id(user_input) |
|
if pdf_id: |
|
if "when" in lower_input and "created" in lower_input: |
|
return f"Checking creation date for '{pdf_id}'. Please wait...", pdf_id, True |
|
else: |
|
return f"Looking for '{pdf_id}' in GCS fynd prod 393805 bucket. Please wait...", pdf_id |
|
|
|
return "I didn't quite understand that ;(" |
|
|
|
def download_pdf(file_path): |
|
"""Reads a PDF file and returns its content in Base64 format.""" |
|
with open(file_path, "rb") as f: |
|
pdf_data = f.read() |
|
return base64.b64encode(pdf_data).decode('utf-8') |
|
|
|
def main(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uploaded_file = st.file_uploader("Upload your fynd_prod.json file", type="json") |
|
|
|
if uploaded_file is not None: |
|
|
|
with NamedTemporaryFile(delete=False, suffix=".json") as temp_file: |
|
temp_file.write(uploaded_file.read()) |
|
json_path = temp_file.name |
|
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ninadmandavkar/Desktop/Fynd/alerter/fynd-prod.json" |
|
|
|
with open(json_path) as f: |
|
credentials_info = json.load(f) |
|
st.write("Loaded credentials for:", credentials_info.get("client_email")) |
|
|
|
|
|
st.markdown( |
|
""" |
|
<h1 style=" |
|
background-image: linear-gradient(to right, #800000, #ff0000); |
|
-webkit-background-clip: text; |
|
-webkit-text-fill-color: transparent; |
|
font-size: 3rem; /* Adjust size as needed */ |
|
text-align: center; /* Center the title */ |
|
"> |
|
Fynder |
|
</h1> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
user_input = st.text_input("", value="", help="Type your prompt here") |
|
|
|
if user_input: |
|
response = chatbot_response(user_input) |
|
|
|
if isinstance(response, tuple): |
|
bot_response, pdf_name = response |
|
typing_effect(bot_response) |
|
|
|
result = fetch_pdf_from_gcs(pdf_name) |
|
if result.returncode == 0: |
|
downloaded_files = [f for f in os.listdir(LOCAL_DOWNLOAD_PATH) if pdf_name in f and f.endswith('.pdf')] |
|
|
|
if downloaded_files: |
|
for pdf_file in downloaded_files: |
|
file_path = os.path.join(LOCAL_DOWNLOAD_PATH, pdf_file) |
|
pdf_data = download_pdf(file_path) |
|
|
|
|
|
download_button_html = f""" |
|
<a href="data:application/pdf;base64,{pdf_data}" download="{pdf_file}" |
|
style=" |
|
display: inline-block; |
|
padding: 10px 20px; |
|
color: white; |
|
text-decoration: none; |
|
border-radius: 20px; |
|
background-image: linear-gradient(to right, #800000, #ff0000); |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
"> |
|
Download {pdf_file} |
|
</a> |
|
""" |
|
st.markdown(download_button_html, unsafe_allow_html=True) |
|
else: |
|
typing_effect("No files matching that name were found.") |
|
else: |
|
typing_effect("There was an error fetching the PDF. Please check the name and try again.") |
|
st.write(result.stderr) |
|
|
|
elif isinstance(response, list): |
|
for pdf_file in response: |
|
typing_effect(pdf_file) |
|
else: |
|
typing_effect(response) |
|
else: |
|
st.markdown("") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|