IAMTFRMZA's picture
Update app.py
29d68fa verified
import streamlit as st
import os
import time
import re
import requests
import json
from PIL import Image
from io import BytesIO
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Configuration ------------------
st.set_page_config(page_title="Schlager Forrestdale DocAIAssist", layout="wide", initial_sidebar_state="collapsed")
st.title("πŸ“„ Schlager Forrestdale Document Assistant")
st.caption("Explore City of Armadale construction documents using AI + OCR 🧐")
# ------------------ Load API Key ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Tabs ------------------
tab1, tab2 = st.tabs(["πŸ“‘ Contract", "πŸ“ Technical"])
# ------------------ Contract Tab ------------------
with tab1:
ASSISTANT_ID = "asst_KsQRedoJUnEeStzfox1o06lO"
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_url" not in st.session_state:
st.session_state.image_url = None
if "image_updated" not in st.session_state:
st.session_state.image_updated = False
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
with st.sidebar:
st.header("ℹ️ Contract Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_url = None
st.session_state.image_updated = False
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“‘ Show Page Image", value=True)
keyword = st.text_input("Search by Keyword", placeholder="e.g. defects, WHS, delay")
if st.button("πŸ”Ž Search Keyword") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section_options = [
"Select a section...",
"1. Formal Instrument of Contract",
"2. Offer and Acceptance",
"3. Key Personnel",
"4. Contract Pricing",
"5. Specifications",
"6. WHS Policies",
"7. Penalties and Delays",
"8. Dispute Resolution",
"9. Principal Obligations"
]
section = st.selectbox("πŸ“„ Jump to Section", section_options)
if section != section_options[0]:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List all contractual obligations",
"Summarize payment terms",
"List WHS responsibilities",
"Find delay-related penalties",
"Extract dispute resolution steps"
]
action = st.selectbox("βš™οΈ Common Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### 🧠 Ask a Document-Specific Question")
user_input = st.chat_input("Example: What is the defects liability period?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ€– Thinking..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
match = re.search(r'Document Reference:\s*(.*?),\s*Page\s*(\d+)', reply)
if match:
doc, page = match.group(1).strip(), int(match.group(2))
folder = quote(doc)
img_url = f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/{folder}/{folder}_page_{page:04d}.png"
st.session_state.image_url = img_url
st.session_state.image_updated = True
break
else:
st.error("❌ Assistant failed.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
with image_col:
if show_image and st.session_state.image_url:
try:
r = requests.get(st.session_state.image_url)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption="πŸ“„ OCR Page Image", use_container_width=True)
except Exception as e:
st.error(f"πŸ–ΌοΈ Image failed: {e}")
# ------------------ Technical Tab ------------------
with tab2:
ASSISTANT_ID = "asst_DjvuWBc7tCvMbAhY7n1em4BZ"
if "tech_messages" not in st.session_state:
st.session_state.tech_messages = []
if "tech_thread_id" not in st.session_state:
st.session_state.tech_thread_id = None
if "tech_results" not in st.session_state:
st.session_state.tech_results = []
st.session_state.tech_lightbox = None
tech_input = st.chat_input("Ask about plans, drawings or components")
if tech_input:
st.session_state.tech_messages.append({"role": "user", "content": tech_input})
if st.session_state.tech_messages and st.session_state.tech_messages[-1]["role"] == "user":
try:
if st.session_state.tech_thread_id is None:
thread = client.beta.threads.create()
st.session_state.tech_thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.tech_thread_id,
role="user",
content=st.session_state.tech_messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.tech_thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ” Searching technical drawings..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.tech_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.tech_thread_id)
for msg in reversed(messages.data):
if msg.role == "assistant":
content = msg.content[0].text.value
st.session_state.tech_messages.append({"role": "assistant", "content": content})
try:
st.session_state.tech_results = json.loads(content.strip("`json "))
except:
st.session_state.tech_results = []
break
except Exception as e:
st.error(f"❌ Technical Assistant Error: {e}")
with st.expander("πŸ”§ Options (Filter + Pagination)", expanded=False):
disciplines = sorted(set(d.get("discipline", "") for d in st.session_state.tech_results))
selected = st.selectbox("🌍 Filter by discipline", ["All"] + disciplines)
page_size = 8
page = st.number_input("Page", min_value=1, step=1, value=1)
if st.session_state.tech_results:
st.subheader("πŸ“‚ Results")
results = [r for r in st.session_state.tech_results if selected == "All" or r.get("discipline") == selected]
paged = results[(page - 1) * page_size : page * page_size]
cols = st.columns(4)
for i, item in enumerate(paged):
with cols[i % 4]:
st.markdown(f"**πŸ“ {item['drawing_number']} ({item['discipline']})**")
st.caption(item.get("summary", ""))
image_urls = item.get("images", [])
if not image_urls:
st.warning("⚠️ No image available.")
else:
url = image_urls[0]
st.caption(f"πŸ”— Image URL: {url}")
try:
st.image(url, caption=f"{item['drawing_number']} - Page 1", use_container_width=True)
except Exception as e:
st.error(f"❌ Could not load image: {e}")
if st.button("πŸ–ΌοΈ View Drawing Details", key=f"thumb_{i}"):
st.session_state.tech_lightbox = url
if st.session_state.tech_lightbox:
st.image(st.session_state.tech_lightbox, caption="πŸ” Enlarged Drawing Preview", use_container_width=True)
if st.button("❌ Close Preview"):
st.session_state.tech_lightbox = None
st.rerun()
else:
for msg in st.session_state.tech_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)