import os |
from datetime import datetime |
import pymongo |
from urllib.parse import quote_plus |
from bson.objectid import ObjectId |
raw_username = os.getenv("DB_USERNAME") |
raw_password = os.getenv("DB_PASSWORD") |
if not raw_username or not raw_password: |
raise Exception("Database credentials are missing. Check your environment variables.") |
username = quote_plus(raw_username) |
password = quote_plus(raw_password) |
cluster = "cluster0.yxjok.mongodb.net" |
DB_URI = f"mongodb+srv://{username}:{password}@{cluster}/?retryWrites=true&w=majority&appName=Cluster0" |
client = pymongo.MongoClient(DB_URI) |
db = client["billing_app"] |
users_coll = db["users"] |
sections_coll = db["sections"] |
bills_coll = db["bills"] |
sections_coll.create_index( |
[("owner_email", 1), ("section_name", 1)], |
unique=True |
) |
def create_user(email, password): |
if users_coll.find_one({"email": email}): |
raise ValueError("Email already exists") |
users_coll.insert_one({"email": email, "password": password}) |
def get_user_by_email_and_password(email, password): |
return users_coll.find_one({"email": email, "password": password}) |
def create_section(owner_email, section_name, participants_list): |
""" |
Creates a new section for the given user. |
Fails if that user already has a section with the same name. |
""" |
existing = sections_coll.find_one({"owner_email": owner_email, "section_name": section_name}) |
if existing: |
raise ValueError("Section with this name already exists for your account.") |
sections_coll.insert_one({ |
"owner_email": owner_email, |
"section_name": section_name, |
"participants": participants_list |
}) |
def update_section(owner_email, section_name, participants_list): |
sections_coll.update_one( |
{"owner_email": owner_email, "section_name": section_name}, |
{"$set": {"participants": participants_list}}, |
upsert=True |
) |
def get_section(owner_email, section_name): |
return sections_coll.find_one({"owner_email": owner_email, "section_name": section_name}) |
def delete_section(owner_email, section_name): |
sections_coll.delete_one({"owner_email": owner_email, "section_name": section_name}) |
bills_coll.delete_many({"owner_email": owner_email, "section_name": section_name}) |
def get_all_sections(owner_email): |
""" |
Returns all sections for that specific user/email. |
""" |
sections = sections_coll.find({"owner_email": owner_email}) |
return [sec["section_name"] for sec in sections] |
def create_bill(owner_email, section_name, participant, item, price): |
bills_coll.insert_one({ |
"owner_email": owner_email, |
"section_name": section_name, |
"participant": participant, |
"item": item, |
"price": float(price), |
"timestamp": datetime.now().date().isoformat() |
}) |
def get_submitted_items(owner_email, section_name): |
pipeline = [ |
{"$match": {"owner_email": owner_email, "section_name": section_name}}, |
{"$group": {"_id": "$item"}} |
] |
results = list(bills_coll.aggregate(pipeline)) |
return [r["_id"] for r in results] |
def get_billing_history(owner_email, section_name): |
pipeline = [ |
{"$match": {"owner_email": owner_email, "section_name": section_name}}, |
{ |
"$group": { |
"_id": "$participant", |
"total_price": {"$sum": "$price"}, |
"last_updated": {"$max": "$timestamp"} |
} |
} |
] |
return list(bills_coll.aggregate(pipeline)) |
def remove_items(owner_email, section_name, participant, items_to_remove): |
if items_to_remove is None: |
bills_coll.delete_many({ |
"owner_email": owner_email, |
"section_name": section_name, |
"participant": participant |
}) |
else: |
for item in items_to_remove: |
bills_coll.delete_many({ |
"owner_email": owner_email, |
"section_name": section_name, |
"participant": participant, |
"item": item |
}) |
def get_most_bought_item(owner_email, section_name): |
pipeline = [ |
{"$match": {"owner_email": owner_email, "section_name": section_name}}, |
{ |
"$group": { |
"_id": "$item", |
"count": {"$sum": 1}, |
"max_price": {"$max": "$price"} |
} |
}, |
{"$sort": {"count": -1, "max_price": -1}}, |
{"$limit": 1} |
] |
result = list(bills_coll.aggregate(pipeline)) |
if result: |
return (result[0]["_id"], result[0]["count"], result[0]["max_price"]) |
return None |