|
import gradio as gr |
|
import requests |
|
import yaml |
|
import os |
|
import json |
|
from datetime import datetime, timedelta |
|
import threading |
|
import time |
|
import smtplib |
|
from email.message import EmailMessage |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from collections import defaultdict |
|
from string import Template |
|
|
|
|
|
CONFIG_FILE = "config.yaml" |
|
DATA_FILE = "/data/status.json" |
|
STATUS_HISTORY = 24 * 60 * 60 |
|
|
|
class Monitor: |
|
def __init__(self): |
|
self.config = self.load_config() |
|
self.lock = threading.Lock() |
|
self.last_status = {} |
|
self.incidents = defaultdict(list) |
|
|
|
def load_config(self): |
|
|
|
with open(CONFIG_FILE, "r") as f: |
|
config_content = f.read() |
|
|
|
|
|
config_content = Template(config_content).substitute(os.environ) |
|
config = yaml.safe_load(config_content) |
|
|
|
|
|
config.setdefault('settings', {}) |
|
config['settings'].setdefault('check_interval_seconds', 300) |
|
|
|
|
|
required_vars = ['SMTP_HOST', 'SMTP_PORT', 'SMTP_USERNAME', 'SMTP_PASSWORD', 'SMTP_FROM'] |
|
for var in required_vars: |
|
if var not in os.environ: |
|
raise ValueError(f"Missing required environment variable: {var}") |
|
|
|
|
|
config['settings']['smtp']['username'] = os.environ['SMTP_USERNAME'] |
|
config['settings']['smtp']['password'] = os.environ['SMTP_PASSWORD'] |
|
|
|
return config |
|
|
|
def load_data(self): |
|
try: |
|
with open(DATA_FILE, "r") as f: |
|
return json.load(f) |
|
except (FileNotFoundError, json.JSONDecodeError): |
|
return {} |
|
|
|
def save_data(self, data): |
|
with self.lock: |
|
try: |
|
with open(DATA_FILE, "w") as f: |
|
json.dump(data, f) |
|
except Exception as e: |
|
print(f"Error saving data: {e}") |
|
|
|
def check_status(self, endpoint): |
|
try: |
|
start = time.time() |
|
response = requests.get( |
|
endpoint["url"], |
|
timeout=10, |
|
headers={'User-Agent': 'StatusMonitor/1.0'} |
|
) |
|
response_time = time.time() - start |
|
return { |
|
"status": "UP" if response.ok else "DOWN", |
|
"response_time": response_time, |
|
"error": None |
|
} |
|
except Exception as e: |
|
return {"status": "DOWN", "response_time": None, "error": str(e)} |
|
|
|
def send_alert(self, endpoint, result): |
|
try: |
|
msg = EmailMessage() |
|
msg.set_content( |
|
f"Website {endpoint['name']} is DOWN\n" |
|
f"URL: {endpoint['url']}\n" |
|
f"Time: {datetime.now().isoformat()}\n" |
|
f"Error: {result['error'] or 'Unknown error'}" |
|
) |
|
msg['Subject'] = f"ALERT: {endpoint['name']} is DOWN" |
|
msg['From'] = self.config['settings']['smtp']['from'] |
|
msg['To'] = ", ".join(self.config['settings']['alert_emails']) |
|
|
|
with smtplib.SMTP( |
|
self.config['settings']['smtp']['host'], |
|
self.config['settings']['smtp']['port'] |
|
) as server: |
|
server.starttls() |
|
server.login( |
|
self.config['settings']['smtp']['username'], |
|
self.config['settings']['smtp']['password'] |
|
) |
|
server.send_message(msg) |
|
except Exception as e: |
|
print(f"Failed to send email: {e}") |
|
|
|
def update_status(self): |
|
endpoints = self.config['endpoints'] |
|
data = self.load_data() |
|
current_time = datetime.now().isoformat() |
|
|
|
for endpoint in endpoints: |
|
key = endpoint["name"] |
|
result = self.check_status(endpoint) |
|
|
|
|
|
if result['status'] != self.last_status.get(key, 'UP'): |
|
if result['status'] == 'DOWN': |
|
self.incidents[key].append({ |
|
'start': current_time, |
|
'end': None, |
|
'duration': None |
|
}) |
|
else: |
|
if self.incidents[key] and not self.incidents[key][-1]['end']: |
|
self.incidents[key][-1]['end'] = current_time |
|
start_time = datetime.fromisoformat(self.incidents[key][-1]['start']) |
|
end_time = datetime.fromisoformat(current_time) |
|
self.incidents[key][-1]['duration'] = (end_time - start_time).total_seconds() |
|
|
|
|
|
if self.last_status.get(key) == 'UP' and result['status'] == 'DOWN': |
|
self.send_alert(endpoint, result) |
|
self.last_status[key] = result['status'] |
|
|
|
|
|
if key not in data: |
|
data[key] = [] |
|
data[key].append({ |
|
"timestamp": current_time, |
|
**result |
|
}) |
|
|
|
|
|
cutoff = datetime.now() - timedelta(seconds=STATUS_HISTORY) |
|
data[key] = [entry for entry in data[key] |
|
if datetime.fromisoformat(entry['timestamp']) > cutoff] |
|
|
|
self.save_data(data) |
|
return data |
|
|
|
def get_metrics(self, data): |
|
metrics = {} |
|
for name, history in data.items(): |
|
if not history: |
|
continue |
|
|
|
uptime = sum(1 for h in history if h["status"] == "UP") / len(history) |
|
response_times = [h["response_time"] for h in history if h["response_time"] is not None] |
|
|
|
metrics[name] = { |
|
"uptime": uptime, |
|
"response_times": response_times, |
|
"current_status": history[-1]["status"], |
|
"incidents": self.incidents.get(name, []) |
|
} |
|
return metrics |
|
|
|
def create_plots(self, metrics): |
|
plots = {} |
|
for name, metric in metrics.items(): |
|
|
|
plt.figure(figsize=(8, 3)) |
|
plt.plot(metric['response_times']) |
|
plt.title(f"{name} Response Times") |
|
plt.ylabel("Seconds") |
|
plt.xlabel("Last 24h checks") |
|
plt.tight_layout() |
|
plots[f"{name}_response"] = plt.gcf() |
|
plt.close() |
|
|
|
|
|
plt.figure(figsize=(3, 3)) |
|
plt.pie([metric['uptime'], 1-metric['uptime']], |
|
labels=['Uptime', 'Downtime'], |
|
autopct='%1.1f%%', |
|
colors=['#4CAF50', '#F44336']) |
|
plt.title(f"{name} Uptime") |
|
plots[f"{name}_uptime"] = plt.gcf() |
|
plt.close() |
|
|
|
return plots |
|
|
|
monitor = Monitor() |
|
|
|
def background_updates(): |
|
while True: |
|
print("Updating status...") |
|
monitor.update_status() |
|
time.sleep(monitor.config['settings']['check_interval_seconds']) |
|
|
|
|
|
threading.Thread(target=background_updates, daemon=True).start() |
|
|
|
|
|
with gr.Blocks(title="Website Status Monitor") as demo: |
|
gr.Markdown("# 🚦 Website Status Monitor") |
|
|
|
|
|
status_components = [] |
|
|
|
def update_ui(): |
|
data = monitor.load_data() |
|
print("Loaded data:", data) |
|
metrics = monitor.get_metrics(data) |
|
print("Metrics:", metrics) |
|
plots = monitor.create_plots(metrics) |
|
print("Plots created.") |
|
|
|
elements = [] |
|
for name, metric in metrics.items(): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
status_color = "#4CAF50" if metric['current_status'] == 'UP' else "#F44336" |
|
status_markdown = gr.Markdown(f"### {name}\n" |
|
f"**Status:** <span style='color: {status_color}'>" |
|
f"{metric['current_status']}</span>\n" |
|
f"**Last Response:** {np.mean(metric['response_times'][-5:]):.2f}s\n" |
|
f"**24h Uptime:** {metric['uptime']*100:.1f}%") |
|
elements.append(status_markdown) |
|
|
|
with gr.Column(scale=2): |
|
response_plot = gr.Plot(plots[f"{name}_response"], label="Response Times") |
|
elements.append(response_plot) |
|
|
|
with gr.Column(scale=1): |
|
uptime_plot = gr.Plot(plots[f"{name}_uptime"], label="Uptime") |
|
elements.append(uptime_plot) |
|
|
|
with gr.Accordion("Incident History", open=False): |
|
if not metric['incidents']: |
|
incident_markdown = gr.Markdown("No incidents in past 24 hours") |
|
elements.append(incident_markdown) |
|
else: |
|
for incident in metric['incidents'][-5:]: |
|
duration = incident['duration'] or "Ongoing" |
|
incident_markdown = gr.Markdown(f"**Start:** {incident['start']}\n" |
|
f"**End:** {incident['end'] or 'Still down'}\n" |
|
f"**Duration:** {duration}s\n") |
|
elements.append(incident_markdown) |
|
|
|
return elements |
|
|
|
|
|
initial_elements = update_ui() |
|
status_components.extend(initial_elements) |
|
|
|
|
|
demo.load(update_ui, outputs=status_components) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |