|
feature_extractor.py -
|
|
import tensorflow as tf
|
|
from tensorflow.keras.applications import EfficientNetB0
|
|
from tensorflow.keras.preprocessing import image
|
|
from tensorflow.keras.applications.efficientnet import preprocess_input
|
|
import numpy as np
|
|
|
|
class FeatureExtractor:
|
|
def __init__(self):
|
|
# Load pretrained EfficientNetB0 model without top layers
|
|
base_model = EfficientNetB0(weights='imagenet', include_top=False, pooling='avg')
|
|
self.model = tf.keras.Model(inputs=base_model.input, outputs=base_model.output)
|
|
|
|
def extract_features(self, img_path):
|
|
# Load and preprocess the image
|
|
img = image.load_img(img_path, target_size=(224, 224))
|
|
img_array = image.img_to_array(img)
|
|
expanded_img = np.expand_dims(img_array, axis=0)
|
|
preprocessed_img = preprocess_input(expanded_img)
|
|
|
|
# Extract features
|
|
features = self.model.predict(preprocessed_img)
|
|
return features.flatten()
|
|
|
|
preprocessing.py -
|
|
import os
|
|
import pickle
|
|
from .feature_extractor import FeatureExtractor
|
|
import time
|
|
from tqdm import tqdm
|
|
|
|
def precompute_embeddings(image_dir='data/images', output_path='data/embeddings.pkl'):
|
|
# Initialize the feature extractor
|
|
extractor = FeatureExtractor()
|
|
|
|
embeddings = []
|
|
image_paths = []
|
|
|
|
# Get total number of valid images
|
|
valid_images = [f for f in os.listdir(image_dir)
|
|
if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
total_images = len(valid_images)
|
|
|
|
print(f"\nFound {total_images} images to process")
|
|
|
|
# Estimate time (assuming ~1 second per image for EfficientNetB0)
|
|
estimated_time = total_images * 1 # 1 second per image
|
|
print(f"Estimated time: {estimated_time//60} minutes and {estimated_time%60} seconds\n")
|
|
|
|
# Use tqdm for progress bar
|
|
start_time = time.time()
|
|
for idx, filename in enumerate(tqdm(valid_images, desc="Processing images")):
|
|
if filename.endswith(('.png', '.jpg', '.jpeg')):
|
|
img_path = os.path.join(image_dir, filename)
|
|
try:
|
|
# Show current image being processed
|
|
print(f"\rProcessing image {idx+1}/{total_images}: {filename}", end="")
|
|
|
|
embedding = extractor.extract_features(img_path)
|
|
embeddings.append(embedding)
|
|
image_paths.append(img_path)
|
|
|
|
# Calculate and show remaining time
|
|
elapsed_time = time.time() - start_time
|
|
avg_time_per_image = elapsed_time / (idx + 1)
|
|
remaining_images = total_images - (idx + 1)
|
|
estimated_remaining_time = remaining_images * avg_time_per_image
|
|
|
|
print(f" | Remaining time: {estimated_remaining_time//60:.0f}m {estimated_remaining_time%60:.0f}s")
|
|
|
|
except Exception as e:
|
|
print(f"\nError processing {filename}: {e}")
|
|
|
|
# Save embeddings and paths
|
|
with open(output_path, 'wb') as f:
|
|
pickle.dump({'embeddings': embeddings, 'image_paths': image_paths}, f)
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"\nProcessing complete!")
|
|
print(f"Total time taken: {total_time//60:.0f} minutes and {total_time%60:.0f} seconds")
|
|
print(f"Successfully processed {len(embeddings)}/{total_images} images")
|
|
print(f"Embeddings saved to {output_path}")
|
|
|
|
return embeddings, image_paths
|
|
|
|
if __name__ == "__main__":
|
|
precompute_embeddings()
|
|
|
|
|
|
similarity_search.py -
|
|
import faiss
|
|
import numpy as np
|
|
import pickle
|
|
import os
|
|
|
|
class SimilaritySearchEngine:
|
|
def __init__(self, embeddings_path='data/embeddings.pkl'):
|
|
# Load precomputed embeddings
|
|
with open(embeddings_path, 'rb') as f:
|
|
data = pickle.load(f)
|
|
self.embeddings = data['embeddings']
|
|
self.image_paths = data['image_paths']
|
|
|
|
# Create FAISS index
|
|
dimension = len(self.embeddings[0])
|
|
self.index = faiss.IndexFlatL2(dimension)
|
|
self.index.add(np.array(self.embeddings))
|
|
|
|
def search_similar_images(self, query_embedding, top_k=5):
|
|
# Perform similarity search
|
|
distances, indices = self.index.search(np.array([query_embedding]), top_k)
|
|
return [self.image_paths[idx] for idx in indices[0]], distances[0]
|
|
|
|
|
|
app.py -
|
|
import streamlit as st
|
|
from PIL import Image
|
|
from src.feature_extractor import FeatureExtractor
|
|
from src.similarity_search import SimilaritySearchEngine
|
|
|
|
def main():
|
|
st.title('Image Similarity Search')
|
|
|
|
# Upload query image
|
|
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png", "jpeg"])
|
|
|
|
if uploaded_file is not None:
|
|
# Load the uploaded image
|
|
query_img = Image.open(uploaded_file)
|
|
|
|
# Resize and display the query image
|
|
query_img_resized = query_img.resize((263, 385))
|
|
st.image(query_img_resized, caption='Uploaded Image', use_container_width=False)
|
|
|
|
# Feature extraction and similarity search
|
|
if st.button("Search Similar Images"):
|
|
with st.spinner("Analyzing query image..."):
|
|
try:
|
|
# Initialize feature extractor and search engine
|
|
extractor = FeatureExtractor()
|
|
search_engine = SimilaritySearchEngine()
|
|
|
|
# Save the uploaded image temporarily
|
|
query_img_path = 'temp_query_image.jpg'
|
|
query_img.save(query_img_path)
|
|
|
|
# Extract features from the query image
|
|
query_embedding = extractor.extract_features(query_img_path)
|
|
|
|
# Perform similarity search
|
|
similar_images, distances = search_engine.search_similar_images(query_embedding)
|
|
|
|
# Display similar images
|
|
st.subheader('Similar Images')
|
|
cols = st.columns(len(similar_images))
|
|
for i, (img_path, dist) in enumerate(zip(similar_images, distances)):
|
|
with cols[i]:
|
|
similar_img = Image.open(img_path).resize((375, 550))
|
|
st.image(similar_img, caption=f'Distance: {dist:.2f}', use_container_width=True)
|
|
|
|
except Exception as e:
|
|
st.error(f"Error during similarity search: {e}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|