Spaces:
Running
Running
import gradio as gr | |
import os | |
import numpy as np | |
import cv2 | |
import random | |
from sklearn.decomposition import PCA | |
from sklearn.metrics import classification_report, roc_auc_score | |
from tensorflow.keras.applications import EfficientNetB0 | |
from tensorflow.keras.applications.efficientnet import preprocess_input # Add this import | |
from tensorflow.keras.models import Model | |
from pyod.models.iforest import IForest | |
from pyod.models.lof import LOF | |
from pyod.models.ocsvm import OCSVM | |
import matplotlib.pyplot as plt | |
# Paths (adjust as needed) | |
dataset_path = "data" | |
basmati_path = os.path.join(dataset_path, "basmati") | |
jasmine_path = os.path.join(dataset_path, "jasmine") | |
# Load and preprocess images | |
def load_images_from_folder(folder, label, limit=None): | |
images = [] | |
filenames = os.listdir(folder) | |
if limit: | |
filenames = random.sample(filenames, limit) | |
img_data = [] | |
for filename in filenames: | |
img_path = os.path.join(folder, filename) | |
img = cv2.imread(img_path) | |
if img is not None: | |
img = cv2.resize(img, (128, 128)) | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
img = preprocess_input(img.astype(np.float32)) # Now this will work | |
images.append(img) | |
img_data.append((img, filename, label)) | |
return np.array(images), img_data | |
# Load data | |
all_basmati_images, all_basmati_data = load_images_from_folder(basmati_path, label=0) | |
jasmine_images, jasmine_data = load_images_from_folder(jasmine_path, label=1, limit=None) | |
# Training and test sets | |
basmati_train_count = int(0.2 * len(all_basmati_images)) | |
basmati_train_indices = random.sample(range(len(all_basmati_images)), basmati_train_count) | |
X_train = np.array([all_basmati_images[i] for i in basmati_train_indices]) | |
train_data = [all_basmati_data[i] for i in basmati_train_indices] | |
basmati_test_count = 200 | |
basmati_test_indices = random.sample(range(len(all_basmati_images)), basmati_test_count) | |
X_test_basmati = np.array([all_basmati_images[i] for i in basmati_test_indices]) | |
test_data_basmati = [all_basmati_data[i] for i in basmati_test_indices] | |
jasmine_test_count = 10 | |
jasmine_test_images, jasmine_test_data = load_images_from_folder(jasmine_path, label=1, limit=jasmine_test_count) | |
X_test = np.concatenate([X_test_basmati, jasmine_test_images], axis=0) | |
test_data = test_data_basmati + jasmine_test_data | |
y_test = np.array([0] * len(X_test_basmati) + [1] * len(jasmine_test_images)) | |
# Feature extraction | |
base_model = EfficientNetB0(weights='imagenet', include_top=False, pooling='avg', input_shape=(128, 128, 3)) | |
feature_extractor = Model(inputs=base_model.input, outputs=base_model.output) | |
def extract_features(images, batch_size=16): | |
return feature_extractor.predict(images, batch_size=batch_size, verbose=1) | |
X_train_features = extract_features(X_train) | |
X_test_features = extract_features(X_test) | |
# PCA | |
pca = PCA(n_components=50) | |
X_train_reduced = pca.fit_transform(X_train_features) | |
X_test_reduced = pca.transform(X_test_features) | |
# Main anomaly detection function | |
def run_anomaly_detection(mode, model_name, contamination, n_estimators, n_neighbors, nu): | |
# Adjust training data for semi-supervised mode | |
if mode == "Semi-supervised": | |
# Add a small portion of Jasmine to training (e.g., 5 images) | |
jasmine_train_count = 5 | |
jasmine_train_images, jasmine_train_data = load_images_from_folder(jasmine_path, label=1, limit=jasmine_train_count) | |
X_train_semi = np.concatenate([X_train, jasmine_train_images], axis=0) | |
X_train_semi_features = extract_features(X_train_semi) | |
X_train_semi_reduced = pca.transform(X_train_semi_features) | |
else: | |
X_train_semi_reduced = X_train_reduced | |
# Initialize model based on selection | |
if model_name == "IForest": | |
outlier_detector = IForest(contamination=contamination, n_estimators=int(n_estimators)) | |
elif model_name == "LOF": | |
outlier_detector = LOF(contamination=contamination, n_neighbors=int(n_neighbors)) | |
else: # OCSVM | |
outlier_detector = OCSVM(contamination=contamination, nu=nu) | |
# Fit and predict | |
outlier_detector.fit(X_train_semi_reduced) | |
predictions = outlier_detector.predict(X_test_reduced) | |
# Evaluation | |
report = classification_report(y_test, predictions) | |
try: | |
auc_score = roc_auc_score(y_test, predictions) | |
auc_text = f"AUC Score: {auc_score:.4f}" | |
except: | |
auc_text = "AUC Score could not be calculated." | |
# Outlier filenames | |
outlier_indices = np.where(predictions == 1)[0] | |
outlier_list = [] | |
for idx in outlier_indices: | |
img, filename, label = test_data[idx] | |
rice_type = "Jasmine" if label == 1 else "Basmati" | |
outlier_list.append(f"Filename: {filename}, Actual Label: {rice_type}") | |
outlier_text = "\n".join(outlier_list) if outlier_list else "No outliers detected." | |
# PCA Visualization (2D) | |
pca_vis = PCA(n_components=2) | |
X_test_2d = pca_vis.fit_transform(X_test_features) | |
plt.figure(figsize=(10, 7)) | |
plt.scatter(X_test_2d[y_test == 0, 0], X_test_2d[y_test == 0, 1], c='blue', label='Basmati', alpha=0.6, s=40) | |
plt.scatter(X_test_2d[y_test == 1, 0], X_test_2d[y_test == 1, 1], c='red', label='Jasmine', alpha=0.6, s=40) | |
plt.scatter(X_test_2d[outlier_indices, 0], X_test_2d[outlier_indices, 1], | |
facecolors='none', edgecolors='black', linewidths=1.5, label='Outliers', s=80) | |
plt.title("PCA Projection with Outliers") | |
plt.xlabel("PCA Component 1") | |
plt.ylabel("PCA Component 2") | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
return report, auc_text, outlier_text, plt | |
# Gradio Interface | |
with gr.Blocks() as interface: | |
gr.Markdown("## Anomaly Detection Playground") | |
with gr.Row(): | |
mode = gr.Dropdown(["Unsupervised", "Semi-supervised"], label="Mode") | |
model_name = gr.Dropdown(["IForest", "LOF", "OCSVM"], label="Model") | |
with gr.Row(): | |
contamination = gr.Slider(0, 0.25, value=0.05, step=0.01, label="Contamination") | |
n_estimators = gr.Slider(100, 299, value=100, step=10, label="N Estimators (IForest)") | |
n_neighbors = gr.Slider(5, 50, value=20, step=1, label="N Neighbors (LOF)") | |
nu = gr.Slider(0, 1, value=0.1, step=0.01, label="Nu (OCSVM)") | |
submit_btn = gr.Button("Run Detection") | |
with gr.Row(): | |
report_output = gr.Textbox(label="Classification Report") | |
auc_output = gr.Textbox(label="AUC Score") | |
outlier_output = gr.Textbox(label="Detected Outliers") | |
plot_output = gr.Plot(label="PCA Projection") | |
submit_btn.click( | |
fn=run_anomaly_detection, | |
inputs=[mode, model_name, contamination, n_estimators, n_neighbors, nu], | |
outputs=[report_output, auc_output, outlier_output, plot_output] | |
) | |
interface.launch() |