from google.cloud import storage | |
#storage_client = storage.Client() | |
storage_client = storage.Client.create_anonymous_client() | |
bucket_name = "docs-axio-clara" | |
from langchain_community.vectorstores import Annoy | |
from langchain_community.document_loaders import TextLoader | |
from langchain_text_splitters import CharacterTextSplitter | |
from climateqa.engine.embeddings import get_embeddings_function | |
embeddings_function = get_embeddings_function() | |
import os | |
import pdfplumber | |
def get_PDF_Names_from_GCP(): | |
listName = [] | |
# Récupération des fichier depuis GCP storage | |
blobs = storage_client.list_blobs(bucket_name, prefix='sources/') | |
for blob in blobs: | |
listName.append(blob.name) | |
return listName | |
def get_PDF_from_GCP(folder_path, pdf_folder="./PDF"): | |
# Récupération des fichier depuis GCP storage | |
blobs = storage_client.list_blobs(bucket_name, prefix='sources/') | |
for blob in blobs: | |
print( "\n"+blob.name+":") | |
print( " <- Téléchargement Depuis GCP") | |
blob.download_to_filename(pdf_folder+"/"+blob.name) | |
# Extraction des textes dpuis les fichiers PDF | |
print(" >>> Extraction PDF") | |
for pdf_file in os.listdir(pdf_folder): | |
if pdf_file.startswith("."): | |
continue | |
print(" > "+pdf_folder+"/"+pdf_file) | |
pdf_total_pages = 0 | |
with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: | |
pdf_total_pages = len(pdf.pages) | |
# Fuite mémoire pour les gros fichiers | |
# Reouvrir le fichier à chaque N page semble rélgler le problème | |
N_page = 300 | |
page_number = 0 | |
while page_number < pdf_total_pages: | |
print(" -- ouverture du fichier pour "+str(N_page)+ " pages --" ) | |
with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: | |
npage = 0 | |
while (npage < N_page and page_number < pdf_total_pages) : | |
print(" >>> "+str(page_number+1)) | |
f = open(folder_path+"/"+pdf_file+"..:page:.."+str(page_number+1), "w") | |
for char_pdf in pdf.pages[page_number].chars: | |
f.write(char_pdf["text"]) | |
f.close() | |
npage = npage + 1 | |
page_number = page_number + 1 | |
print(" X removing: " + blob.name ) | |
os.remove(pdf_folder+"/"+blob.name) | |
def build_vectores_stores(folder_path, pdf_folder="./PDF", vectors_path = "./vectors"): | |
if os.path.isfile(vectors_path+"/index.annoy"): | |
return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) | |
try: | |
os.mkdir(vectors_path) | |
except: | |
pass | |
try: | |
# Récupération des fichier depuis GCP storage | |
blobs = storage_client.list_blobs(bucket_name, prefix='testvectors/') | |
for blob in blobs: | |
print( "\n"+blob.name.split("/")[-1]+":") | |
print( " <- Téléchargement Depuis GCP") | |
blob.download_to_filename(vectors_path+"/"+blob.name.split("/")[-1]) | |
except: | |
pass | |
# TODO A FUNCTION FOR THAT TO AVOID CODE DUPLICATION | |
if os.path.isfile(vectors_path+"/index.annoy"): | |
return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) | |
print("MISSING VECTORS") | |
exit(0) | |
# get_PDF_from_GCP(folder_path, pdf_folder) | |
# print(" Vectorisation ...") | |
# docs = [] | |
# vector_store_from_docs = () # Créer un nouvel objet Annoy ou utiliser celui déjà initialisé selon votre code existant | |
# for filename in os.listdir(folder_path): | |
# if filename.startswith("."): | |
# continue | |
# file_path = os.path.join(folder_path, filename) | |
# if os.path.isfile(file_path): | |
# loader = TextLoader(file_path) | |
# documents = loader.load() | |
# | |
# for doc in documents: | |
# if (doc.metadata): | |
# doc.metadata["ax_page"] = doc.metadata['source'].split("..:page:..")[-1] | |
# doc.metadata["ax_name"] = doc.metadata['source'].split("..:page:..")[0].split("/")[-1] | |
# doc.metadata["ax_url"] = "https://storage.googleapis.com/docs-axio-clara/sources/"+doc.metadata["ax_name"] | |
# | |
# text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
# docs += text_splitter.split_documents(documents) | |
# vector_store_from_docs = Annoy.from_documents(docs, embeddings_function) | |
# vector_store_from_docs.save_local(vectors_path) | |
# return vector_store_from_docs | |
# Pinecone | |
# More info at https://docs.pinecone.io/docs/langchain | |
# And https://python.langchain.com/docs/integrations/vectorstores/pinecone | |
#import os | |
#from pinecone import Pinecone | |
#from langchain_community.vectorstores import Pinecone as PineconeVectorstore | |
# LOAD ENVIRONMENT VARIABLES | |
#try: | |
# from dotenv import load_dotenv | |
# load_dotenv() | |
#except: | |
# pass | |
#def get_pinecone_vectorstore(embeddings,text_key = "content"): | |
# # initialize pinecone | |
# pinecone.init( | |
# api_key=os.getenv("PINECONE_API_KEY"), # find at app.pinecone.io | |
# environment=os.getenv("PINECONE_API_ENVIRONMENT"), # next to api key in console | |
# ) | |
# index_name = os.getenv("PINECONE_API_INDEX") | |
# vectorstore = Pinecone.from_existing_index(index_name, embeddings,text_key = text_key) | |
# return vectorstore | |
# pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) | |
# index = pc.Index(os.getenv("PINECONE_API_INDEX")) | |
# vectorstore = PineconeVectorstore( | |
# index, embeddings, text_key, | |
# ) | |
# return vectorstore | |
# def get_pinecone_retriever(vectorstore,k = 10,namespace = "vectors",sources = ["IPBES","IPCC"]): | |
# assert isinstance(sources,list) | |
# # Check if all elements in the list are either IPCC or IPBES | |
# filter = { | |
# "source": { "$in":sources}, | |
# } | |
# retriever = vectorstore.as_retriever(search_kwargs={ | |
# "k": k, | |
# "namespace":"vectors", | |
# "filter":filter | |
# }) | |
# return retriever |