Spaces:
Running
Running
import pytz | |
import globales | |
import conexion_firebase | |
from datetime import datetime | |
servidor = globales.servidor | |
def obtenUltimoTimestamp(): | |
""" | |
Obtiene el último timestamp de renovación guardado. | |
""" | |
resultado = conexion_firebase.obtenDato('nowme', servidor, 'timestamp') | |
return resultado | |
def obtenSegundosDisponibles(): | |
siEsDiaSiguienteRenueva() | |
#Finalmente obten los segundos disponibles después de las operaciones. | |
return conexion_firebase.obtenDato('nowme', servidor, 'segundos') | |
def obtenSegundosDisponiblesInference(): | |
#Finalmente obten los segundos disponibles después de las operaciones. | |
return conexion_firebase.obtenDato('nowme', servidor, 'inferencias') | |
def renuevaSegundosDisponibles(): | |
#Segundos de cuota total gratuita disponibles al momento. | |
conexion_firebase.editaDato('nowme', servidor, 'segundos', globales.quota) | |
renuevaTimestampActual() | |
def renuevaTimestampActual(): | |
timestamp_actual = imprimeTimeNow() | |
conexion_firebase.editaDato('nowme', servidor, 'timestamp', timestamp_actual) | |
def restaSegundosGPU(cuantos_segundos): | |
""" | |
Lee el número de segundos disponibles, | |
resta los segundos dados como parámetro y guarda el nuevo valor en el archivo. | |
""" | |
segundos_disponibles = obtenSegundosDisponibles() | |
# Restar los segundos | |
nuevos_segundos_disponibles = segundos_disponibles - cuantos_segundos | |
print("Procesado, segundos disponibles ahora: ", nuevos_segundos_disponibles) | |
conexion_firebase.editaDato('nowme', servidor, 'segundos', nuevos_segundos_disponibles) | |
def restaSegundosInference(cuantos_segundos): | |
""" | |
Lee el número de segundos disponibles desde seconds_available.txt, | |
resta los segundos dados como parámetro y guarda el nuevo valor en el archivo. | |
""" | |
segundos_disponibles = obtenSegundosDisponiblesInference() | |
# Restar los segundos | |
nuevos_segundos_disponibles = segundos_disponibles - cuantos_segundos | |
print("Procesado, segundos disponibles ahora: ", nuevos_segundos_disponibles) | |
conexion_firebase.editaDato('nowme', servidor, 'inferencias', nuevos_segundos_disponibles) | |
def modificaModeloActual(nuevo_modelo): | |
""" | |
Actualiza el archivo archivos/modelo_actual.txt con el modelo funcional en caso de | |
problemas con el actual. | |
""" | |
conexion_firebase.editaDato('nowme', servidor, 'modelo_actual', nuevo_modelo) | |
def imprimeTimeNow(): | |
""" | |
Devuelve la fecha y hora actual en la zona horaria de la Ciudad de México (GMT-6). | |
""" | |
# 1. Definir la zona horaria de la Ciudad de México | |
# Puedes usar 'America/Mexico_City' para que pytz maneje el horario de verano automáticamente. | |
mexico_city_tz = pytz.timezone('America/Mexico_City') | |
# 2. Obtener la hora actual en UTC | |
utc_now = datetime.now(pytz.utc) | |
# 3. Convertir la hora UTC a la zona horaria deseada | |
mexico_city_now = utc_now.astimezone(mexico_city_tz) | |
# 4. Formatear la fecha y hora | |
# El formato que deseas es "YYYY-MM-DD HH:MM:SS" | |
formatted_time = mexico_city_now.strftime("%Y-%m-%d %H:%M:%S") | |
return formatted_time | |
def siEsDiaSiguienteRenueva(): | |
#Obtiene el último registro de fecha de la base de firestore. | |
fecha_registro_dt = obtenUltimoTimestamp() | |
#Timestamp actual | |
fecha_actual_dt = imprimeTimeNow() | |
formato = "%Y-%m-%d %H:%M:%S" | |
datetime_obj_1 = datetime.strptime(fecha_registro_dt, formato) | |
datetime_obj_2 = datetime.strptime(fecha_actual_dt, formato) | |
# Extraer solo la fecha de los objetos datetime | |
fecha_registro = datetime_obj_1.date() | |
fecha_actual = datetime_obj_2.date() | |
# Verificar si las fechas son diferentes | |
resultado = fecha_actual > fecha_registro | |
diferencia = fecha_actual - fecha_registro | |
if resultado == True: | |
if diferencia.days > 1: | |
print("Renovando segundos.") | |
renuevaSegundosDisponibles() | |
renuevaTimestampActual() | |
else: #Si la diferencia es de un solo día entonces si debe checar si ya rebaso la hora de renovación del servidor. | |
hora_actual = datetime_obj_2.time() | |
if int(hora_actual.hour) > int(globales.hora_renovacion): | |
print("Renovando segundos.") | |
renuevaSegundosDisponibles() | |
renuevaTimestampActual() | |
else: | |
print("Aún no hay renovación de capa de procesamiento.") | |
pass | |
else: | |
print("Aún no hay renovación de capa de procesamiento.") | |
return resultado | |
def despliegaInfoCliente(request): | |
# 1. Obtener la IP del cliente (priorizando X-Forwarded-For) | |
client_ip = request.headers.get("x-forwarded-for") or \ | |
request.headers.get("x-real-ip") or \ | |
(request.client.host if request.client else "unknown") | |
# Si X-Forwarded-For es una lista de IPs separadas por coma | |
if client_ip and "," in client_ip: | |
client_ip = client_ip.split(",")[0].strip() # Tomamos la primera IP (la original) | |
# 2. Obtener el User-Agent | |
user_agent = request.headers.get("user-agent", "unknown") | |
# 3. Información de autenticación (si aplica) | |
auth_header = request.headers.get("authorization") | |
user_id = "unauthenticated" | |
if auth_header and auth_header.startswith("Bearer "): | |
token = auth_header.split(" ")[1] | |
# Aquí normalmente decodificarías el token JWT para obtener el user_id | |
# Ejemplo simulado: | |
if token == "mysecrettoken123": | |
user_id = "user123_authenticated" | |
else: | |
user_id = "invalid_token" | |
# O podrías lanzar un HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
print(f"Petición desde: {client_ip}") | |
print(f"User-Agent: {user_agent}") | |
print(f"Usuario (Auth): {user_id}") |