|
from crawl4ai import AsyncWebCrawler |
|
from urllib.parse import urlparse |
|
import aiohttp |
|
import asyncio |
|
from asyncio.exceptions import TimeoutError as async_timeout |
|
from fast_async import make_async |
|
from bs4 import BeautifulSoup, NavigableString |
|
import secrets |
|
from datetime import datetime |
|
import random |
|
import os |
|
import re |
|
import uuid |
|
from typing import List, Dict, Tuple, Optional |
|
from io import BytesIO |
|
import PyPDF2 |
|
from fake_useragent import FakeUserAgent |
|
from htmlrag import clean_html, build_block_tree, EmbedHTMLPruner, BM25HTMLPruner |
|
from transformers import AutoTokenizer, AutoConfig |
|
import torch |
|
import time |
|
|
|
class Crawler: |
|
def __init__(self, user_dir=None, rate_limit=1, headless=True, verbose=False): |
|
self.session_pool = {} |
|
self.verbose = verbose |
|
self.rate_limit = rate_limit |
|
self.user_dir = user_dir |
|
self.headless = headless |
|
self.crawler = AsyncWebCrawler( |
|
context_options={"userDataDir": self.user_dir}, |
|
headless=self.headless, |
|
verbose=self.verbose |
|
) |
|
|
|
|
|
self._browser_contexts = {} |
|
self._context_locks = {} |
|
|
|
async def get_browser_context(self, session_id): |
|
"""Get or create a browser context with proper locking""" |
|
if session_id not in self._context_locks: |
|
self._context_locks[session_id] = asyncio.Lock() |
|
|
|
async with self._context_locks[session_id]: |
|
if session_id not in self._browser_contexts: |
|
context = await self.crawler.new_context() |
|
self._browser_contexts[session_id] = context |
|
return self._browser_contexts[session_id] |
|
|
|
async def cleanup_browser_context(self, session_id): |
|
"""Safely cleanup browser context""" |
|
if session_id in self._context_locks: |
|
async with self._context_locks[session_id]: |
|
if session_id in self._browser_contexts: |
|
try: |
|
await asyncio.shield( |
|
self._browser_contexts[session_id].close() |
|
) |
|
except Exception as e: |
|
print(f"Error cleaning up browser context: {e}") |
|
finally: |
|
del self._browser_contexts[session_id] |
|
|
|
def create_session(self): |
|
"""Create a new session with secure ID""" |
|
session_id = secrets.token_urlsafe(32) |
|
self.session_pool[session_id] = { |
|
'created_at': datetime.now(), |
|
'last_used': datetime.now(), |
|
'requests_count': 0 |
|
} |
|
return session_id |
|
|
|
def rotate_session(self, session_id): |
|
"""Implement session rotation logic""" |
|
if self.session_pool[session_id]['requests_count'] > 100: |
|
self.cleanup_session(session_id) |
|
return self.create_session() |
|
return session_id |
|
|
|
def is_dynamic_page(self, html_content: str) -> Tuple[bool, Optional[str]]: |
|
"""Analyzes HTML content to determine if a webpage is dynamically loaded""" |
|
def _check_structural_indicators(soup: BeautifulSoup) -> Dict[str, int]: |
|
"""Check structural indicators of dynamic content loading.""" |
|
scores = { |
|
'empty_containers': 0, |
|
'repeated_structures': 0, |
|
'api_endpoints': 0, |
|
'state_management': 0 |
|
} |
|
|
|
|
|
main_containers = soup.find_all(['main', 'div', 'section'], |
|
class_=lambda x: x and any(term in str(x).lower() |
|
for term in ['content', 'main', 'feed', 'list', 'container'])) |
|
|
|
for container in main_containers: |
|
|
|
if len(container.find_all()) < 3: |
|
scores['empty_containers'] += 1 |
|
|
|
|
|
children = container.find_all(recursive=False) |
|
if children: |
|
first_child_class = children[0].get('class', []) |
|
similar_siblings = [c for c in children[1:] |
|
if c.get('class', []) == first_child_class] |
|
if len(similar_siblings) > 0: |
|
scores['repeated_structures'] += 1 |
|
|
|
|
|
scripts = soup.find_all('script', {'src': True}) |
|
api_patterns = ['/api/', '/graphql', '/rest/', '/v1/', '/v2/'] |
|
for script in scripts: |
|
if any(pattern in script['src'] for pattern in api_patterns): |
|
scores['api_endpoints'] += 1 |
|
|
|
|
|
state_patterns = [ |
|
r'window\.__INITIAL_STATE__', |
|
r'window\.__PRELOADED_STATE__', |
|
r'__REDUX_STATE__', |
|
r'__NUXT__', |
|
r'__NEXT_DATA__', |
|
r'window\.__data' |
|
] |
|
|
|
inline_scripts = soup.find_all('script') |
|
for script in inline_scripts: |
|
if script.string: |
|
for pattern in state_patterns: |
|
if re.search(pattern, script.string): |
|
scores['state_management'] += 1 |
|
|
|
return scores |
|
|
|
def _check_modern_framework_indicators(soup: BeautifulSoup) -> Dict[str, int]: |
|
"""Check for indicators of modern web frameworks and dynamic loading patterns.""" |
|
scores = { |
|
'framework_roots': 0, |
|
'hydration': 0, |
|
'routing': 0 |
|
} |
|
|
|
|
|
framework_roots = { |
|
'react': ['react-root', 'react-app', 'root', '__next'], |
|
'angular': ['ng-version', 'ng-app'], |
|
'vue': ['v-app', '#app', 'nuxt-app'], |
|
'modern': ['app-root', 'application', 'spa-root'] |
|
} |
|
|
|
for framework, identifiers in framework_roots.items(): |
|
for id_value in identifiers: |
|
if (soup.find(attrs={'id': re.compile(id_value, re.I)}) or |
|
soup.find(attrs={'class': re.compile(id_value, re.I)}) or |
|
soup.find(attrs={'data-': re.compile(id_value, re.I)})): |
|
scores['framework_roots'] += 1 |
|
|
|
|
|
hydration_patterns = [ |
|
r'hydrate', |
|
r'createRoot', |
|
r'reactive', |
|
r'observable' |
|
] |
|
|
|
scripts = soup.find_all('script') |
|
for script in scripts: |
|
if script.string: |
|
for pattern in hydration_patterns: |
|
if re.search(pattern, script.string): |
|
scores['hydration'] += 1 |
|
|
|
|
|
router_patterns = [ |
|
'router-view', |
|
'router-link', |
|
'route-link', |
|
'history.push', |
|
'navigation' |
|
] |
|
|
|
for pattern in router_patterns: |
|
if soup.find(class_=re.compile(pattern, re.I)) or \ |
|
soup.find(id=re.compile(pattern, re.I)): |
|
scores['routing'] += 1 |
|
|
|
return scores |
|
|
|
def _check_dynamic_loading_patterns(soup: BeautifulSoup) -> Dict[str, int]: |
|
"""Check for various dynamic content loading patterns.""" |
|
scores = { |
|
'infinite_scroll': 0, |
|
'load_more_buttons': 0, |
|
'pagination': 0, |
|
'lazy_loading': 0, |
|
'loading_indicators': 0 |
|
} |
|
|
|
|
|
scroll_indicators = [ |
|
'infinite-scroll', |
|
'data-infinite', |
|
'data-virtualized', |
|
'virtual-scroll', |
|
'scroll-container', |
|
'scroll-viewport' |
|
] |
|
|
|
for indicator in scroll_indicators: |
|
elements = soup.find_all( |
|
lambda tag: any(indicator.lower() in str(v).lower() |
|
for v in tag.attrs.values()) |
|
) |
|
if elements: |
|
scores['infinite_scroll'] += len(elements) |
|
|
|
|
|
button_patterns = [ |
|
r'load[_-]?more', |
|
r'show[_-]?more', |
|
r'view[_-]?more', |
|
r'see[_-]?more', |
|
r'more[_-]?posts', |
|
r'more[_-]?results' |
|
] |
|
|
|
for pattern in button_patterns: |
|
elements = soup.find_all( |
|
['button', 'a', 'div', 'span'], |
|
text=re.compile(pattern, re.I) |
|
) |
|
if elements: |
|
scores['load_more_buttons'] += len(elements) |
|
|
|
|
|
pagination_patterns = [ |
|
'pagination', |
|
'page-numbers', |
|
'page-nav', |
|
'page-links' |
|
] |
|
|
|
for pattern in pagination_patterns: |
|
elements = soup.find_all(class_=re.compile(pattern, re.I)) |
|
if elements: |
|
scores['pagination'] += len(elements) |
|
|
|
|
|
lazy_patterns = ['lazy', 'data-src', 'data-lazy'] |
|
for pattern in lazy_patterns: |
|
elements = soup.find_all( |
|
lambda tag: any(pattern.lower() in str(v).lower() |
|
for v in tag.attrs.values()) |
|
) |
|
if elements: |
|
scores['lazy_loading'] += len(elements) |
|
|
|
|
|
loading_patterns = [ |
|
'loading', |
|
'spinner', |
|
'skeleton', |
|
'placeholder', |
|
'shimmer' |
|
] |
|
|
|
for pattern in loading_patterns: |
|
elements = soup.find_all(class_=re.compile(pattern, re.I)) |
|
if elements: |
|
scores['loading_indicators'] += len(elements) |
|
|
|
return scores |
|
|
|
def _evaluate_dynamic_indicators( |
|
structural: Dict[str, int], |
|
framework: Dict[str, int], |
|
loading: Dict[str, int] |
|
) -> Tuple[bool, Optional[str]]: |
|
"""Evaluate dynamic indicators and return JavaScript instructions.""" |
|
methods = [] |
|
js_snippets = [] |
|
|
|
|
|
if loading['infinite_scroll'] > 0: |
|
methods.append("scroll") |
|
js_snippets.append( |
|
""" |
|
window.scrollTo(0, document.body.scrollHeight); |
|
await new Promise(resolve => setTimeout(resolve, 1000)); |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if loading['load_more_buttons'] > 0: |
|
methods.append("button") |
|
js_snippets.append( |
|
""" |
|
const button = Array.from(document.querySelectorAll('button, a, div, span')).find( |
|
el => /load[_-]?more|show[_-]?more/i.test(el.textContent) |
|
); |
|
if (button) { |
|
button.click(); |
|
await new Promise(resolve => setTimeout(resolve, 1000)); |
|
} else { |
|
console.warn("No 'Load More' button found."); |
|
} |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if loading.get('pagination', 0) > 0: |
|
methods.append("pagination") |
|
js_snippets.append( |
|
""" |
|
const nextPage = document.querySelector('a[rel="next"], .pagination-next, .page-next'); |
|
if (nextPage) { |
|
nextPage.click(); |
|
await new Promise(resolve => setTimeout(resolve, 1000)); |
|
} else { |
|
console.warn("No pagination link found."); |
|
} |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if loading.get('lazy_loading', 0) > 0: |
|
methods.append("lazy") |
|
js_snippets.append( |
|
""" |
|
if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { |
|
console.log('Framework state detected. Consider monitoring network requests for further actions.'); |
|
} |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if framework['framework_roots'] > 0 or structural['state_management'] > 0: |
|
methods.append("stateful") |
|
js_snippets.append( |
|
""" |
|
if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { |
|
console.log('Detected stateful framework data loading.'); |
|
} |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if structural['api_endpoints'] > 0: |
|
methods.append("api") |
|
js_snippets.append( |
|
""" |
|
console.log('API requests detected. Use browser devtools to inspect network activity for specific endpoints.'); |
|
""".strip().replace('\n', '') |
|
) |
|
|
|
|
|
if methods: |
|
js_code = "\n".join(js_snippets) |
|
return True, js_code |
|
|
|
return False, None |
|
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
structural_scores = _check_structural_indicators(soup) |
|
framework_scores = _check_modern_framework_indicators(soup) |
|
loading_scores = _check_dynamic_loading_patterns(soup) |
|
|
|
|
|
return _evaluate_dynamic_indicators(structural_scores, framework_scores, loading_scores) |
|
|
|
async def crawl( |
|
self, |
|
url, |
|
depth=2, |
|
max_pages=5, |
|
session_id=None, |
|
human_simulation=True, |
|
rotate_user_agent=True, |
|
rotate_proxy=True, |
|
return_html=False |
|
): |
|
if not session_id: |
|
session_id = self.create_session() |
|
|
|
session_id = self.rotate_session(session_id) |
|
|
|
|
|
user_agents = [ |
|
'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', |
|
'Chrome/115.0.0.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', |
|
'Chrome/115.0.0.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', |
|
'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', |
|
'Chrome/115.0.0.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' |
|
] |
|
|
|
|
|
proxies = [ |
|
"http://50.62.183.123:80", |
|
"http://104.129.60.84:6516", |
|
"http://156.228.118.163:3128", |
|
"http://142.111.104.97:6107", |
|
"http://156.228.99.99:3128" |
|
] |
|
|
|
try: |
|
async with self.crawler as crawler: |
|
|
|
headers = { |
|
"User-Agent": random.choice(user_agents) if rotate_user_agent else user_agents[0], |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
|
"Accept-Language": "en-US,en;q=0.5", |
|
"Accept-Encoding": "gzip, deflate", |
|
"Connection": "keep-alive", |
|
"Upgrade-Insecure-Requests": "1", |
|
"Sec-Fetch-Dest": "document", |
|
"Sec-Fetch-Mode": "navigate", |
|
"Sec-Fetch-Site": "none", |
|
"Sec-Fetch-User": "?1", |
|
"Cache-Control": "max-age=0" |
|
} |
|
|
|
|
|
crawler.crawler_strategy.headers = headers |
|
|
|
if rotate_proxy: |
|
|
|
crawler.crawler_strategy.proxy = random.choice(proxies) |
|
|
|
result_1 = await crawler.arun( |
|
session_id=session_id, |
|
url=url, |
|
magic=True if human_simulation else False, |
|
simulate_user=True if human_simulation else False, |
|
override_navigator=True if human_simulation else False, |
|
depth=depth, |
|
max_pages=max_pages, |
|
bypass_cache=True, |
|
remove_overlay_elements=True, |
|
delay_before_retrieve_html=1.0, |
|
verbose=self.verbose |
|
) |
|
|
|
|
|
self.session_pool[session_id]['requests_count'] += 1 |
|
self.session_pool[session_id]['last_used'] = datetime.now() |
|
|
|
if result_1.success: |
|
if hasattr(result_1, 'html'): |
|
success, js_code = self.is_dynamic_page(result_1.html) |
|
|
|
if success: |
|
async with crawler as crawler: |
|
|
|
crawler.crawler_strategy.headers = headers |
|
|
|
if rotate_proxy: |
|
|
|
crawler.crawler_strategy.proxy = random.choice(proxies) |
|
|
|
print(f"Executing JS code: {js_code}") |
|
result_2 = await crawler.arun( |
|
session_id=session_id, |
|
url=url, |
|
magic=True if human_simulation else False, |
|
simulate_user=True if human_simulation else False, |
|
override_navigator=True if human_simulation else False, |
|
depth=depth, |
|
max_pages=max_pages, |
|
js_code=js_code, |
|
bypass_cache=True, |
|
remove_overlay_elements=True, |
|
delay_before_retrieve_html=1.0, |
|
verbose=self.verbose |
|
) |
|
|
|
if result_2.success: |
|
result = result_2 |
|
else: |
|
result = result_1 |
|
|
|
|
|
self.session_pool[session_id]['requests_count'] += 1 |
|
self.session_pool[session_id]['last_used'] = datetime.now() |
|
|
|
else: |
|
result = result_1 |
|
|
|
if return_html and hasattr(result, 'html'): |
|
return result.html |
|
elif hasattr(result, 'fit_markdown'): |
|
return result.fit_markdown |
|
elif hasattr(result, 'markdown'): |
|
return self.extract_content(result.markdown) |
|
|
|
except Exception as e: |
|
print(f"Error crawling {url}: {str(e)}") |
|
|
|
return None |
|
|
|
async def crawl_with_retry( |
|
self, |
|
url, |
|
depth=2, |
|
max_pages=5, |
|
max_retries=3, |
|
backoff_factor=1, |
|
session_id=None, |
|
human_simulation=True, |
|
rotate_user_agent=True, |
|
rotate_proxy=True, |
|
return_html=False, |
|
timeout=10.0 |
|
): |
|
"""Crawl with retry logic and anti-blocking measures""" |
|
|
|
async def attempt_crawl(attempt): |
|
try: |
|
async with async_timeout.timeout(timeout): |
|
context = await self.get_browser_context(session_id) |
|
return await self.crawl( |
|
context, |
|
url, |
|
depth, |
|
max_pages, |
|
session_id, |
|
human_simulation, |
|
rotate_user_agent, |
|
rotate_proxy, |
|
return_html |
|
) |
|
except asyncio.TimeoutError: |
|
print(f"Timeout on attempt {attempt} for {url}") |
|
raise |
|
except Exception as e: |
|
print(f"Error on attempt {attempt} for {url}: {e}") |
|
raise |
|
|
|
if not self.is_valid_url(url) and not self.is_html_url(url): |
|
print(f"Invalid URL: {url}") |
|
return f"No web results found for query: {url}" |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
if attempt > 0: |
|
|
|
delay = backoff_factor * (2 ** (attempt - 1)) |
|
await asyncio.sleep(delay) |
|
|
|
return await attempt_crawl(attempt + 1) |
|
except Exception as e: |
|
if attempt == max_retries - 1: |
|
print(f"Max retries ({max_retries}) reached for {url}") |
|
return f"Failed to crawl after {max_retries} attempts: {url}" |
|
continue |
|
|
|
return f"No content found after {max_retries} attempts for: {url}" |
|
|
|
def extract_content(self, html_content): |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
return text |
|
|
|
def cleanup_session(self, session_id): |
|
"""Clean up a session""" |
|
print(f"Cleaning up session {session_id}") |
|
if session_id in self.session_pool: |
|
self.crawler.crawler_strategy.kill_session(session_id) |
|
del self.session_pool[session_id] |
|
|
|
def cleanup_expired_sessions(self): |
|
"""Regular cleanup of expired sessions using proper time calculation""" |
|
try: |
|
current_time = datetime.now() |
|
expired_sessions = [] |
|
|
|
for sid, data in self.session_pool.items(): |
|
|
|
time_diff = (current_time - data['last_used']).total_seconds() |
|
|
|
|
|
if time_diff > 3600: |
|
expired_sessions.append(sid) |
|
|
|
|
|
for session_id in expired_sessions: |
|
self.cleanup_session(session_id) |
|
|
|
except Exception as e: |
|
if self.verbose: |
|
print(f"Error during session cleanup: {str(e)}") |
|
|
|
@staticmethod |
|
def is_valid_url(url): |
|
try: |
|
result = urlparse(url) |
|
return all([result.scheme, result.netloc]) |
|
except ValueError: |
|
return False |
|
|
|
@staticmethod |
|
def is_html_url(url): |
|
return url.endswith(".html") or url.endswith(".htm") |
|
|
|
class CustomCrawler: |
|
def __init__( |
|
self, |
|
embed_model: str = "HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1", |
|
max_concurrent_requests: int = 10, |
|
verbose: bool = True |
|
): |
|
print(f"π¦ Initializing the crawler") if verbose else None |
|
time.sleep(1) |
|
self.embed_model = embed_model |
|
self.max_concurrent_requests = max_concurrent_requests |
|
self.verbose = verbose |
|
self.ua = FakeUserAgent() |
|
self.semaphore = asyncio.Semaphore(self.max_concurrent_requests) |
|
self.sessions = {} |
|
|
|
|
|
print(f"π Loading HTML Pruners and Tokenizer with {self.embed_model}") if self.verbose else None |
|
self.bm25_html_pruner = BM25HTMLPruner() |
|
self.embed_html_pruner = EmbedHTMLPruner( |
|
embed_model=self.embed_model, |
|
local_inference=True |
|
) |
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
self.embed_model, |
|
use_fast=True, |
|
trust_remote_code=True, |
|
device="cuda" if torch.cuda.is_available() else "cpu" |
|
) |
|
|
|
|
|
print(f"π οΈ Getting model configuration for {self.embed_model}") if self.verbose else None |
|
self.config = AutoConfig.from_pretrained(self.embed_model) |
|
self.tokenizer.max_seq_length = self.config.max_position_embeddings |
|
print(f"π Setting max context length to {self.tokenizer.max_seq_length}") if self.verbose else None |
|
|
|
async def create_session(self): |
|
session_id = str(uuid.uuid4()) |
|
timeout = aiohttp.ClientTimeout(total=600) |
|
connector = aiohttp.TCPConnector(limit=self.max_concurrent_requests) |
|
self.sessions[session_id] = aiohttp.ClientSession(timeout=timeout, connector=connector) |
|
print(f"π Created session: {session_id}") if self.verbose else None |
|
return session_id |
|
|
|
async def close_session(self, session_id): |
|
session = self.sessions.pop(session_id, None) |
|
if session: |
|
await session.close() |
|
print(f"π Closed session: {session_id}") if self.verbose else None |
|
|
|
async def cleanup_expired_sessions(self, expiration_time: int = 600): |
|
current_time = time.time() |
|
expired_sessions = [] |
|
print("π Checking for expired sessions") if self.verbose else None |
|
for session_id, (session, creation_time) in self.sessions.items(): |
|
if current_time - creation_time > expiration_time: |
|
expired_sessions.append(session_id) |
|
|
|
for session_id in expired_sessions: |
|
await self.close_session(session_id) |
|
|
|
print(f"ποΈ Successfully cleaned up all expired sessions") if self.verbose else None |
|
|
|
@make_async |
|
def html_rag( |
|
self, |
|
query: str, |
|
html: str, |
|
max_context_length: int = 32000, |
|
buffer: int = 2000 |
|
) -> str: |
|
if not html: |
|
raise Exception("No HTML contents provided.") |
|
|
|
|
|
try: |
|
BeautifulSoup(html, 'html.parser') |
|
except Exception as e: |
|
raise Exception(f"Invalid HTML content: {e}") |
|
|
|
prompt_for_retrieval = \ |
|
"""Given a query, your task is to retrieve the most relevant passages that answers and/or is relevant to the query. |
|
|
|
Query:""" |
|
|
|
self.embed_html_pruner.query_instruction_for_retrieval = prompt_for_retrieval |
|
|
|
print(f"π§Ή Pruning HTML for query: {query}") if self.verbose else None |
|
cleaned_html = clean_html(html) |
|
block_tree, cleaned_html = build_block_tree(cleaned_html, max_node_words=10) |
|
|
|
block_rankings = self.bm25_html_pruner.calculate_block_rankings(query, cleaned_html, block_tree) |
|
|
|
max_context_window = max_context_length - buffer |
|
pruned_html = self.embed_html_pruner.prune_HTML( |
|
cleaned_html, |
|
block_tree, |
|
block_rankings, |
|
self.tokenizer, |
|
max_context_window |
|
) |
|
print(f"π Successfully pruned HTML for query: {query}") if self.verbose else None |
|
return pruned_html |
|
|
|
async def fetch_page_contents( |
|
self, |
|
urls: List[str], |
|
query: Optional[str] = None, |
|
session_id: Optional[str] = None, |
|
max_attempts: int = 3, |
|
delay: float = 1.0, |
|
timeout: float = 10.0, |
|
return_type: str = "markdown", |
|
rotate_headers: bool = True, |
|
) -> List[Optional[str]]: |
|
async def fetch_single_page(url, proxies, session=None, query=query): |
|
for attempt in range(max_attempts): |
|
print(f"π Attempt {attempt + 1}/{max_attempts}: Fetching content from {url}") if self.verbose else None |
|
content = await self._fetch_page_contents( |
|
url=url, |
|
query=query, |
|
timeout=timeout, |
|
return_type=return_type, |
|
rotate_headers=rotate_headers, |
|
proxies=proxies, |
|
session=session |
|
) |
|
|
|
if content: |
|
print(f"β
Successfully fetched content from {url}") if self.verbose else None |
|
return content |
|
else: |
|
if max_attempts > 1: |
|
print(f"π« Failed to fetch content from {url}. Retrying in {delay} seconds...") if self.verbose else None |
|
await asyncio.sleep(delay) |
|
|
|
print(f"π« Failed to fetch content from {url} after {max_attempts} attempts.") if self.verbose else None |
|
return None |
|
|
|
proxy_list = self.load_proxies() |
|
if proxy_list: |
|
proxies = proxy_list |
|
else: |
|
proxies = None |
|
|
|
if not urls: |
|
raise Exception("No URLs provided!") |
|
|
|
if return_type == "fit_markdown" and query is None: |
|
raise Exception("Query must be provided when return_type is 'fit_markdown'!") |
|
|
|
if session_id: |
|
if session_id not in self.sessions: |
|
raise ValueError(f"Invalid session ID: {session_id}") |
|
session = self.sessions[session_id] |
|
tasks = [fetch_single_page(url, proxies, session) for url in urls] |
|
else: |
|
tasks = [fetch_single_page(url, proxies) for url in urls] |
|
|
|
results = await asyncio.gather(*tasks) |
|
return [result for result in results if result is not None] |
|
|
|
async def _fetch_page_contents( |
|
self, |
|
url: str, |
|
query: Optional[str] = None, |
|
timeout: float = 5.0, |
|
return_type: str = "markdown", |
|
rotate_headers: bool = True, |
|
proxies: Optional[List[str]] = None, |
|
session: Optional[aiohttp.ClientSession] = None |
|
) -> Optional[str]: |
|
async def get_content(response, return_type=return_type): |
|
print(f"π Getting content from {url}") if self.verbose else None |
|
if return_type == "html": |
|
return await response.text() |
|
|
|
response.raise_for_status() |
|
content_type = response.headers.get('Content-Type', '').lower() |
|
|
|
if 'application/pdf' in content_type: |
|
content = await response.read() |
|
text = self.extract_text_from_pdf(content) |
|
return text |
|
elif 'text/html' in content_type: |
|
html_content = await response.text() |
|
if return_type == "fit_markdown": |
|
html_content = self.html_rag(query, html_content).wait() |
|
|
|
soup = BeautifulSoup(html_content, "html.parser") |
|
for script_or_style in soup(["script", "style"]): |
|
script_or_style.decompose() |
|
text = self.html_to_markdown(soup) |
|
return text.strip() |
|
else: |
|
print(f"π« Unsupported content type {content_type} for URL {url}") if self.verbose else None |
|
return None |
|
|
|
headers = self.get_headers() if rotate_headers else {} |
|
proxy = self.get_proxy(proxies) if proxies else None |
|
|
|
|
|
timeout_config = aiohttp.ClientTimeout(total=timeout) |
|
|
|
try: |
|
|
|
if session: |
|
async with session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: |
|
return await get_content(response) |
|
|
|
else: |
|
async with aiohttp.ClientSession() as new_session: |
|
async with new_session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: |
|
return await get_content(response) |
|
|
|
except aiohttp.ClientError as e: |
|
print(f"π« Request Exception for {url}: {e}") if self.verbose else None |
|
return None |
|
except asyncio.TimeoutError as e: |
|
print(f"π« Timeout error for {url}") if self.verbose else None |
|
return None |
|
except Exception as e: |
|
print(f"π« Unexpected error fetching {url}: {e}") if self.verbose else None |
|
return None |
|
|
|
def load_proxies(self) -> Optional[List[str]]: |
|
|
|
env_vars = dict(os.environ) |
|
|
|
|
|
proxy_pattern = re.compile(r"PROXY_\d+") |
|
proxies = [env_vars[key] for key in env_vars if proxy_pattern.match(key)] |
|
|
|
if proxies: |
|
print(f"π Loaded {len(proxies)} proxies from environment variables") if self.verbose else None |
|
return proxies |
|
else: |
|
return None |
|
|
|
def get_proxy(self, proxies: List[str]) -> str: |
|
if proxies: |
|
return next(iter(proxies)) |
|
return None |
|
|
|
def get_headers(self) -> Dict[str, str]: |
|
return {'User-Agent': self.ua.random} |
|
|
|
def extract_text_from_pdf(self, pdf_content: bytes) -> str: |
|
try: |
|
print(f"π Extracting text from PDF") if self.verbose else None |
|
pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_content)) |
|
text = '' |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
print(f"πͺ Successfully extracted text from PDF") if self.verbose else None |
|
return text |
|
except Exception as e: |
|
print(f"π« Error extracting text from PDF: {e}") if self.verbose else None |
|
return "" |
|
|
|
def html_to_markdown(self, soup): |
|
markdown_text = "" |
|
print(f"π Converting HTML to Markdown") if self.verbose else None |
|
def process_element(element, indent=0): |
|
nonlocal markdown_text |
|
|
|
if isinstance(element, NavigableString): |
|
text = str(element).strip() |
|
if text: |
|
markdown_text += text + " " |
|
return |
|
|
|
tag = element.name |
|
|
|
if tag == "h1": |
|
markdown_text += "# " + element.text.strip() + "\n\n" |
|
elif tag == "h2": |
|
markdown_text += "## " + element.text.strip() + "\n\n" |
|
elif tag == "h3": |
|
markdown_text += "### " + element.text.strip() + "\n\n" |
|
elif tag == "h4": |
|
markdown_text += "#### " + element.text.strip() + "\n\n" |
|
elif tag == "h5": |
|
markdown_text += "##### " + element.text.strip() + "\n\n" |
|
elif tag == "h6": |
|
markdown_text += "###### " + element.text.strip() + "\n\n" |
|
elif tag == "p": |
|
markdown_text += element.text.strip() + "\n\n" |
|
elif tag == "br": |
|
markdown_text += "\n" |
|
elif tag == "ul": |
|
for li in element.find_all("li", recursive=False): |
|
markdown_text += " " * indent + "- " |
|
process_element(li, indent + 1) |
|
markdown_text += "\n" |
|
markdown_text += "\n" |
|
elif tag == "ol": |
|
for i, li in enumerate(element.find_all("li", recursive=False), 1): |
|
markdown_text += " " * indent + f"{i}. " |
|
process_element(li, indent + 1) |
|
markdown_text += "\n" |
|
markdown_text += "\n" |
|
elif tag == "table": |
|
rows = element.find_all("tr") |
|
for row in rows: |
|
cells = row.find_all(["td", "th"]) |
|
row_text = [cell.text.strip() for cell in cells] |
|
markdown_text += "| " + " | ".join(row_text) + " |\n" |
|
if row == rows[0]: |
|
markdown_text += "| " + " | ".join(["---"] * len(cells)) + " |\n" |
|
markdown_text += "\n" |
|
elif tag == "blockquote": |
|
markdown_text += "> " + element.text.strip().replace("\n", "\n> ") + "\n\n" |
|
elif tag == "strong" or tag == "b": |
|
markdown_text += "**" + element.text.strip() + "**" |
|
elif tag == "em" or tag == "i": |
|
markdown_text += "*" + element.text.strip() + "*" |
|
elif tag == "code": |
|
markdown_text += "`" + element.text.strip() + "`" |
|
elif tag == "pre": |
|
markdown_text += "```\n" + element.text + "\n```\n\n" |
|
elif tag == "hr": |
|
markdown_text += "---\n\n" |
|
else: |
|
for child in element.children: |
|
process_element(child, indent) |
|
|
|
process_element(soup) |
|
print(f"π Successfully converted HTML to Markdown") if self.verbose else None |
|
return markdown_text |
|
|
|
if __name__ == "__main__": |
|
import time |
|
import winloop |
|
|
|
URLS = [ |
|
"https://en.wikipedia.org/wiki/Treaty_Principles_Bill#:~:text=The%20Treaty%20Principles%20Bill%2C%20or,of%20the%20Treaty%20of%20Waitangi.", |
|
"https://www.parliament.nz/en/pb/sc/make-a-submission/document/54SCJUST_SCF_227E6D0B-E632-42EB-CFFE-08DCFEB826C6/principles-of-the-treaty-of-waitangi-bill", |
|
"https://en.wikipedia.org/wiki/Waitangi_Tribunal", |
|
"https://aljazeera.com/news/2024/11/19/why-are-new-zealands-maori-protesting-over-colonial-era-treaty-bill", |
|
"https://downiewenjack.ca/treaty-of-waitangi-treaty-principles-bill/" |
|
] |
|
|
|
query = "What is the Treaty of Waitangi Bill?" |
|
loop = asyncio.get_event_loop() |
|
custom_crawler = CustomCrawler(max_concurrent_requests=1000) |
|
session_id = loop.run_until_complete(custom_crawler.create_session()) |
|
start = time.perf_counter() |
|
winloop.install() |
|
result = loop.run_until_complete(custom_crawler.fetch_page_contents( |
|
URLS, |
|
query, |
|
session_id=session_id, |
|
timeout=20, |
|
max_attempts=1, |
|
return_type="fit_markdown", |
|
) |
|
) |
|
end = time.perf_counter() |
|
loop.run_until_complete(custom_crawler.close_session(session_id)) |
|
loop.run_until_complete(custom_crawler.cleanup_expired_sessions()) |
|
print("\n\n".join([f"Document {i+1}:\n\n{result[i]}" for i in range(len(result))])) |
|
print(f"\n\nTime taken: {end - start} seconds") |
|
|