import subprocess import tempfile import os from io import StringIO import re from functools import lru_cache from typing import Optional, Dict, Union from browserforge.headers import Browser, HeaderGenerator from tldextract import extract from markitdown import MarkItDown from markdown import Markdown import brotli import zstandard as zstd import gzip import zlib from urllib.parse import unquote from smolagents import tool class Response: def __init__(self, response, convert_to_markdown, convert_to_plain_text): self._response = response self._convert_to_markdown = convert_to_markdown self._convert_to_plain_text = convert_to_plain_text self._markdown = None self._plain_text = None def __getattr__(self, item): return getattr(self._response, item) @property def markdown(self) -> str: if self._markdown is None: self._markdown = self._convert_to_markdown(self._response.content) return self._markdown @property def plain_text(self) -> str: if self._plain_text is None: self._plain_text = self._convert_to_plain_text(self._response.content) return self._plain_text def generate_headers() -> Dict[str, str]: browsers = [ Browser(name='chrome', min_version=120), Browser(name='firefox', min_version=120), Browser(name='edge', min_version=120), ] return HeaderGenerator(browser=browsers, device='desktop').generate() @lru_cache(None, typed=True) def generate_convincing_referer(url: str) -> str: website_name = extract(url).domain return f'https://www.google.com/search?q={website_name}' def headers_job(headers: Optional[Dict], url: str) -> Dict: headers = headers or {} # Ensure a User-Agent is present. headers['User-Agent'] = generate_headers().get('User-Agent') extra_headers = generate_headers() headers.update(extra_headers) headers.update({'referer': generate_convincing_referer(url)}) return headers def convert_to_markdown(content: bytes) -> str: md = MarkItDown() temp_path = None try: with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(content) tmp_file.flush() temp_path = tmp_file.name markdown_result = md.convert_local(temp_path).text_content return markdown_result except Exception as e: raise e finally: if temp_path and os.path.exists(temp_path): os.remove(temp_path) def convert_to_plain_text(content: bytes) -> str: md_content = convert_to_markdown(content) def unmark_element(element, stream=None): if stream is None: stream = StringIO() if element.text: stream.write(element.text) for sub in element: unmark_element(sub, stream) if element.tail: stream.write(element.tail) return stream.getvalue() Markdown.output_formats["plain"] = unmark_element __md = Markdown(output_format="plain") __md.stripTopLevelTags = False final_text = __md.convert(md_content) final_text = re.sub(r"\n+", " ", final_text) return final_text class BasicScraper: """Basic scraper class for making HTTP requests using curl.""" def __init__( self, proxy: Optional[str] = None, follow_redirects: bool = True, timeout: Optional[Union[int, float]] = None, retries: Optional[int] = 3 ): self.proxy = proxy self.timeout = timeout self.follow_redirects = bool(follow_redirects) self.retries = retries def _curl_get( self, url: str, headers: Dict[str, str], cookies: Optional[Dict], timeout: Optional[Union[int, float]], proxy: Optional[str], follow_redirects: bool ) -> bytes: # Use -i to include HTTP headers in the output. curl_command = ["curl", "-s", "-i"] if follow_redirects: curl_command.append("-L") if self.retries: curl_command.extend(["--retry", str(self.retries)]) # Add headers. for key, value in headers.items(): curl_command.extend(["-H", f"{key}: {value}"]) # Add cookies if provided. if cookies: cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) curl_command.extend(["--cookie", cookie_str]) # Set proxy if specified. if proxy: curl_command.extend(["--proxy", proxy]) # Set timeout options. if timeout: curl_command.extend(["--connect-timeout", str(timeout), "--max-time", str(timeout)]) curl_command.append(url) try: result = subprocess.run( curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False ) if result.returncode != 0: raise Exception(f"Curl command failed: {result.stderr.decode('utf-8')}") raw_response = result.stdout # Split the response into header blocks and body. parts = raw_response.split(b'\r\n\r\n') if len(parts) >= 2: body = parts[-1] last_header_block = parts[-2] else: body = raw_response last_header_block = b"" # Look for a Content-Encoding header in the last header block. content_encoding = None for line in last_header_block.decode('utf-8', errors='ignore').splitlines(): if line.lower().startswith("content-encoding:"): content_encoding = line.split(":", 1)[1].strip().lower() break # Decode Brotli or Zstandard if needed. if content_encoding: try: if 'br' in content_encoding: body = brotli.decompress(body) elif 'zstd' in content_encoding: dctx = zstd.ZstdDecompressor() try: body = dctx.decompress(body) except zstd.ZstdError as e: # Fallback to streaming decompression if content size is unknown if "could not determine content size" in str(e): dctx_stream = zstd.ZstdDecompressor().decompressobj() body = dctx_stream.decompress(body) body += dctx_stream.flush() else: raise elif 'gzip' in content_encoding: body = gzip.decompress(body) elif 'deflate' in content_encoding: body = zlib.decompress(body) except Exception as e: raise Exception(f"Error decompressing content: {e}") return body except Exception as e: raise Exception(f"Error during curl request: {e}") def get( self, url: str, cookies: Optional[Dict] = None, timeout: Optional[Union[int, float]] = None, **kwargs: Dict ) -> Response: url = unquote(url).replace(" ", "+") hdrs = headers_job(kwargs.pop('headers', {}), url) effective_timeout = self.timeout if self.timeout is not None else timeout content = self._curl_get( url, headers=hdrs, cookies=cookies, timeout=effective_timeout, proxy=self.proxy, follow_redirects=self.follow_redirects ) # Create a dummy response object with a 'content' attribute. class DummyResponse: pass dummy = DummyResponse() dummy.content = content return Response( response=dummy, convert_to_markdown=convert_to_markdown, convert_to_plain_text=convert_to_plain_text )