Spaces:
Sleeping
Sleeping
from typing import List, Union, Optional | |
import os | |
import requests | |
import re | |
import time | |
import shutil | |
import subprocess | |
import pandas as pd | |
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from bs4 import BeautifulSoup | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import chromedriver_autoinstaller | |
class PatentDownloader: | |
url = "https://patents.google.com" | |
def __init__(self, verbose: bool = False): | |
""" | |
Parameters | |
---------- | |
verbose : bool | |
Print additional debug information. | |
""" | |
self.verbose = verbose | |
self.chrome_path = self.install_chrome() | |
def install_chrome(self) -> str: | |
""" | |
Download and install Google Chrome dynamically. | |
Returns | |
------- | |
str: Path to the Chrome binary. | |
""" | |
chrome_path = "/usr/bin/google-chrome" | |
if not shutil.which("google-chrome"): | |
print("Downloading and installing Google Chrome...") | |
subprocess.run( | |
"wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -O chrome.deb", | |
shell=True, | |
check=True, | |
) | |
subprocess.run( | |
"apt-get update && apt-get install -y ./chrome.deb", | |
shell=True, | |
check=True, | |
) | |
os.remove("chrome.deb") | |
if not shutil.which("google-chrome"): | |
raise ValueError("Google Chrome installation failed!") | |
return chrome_path | |
def download(self, patent: Union[str, List[str]], output_path: str = "./", | |
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None: | |
""" | |
Download patent document(s) as PDF. | |
""" | |
if isinstance(patent, list) or os.path.isfile(patent): | |
self.get_pdfs(patent, output_path, waiting_time, remove_kind_codes) | |
else: | |
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes) | |
def get_pdf(self, patent: str, output_path: str = "./", waiting_time: int = 10, | |
remove_kind_codes: Optional[List[str]] = None) -> None: | |
""" | |
Download a single patent PDF. | |
""" | |
if remove_kind_codes: | |
for kind_code in remove_kind_codes: | |
patent = re.sub(kind_code + "$", "", patent) | |
# Automatically install ChromeDriver | |
chromedriver_autoinstaller.install() | |
# Set up Chrome options | |
chrome_options = Options() | |
chrome_options.binary_location = self.chrome_path | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
# Initialize Selenium WebDriver | |
service = Service() | |
driver = webdriver.Chrome(service=service, options=chrome_options) | |
pdf_link = None # Ensure pdf_link is defined | |
try: | |
driver.get(self.url) | |
# Wait for the search input field and interact with it | |
print("Waiting for the search input field...") | |
search_input_xpath = "//input[@aria-label='Search patents']" | |
WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located((By.XPATH, search_input_xpath)) | |
) | |
element = driver.find_element(By.XPATH, search_input_xpath) | |
print("Search input field located.") | |
element.send_keys(patent) | |
element.send_keys(Keys.RETURN) | |
# Wait for search results to load | |
print("Waiting for search results to load...") | |
WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
time.sleep(waiting_time) | |
# Parse HTML and get the PDF link | |
soup = BeautifulSoup(driver.page_source, "html.parser") | |
pdf_link = self.get_pdf_link(soup, patent) | |
except Exception as e: | |
print(f"Error occurred: {e}") | |
finally: | |
driver.quit() | |
# Download the PDF | |
if pdf_link: | |
validate_directory(output_path) | |
pdf_content = requests.get(pdf_link).content | |
with open(os.path.join(output_path, f"{patent}.pdf"), "wb") as file: | |
file.write(pdf_content) | |
print(f">>> Patent {patent} successfully downloaded <<<") | |
else: | |
print(f"Error: PDF link for patent {patent} not found!") | |
def get_pdfs(self, patents: Union[List[str], str], output_path: str = "./", | |
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None: | |
""" | |
Download multiple patent PDFs from a list or file. | |
""" | |
if isinstance(patents, str): | |
if patents.lower().endswith('csv'): | |
df_patents = pd.read_csv(patents) | |
patents = df_patents['patent_number'].to_list() | |
elif patents.lower().endswith('txt'): | |
with open(patents, 'r') as txt_file: | |
patents = txt_file.read().splitlines() | |
else: | |
raise NotImplementedError(f'Unsupported file type: {patents}') | |
for i, patent in enumerate(patents): | |
print(len(patents) - i, "patent(s) remaining.") | |
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes) | |
def get_pdf_link(soup: BeautifulSoup, patent: str) -> Optional[str]: | |
""" | |
Extract the PDF link from parsed HTML. | |
""" | |
pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith("pdf")] | |
for link in pdf_links: | |
if patent.lower() in link.lower(): | |
return link | |
return None | |
def validate_directory(directory: str) -> None: | |
""" | |
Ensure the output directory exists. | |
""" | |
if not os.path.exists(directory): | |
os.makedirs(directory) |