feature_extractor.py - import tensorflow as tf from tensorflow.keras.applications import EfficientNetB0 from tensorflow.keras.preprocessing import image from tensorflow.keras.applications.efficientnet import preprocess_input import numpy as np class FeatureExtractor: def __init__(self): # Load pretrained EfficientNetB0 model without top layers base_model = EfficientNetB0(weights='imagenet', include_top=False, pooling='avg') self.model = tf.keras.Model(inputs=base_model.input, outputs=base_model.output) def extract_features(self, img_path): # Load and preprocess the image img = image.load_img(img_path, target_size=(224, 224)) img_array = image.img_to_array(img) expanded_img = np.expand_dims(img_array, axis=0) preprocessed_img = preprocess_input(expanded_img) # Extract features features = self.model.predict(preprocessed_img) return features.flatten() preprocessing.py - import os import pickle from .feature_extractor import FeatureExtractor import time from tqdm import tqdm def precompute_embeddings(image_dir='data/images', output_path='data/embeddings.pkl'): # Initialize the feature extractor extractor = FeatureExtractor() embeddings = [] image_paths = [] # Get total number of valid images valid_images = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] total_images = len(valid_images) print(f"\nFound {total_images} images to process") # Estimate time (assuming ~1 second per image for EfficientNetB0) estimated_time = total_images * 1 # 1 second per image print(f"Estimated time: {estimated_time//60} minutes and {estimated_time%60} seconds\n") # Use tqdm for progress bar start_time = time.time() for idx, filename in enumerate(tqdm(valid_images, desc="Processing images")): if filename.endswith(('.png', '.jpg', '.jpeg')): img_path = os.path.join(image_dir, filename) try: # Show current image being processed print(f"\rProcessing image {idx+1}/{total_images}: {filename}", end="") embedding = extractor.extract_features(img_path) embeddings.append(embedding) image_paths.append(img_path) # Calculate and show remaining time elapsed_time = time.time() - start_time avg_time_per_image = elapsed_time / (idx + 1) remaining_images = total_images - (idx + 1) estimated_remaining_time = remaining_images * avg_time_per_image print(f" | Remaining time: {estimated_remaining_time//60:.0f}m {estimated_remaining_time%60:.0f}s") except Exception as e: print(f"\nError processing {filename}: {e}") # Save embeddings and paths with open(output_path, 'wb') as f: pickle.dump({'embeddings': embeddings, 'image_paths': image_paths}, f) total_time = time.time() - start_time print(f"\nProcessing complete!") print(f"Total time taken: {total_time//60:.0f} minutes and {total_time%60:.0f} seconds") print(f"Successfully processed {len(embeddings)}/{total_images} images") print(f"Embeddings saved to {output_path}") return embeddings, image_paths if __name__ == "__main__": precompute_embeddings() similarity_search.py - import faiss import numpy as np import pickle import os class SimilaritySearchEngine: def __init__(self, embeddings_path='data/embeddings.pkl'): # Load precomputed embeddings with open(embeddings_path, 'rb') as f: data = pickle.load(f) self.embeddings = data['embeddings'] self.image_paths = data['image_paths'] # Create FAISS index dimension = len(self.embeddings[0]) self.index = faiss.IndexFlatL2(dimension) self.index.add(np.array(self.embeddings)) def search_similar_images(self, query_embedding, top_k=5): # Perform similarity search distances, indices = self.index.search(np.array([query_embedding]), top_k) return [self.image_paths[idx] for idx in indices[0]], distances[0] app.py - import streamlit as st from PIL import Image from src.feature_extractor import FeatureExtractor from src.similarity_search import SimilaritySearchEngine def main(): st.title('Image Similarity Search') # Upload query image uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png", "jpeg"]) if uploaded_file is not None: # Load the uploaded image query_img = Image.open(uploaded_file) # Resize and display the query image query_img_resized = query_img.resize((263, 385)) st.image(query_img_resized, caption='Uploaded Image', use_container_width=False) # Feature extraction and similarity search if st.button("Search Similar Images"): with st.spinner("Analyzing query image..."): try: # Initialize feature extractor and search engine extractor = FeatureExtractor() search_engine = SimilaritySearchEngine() # Save the uploaded image temporarily query_img_path = 'temp_query_image.jpg' query_img.save(query_img_path) # Extract features from the query image query_embedding = extractor.extract_features(query_img_path) # Perform similarity search similar_images, distances = search_engine.search_similar_images(query_embedding) # Display similar images st.subheader('Similar Images') cols = st.columns(len(similar_images)) for i, (img_path, dist) in enumerate(zip(similar_images, distances)): with cols[i]: similar_img = Image.open(img_path).resize((375, 550)) st.image(similar_img, caption=f'Distance: {dist:.2f}', use_container_width=True) except Exception as e: st.error(f"Error during similarity search: {e}") if __name__ == '__main__': main()