Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, render_template | |
from PIL import Image | |
import io | |
import os | |
import requests | |
from roboflow import Roboflow | |
import supervision as sv | |
import cv2 | |
import tempfile | |
import gdown | |
import os | |
import random | |
import requests | |
import requests | |
import cloudinary | |
import model | |
import cloudinary.uploader | |
from a import main | |
import numpy as np | |
import torchvision.transforms as transforms | |
import pandas as pd | |
import nibabel as nib | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torchvision | |
import torchvision.transforms as transforms | |
import cv2 | |
from PIL import Image | |
from sklearn.model_selection import train_test_split | |
import os | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torchvision.transforms as transforms | |
import torchvision.models as models | |
from torch.utils.data import Dataset, DataLoader | |
import pandas as pd | |
from PIL import Image | |
import os | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import MinMaxScaler | |
from inference_sdk import InferenceHTTPClient, InferenceConfiguration | |
app = Flask(__name__) | |
GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" | |
LOCAL_MODEL_PATH = "checkpoint32.pth" | |
d = "https://drive.google.com/uc?id=1GfrlFNoa7E4liMHyMuF73nA21yT9SNSb" | |
def download_file_from_google_drive(): | |
gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False) | |
da = "a.pth" | |
def download_file_from_google_drived(): | |
gdown.download(d, da, quiet=False) | |
def download_model(): | |
if not os.path.exists(LOCAL_MODEL_PATH): | |
response = requests.get(GDRIVE_MODEL_URL, stream=True) | |
if response.status_code == 200: | |
with open(LOCAL_MODEL_PATH, "wb") as f: | |
f.write(response.content) | |
else: | |
raise Exception( | |
f"Failed to download model from Google Drive: {response.status_code}" | |
) | |
download_file_from_google_drive() | |
download_file_from_google_drived() | |
def home(): | |
return render_template("index.html") | |
def fetchImage(): | |
file = None | |
url = "" | |
if "url" in request.form: | |
url = request.form["url"] | |
response = requests.get(url) | |
file = io.BytesIO(response.content) | |
elif "file" in request.files: | |
file = request.files["file"] | |
file_name = f"downloaded_image_{random.randint(1,50)}.jpg" | |
response = requests.get(url) | |
if response.status_code == 200: | |
image = Image.open(io.BytesIO(response.content)) | |
if image.mode == "RGBA": | |
image = image.convert("RGB") | |
image.save(file_name, "JPEG", quality=100) | |
print(f"Image downloaded and saved as {file_name}") | |
else: | |
print(f"Failed to download image. Status code: {response.status_code}") | |
image = cv2.imread(file_name) | |
repair_costs = { | |
"damage-front-windscreen": 800, | |
"damaged-door": 650, | |
"damaged-fender": 650, | |
"damaged-front-bumper": 640, | |
"damaged-head-light": 680, | |
"damaged-hood": 690, | |
"damaged-rear-bumper": 610, | |
"damaged-rear-window": 605, | |
"damaged-side-window": 701, | |
"damaged-tail-light": 678, | |
"quaterpanel-dent": 720 | |
} | |
total_cost = 0 | |
CLIENT = InferenceHTTPClient( | |
api_url="https://detect.roboflow.com", | |
api_key="LqD8Cs4OsoK8seO3CPkf" | |
) | |
print(file_name) | |
custom_configuration = InferenceConfiguration(confidence_threshold=0.7) | |
with CLIENT.use_configuration(custom_configuration): | |
result = CLIENT.infer( | |
file_name, model_id="car-recognition-v4/4") | |
filtered_labels = [item | |
for item in result["predictions"]] | |
if filtered_labels == []: | |
i = 0.7 | |
while filtered_labels == []: | |
i = i - 0.1 | |
custom_configuration = InferenceConfiguration( | |
confidence_threshold=i) | |
with CLIENT.use_configuration(custom_configuration): | |
result = CLIENT.infer( | |
file_name, model_id="car-recognition-v4/4") | |
filtered_labels = [item | |
for item in result["predictions"]] | |
labels = filtered_labels | |
print(labels) | |
for class_ in labels: | |
total_cost += repair_costs.get(class_["class"], | |
0) | |
result = {"total_cost": round(total_cost, 2)} | |
print(result) | |
return jsonify(result) | |
def generate_report(): | |
file = None | |
if "report_url" in request.form: | |
report_url = request.form["report_url"] | |
insurance_url = request.form["insurance_url"] | |
url = main(report_url, insurance_url, "output.pdf") | |
result = {"url": url} | |
return jsonify(result), 200 | |
elif "file" in request.files: | |
file = request.files["file"] | |
with open("uploaded_report.pdf", "wb") as f: | |
f.write(file.read()) | |
return jsonify({"message": "Something happened!."}), 404 | |
def predict(): | |
file = request.files["file"] | |
if not file: | |
return jsonify({"error": "file not uploaded"}), 400 | |
# Save file temporarily | |
temp_path = os.path.join(tempfile.gettempdir(), file.filename) | |
file.save(temp_path) | |
transform = transforms.Compose([ | |
transforms.Resize((224, 224)), | |
transforms.ToTensor(), | |
]) | |
if file.filename.lower().endswith((".png", ".jpg", ".jpeg")): | |
image = Image.open(temp_path) | |
image_save_path = os.path.join( | |
tempfile.gettempdir(), file.filename.lower()) | |
image.save(image_save_path) | |
def is_mri_image(image_path): | |
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
if img is None: | |
return False # Invalid image | |
# Apply Canny edge detection | |
edges = cv2.Canny(img, 50, 150) | |
# Calculate edge density (MRI images have high edge presence) | |
edge_density = np.sum(edges > 0) / edges.size | |
print(edge_density) | |
return edge_density > 0.05 | |
if (is_mri_image(temp_path)): | |
return jsonify({"message": "Not an mri image", "confidence": 0.95, "saved_path": image_save_path}) | |
a, b = model.check_file(temp_path) | |
class ResNetRegression(nn.Module): | |
def __init__(self): | |
super(ResNetRegression, self).__init__() | |
self.model = models.resnet34(pretrained=True) | |
in_features = self.model.fc.in_features | |
# Change output layer for regression | |
self.model.fc = nn.Linear(in_features, 1) | |
def forward(self, x): | |
return self.model(x) | |
# Initialize Model, Loss, and Optimizer | |
model_new = ResNetRegression() | |
checkpoint = torch.load( | |
"/home/user/app/a.pth", weights_only=False, map_location=torch.device('cpu')) | |
def remove_module_from_checkpoint(checkpoint): | |
new_state_dict = {} | |
print(checkpoint.keys()) | |
for key, value in checkpoint.items(): | |
new_key = key.replace("module.", "") | |
new_state_dict[new_key] = value | |
checkpoint = new_state_dict | |
return checkpoint | |
checkpoint = remove_module_from_checkpoint(checkpoint) | |
model_new.load_state_dict(checkpoint) | |
image = Image.open(temp_path).convert("RGB") | |
output = model_new(transform(image).unsqueeze(0)) | |
stage = output.item() | |
if not a == "No ms detected": | |
if stage <= 2.0: | |
stage = "Mild" | |
elif stage >= 2.0 and stage <= 3.2: | |
stage = "Moderate" | |
else: | |
stage = "Severe" | |
else: | |
stage = "No ms detected" | |
return jsonify({"message": a, "confidence": b, "stage": stage, "saved_path": image_save_path}) | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) | |