import requests import time from concurrent.futures import ThreadPoolExecutor import csv NUM_REQUESTS = 5 CONCURRENT_THREADS = 10 URL = "https://mo01018-Deployment-Trial.hf.space" def send_request(): data = { 'search': "A MRI, magnetic resonance imaging, scan is a very useful diagnosis tool.", 'pipeline_select': '1' } start_time = time.time() try: response = requests.post(URL, data=data) elapsed = time.time() - start_time if response.status_code != 200: print(f"Error {response.status_code}: {response.text[:100]}") return response.status_code, elapsed except Exception as e: print("Request failed:", e) return 500, 0 # Treat exceptions as failures def run_stress_test(): results = [] with ThreadPoolExecutor(max_workers=CONCURRENT_THREADS) as executor: futures = [executor.submit(send_request) for _ in range(NUM_REQUESTS)] for future in futures: results.append(future.result()) successes = sum(1 for r in results if r[0] == 200) failures = NUM_REQUESTS - successes avg_time = sum(r[1] for r in results) / NUM_REQUESTS max_time = max(r[1] for r in results) min_time = min(r[1] for r in results) print(f"\n=== Stress Test Results ===") print(f"Total Requests: {NUM_REQUESTS}") print(f"Concurrency Level: {CONCURRENT_THREADS}") print(f"Successes: {successes}") print(f"Failures: {failures}") print(f"Avg Time: {avg_time:.3f}s") print(f"Min Time: {min_time:.3f}s") print(f"Max Time: {max_time:.3f}s") return [NUM_REQUESTS, CONCURRENT_THREADS, avg_time, max_time] if __name__ == "__main__": # Open the CSV file for writing the summary results with open('stress_test_results.csv', 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(['Total Requests', 'Concurrency Level', 'Avg Time', 'Max Time']) for users in [1, 5, 10, 20, 50, 100]: CONCURRENT_THREADS = users NUM_REQUESTS = users * 5 result = run_stress_test() writer.writerow(result)