Spaces:
Sleeping
Sleeping
import os | |
from flask import Flask, render_template, request, jsonify | |
import google.generativeai as genai | |
from PIL import Image | |
from dotenv import load_dotenv | |
import time | |
import traceback | |
import sys | |
import json | |
# Load environment variables | |
load_dotenv() | |
# Configure Gemini API with key from environment variable | |
api_key = os.getenv("GEMINI_API_KEY", "AIzaSyB0IOx76FydAk4wabMz1juzzHF5oBiHW64") | |
if api_key == "AIzaSyB0IOx76FydAk4wabMz1juzzHF5oBiHW64": | |
print("WARNING: Using hardcoded API key. Set GEMINI_API_KEY environment variable instead.") | |
# Function to test API connectivity | |
def test_gemini_api(): | |
try: | |
genai.configure(api_key=api_key) | |
# Test with a simple text prompt using the latest model | |
model = genai.GenerativeModel('gemini-2.0-flash') | |
response = model.generate_content("Hello, please respond with 'API is working'") | |
if not response or not hasattr(response, 'text') or not response.text: | |
print("WARNING: Received empty response during API test") | |
return False | |
print(f"API Test Response: {response.text.strip()}") | |
return True | |
except Exception as e: | |
print(f"ERROR: Failed to connect to Gemini API: {str(e)}") | |
print(traceback.format_exc()) | |
return False | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Configure error responses | |
def server_error(e): | |
return jsonify(error="Internal server error: " + str(e)), 500 | |
def extract_text_with_gemini(image_path): | |
"""Extract text from image using Gemini Vision model""" | |
max_retries = 3 | |
retry_delay = 2 | |
for attempt in range(max_retries): | |
try: | |
print(f"Attempt {attempt + 1} to extract text using Gemini...") | |
# Updated model options to use the latest available models | |
model_options = ['gemini-2.0-flash'] | |
for model_name in model_options: | |
try: | |
print(f"Trying model: {model_name}") | |
model = genai.GenerativeModel(model_name) | |
break | |
except Exception as model_error: | |
print(f"Error with model {model_name}: {str(model_error)}") | |
if model_name == model_options[-1]: # Last model option | |
raise | |
continue | |
# Load the image | |
with Image.open(image_path) as img: | |
print(f"Image loaded from {image_path} (Size: {img.size}, Format: {img.format})") | |
# Resize image if too large (API may have size limits) | |
max_dimension = 1024 | |
if img.width > max_dimension or img.height > max_dimension: | |
print(f"Resizing large image from {img.width}x{img.height}") | |
ratio = min(max_dimension / img.width, max_dimension / img.height) | |
new_width = int(img.width * ratio) | |
new_height = int(img.height * ratio) | |
img = img.resize((new_width, new_height)) | |
print(f"Resized to {new_width}x{new_height}") | |
img.save(image_path) # Save resized image | |
# Create prompt for text extraction | |
prompt = "Extract all the text from this image. Return only the extracted text, nothing else." | |
# Generate response with image | |
print("Sending request to Gemini API for text extraction...") | |
response = model.generate_content([prompt, img]) | |
# Validate response | |
if not response or not hasattr(response, 'text') or not response.text: | |
raise ValueError("Received empty response from Gemini API") | |
extracted_text = response.text.strip() | |
print(f"Successfully extracted text (length: {len(extracted_text)})") | |
return extracted_text | |
except Exception as e: | |
print(f"Attempt {attempt + 1} failed: {str(e)}") | |
print(traceback.format_exc()) | |
if attempt < max_retries - 1: | |
print(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
continue | |
return f"Could not extract text from the image: {str(e)}" | |
def translate_text(text): | |
"""Translate text from English to Hindi using Gemini""" | |
max_retries = 3 | |
retry_delay = 2 | |
# Check if there's text to translate | |
if not text or text.strip() == "": | |
return "No text to translate." | |
# If the text indicates an error occurred during extraction, don't try to translate | |
if text.startswith("Could not extract text from the image"): | |
return "Cannot translate due to OCR failure." | |
for attempt in range(max_retries): | |
try: | |
print(f"Attempt {attempt + 1} to translate text using Gemini...") | |
# Updated model options to use the latest available models | |
model_options = ['gemini-2.0-flash'] | |
for model_name in model_options: | |
try: | |
print(f"Trying model: {model_name}") | |
model = genai.GenerativeModel(model_name) | |
break | |
except Exception as model_error: | |
print(f"Error with model {model_name}: {str(model_error)}") | |
if model_name == model_options[-1]: # Last model option | |
raise | |
continue | |
# Create prompt for translation | |
prompt = f""" | |
Translate the following English text to Hindi. | |
Keep proper names, titles, and organization names unchanged. | |
Text to translate: {text} | |
""" | |
# Generate response | |
print("Sending request to Gemini API for translation...") | |
response = model.generate_content(prompt) | |
# Validate response | |
if not response or not hasattr(response, 'text') or not response.text: | |
raise ValueError("Received empty response from Gemini API") | |
translated_text = response.text.strip() | |
print(f"Successfully translated text (length: {len(translated_text)})") | |
return translated_text | |
except Exception as e: | |
print(f"Translation attempt {attempt + 1} failed: {str(e)}") | |
print(traceback.format_exc()) | |
if attempt < max_retries - 1: | |
print(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
continue | |
return f"Translation failed: {str(e)}" | |
def home(): | |
return render_template('index.html') | |
def upload_file(): | |
print("Received upload request") | |
if 'file' not in request.files: | |
print("No file part in the request") | |
return jsonify({'error': 'No file uploaded'}), 400 | |
file = request.files['file'] | |
if file.filename == '': | |
print("No file selected") | |
return jsonify({'error': 'No file selected'}), 400 | |
# Check file extension | |
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp'} | |
if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: | |
print(f"Invalid file format: {file.filename}") | |
return jsonify({'error': 'Invalid file format. Please upload an image (PNG, JPG, JPEG, GIF, BMP).'}), 400 | |
temp_path = None | |
try: | |
# Create temp directory if it doesn't exist | |
temp_dir = "temp" | |
os.makedirs(temp_dir, exist_ok=True) | |
print(f"Ensuring temp directory exists: {temp_dir}") | |
# Make sure the temp directory has write permissions | |
try: | |
if not os.access(temp_dir, os.W_OK): | |
os.chmod(temp_dir, 0o755) # chmod to ensure write permissions | |
print(f"Updated permissions for temp directory: {temp_dir}") | |
except Exception as perm_error: | |
print(f"Warning: Could not update permissions: {str(perm_error)}") | |
# Save the uploaded file temporarily with a unique name | |
temp_filename = f"temp_image_{int(time.time())}.png" | |
temp_path = os.path.join(temp_dir, temp_filename) | |
print(f"Saving uploaded file to {temp_path}") | |
# Save in a way that ensures we have write permissions | |
file.save(temp_path) | |
# Ensure the file has appropriate permissions | |
try: | |
os.chmod(temp_path, 0o644) # Make the file readable | |
print(f"Updated permissions for file: {temp_path}") | |
except Exception as file_perm_error: | |
print(f"Warning: Could not update file permissions: {str(file_perm_error)}") | |
# Extract text using Gemini | |
print("Starting text extraction...") | |
extracted_text = extract_text_with_gemini(temp_path) | |
print(f"Text extraction result: {extracted_text[:100]}...") | |
# Translate text | |
print("Starting text translation...") | |
translated_text = translate_text(extracted_text) | |
print(f"Translation result: {translated_text[:100]}...") | |
return jsonify({ | |
'original_text': extracted_text, | |
'translated_text': translated_text | |
}) | |
except Exception as e: | |
error_msg = f"Error processing image: {str(e)}" | |
print(error_msg) | |
print(traceback.format_exc()) | |
return jsonify({ | |
'error': error_msg | |
}), 500 | |
finally: | |
# Clean up temporary file if it exists | |
try: | |
if temp_path and os.path.exists(temp_path): | |
os.remove(temp_path) | |
print(f"Removed temporary file: {temp_path}") | |
except Exception as e: | |
print(f"Failed to remove temporary file: {str(e)}") | |
# Don't let this failure affect the response | |
if __name__ == '__main__': | |
# Ensure the template folder exists | |
if not os.path.exists('templates'): | |
os.makedirs('templates') | |
print("Created 'templates' directory. Please place your HTML files here.") | |
# Test API connectivity at startup | |
api_working = test_gemini_api() | |
if api_working: | |
print("β Gemini API connection successful!") | |
else: | |
print("β WARNING: Gemini API connection failed. The application may not work correctly!") | |
# For Hugging Face Spaces, we need to listen on 0.0.0.0 and port 7860 | |
print(f"Starting Flask app on port {os.environ.get('PORT', 7860)}") | |
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |