Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import sys | |
from langchain_community.embeddings import OpenAIEmbeddings | |
from dotenv import load_dotenv | |
def install_packages(): | |
# List of packages to install in separate batches | |
packages_batches = [ | |
["langchain", "langchain-openai", "langchain_core", "langchain-community", "langchainhub", "openai", "langchain-qdrant"], | |
["qdrant-client", "pymupdf", "pandas"], | |
["llama-index", "--no-cache-dir"], | |
["llama-parse", "PyPDF2", "tiktoken"], | |
["langchain-text-splitters"], | |
["PyPDF2"], | |
["scikit-learn"] | |
] | |
# Install each batch of packages | |
for package_list in packages_batches: | |
try: | |
print(f"Installing: {' '.join(package_list)}") | |
subprocess.check_call([sys.executable, "-m", "pip", "install"] + package_list) | |
print(f"Successfully installed: {' '.join(package_list)}\n") | |
except subprocess.CalledProcessError as e: | |
print(f"Failed to install {package_list}: {e}\n") | |
# Call the function to install the packages | |
if __name__ == "__main__": | |
install_packages() | |
# Load environment variables from .env file | |
load_dotenv() | |
# Get the OpenAI API key from the environment variables | |
api_key = os.getenv("OPENAI_API_KEY") | |
# Check if the API key is loaded | |
if not api_key: | |
print("OpenAI API key not found. Please ensure it is set in the .env file.") | |
else: | |
print("OpenAI API key loaded successfully.") | |
import nest_asyncio | |
nest_asyncio.apply() | |
# Function to extract text from PDF URLs | |
import re | |
import requests | |
from PyPDF2 import PdfReader | |
from io import BytesIO | |
# URLs for the two PDFs | |
pdf_urls = [ | |
"https://www.whitehouse.gov/wp-content/uploads/2022/10/Blueprint-for-an-AI-Bill-of-Rights.pdf", | |
"https://nvlpubs.nist.gov/nistpubs/ai/NIST.AI.600-1.pdf" | |
] | |
def extract_text_from_pdf(url): | |
response = requests.get(url) | |
pdf_file = BytesIO(response.content) | |
reader = PdfReader(pdf_file) | |
pdf_text = "" | |
for page in reader.pages: | |
pdf_text += page.extract_text() | |
cleaned_text = pdf_text.replace("\n", " ").replace("\r", " ").strip() | |
cleaned_text = " ".join(cleaned_text.split()) | |
sentences = re.split(r'(?<=[.!?]) +', cleaned_text) | |
return sentences | |
# Extract text from both PDFs | |
sentences_list = [] | |
for url in pdf_urls: | |
sentences = extract_text_from_pdf(url) | |
sentences_list.append(sentences) | |
print(f"Extracted {len(sentences)} sentences from {url}") | |
# Semantic chunking | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from sklearn.metrics.pairwise import cosine_similarity | |
import tiktoken | |
import numpy as np | |
embedding_model = OpenAIEmbeddings() | |
flat_sentences = [sentence for sublist in sentences_list for sentence in sublist] | |
embeddings = embedding_model.embed_documents(flat_sentences) | |
def greedy_chunk_sentences(sentences, sentence_embeddings, max_chunk_size=1000, similarity_threshold=0.75): | |
chunks = [] | |
current_chunk = [] | |
current_chunk_tokens = 0 | |
encoder = tiktoken.get_encoding("cl100k_base") | |
for i, sentence in enumerate(sentences): | |
sentence_tokens = len(encoder.encode(sentence)) | |
if current_chunk: | |
similarity = cosine_similarity([sentence_embeddings[i]], [sentence_embeddings[i - 1]])[0][0] | |
if similarity < similarity_threshold or current_chunk_tokens + sentence_tokens > max_chunk_size: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [] | |
current_chunk_tokens = 0 | |
current_chunk.append(sentence) | |
current_chunk_tokens += sentence_tokens | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
# Perform greedy chunking | |
semantic_chunks = greedy_chunk_sentences(sentences_list[0], embeddings) | |
# Qdrant setup for storing chunks | |
from qdrant_client import QdrantClient | |
from qdrant_client.http.models import Distance, VectorParams | |
from langchain_qdrant import QdrantVectorStore | |
from langchain.schema import Document | |
import uuid | |
LOCATION = ":memory:" | |
COLLECTION_NAME = "Semantic_Chunking" | |
qdrant_client = QdrantClient(LOCATION) | |
qdrant_client.create_collection( | |
collection_name=COLLECTION_NAME, | |
vectors_config=VectorParams(size=1536, distance=Distance.COSINE) | |
) | |
qdrant_vector_store = QdrantVectorStore( | |
client=qdrant_client, | |
collection_name=COLLECTION_NAME, | |
embedding=embedding_model, | |
) | |
documents = [Document(page_content=chunk, metadata={"source": "generated"}, id=str(uuid.uuid4())) for chunk in semantic_chunks] | |
qdrant_vector_store.add_documents(documents) | |
# Retrieve data from Qdrant | |
retriever = qdrant_vector_store.as_retriever() | |
# Define prompt and execute RAG chain | |
from langchain.prompts import ChatPromptTemplate | |
from operator import itemgetter | |
from langchain_openai import ChatOpenAI | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
template = """ | |
### You are a helpful assistant. Use the available context to answer the question. If you can't answer the question, say you don't know. | |
Question: | |
{question} | |
Context: | |
{context} | |
""" | |
prompt = ChatPromptTemplate.from_template(template) | |
primary_qa_llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0) | |
retrieval_augmented_qa_chain = ( | |
{"context": itemgetter("question") | retriever, "question": itemgetter("question")} | |
| RunnablePassthrough.assign(context=itemgetter("context")) | |
| {"response": prompt | primary_qa_llm, "context": itemgetter("context")} | |
) | |
# Query the RAG chain | |
question = "What are the top AI risks and how to best manage them?" | |
result = retrieval_augmented_qa_chain.invoke({"question": question}) | |
print(result["response"].content) | |