lead-sparks / app.py
KRISH09bha's picture
Create app.py
0d936bc verified
from fastapi import FastAPI, Query, HTTPException, Depends
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from typing import List, Optional, Dict, Any
from bs4 import BeautifulSoup
from datetime import datetime
import requests
import time
import csv
import os
import json
from bson import json_util
from fastapi.middleware.cors import CORSMiddleware
import pymongo
from pymongo import MongoClient
app = FastAPI(title="Yellow Pages Scraper API",
description="API to scrape business listings from Yellow Pages Canada")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # For development only - restrict this in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# MongoDB Connection
MONGO_URI = "mongodb+srv://0808ds221063ies:[email protected]/?retryWrites=true&w=majority&appName=Cluster0" # Update with your MongoDB connection string
DB_NAME = "yellow_pages_data" # Name of your database
def get_database():
client = MongoClient(MONGO_URI)
return client[DB_NAME]
class YellowPagesScraper:
def __init__(self, occupation: str, city: Optional[str] = None):
self.searchOccupation = occupation
self.searchCity = city
# All Canadian provinces and territories
self.all_provinces = {
"AB": "Alberta",
"BC": "British Columbia",
"MB": "Manitoba",
"NB": "New Brunswick",
"NL": "Newfoundland and Labrador",
"NS": "Nova Scotia",
"NT": "Northwest Territories",
"NU": "Nunavut",
"ON": "Ontario",
"PE": "Prince Edward Island",
"QC": "Quebec",
"SK": "Saskatchewan",
"YT": "Yukon"
}
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.userAgent = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36'
}
self.total_entries = 0
self.province_stats = {}
self.scraped_data = [] # List to store all the scraped data
def construct_url(self, province, page_num):
if self.searchCity:
return f"https://www.yellowpages.ca/search/si/{page_num}/{self.searchOccupation}/{self.searchCity}+{province}"
else:
return f"https://www.yellowpages.ca/search/si/{page_num}/{self.searchOccupation}/{province}"
def getPageHTML(self, url):
try:
page = requests.get(url, headers=self.userAgent)
page.raise_for_status()
return BeautifulSoup(page.text, "html.parser")
except requests.exceptions.RequestException as e:
print(f"Error fetching URL {url}: {e}")
return None
def getListings(self, parsedHTML):
if parsedHTML:
return parsedHTML.findAll("div", {"class": "listing__content__wrap--flexed"})
return []
def get_max_pages(self, parsedHTML):
if not parsedHTML:
return 1
try:
pagination = parsedHTML.find("div", {"class": "pagination"})
if pagination:
pages = pagination.findAll("li")
if pages:
return int(pages[-2].text.strip())
except Exception:
pass
return 1
def addInfo(self, listings, province_code):
province_name = self.all_provinces.get(province_code, "Unknown")
entries_in_province = 0
for business in listings:
self.total_entries += 1
entries_in_province += 1
try:
businessName = business.find("div", "listing__title--wrap").h3.a.text.replace(",", "")
except Exception:
businessName = "N/A"
try:
businessPhone = business.find("ul", "mlr__submenu").li.h4.text
except Exception:
businessPhone = "N/A"
try:
businessWebsite = business.find("li", "mlr__item mlr__item--website").a["href"]
redirect_index = businessWebsite.find("redirect=")
if redirect_index != -1:
businessWebsite = businessWebsite[redirect_index + 9:]
businessWebsite = businessWebsite.replace("%3A", ":").replace("%2F", "/")
except Exception:
businessWebsite = "No Website"
try:
addressParsing = business.find("span", "listing__address--full").findAll("span", {
"class": "jsMapBubbleAddress"})
businessAddress = " ".join(info.text for info in addressParsing).replace(",", "")
except Exception:
businessAddress = "N/A"
# Add business data to the scraped_data list
business_data = {
"number": self.total_entries,
"name": businessName,
"address": businessAddress,
"province_code": province_code,
"province_name": province_name,
"phone": businessPhone,
"website": businessWebsite
}
self.scraped_data.append(business_data)
# Update province stats
self.province_stats[province_code] = entries_in_province
def scrape(self):
for province_code in self.all_provinces.keys():
print(f"Scraping {province_code} ({self.all_provinces[province_code]})")
page_num = 1
url = self.construct_url(province_code, page_num)
first_page_html = self.getPageHTML(url)
if not first_page_html:
print(f"Failed to fetch first page for {province_code}, skipping...")
continue
max_pages = self.get_max_pages(first_page_html)
listings = self.getListings(first_page_html)
if listings:
self.addInfo(listings, province_code)
for page_num in range(2, max_pages + 1):
url = self.construct_url(province_code, page_num)
page_html = self.getPageHTML(url)
if not page_html:
print(f"Failed to fetch page {page_num} for {province_code}, skipping...")
continue
listings = self.getListings(page_html)
if listings:
self.addInfo(listings, province_code)
time.sleep(1) # Respectful delay between requests
return self.scraped_data, self.total_entries, self.province_stats
# Serve the HTML interface at the root
@app.get("/", response_class=HTMLResponse)
def get_html():
with open("index.html", "r", encoding="utf-8") as f:
return f.read()
# Create MongoDB connection and directories at startup
@app.on_event("startup")
def startup_event():
os.makedirs("csv_files", exist_ok=True)
# Validate MongoDB connection
try:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
print("MongoDB connection successful")
except Exception as e:
print(f"Failed to connect to MongoDB: {e}")
# API endpoint for scraping with MongoDB storage
@app.get("/scrape")
def scrape_yellow_pages(
occupation: str = Query(..., description="Occupation or business type to search"),
city: Optional[str] = Query(None, description="City name (optional)"),
user_id: str = Query(..., description="User ID for storing the data")
):
scraper = YellowPagesScraper(occupation, city)
scraped_data, total, province_stats = scraper.scrape()
# Create the response data
response_data = {
"user_id": user_id,
"message": "Scraping completed successfully",
"total_entries": total,
"entries_by_province": province_stats,
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"search_params": {
"occupation": occupation,
"city": city
},
"data": scraped_data
}
# Store data in MongoDB
try:
db = get_database()
collection = db["scrape_results"]
# Insert the data
result = collection.insert_one(response_data)
# Add MongoDB ID to response
response_data["db_id"] = str(result.inserted_id)
print(f"Data stored in MongoDB with ID: {result.inserted_id}")
except Exception as e:
print(f"Error storing data in MongoDB: {e}")
# Return error in response but don't fail the request
response_data["db_error"] = str(e)
# Use json_util to handle MongoDB-specific data types
json_compatible_data = json.loads(json_util.dumps(response_data))
return JSONResponse(content=json_compatible_data)
# Endpoint to retrieve scrape results by user ID
@app.get("/results/{user_id}")
def get_user_results(user_id: str):
try:
db = get_database()
collection = db["scrape_results"]
# Find all records for this user
results = list(collection.find({"user_id": user_id}))
if not results:
return JSONResponse(
status_code=404,
content={"message": f"No results found for user ID: {user_id}"}
)
# Use json_util to handle MongoDB-specific data types
json_compatible_results = json.loads(json_util.dumps({"results": results}))
return JSONResponse(content=json_compatible_results)
except Exception as e:
return JSONResponse(
status_code=500,
content={"message": f"Error retrieving data: {str(e)}"}
)
# Health check endpoint
@app.get("/health")
def health_check():
# Check MongoDB connection as part of health check
try:
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=2000)
client.admin.command('ping')
db_status = "connected"
except Exception as e:
db_status = f"error: {str(e)}"
health_data = {
"status": "healthy",
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"mongodb": db_status
}
return JSONResponse(content=json.loads(json_util.dumps(health_data)))
# Get list of available provinces
@app.get("/provinces")
def get_provinces():
provinces = {
"AB": "Alberta",
"BC": "British Columbia",
"MB": "Manitoba",
"NB": "New Brunswick",
"NL": "Newfoundland and Labrador",
"NS": "Nova Scotia",
"NT": "Northwest Territories",
"NU": "Nunavut",
"ON": "Ontario",
"PE": "Prince Edward Island",
"QC": "Quebec",
"SK": "Saskatchewan",
"YT": "Yukon"
}
return provinces
# Add additional MongoDB endpoints as needed
@app.get("/stats")
def get_stats():
try:
db = get_database()
collection = db["scrape_results"]
# Get total number of scrapes
total_scrapes = collection.count_documents({})
# Get unique users
unique_users = len(collection.distinct("user_id"))
# Get total entries scraped
pipeline = [
{"$group": {"_id": None, "total": {"$sum": "$total_entries"}}}
]
total_entries_result = list(collection.aggregate(pipeline))
total_entries = total_entries_result[0]["total"] if total_entries_result else 0
stats_data = {
"total_scrapes": total_scrapes,
"unique_users": unique_users,
"total_entries_scraped": total_entries
}
# Use json_util to handle MongoDB-specific data types
json_compatible_stats = json.loads(json_util.dumps(stats_data))
return JSONResponse(content=json_compatible_stats)
except Exception as e:
return JSONResponse(
status_code=500,
content={"message": f"Error retrieving stats: {str(e)}"}
)