import pandas as pd import joblib import os import pyodbc from datetime import datetime, timedelta import sys def insertar_alerta_en_db(cursor, tipo_alerta, id_entidad, entidad, mensaje, fecha_anomalia, cant_enviada=None, cant_devuelta=None, porc_devolucion=None): """Función centralizada para insertar en la nueva tabla Sistema_Alertas.""" insert_query = """ INSERT INTO Sistema_Alertas (TipoAlerta, IdEntidad, Entidad, Mensaje, FechaAnomalia, CantidadEnviada, CantidadDevuelta, PorcentajeDevolucion, Leida) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0) """ try: p_dev = float(porc_devolucion) if porc_devolucion is not None else None c_env = int(cant_enviada) if cant_enviada is not None else None c_dev = int(cant_devuelta) if cant_devuelta is not None else None cursor.execute(insert_query, tipo_alerta, id_entidad, entidad, mensaje, fecha_anomalia, c_env, c_dev, p_dev) print(f"INFO: Alerta '{tipo_alerta}' para '{entidad}' ID {id_entidad} registrada.") except Exception as e: print(f"ERROR: No se pudo insertar la alerta para '{entidad}' ID {id_entidad}. Error: {e}") print("--- INICIANDO SCRIPT DE DETECCIÓN COMPLETO ---") # --- 1. Configuración --- DB_SERVER = 'TECNICA3' DB_DATABASE = 'SistemaGestion' CONNECTION_STRING = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={DB_SERVER};DATABASE={DB_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes;' MODEL_INDIVIDUAL_FILE = 'modelo_anomalias.joblib' MODEL_SISTEMA_FILE = 'modelo_sistema_anomalias.joblib' MODEL_DIST_FILE = 'modelo_anomalias_dist.joblib' MODEL_DANADAS_FILE = 'modelo_danadas.joblib' MODEL_MONTOS_FILE = 'modelo_montos.joblib' # --- 2. Determinar Fecha --- if len(sys.argv) > 1: target_date = datetime.strptime(sys.argv[1], '%Y-%m-%d') else: target_date = datetime.now() - timedelta(days=1) print(f"--- FECHA DE ANÁLISIS: {target_date.date()} ---") try: cnxn = pyodbc.connect(CONNECTION_STRING) cursor = cnxn.cursor() except Exception as e: print(f"CRITICAL: No se pudo conectar a la base de datos. Error: {e}") exit() # --- FASE 1: Detección de Anomalías Individuales (Canillitas) --- print("\n--- FASE 1: Detección de Anomalías Individuales (Canillitas) ---") if not os.path.exists(MODEL_INDIVIDUAL_FILE): print(f"ADVERTENCIA: Modelo individual '{MODEL_INDIVIDUAL_FILE}' no encontrado.") else: # ... (esta sección se mantiene exactamente igual que antes) ... model_individual = joblib.load(MODEL_INDIVIDUAL_FILE) query_individual = f""" SELECT esc.Id_Canilla AS id_canilla, esc.Fecha AS fecha, esc.CantSalida AS cantidad_enviada, esc.CantEntrada AS cantidad_devuelta, c.NomApe AS nombre_canilla FROM dist_EntradasSalidasCanillas esc JOIN dist_dtCanillas c ON esc.Id_Canilla = c.Id_Canilla WHERE CAST(Fecha AS DATE) = '{target_date.strftime('%Y-%m-%d')}' AND CantSalida > 0 """ df_new = pd.read_sql(query_individual, cnxn) if not df_new.empty: df_new['porcentaje_devolucion'] = (df_new['cantidad_devuelta'] / df_new['cantidad_enviada']).fillna(0) * 100 df_new['dia_semana'] = pd.to_datetime(df_new['fecha']).dt.dayofweek features = ['id_canilla', 'porcentaje_devolucion', 'dia_semana'] X_new = df_new[features] df_new['anomalia'] = model_individual.predict(X_new) anomalias_detectadas = df_new[df_new['anomalia'] == -1] if not anomalias_detectadas.empty: for index, row in anomalias_detectadas.iterrows(): mensaje = f"Devolución del {row['porcentaje_devolucion']:.2f}% para '{row['nombre_canilla']}'." insertar_alerta_en_db(cursor, tipo_alerta='DevolucionAnomala', id_entidad=row['id_canilla'], entidad='Canillita', mensaje=mensaje, fecha_anomalia=row['fecha'].date(), cant_enviada=row['cantidad_enviada'], cant_devuelta=row['cantidad_devuelta'], porc_devolucion=row['porcentaje_devolucion']) else: print("INFO: No se encontraron anomalías individuales significativas en canillitas.") else: print("INFO: No hay datos de canillitas para analizar en la fecha seleccionada.") # --- FASE 2: Detección de Anomalías de Sistema --- print("\n--- FASE 2: Detección de Anomalías de Sistema ---") if not os.path.exists(MODEL_SISTEMA_FILE): print(f"ADVERTENCIA: Modelo de sistema '{MODEL_SISTEMA_FILE}' no encontrado.") else: # ... (esta sección se mantiene exactamente igual que antes) ... model_sistema = joblib.load(MODEL_SISTEMA_FILE) query_agregada = f""" SELECT CAST(Fecha AS DATE) AS fecha_dia, DATEPART(weekday, Fecha) as dia_semana, COUNT(DISTINCT Id_Canilla) as total_canillitas_activos, SUM(CantSalida) as total_salidas, SUM(CantEntrada) as total_devoluciones FROM dist_EntradasSalidasCanillas WHERE CAST(Fecha AS DATE) = '{target_date.strftime('%Y-%m-%d')}' AND CantSalida > 0 GROUP BY CAST(Fecha AS DATE), DATEPART(weekday, Fecha) """ df_system = pd.read_sql(query_agregada, cnxn) if not df_system.empty and df_system['total_salidas'].iloc[0] > 0: df_system['ratio_devolucion'] = (df_system['total_devoluciones'] / df_system['total_salidas']).fillna(0) df_system['salidas_por_canillita'] = (df_system['total_salidas'] / df_system['total_canillitas_activos']).fillna(0) features_system = ['dia_semana', 'total_salidas', 'ratio_devolucion', 'salidas_por_canillita'] X_system = df_system[features_system] df_system['anomalia_sistema'] = model_sistema.predict(X_system) if df_system['anomalia_sistema'].iloc[0] == -1: ratio_hoy = df_system['ratio_devolucion'].iloc[0] * 100 mensaje = f"El ratio de devolución global fue del {ratio_hoy:.2f}%, un valor atípico para este día de la semana." insertar_alerta_en_db(cursor, tipo_alerta='ComportamientoSistema', id_entidad=0, entidad='Sistema', mensaje=mensaje, fecha_anomalia=target_date.date()) else: print("INFO: El comportamiento agregado del sistema fue normal.") else: mensaje = f"ALERTA GRAVE: No se registraron movimientos de salida para ningún canillita en la fecha {target_date.date()}." insertar_alerta_en_db(cursor, tipo_alerta='FaltaDeDatos', id_entidad=0, entidad='Sistema', mensaje=mensaje, fecha_anomalia=target_date.date()) # --- FASE 3: Detección de Anomalías Individuales (Distribuidores) --- print("\n--- FASE 3: Detección de Anomalías Individuales (Distribuidores) ---") if not os.path.exists(MODEL_DIST_FILE): print(f"ADVERTENCIA: Modelo de distribuidores '{MODEL_DIST_FILE}' no encontrado.") else: model_dist = joblib.load(MODEL_DIST_FILE) query_dist = f""" SELECT es.Id_Distribuidor AS id_distribuidor, d.Nombre AS nombre_distribuidor, CAST(es.Fecha AS DATE) AS fecha, SUM(CASE WHEN es.TipoMovimiento = 'Salida' THEN es.Cantidad ELSE 0 END) as cantidad_enviada, SUM(CASE WHEN es.TipoMovimiento = 'Entrada' THEN es.Cantidad ELSE 0 END) as cantidad_devuelta FROM dist_EntradasSalidas es JOIN dist_dtDistribuidores d ON es.Id_Distribuidor = d.Id_Distribuidor WHERE CAST(es.Fecha AS DATE) = '{target_date.strftime('%Y-%m-%d')}' GROUP BY es.Id_Distribuidor, d.Nombre, CAST(es.Fecha AS DATE) HAVING SUM(CASE WHEN es.TipoMovimiento = 'Salida' THEN es.Cantidad ELSE 0 END) > 0 """ df_dist_new = pd.read_sql(query_dist, cnxn) if not df_dist_new.empty: df_dist_new['porcentaje_devolucion'] = (df_dist_new['cantidad_devuelta'] / df_dist_new['cantidad_enviada']).fillna(0) * 100 df_dist_new['dia_semana'] = pd.to_datetime(df_dist_new['fecha']).dt.dayofweek features_dist = ['id_distribuidor', 'porcentaje_devolucion', 'dia_semana'] X_dist_new = df_dist_new[features_dist] df_dist_new['anomalia'] = model_dist.predict(X_dist_new) anomalias_dist_detectadas = df_dist_new[df_dist_new['anomalia'] == -1] if not anomalias_dist_detectadas.empty: for index, row in anomalias_dist_detectadas.iterrows(): mensaje = f"Devolución inusual del {row['porcentaje_devolucion']:.2f}% para el distribuidor '{row['nombre_distribuidor']}'." insertar_alerta_en_db(cursor, tipo_alerta='DevolucionAnomalaDist', id_entidad=row['id_distribuidor'], entidad='Distribuidor', mensaje=mensaje, fecha_anomalia=row['fecha'], cant_enviada=row['cantidad_enviada'], cant_devuelta=row['cantidad_devuelta'], porc_devolucion=row['porcentaje_devolucion']) else: print("INFO: No se encontraron anomalías individuales significativas en distribuidores.") else: print("INFO: No hay datos de distribuidores para analizar en la fecha seleccionada.") # --- FASE 4: Detección de Anomalías en Bobinas Dañadas --- print("\n--- FASE 4: Detección de Anomalías en Bobinas Dañadas ---") if not os.path.exists(MODEL_DANADAS_FILE): print(f"ADVERTENCIA: Modelo de bobinas dañadas '{MODEL_DANADAS_FILE}' no encontrado.") else: model_danadas = joblib.load(MODEL_DANADAS_FILE) query_danadas = f""" SELECT h.Id_Planta as id_planta, p.Nombre as nombre_planta, DATEPART(weekday, h.FechaMod) as dia_semana, COUNT(DISTINCT h.Id_Bobina) as cantidad_danadas FROM bob_StockBobinas_H h JOIN bob_dtPlantas p ON h.Id_Planta = p.Id_Planta WHERE h.Id_EstadoBobina = 3 -- Asumiendo ID 3 para 'Dañada' AND h.TipoMod = 'Estado: Dañada' AND CAST(h.FechaMod AS DATE) = '{target_date.strftime('%Y-%m-%d')}' GROUP BY h.Id_Planta, p.Nombre, DATEPART(weekday, h.FechaMod) """ df_danadas_new = pd.read_sql(query_danadas, cnxn) if not df_danadas_new.empty: for index, row in df_danadas_new.iterrows(): features_danadas = ['id_planta', 'dia_semana', 'cantidad_danadas'] X_danadas_new = row[features_danadas].to_frame().T prediction = model_danadas.predict(X_danadas_new) if prediction[0] == -1: mensaje = f"Se registraron {row['cantidad_danadas']} bobina(s) dañada(s) en la Planta '{row['nombre_planta']}', un valor inusualmente alto." insertar_alerta_en_db(cursor, tipo_alerta='ExcesoBobinasDañadas', id_entidad=row['id_planta'], entidad='Planta', mensaje=mensaje, fecha_anomalia=target_date.date()) print(f"INFO: Análisis de {len(df_danadas_new)} planta(s) con bobinas dañadas completado.") else: print("INFO: No se registraron bobinas dañadas en la fecha seleccionada.") # --- FASE 5: Detección de Anomalías en Montos Contables --- print("\n--- FASE 5: Detección de Anomalías en Montos Contables ---") if not os.path.exists(MODEL_MONTOS_FILE): print(f"ADVERTENCIA: Modelo de montos contables '{MODEL_MONTOS_FILE}' no encontrado.") else: model_montos = joblib.load(MODEL_MONTOS_FILE) # Consulta unificada para obtener todas las transacciones del día query_transacciones = f""" SELECT 'Distribuidor' AS entidad, p.Id_Distribuidor AS id_entidad, d.Nombre as nombre_entidad, p.Id_Empresa as id_empresa, p.Fecha as fecha, p.TipoMovimiento as tipo_transaccion, p.Monto as monto FROM cue_PagosDistribuidor p JOIN dist_dtDistribuidores d ON p.Id_Distribuidor = d.Id_Distribuidor WHERE CAST(p.Fecha AS DATE) = '{target_date.strftime('%Y-%m-%d')}' UNION ALL SELECT CASE WHEN cd.Destino = 'Distribuidores' THEN 'Distribuidor' ELSE 'Canillita' END AS entidad, cd.Id_Destino AS id_entidad, COALESCE(d.Nombre, c.NomApe) as nombre_entidad, cd.Id_Empresa as id_empresa, cd.Fecha as fecha, cd.Tipo as tipo_transaccion, cd.Monto as monto FROM cue_CreditosDebitos cd LEFT JOIN dist_dtDistribuidores d ON cd.Id_Destino = d.Id_Distribuidor AND cd.Destino = 'Distribuidores' LEFT JOIN dist_dtCanillas c ON cd.Id_Destino = c.Id_Canilla AND cd.Destino = 'Canillas' WHERE CAST(cd.Fecha AS DATE) = '{target_date.strftime('%Y-%m-%d')}' """ df_transacciones_new = pd.read_sql(query_transacciones, cnxn) if not df_transacciones_new.empty: # Aplicar exactamente el mismo pre-procesamiento que en el entrenamiento df_transacciones_new['tipo_transaccion_cat'] = pd.Categorical(df_transacciones_new['tipo_transaccion']).codes df_transacciones_new['dia_semana'] = pd.to_datetime(df_transacciones_new['fecha']).dt.dayofweek features = ['id_entidad', 'id_empresa', 'tipo_transaccion_cat', 'dia_semana', 'monto'] X_new = df_transacciones_new[features] df_transacciones_new['anomalia'] = model_montos.predict(X_new) anomalias_detectadas = df_transacciones_new[df_transacciones_new['anomalia'] == -1] if not anomalias_detectadas.empty: for index, row in anomalias_detectadas.iterrows(): tipo_alerta = 'MontoInusualPago' if row['tipo_transaccion'] in ['Recibido', 'Realizado'] else 'MontoInusualNota' mensaje = f"Se registró un '{row['tipo_transaccion']}' de ${row['monto']:,} para '{row['nombre_entidad']}', un valor atípico." insertar_alerta_en_db(cursor, tipo_alerta=tipo_alerta, id_entidad=row['id_entidad'], entidad=row['entidad'], mensaje=mensaje, fecha_anomalia=row['fecha'].date()) else: print("INFO: No se encontraron anomalías en los montos contables registrados.") else: print("INFO: No hay transacciones contables para analizar en la fecha seleccionada.") # --- Finalización --- cnxn.commit() cnxn.close() print("\n--- DETECCIÓN COMPLETA ---")