pro-search-api / app.py
vhr1007
new_version_changes3.0
b687ff9
from huggingface_hub import login
from fastapi import FastAPI, Depends, HTTPException
import logging
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModel
from services.qdrant_searcher import QdrantSearcher
from services.openai_service import generate_rag_response
from utils.auth import token_required
from dotenv import load_dotenv
import os
import torch
from utils.auth_x import x_api_key_auth
import time
# Load environment variables from .env file
load_dotenv()
# Initialize FastAPI application
app = FastAPI()
# Set the cache directory for Hugging Face
os.environ["HF_HOME"] = "/tmp/huggingface_cache"
# Ensure the cache directory exists
hf_home_dir = os.environ["HF_HOME"]
if not os.path.exists(hf_home_dir):
os.makedirs(hf_home_dir)
collection_name = os.getenv('QDRANT_COLLECTION_NAME')
logging.info(f"Collection name: {collection_name}")
# Setup logging using Python's standard logging library
logging.basicConfig(level=logging.INFO)
# Load Hugging Face token from environment variable
huggingface_token = os.getenv('HUGGINGFACE_HUB_TOKEN')
if huggingface_token:
try:
login(token=huggingface_token, add_to_git_credential=True)
logging.info("Successfully logged into Hugging Face Hub.")
except Exception as e:
logging.error(f"Failed to log into Hugging Face Hub: {e}")
raise HTTPException(status_code=500, detail="Failed to log into Hugging Face Hub.")
else:
raise ValueError("Hugging Face token is not set. Please set the HUGGINGFACE_HUB_TOKEN environment variable.")
# Initialize the Qdrant searcher
qdrant_url = os.getenv('QDRANT_URL')
access_token = os.getenv('QDRANT_ACCESS_TOKEN')
if not qdrant_url or not access_token:
raise ValueError("Qdrant URL or Access Token is not set. Please set the QDRANT_URL and QDRANT_ACCESS_TOKEN environment variables.")
# Load the model and tokenizer with trust_remote_code=True
try:
cache_folder = os.path.join(hf_home_dir, "transformers_cache")
# Load the tokenizer and model with trust_remote_code=True
tokenizer = AutoTokenizer.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)
model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)
logging.info("Successfully loaded the model and tokenizer with transformers.")
# Initialize the Qdrant searcher after the model is successfully loaded
global searcher # Ensure searcher is accessible globally if needed
searcher = QdrantSearcher(qdrant_url=qdrant_url, access_token=access_token)
except Exception as e:
logging.error(f"Failed to load the model or initialize searcher: {e}")
raise HTTPException(status_code=500, detail="Failed to load the custom model or initialize searcher.")
# Function to embed text using the model
def embed_text(text):
inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
outputs = model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1) # Example: mean pooling
return embeddings.detach().numpy()
# Define the request body models
class SearchDocumentsRequest(BaseModel):
query: str
limit: int = 3
file_id: str = None
class GenerateRAGRequest(BaseModel):
search_query: str
file_id: str = None
class XApiKeyRequest(BaseModel):
organization_id: str
user_id: str
search_query: str
file_id: str = None
@app.get("/")
async def root():
return {"message": "Welcome to the Search and RAG API!, go to relevant address for API request"}
# Define the search documents endpoint
@app.post("/api/search-documents")
async def search_documents(
body: SearchDocumentsRequest,
credentials: tuple = Depends(token_required)
):
customer_id, user_id = credentials
start_time = time.time()
if not customer_id or not user_id:
logging.error("Failed to extract customer_id or user_id from the JWT token.")
raise HTTPException(status_code=401, detail="Invalid token: missing customer_id or user_id")
logging.info("Received request to search documents")
try:
logging.info("Starting document search")
# Encode the query using the custom embedding function
query_embedding = embed_text(body.query)
print(body.query)
#collection_name = "embed" # Use the collection name where the embeddings are stored
logging.info("Performing search using the precomputed embeddings")
if body.file_id:
hits, error = searcher.search_documents(collection_name, query_embedding, user_id, body.limit, file_id=body.file_id)
# Perform search using the precomputed embeddings
hits, error = searcher.search_documents(collection_name, query_embedding, user_id, body.limit)
if error:
logging.error(f"Search documents error: {error}")
raise HTTPException(status_code=500, detail=error)
end_time = time.time()
time_taken = end_time - start_time
return hits, time_taken
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Define the generate RAG response endpoint
@app.post("/api/generate-rag-response")
async def generate_rag_response_api(
body: GenerateRAGRequest,
credentials: tuple = Depends(token_required)
):
customer_id, user_id = credentials
start_time = time.time()
if not customer_id or not user_id:
logging.error("Failed to extract customer_id or user_id from the JWT token.")
raise HTTPException(status_code=401, detail="Invalid token: missing customer_id or user_id")
logging.info("Received request to generate RAG response")
try:
search_time = time.time()
logging.info("Starting document search")
# Encode the query using the custom embedding function
query_embedding = embed_text(body.search_query)
print(body.search_query)
#collection_name = "embed" # Use the collection name where the embeddings are stored
# Perform search using the precomputed embeddings
if body.file_id:
hits, error = searcher.search_documents(collection_name, query_embedding, user_id, file_id=body.file_id)
else:
hits, error = searcher.search_documents(collection_name, query_embedding, user_id)
if error:
logging.error(f"Search documents error: {error}")
raise HTTPException(status_code=500, detail=error)
logging.info("Generating RAG response")
end_search_time = time.time()
search_time_taken = end_search_time - search_time
rag_start_time = time.time()
# Generate the RAG response using the retrieved documents
response, error = generate_rag_response(hits, body.search_query)
rag_end_time = time.time()
rag_time_taken = rag_end_time - rag_start_time
end_time= time.time()
total_time = end_time - start_time
logging.info(f"Search time: {search_time_taken}, RAG time: {rag_time_taken}, Total time: {total_time}")
if error:
logging.error(f"Generate RAG response error: {error}")
raise HTTPException(status_code=500, detail=error)
return {"response": response}
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search-documents/v1")
async def search_documents_x_api_key(
body: XApiKeyRequest,
authorized: bool = Depends(x_api_key_auth)
):
if not authorized:
raise HTTPException(status_code=401, detail="Unauthorized")
start_time = time.time()
organization_id = body.organization_id
user_id = body.user_id
file_id = body.file_id
logging.info(f'search query {body.search_query}')
logging.info(f"organization_id: {organization_id}, user_id: {user_id}")
logging.info("Received request to search documents with x-api-key auth")
try:
logging.info("Starting document search")
# Encode the query using the custom embedding function
query_embedding = embed_text(body.search_query)
#collection_name = "embed" # Use the collection name where the embeddings are stored
# Perform search using the precomputed embeddings
hits, error = searcher.search_documents(collection_name, query_embedding, user_id, limit=3, file_id=file_id)
if error:
logging.error(f"Search documents error: {error}")
raise HTTPException(status_code=500, detail=error)
logging.info(f"Document search completed with {len(hits)} hits")
end_time = time.time()
logging.info(f"Time taken: {end_time - start_time}")
return hits
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/generate-rag-response/v1")
async def generate_rag_response_x_api_key(
body: XApiKeyRequest,
authorized: bool = Depends(x_api_key_auth)
):
# Assuming x_api_key_auth validates the key
if not authorized:
raise HTTPException(status_code=401, detail="Unauthorized")
start_time = time.time()
organization_id = body.organization_id
user_id = body.user_id
file_id = body.file_id
logging.info(f'search query {body.search_query}')
logging.info(f"organization_id: {organization_id}, user_id: {user_id}")
logging.info("Received request to generate RAG response with x-api-key auth")
try:
logging.info("Starting document search")
# Encode the query using the custom embedding function
query_embedding = embed_text(body.search_query)
#collection_name = "embed" # Use the collection name where the embeddings are stored
# Perform search using the precomputed embeddings
hits, error = searcher.search_documents(collection_name, query_embedding, user_id, file_id=file_id)
if error:
logging.error(f"Search documents error: {error}")
raise HTTPException(status_code=500, detail=error)
logging.info("Generating RAG response")
# Generate the RAG response using the retrieved documents
response, error = generate_rag_response(hits, body.search_query)
if error:
logging.error(f"Generate RAG response error: {error}")
raise HTTPException(status_code=500, detail=error)
end_time = time.time()
logging.info(f"Time taken: {end_time - start_time}")
return {"response": response}
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)