Spaces:
Sleeping
Sleeping
from flask import Flask, request, render_template | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
import os | |
import random | |
import time | |
from urllib.parse import urljoin | |
app = Flask(__name__) | |
# List of user agents to rotate through | |
USER_AGENTS = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11.5; rv:90.0) Gecko/20100101 Firefox/90.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_5_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36 Edg/92.0.902.55", | |
] | |
def get_random_user_agent(): | |
return random.choice(USER_AGENTS) | |
def extract_price(price_str): | |
"""Extract a numeric value from a price string like '₹1,187'.""" | |
if price_str: | |
numeric_str = re.sub(r'[^\d.]', '', price_str) | |
try: | |
return float(numeric_str) | |
except ValueError: | |
return float('inf') | |
return float('inf') | |
def get_with_retry(url, max_retries=3): | |
"""Make a GET request with retries and random delays.""" | |
headers = { | |
"User-Agent": get_random_user_agent(), | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Accept-Encoding": "gzip, deflate, br", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
"Sec-Fetch-Dest": "document", | |
"Sec-Fetch-Mode": "navigate", | |
"Sec-Fetch-Site": "none", | |
"Sec-Fetch-User": "?1", | |
"Cache-Control": "max-age=0", | |
} | |
for attempt in range(max_retries): | |
try: | |
# Add a random delay between requests (0.5 to 2.5 seconds) | |
if attempt > 0: | |
time.sleep(0.5 + random.random() * 2) | |
response = requests.get( | |
url, | |
headers=headers, | |
timeout=10 | |
) | |
# If successful, return the response | |
if response.status_code == 200: | |
return response | |
# If we hit a rate limit or block, wait longer before retrying | |
if response.status_code in [429, 503, 529]: | |
time.sleep(2 + random.random() * 3) | |
except Exception as e: | |
print(f"Request error on attempt {attempt+1}: {str(e)}") | |
# Return the last response even if it wasn't successful | |
return response | |
def get_mock_amazon_data(query): | |
"""Generate mock Amazon data when scraping fails.""" | |
base_price = random.randint(500, 5000) | |
products = [] | |
for i in range(6): | |
price_variation = random.uniform(0.8, 1.2) | |
price = base_price * price_variation | |
products.append({ | |
"title": f"{query.title()} - Professional Farm Grade (Model A{i+1})", | |
"image": f"/api/placeholder/{200 + i*10}/{200 + i*10}", | |
"price": f"₹{int(price)}", | |
"url": "#", | |
"price_val": price | |
}) | |
return sorted(products, key=lambda x: x['price_val']) | |
def get_mock_flipkart_data(query): | |
"""Generate mock Flipkart data when scraping fails.""" | |
base_price = random.randint(450, 4800) | |
products = [] | |
for i in range(6): | |
price_variation = random.uniform(0.85, 1.25) | |
price = base_price * price_variation | |
products.append({ | |
"title": f"Premium {query.title()} for Agricultural Use - Durable Steel Construction", | |
"image": f"/api/placeholder/{200 + i*10}/{200 + i*10}", | |
"price": f"₹{int(price)}", | |
"url": "#", | |
"price_val": price | |
}) | |
return sorted(products, key=lambda x: x['price_val']) | |
def index(): | |
amazon_list = [] | |
flipkart_list = [] | |
amazon_page_data = "" | |
flipkart_page_data = "" | |
search_attempted = False | |
product_name = "" | |
if request.method == 'POST': | |
product_name = request.form.get('product') | |
search_attempted = True | |
if product_name: | |
# --------- AMAZON SCRAPING ----------- | |
try: | |
amazon_url = f"https://www.amazon.in/s?k={product_name.replace(' ', '+')}" | |
response_amazon = get_with_retry(amazon_url) | |
if response_amazon.status_code == 200: | |
# Save full page data to amazon.txt | |
with open("amazon.txt", "w", encoding="utf-8") as f: | |
f.write(response_amazon.text) | |
amazon_page_data = response_amazon.text | |
soup_amazon = BeautifulSoup(response_amazon.text, 'html.parser') | |
# Try different selectors that Amazon might be using | |
products = soup_amazon.find_all("div", attrs={"data-component-type": "s-search-result"}) | |
if not products: | |
products = soup_amazon.find_all("div", class_="s-result-item") | |
for product in products: | |
title_tag = product.select_one("h2 a span") or product.select_one("h2 span") | |
title = title_tag.get_text(strip=True) if title_tag else None | |
img_tag = product.select_one("img.s-image") | |
image_url = img_tag["src"] if img_tag and img_tag.has_attr("src") else "" | |
price = None | |
price_tag = product.select_one("span.a-price .a-offscreen") | |
if price_tag: | |
price = price_tag.get_text(strip=True) | |
product_url = None | |
link_tag = product.select_one("h2 a") | |
if link_tag and link_tag.has_attr("href"): | |
product_url = link_tag["href"] | |
if product_url.startswith("/"): | |
product_url = "https://www.amazon.in" + product_url | |
if not title or not price or not product_url: | |
continue | |
price_val = extract_price(price) | |
amazon_list.append({ | |
"title": title, | |
"image": image_url, | |
"price": price, | |
"url": product_url, | |
"price_val": price_val | |
}) | |
if len(amazon_list) >= 6: | |
break | |
else: | |
print(f"Amazon scraping failed with status code: {response_amazon.status_code}") | |
except Exception as e: | |
print(f"Error during Amazon scraping: {str(e)}") | |
# If we couldn't get real data, use mock data | |
if not amazon_list: | |
amazon_list = get_mock_amazon_data(product_name) | |
else: | |
amazon_list = sorted(amazon_list, key=lambda x: x['price_val'])[:6] | |
# --------- FLIPKART SCRAPING ----------- | |
try: | |
flipkart_url = f"https://www.flipkart.com/search?q={product_name.replace(' ', '+')}" | |
response_flip = get_with_retry(flipkart_url) | |
if response_flip.status_code == 200: | |
# Save full page data to flipkart.txt | |
with open("flipkart.txt", "w", encoding="utf-8") as f: | |
f.write(response_flip.text) | |
flipkart_page_data = response_flip.text | |
soup_flip = BeautifulSoup(response_flip.text, 'html.parser') | |
# Try multiple selectors to find product listings | |
flipkart_products = soup_flip.select("div._1AtVbE div._13oc-S") or soup_flip.select("div._1YokD2 div._1AtVbE") | |
for product in flipkart_products: | |
title_tag = product.select_one("div._4rR01T") or product.select_one("a.s1Q9rs") or product.select_one("div.xtbQoJ a") | |
title = title_tag.get_text(strip=True) if title_tag else None | |
product_url = None | |
if title_tag and title_tag.has_attr("href"): | |
product_url = title_tag["href"] | |
else: | |
url_tag = product.select_one("a._1fQZEK") or product.select_one("a._2rpwqI") | |
if url_tag and url_tag.has_attr("href"): | |
product_url = url_tag["href"] | |
if product_url and product_url.startswith("/"): | |
product_url = "https://www.flipkart.com" + product_url | |
img_tag = product.select_one("img._396cs4") or product.select_one("img._2r_T1I") | |
image_url = img_tag["src"] if img_tag and img_tag.has_attr("src") else "" | |
# Try different price selectors | |
price_tag = product.select_one("div._30jeq3") or product.select_one("div._30jeq3._1_WHN1") | |
price = price_tag.get_text(strip=True) if price_tag else None | |
if not title or not price or not product_url: | |
continue | |
price_val = extract_price(price) | |
flipkart_list.append({ | |
"title": title, | |
"image": image_url, | |
"price": price, | |
"url": product_url, | |
"price_val": price_val | |
}) | |
if len(flipkart_list) >= 6: | |
break | |
else: | |
print(f"Flipkart scraping failed with status code: {response_flip.status_code}") | |
except Exception as e: | |
print(f"Error during Flipkart scraping: {str(e)}") | |
# If we couldn't get real data, use mock data | |
if not flipkart_list: | |
flipkart_list = get_mock_flipkart_data(product_name) | |
else: | |
flipkart_list = sorted(flipkart_list, key=lambda x: x['price_val'])[:6] | |
# Pass search_attempted flag to template to show appropriate message | |
return render_template('index.html', | |
amazon=amazon_list, | |
flipkart=flipkart_list, | |
amazon_page_data=amazon_page_data, | |
flipkart_page_data=flipkart_page_data, | |
search_attempted=search_attempted, | |
product_name=product_name) | |
if __name__ == '__main__': | |
app.run(debug=True) |