| """ | |
| Proxy requests to avoid SSRF | |
| """ | |
| import logging | |
| import os | |
| import time | |
| import httpx | |
| SSRF_PROXY_ALL_URL = os.getenv("SSRF_PROXY_ALL_URL", "") | |
| SSRF_PROXY_HTTP_URL = os.getenv("SSRF_PROXY_HTTP_URL", "") | |
| SSRF_PROXY_HTTPS_URL = os.getenv("SSRF_PROXY_HTTPS_URL", "") | |
| SSRF_DEFAULT_MAX_RETRIES = int(os.getenv("SSRF_DEFAULT_MAX_RETRIES", "3")) | |
| SSRF_DEFAULT_TIME_OUT = float(os.getenv("SSRF_DEFAULT_TIME_OUT", "5")) | |
| SSRF_DEFAULT_CONNECT_TIME_OUT = float(os.getenv("SSRF_DEFAULT_CONNECT_TIME_OUT", "5")) | |
| SSRF_DEFAULT_READ_TIME_OUT = float(os.getenv("SSRF_DEFAULT_READ_TIME_OUT", "5")) | |
| SSRF_DEFAULT_WRITE_TIME_OUT = float(os.getenv("SSRF_DEFAULT_WRITE_TIME_OUT", "5")) | |
| proxy_mounts = ( | |
| { | |
| "http://": httpx.HTTPTransport(proxy=SSRF_PROXY_HTTP_URL), | |
| "https://": httpx.HTTPTransport(proxy=SSRF_PROXY_HTTPS_URL), | |
| } | |
| if SSRF_PROXY_HTTP_URL and SSRF_PROXY_HTTPS_URL | |
| else None | |
| ) | |
| BACKOFF_FACTOR = 0.5 | |
| STATUS_FORCELIST = [429, 500, 502, 503, 504] | |
| def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| if "allow_redirects" in kwargs: | |
| allow_redirects = kwargs.pop("allow_redirects") | |
| if "follow_redirects" not in kwargs: | |
| kwargs["follow_redirects"] = allow_redirects | |
| if "timeout" not in kwargs: | |
| kwargs["timeout"] = httpx.Timeout( | |
| SSRF_DEFAULT_TIME_OUT, | |
| connect=SSRF_DEFAULT_CONNECT_TIME_OUT, | |
| read=SSRF_DEFAULT_READ_TIME_OUT, | |
| write=SSRF_DEFAULT_WRITE_TIME_OUT, | |
| ) | |
| retries = 0 | |
| while retries <= max_retries: | |
| try: | |
| if SSRF_PROXY_ALL_URL: | |
| with httpx.Client(proxy=SSRF_PROXY_ALL_URL) as client: | |
| response = client.request(method=method, url=url, **kwargs) | |
| elif proxy_mounts: | |
| with httpx.Client(mounts=proxy_mounts) as client: | |
| response = client.request(method=method, url=url, **kwargs) | |
| else: | |
| with httpx.Client() as client: | |
| response = client.request(method=method, url=url, **kwargs) | |
| if response.status_code not in STATUS_FORCELIST: | |
| return response | |
| else: | |
| logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") | |
| except httpx.RequestError as e: | |
| logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") | |
| retries += 1 | |
| if retries <= max_retries: | |
| time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) | |
| raise Exception(f"Reached maximum retries ({max_retries}) for URL {url}") | |
| def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("GET", url, max_retries=max_retries, **kwargs) | |
| def post(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("POST", url, max_retries=max_retries, **kwargs) | |
| def put(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("PUT", url, max_retries=max_retries, **kwargs) | |
| def patch(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("PATCH", url, max_retries=max_retries, **kwargs) | |
| def delete(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("DELETE", url, max_retries=max_retries, **kwargs) | |
| def head(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): | |
| return make_request("HEAD", url, max_retries=max_retries, **kwargs) | |