sbapan41's picture
Rename Text_extraction_deploy_2.py to apppy
6ade045 verified
raw
history blame contribute delete
11.5 kB
import os
import tempfile
import json
import logging
import time
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
import pdfplumber
from pdf2image import convert_from_path
from PIL import Image
import cv2
import numpy as np
import io
import pandas as pd
try:
from docx import Document
except ImportError:
Document = None # Handle case where python-docx is not installed
import openpyxl
import easyocr
app = Flask(__name__)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt', 'csv', 'xlsx', 'xls', 'jpg', 'jpeg', 'png'}
UPLOAD_FOLDER = tempfile.mkdtemp()
OUTPUT_FOLDER = os.path.join(os.getcwd(), 'extracted_data')
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit
# API Key Configuration
API_KEYS = {
"your_api_key_1": "client1",
"your_api_key_2": "client2"
}
# Initialize EasyOCR readers with GPU support
reader_en_hi = easyocr.Reader(['en', 'hi'], gpu=True)
reader_en_bn = easyocr.Reader(['en', 'bn'], gpu=True)
reader_en_ur = easyocr.Reader(['en', 'ur'], gpu=True)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def validate_api_key():
"""Check if the provided API key is valid"""
api_key = request.headers.get('X-API-KEY')
if not api_key or api_key not in API_KEYS:
return False
return True
def preprocess_image(image):
"""Enhance image for better OCR results"""
try:
img = np.array(image)
if len(img.shape) == 2: # Grayscale
img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
elif img.shape[2] == 4: # RGBA
img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB)
# Convert to grayscale for processing
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# Apply adaptive thresholding
processed = cv2.adaptiveThreshold(
gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
return Image.fromarray(processed)
except Exception as e:
logger.error(f"Image preprocessing failed: {str(e)}")
return image
def extract_text_from_image(image):
"""Extract text from image using EasyOCR"""
try:
processed_img = preprocess_image(image)
result_en_hi = reader_en_hi.readtext(np.array(processed_img))
result_en_bn = reader_en_bn.readtext(np.array(processed_img))
result_en_ur = reader_en_ur.readtext(np.array(processed_img))
text_en_hi = " ".join([text[1] for text in result_en_hi])
text_en_bn = " ".join([text[1] for text in result_en_bn])
text_en_ur = " ".join([text[1] for text in result_en_ur])
return text_en_hi + " " + text_en_bn + " " + text_en_ur
except Exception as e:
logger.error(f"OCR extraction failed: {str(e)}")
return ""
def process_pdf_page(page, page_num, pdf_path):
"""Process a single PDF page with mixed content"""
result = {
"page": page_num + 1,
"native_text": "",
"image_text": "",
"type": "mixed"
}
# First try to extract native text
try:
result["native_text"] = page.extract_text(x_tolerance=1, y_tolerance=1) or ""
except Exception as e:
logger.warning(f"Native text extraction failed: {str(e)}")
# Check if page has images or if native text extraction was insufficient
if page.images or len(result["native_text"].strip()) < 50:
try:
# Convert the entire page to image
images = convert_from_path(
pdf_path,
first_page=page_num+1,
last_page=page_num+1,
dpi=300,
size=(2480, 3508)) # A4 size at 300dpi
if images:
# Extract text from the full page image
full_page_text = extract_text_from_image(images[0])
# Only use OCR text if we got more content than native extraction
if len(full_page_text) > len(result["native_text"]):
result["image_text"] = full_page_text
result["type"] = "ocr_text" if not result["native_text"] else "mixed"
# Explicit cleanup
del images
except Exception as e:
logger.error(f"Page image processing failed: {str(e)}")
return result
def process_docx(file_path):
"""Extract text from DOCX file"""
if Document is None:
raise ImportError("python-docx package is not installed")
try:
doc = Document(file_path)
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
return {
"content": [{
"page": 1,
"text": text,
"type": "native_text"
}]
}
except Exception as e:
logger.error(f"DOCX processing failed: {str(e)}")
raise
def process_txt(file_path):
"""Extract text from TXT file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
return {
"content": [{
"page": 1,
"text": text,
"type": "native_text"
}]
}
except Exception as e:
logger.error(f"TXT processing failed: {str(e)}")
raise
def process_csv(file_path):
"""Extract data from CSV file"""
try:
df = pd.read_csv(file_path)
text = df.to_string(index=False)
return {
"content": [{
"page": 1,
"text": text,
"type": "table_data"
}]
}
except Exception as e:
logger.error(f"CSV processing failed: {str(e)}")
raise
def process_excel(file_path):
"""Extract data from Excel file (XLSX or XLS)"""
try:
text = ""
if file_path.endswith('.xlsx'):
wb = openpyxl.load_workbook(file_path)
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
text += f"\n\nSheet: {sheet_name}\n"
for row in sheet.iter_rows(values_only=True):
text += "\t".join(str(cell) if cell is not None else "" for cell in row) + "\n"
else: # .xls
df = pd.read_excel(file_path, sheet_name=None)
for sheet_name, data in df.items():
text += f"\n\nSheet: {sheet_name}\n{data.to_string(index=False)}\n"
return {
"content": [{
"page": 1,
"text": text,
"type": "table_data"
}]
}
except Exception as e:
logger.error(f"Excel processing failed: {str(e)}")
raise
def process_image(file_path):
"""Extract text from image file (JPG, JPEG, PNG)"""
try:
image = Image.open(file_path)
text = extract_text_from_image(image)
return {
"content": [{
"page": 1,
"text": text,
"type": "ocr_text"
}]
}
except Exception as e:
logger.error(f"Image processing failed: {str(e)}")
raise
@app.route('/process', methods=['POST'])
def handle_file():
# API Key validation
if not validate_api_key():
return jsonify({"error": "Invalid or missing API key"}), 401
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
file = request.files['file']
if not file or file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not allowed_file(file.filename):
return jsonify({"error": "Invalid file type"}), 400
temp_path = None
try:
# Save uploaded file temporarily
filename = secure_filename(file.filename)
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(temp_dir, filename)
file.save(temp_path)
start_time = time.time()
file_extension = filename.rsplit('.', 1)[1].lower()
# Process file based on extension
if file_extension == 'pdf':
results = []
with pdfplumber.open(temp_path) as pdf:
for page_num, page in enumerate(pdf.pages):
page_result = process_pdf_page(page, page_num, temp_path)
results.append(page_result)
# Combine results
combined_text = ""
for page in results:
combined_text += page.get("native_text", "") + "\n" + page.get("image_text", "") + "\n"
response = {
"metadata": {
"filename": filename,
"pages": len(results),
"processing_time": round(time.time() - start_time, 2),
"text_length": len(combined_text)
},
"content": results
}
elif file_extension == 'docx':
response = process_docx(temp_path)
response['metadata'] = {
"filename": filename,
"pages": 1,
"processing_time": round(time.time() - start_time, 2),
"text_length": len(response['content'][0]['text'])
}
elif file_extension == 'txt':
response = process_txt(temp_path)
response['metadata'] = {
"filename": filename,
"pages": 1,
"processing_time": round(time.time() - start_time, 2),
"text_length": len(response['content'][0]['text'])
}
elif file_extension == 'csv':
response = process_csv(temp_path)
response['metadata'] = {
"filename": filename,
"pages": 1,
"processing_time": round(time.time() - start_time, 2),
"text_length": len(response['content'][0]['text'])
}
elif file_extension in ('xlsx', 'xls'):
response = process_excel(temp_path)
response['metadata'] = {
"filename": filename,
"pages": 1,
"processing_time": round(time.time() - start_time, 2),
"text_length": len(response['content'][0]['text'])
}
elif file_extension in ('jpg', 'jpeg', 'png'):
response = process_image(temp_path)
response['metadata'] = {
"filename": filename,
"pages": 1,
"processing_time": round(time.time() - start_time, 2),
"text_length": len(response['content'][0]['text'])
}
else:
return jsonify({"error": "Unsupported file type"}), 400
return jsonify(response)
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
return jsonify({"error": str(e)}), 500
finally:
# Clean up temporary files
try:
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
if 'temp_dir' in locals() and os.path.exists(temp_dir):
os.rmdir(temp_dir)
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)