get-me-a-job-ap / app.py
yashgori20's picture
Upload 5 files
714150d verified
raw
history blame
11.9 kB
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr
import firebase_admin
from firebase_admin import credentials, firestore, auth
import jwt
import bcrypt
import os
from datetime import datetime, timedelta
from typing import Optional
import uvicorn
import json
# Initialize FastAPI app
app = FastAPI(title="Get Me A Job API", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security
security = HTTPBearer()
JWT_SECRET = os.getenv("JWT_SECRET", "your-super-secret-key-change-in-production")
JWT_ALGORITHM = "HS256"
# Firebase setup
def init_firebase():
"""Initialize Firebase Admin SDK"""
try:
# Try to get service account from environment variable
service_account_json = os.getenv("FIREBASE_SERVICE_ACCOUNT")
if service_account_json:
service_account_info = json.loads(service_account_json)
cred = credentials.Certificate(service_account_info)
else:
# Fallback to service account file (for local development)
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
print("Firebase initialized successfully")
except Exception as e:
print(f"Firebase initialization failed: {e}")
# For development, continue without Firebase
pass
# Initialize Firebase
init_firebase()
# Get Firestore client
try:
db = firestore.client()
except:
db = None
print("Firestore client not available - running in development mode")
# Pydantic models
class UserRegister(BaseModel):
email: EmailStr
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: str
email: str
class TokenResponse(BaseModel):
token: str
user: UserResponse
class SubscriptionResponse(BaseModel):
isPaid: bool
plan: str
usageLeft: Optional[int] = None
usageLimit: Optional[int] = None
class UpdateResponse(BaseModel):
latestVersion: str
downloadUrl: str
# Helper functions
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_jwt_token(user_id: str, email: str) -> str:
payload = {
'user_id': user_id,
'email': email,
'exp': datetime.utcnow() + timedelta(days=7)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> dict:
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
return verify_jwt_token(credentials.credentials)
# API Endpoints
@app.get("/")
async def root():
return {"message": "Get Me A Job API is running!", "firebase_status": "connected" if db else "disconnected"}
@app.post("/getmeajob/api/auth/register", response_model=TokenResponse)
async def register(user: UserRegister):
if not db:
raise HTTPException(status_code=503, detail="Database not available")
try:
# Check if user already exists
users_ref = db.collection('users')
existing_user = users_ref.where('email', '==', user.email).limit(1).get()
if existing_user:
raise HTTPException(status_code=400, detail="Email already exists")
# Hash password and create user
password_hash = hash_password(user.password)
# Create user document
user_data = {
'email': user.email,
'password_hash': password_hash,
'created_at': firestore.SERVER_TIMESTAMP,
'updated_at': firestore.SERVER_TIMESTAMP
}
# Add user to Firestore
user_ref = users_ref.add(user_data)
user_id = user_ref[1].id
# Create default subscription (free plan)
subscription_data = {
'user_id': user_id,
'plan': 'free',
'is_active': True,
'created_at': firestore.SERVER_TIMESTAMP,
'expires_at': None
}
db.collection('subscriptions').add(subscription_data)
# Create usage record (10 free uses)
usage_data = {
'user_id': user_id,
'usage_count': 10,
'last_reset': firestore.SERVER_TIMESTAMP,
'created_at': firestore.SERVER_TIMESTAMP
}
db.collection('usage').add(usage_data)
# Generate JWT token
token = create_jwt_token(user_id, user.email)
return TokenResponse(
token=token,
user=UserResponse(id=user_id, email=user.email)
)
except Exception as e:
if "Email already exists" in str(e):
raise e
print(f"Registration error: {e}")
raise HTTPException(status_code=500, detail="Registration failed")
@app.post("/getmeajob/api/auth/login", response_model=TokenResponse)
async def login(user: UserLogin):
if not db:
raise HTTPException(status_code=503, detail="Database not available")
try:
# Get user by email
users_ref = db.collection('users')
user_query = users_ref.where('email', '==', user.email).limit(1).get()
if not user_query:
raise HTTPException(status_code=401, detail="Invalid credentials")
user_doc = user_query[0]
user_data = user_doc.to_dict()
# Verify password
if not verify_password(user.password, user_data['password_hash']):
raise HTTPException(status_code=401, detail="Invalid credentials")
# Generate JWT token
token = create_jwt_token(user_doc.id, user_data['email'])
return TokenResponse(
token=token,
user=UserResponse(id=user_doc.id, email=user_data['email'])
)
except HTTPException:
raise
except Exception as e:
print(f"Login error: {e}")
raise HTTPException(status_code=500, detail="Login failed")
@app.get("/getmeajob/api/auth/verify")
async def verify_token(current_user: dict = Depends(get_current_user)):
return {
"valid": True,
"user": {
"id": current_user["user_id"],
"email": current_user["email"]
}
}
@app.get("/getmeajob/api/subscription/status", response_model=SubscriptionResponse)
async def get_subscription_status(current_user: dict = Depends(get_current_user)):
if not db:
raise HTTPException(status_code=503, detail="Database not available")
try:
user_id = current_user["user_id"]
# Get subscription info
subscriptions_ref = db.collection('subscriptions')
subscription_query = subscriptions_ref.where('user_id', '==', user_id).where('is_active', '==', True).limit(1).get()
if not subscription_query:
# Create default free subscription if none exists
subscription_data = {
'user_id': user_id,
'plan': 'free',
'is_active': True,
'created_at': firestore.SERVER_TIMESTAMP,
'expires_at': None
}
db.collection('subscriptions').add(subscription_data)
plan = 'free'
else:
subscription_doc = subscription_query[0]
subscription_data = subscription_doc.to_dict()
plan = subscription_data.get('plan', 'free')
is_paid = plan != 'free'
# Get usage info for free users
usage_left = None
usage_limit = None
if not is_paid:
usage_ref = db.collection('usage')
usage_query = usage_ref.where('user_id', '==', user_id).limit(1).get()
if usage_query:
usage_doc = usage_query[0]
usage_data = usage_doc.to_dict()
usage_left = max(0, usage_data.get('usage_count', 0))
usage_limit = 10 # Free tier limit
else:
# Create usage record if none exists
usage_data = {
'user_id': user_id,
'usage_count': 10,
'last_reset': firestore.SERVER_TIMESTAMP,
'created_at': firestore.SERVER_TIMESTAMP
}
db.collection('usage').add(usage_data)
usage_left = 10
usage_limit = 10
return SubscriptionResponse(
isPaid=is_paid,
plan=plan,
usageLeft=usage_left,
usageLimit=usage_limit
)
except Exception as e:
print(f"Subscription status error: {e}")
raise HTTPException(status_code=500, detail="Failed to get subscription status")
@app.get("/getmeajob/api/check-updates", response_model=UpdateResponse)
async def check_updates():
return UpdateResponse(
latestVersion="0.1.1", # Update this when you release new versions
downloadUrl="https://yashgori20.vercel.app/getmeajob/download"
)
# Usage tracking endpoint (for decrementing usage count)
@app.post("/getmeajob/api/usage/track")
async def track_usage(current_user: dict = Depends(get_current_user)):
if not db:
raise HTTPException(status_code=503, detail="Database not available")
try:
user_id = current_user["user_id"]
# Get current usage
usage_ref = db.collection('usage')
usage_query = usage_ref.where('user_id', '==', user_id).limit(1).get()
if usage_query:
usage_doc = usage_query[0]
usage_data = usage_doc.to_dict()
current_count = usage_data.get('usage_count', 0)
if current_count > 0:
# Decrement usage count
usage_doc.reference.update({
'usage_count': current_count - 1,
'updated_at': firestore.SERVER_TIMESTAMP
})
return {"success": True, "remaining": current_count - 1}
else:
return {"success": False, "error": "No usage left"}
else:
return {"success": False, "error": "Usage record not found"}
except Exception as e:
print(f"Usage tracking error: {e}")
raise HTTPException(status_code=500, detail="Failed to track usage")
# Health check endpoint
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"firebase_connected": db is not None
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) # Port 7860 is standard for Hugging Face Spaces