Spaces:
Sleeping
Sleeping
Create app.py
Browse files
app.py
ADDED
@@ -0,0 +1,68 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import requests
|
2 |
+
import time
|
3 |
+
import threading
|
4 |
+
from flask import Flask
|
5 |
+
|
6 |
+
# ==== CONFIG ====
|
7 |
+
BOT_TOKEN = "7316836080:AAEv7wRHNmgCb707ppG6rpTV0mhEBC4n26A"
|
8 |
+
CHAT_ID = -1002038741539 # Group chat ID
|
9 |
+
API_URL = "https://raw.githubusercontent.com/n-ce/Uma/main/dynamic_instances.json"
|
10 |
+
CHECK_INTERVAL = 10 # seconds
|
11 |
+
|
12 |
+
# Simple health messages
|
13 |
+
STATUS_MESSAGES = {
|
14 |
+
"U": "U => Health is good, playback can run optimally",
|
15 |
+
"P": "P => Health is fair enough, playback can have risks",
|
16 |
+
"I": "I => Health is fair enough, playback can have risks",
|
17 |
+
"N": "N => Health is compromised, Playback may take time or errors may occur."
|
18 |
+
}
|
19 |
+
|
20 |
+
last_health = None
|
21 |
+
current_status_text = "No data yet"
|
22 |
+
|
23 |
+
app = Flask(__name__)
|
24 |
+
|
25 |
+
def get_health_status():
|
26 |
+
"""Fetch health status from API."""
|
27 |
+
try:
|
28 |
+
r = requests.get(API_URL, timeout=5)
|
29 |
+
r.raise_for_status()
|
30 |
+
data = r.json()
|
31 |
+
return data.get("health")
|
32 |
+
except Exception as e:
|
33 |
+
print(f"[ERROR] Failed to fetch health status: {e}")
|
34 |
+
return None
|
35 |
+
|
36 |
+
def send_telegram_message(text):
|
37 |
+
"""Send a message to Telegram group."""
|
38 |
+
try:
|
39 |
+
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
|
40 |
+
payload = {"chat_id": CHAT_ID, "text": text}
|
41 |
+
requests.post(url, json=payload, timeout=5)
|
42 |
+
print(f"[BOT] Sent message: {text}")
|
43 |
+
except Exception as e:
|
44 |
+
print(f"[ERROR] Failed to send message: {e}")
|
45 |
+
|
46 |
+
def monitor_status():
|
47 |
+
"""Background thread to monitor Uma health status."""
|
48 |
+
global last_health, current_status_text
|
49 |
+
while True:
|
50 |
+
health = get_health_status()
|
51 |
+
if health:
|
52 |
+
msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}")
|
53 |
+
current_status_text = msg
|
54 |
+
|
55 |
+
if health != last_health:
|
56 |
+
send_telegram_message(msg)
|
57 |
+
print(f"[UPDATE] Uma status changed to: {health} - {msg}")
|
58 |
+
last_health = health
|
59 |
+
|
60 |
+
time.sleep(CHECK_INTERVAL)
|
61 |
+
|
62 |
+
@app.route("/")
|
63 |
+
def home():
|
64 |
+
return current_status_text
|
65 |
+
|
66 |
+
if __name__ == "__main__":
|
67 |
+
threading.Thread(target=monitor_status, daemon=True).start()
|
68 |
+
app.run(host="0.0.0.0", port=7860)
|