Spaces:
Runtime error
Runtime error
import os | |
import time | |
import sys | |
import schedule | |
import subprocess | |
import datetime | |
import psutil | |
MODEL_DIR = "/app/models" | |
TIMESTAMP_FILE = os.path.join(MODEL_DIR, "last_train.txt") | |
APP_PATH = "app.py" | |
TRAIN_SCRIPT = "train.py" | |
TRAIN_INTERVAL = datetime.timedelta(days=2) | |
# Ensure model dir exists | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
def get_latest_model(): | |
"""Return the latest model file path, or None if no models exist.""" | |
models = [f for f in os.listdir(MODEL_DIR) if f.endswith(".pth")] | |
return os.path.join(MODEL_DIR, sorted(models)[-1]) if models else None | |
def should_train(): | |
"""Return True if it's time to retrain the model.""" | |
if not os.path.exists(TIMESTAMP_FILE): | |
return True | |
with open(TIMESTAMP_FILE, "r") as f: | |
last_time = datetime.datetime.fromisoformat(f.read().strip()) | |
return datetime.datetime.now() - last_time > TRAIN_INTERVAL | |
def update_timestamp(): | |
"""Update the timestamp after training.""" | |
with open(TIMESTAMP_FILE, "w") as f: | |
f.write(datetime.datetime.now().isoformat()) | |
def is_app_running(): | |
"""Check if app.py is already running.""" | |
for proc in psutil.process_iter(attrs=["cmdline"]): | |
try: | |
if proc.info["cmdline"] and "app.py" in proc.info["cmdline"]: | |
return True | |
except (psutil.NoSuchProcess, psutil.AccessDenied): | |
continue | |
return False | |
def launch_app(): | |
"""Start app.py if not already running.""" | |
if not is_app_running(): | |
print(f"[{datetime.datetime.now()}] Launching app.py...", flush=True) | |
try: | |
subprocess.Popen([sys.executable, APP_PATH]) | |
except FileNotFoundError: | |
print(f"[{datetime.datetime.now()}] app.py not found!", flush=True) | |
else: | |
print(f"[{datetime.datetime.now()}] app.py is already running.", flush=True) | |
def train_model(): | |
"""Train model if needed and launch app.""" | |
if should_train(): | |
print(f"[{datetime.datetime.now()}] Training model...", flush=True) | |
latest = get_latest_model() | |
if latest and os.path.exists(latest): | |
os.remove(latest) | |
print(f" Deleted old model: {latest}", flush=True) | |
try: | |
subprocess.run([sys.executable, TRAIN_SCRIPT], check=True) | |
update_timestamp() | |
print(f"[{datetime.datetime.now()}] Training completed.", flush=True) | |
except subprocess.CalledProcessError as e: | |
print(f"[{datetime.datetime.now()}] Training failed: {e}", flush=True) | |
except FileNotFoundError: | |
print(f"[{datetime.datetime.now()}] train.py not found!", flush=True) | |
else: | |
print(f"[{datetime.datetime.now()}] Skipping training (recent model exists).", flush=True) | |
launch_app() | |
if __name__ == "__main__": | |
# Run once at startup | |
train_model() | |
# Then schedule every 60 minutes | |
# schedule.every(60).minutes.do(train_model) | |
while True: | |
schedule.run_pending() | |
time.sleep(3) | |