using Elecciones.Database; using Elecciones.Database.Entities; using Elecciones.Infrastructure.Services; using Microsoft.EntityFrameworkCore; using System.Collections.Concurrent; namespace Elecciones.Worker; public class CriticalDataWorker : BackgroundService { private readonly ILogger _logger; private readonly SharedTokenService _tokenService; private readonly IServiceProvider _serviceProvider; private readonly IElectoralApiService _apiService; // <-- DEPENDENCIA AÑADIDA // Inyectamos IElectoralApiService en el constructor public CriticalDataWorker( ILogger logger, SharedTokenService tokenService, IServiceProvider serviceProvider, IElectoralApiService apiService) // <-- PARÁMETRO AÑADIDO { _logger = logger; _tokenService = tokenService; _serviceProvider = serviceProvider; _apiService = apiService; // <-- ASIGNACIÓN AÑADIDA } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Worker de Datos Críticos iniciado."); // Damos tiempo a la sincronización inicial del otro worker para que se complete. try { await Task.Delay(TimeSpan.FromMinutes(2), stoppingToken); } catch (TaskCanceledException) { return; } // Salir si la app se apaga durante la espera inicial int cicloContador = 0; while (!stoppingToken.IsCancellationRequested) { var cicloInicio = DateTime.UtcNow; cicloContador++; _logger.LogInformation("--- Iniciando Ciclo de Datos Críticos #{ciclo} ---", cicloContador); var authToken = await _tokenService.GetValidAuthTokenAsync(stoppingToken); if (string.IsNullOrEmpty(authToken)) { _logger.LogError("Ciclo Crítico: No se pudo obtener token. Reintentando en 30 segundos."); await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); continue; } await SondearResultadosMunicipalesAsync(authToken, stoppingToken); await SondearResumenProvincialAsync(authToken, stoppingToken); await SondearEstadoRecuentoGeneralAsync(authToken, stoppingToken); var cicloFin = DateTime.UtcNow; var duracionCiclo = cicloFin - cicloInicio; _logger.LogInformation("--- Ciclo de Datos Críticos #{ciclo} completado en {duration:N2} segundos. ---", cicloContador, duracionCiclo.TotalSeconds); var tiempoDeEspera = TimeSpan.FromSeconds(30) - duracionCiclo; if (tiempoDeEspera < TimeSpan.Zero) tiempoDeEspera = TimeSpan.Zero; try { await Task.Delay(tiempoDeEspera, stoppingToken); } catch (TaskCanceledException) { break; } } } /// /// Sondea los resultados electorales para todos los municipios/partidos de forma optimizada. /// Utiliza paralelismo controlado para ejecutar múltiples peticiones a la API simultáneamente /// sin sobrecargar la red, y luego guarda todos los resultados en la base de datos de forma masiva. /// /// El token de autenticación válido para la sesión. /// El token de cancelación para detener la operación. private async Task SondearResultadosMunicipalesAsync(string authToken, CancellationToken stoppingToken) { try { // PASO 1: Preparar el DbContext y los datos necesarios. using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); // Obtenemos de nuestra BD local la lista de todos los partidos (NivelId=30) que necesitamos consultar. var municipiosASondear = await dbContext.AmbitosGeograficos .AsNoTracking() .Where(a => a.NivelId == 30 && a.DistritoId != null && a.SeccionId != null) .Select(a => new { a.Id, a.Nombre, a.MunicipioId, a.SeccionId, a.DistritoId }) .ToListAsync(stoppingToken); if (!municipiosASondear.Any()) { _logger.LogWarning("No se encontraron Partidos (NivelId 30) en la BD para sondear resultados."); return; } // Obtenemos la categoría "CONCEJALES", ya que los resultados municipales aplican a esta. var categoriaConcejales = await dbContext.CategoriasElectorales .AsNoTracking() .FirstOrDefaultAsync(c => c.Nombre.Contains("CONCEJALES"), stoppingToken); if (categoriaConcejales == null) { _logger.LogWarning("No se encontró la categoría 'CONCEJALES'. Omitiendo sondeo de resultados municipales."); return; } // PASO 2: Ejecutar las consultas a la API con paralelismo controlado. // Definimos cuántas peticiones queremos que se ejecuten simultáneamente. // Un valor entre 8 y 16 es generalmente seguro y ofrece una gran mejora de velocidad. const int GRADO_DE_PARALELISMO = 3; // Creamos un semáforo que actuará como un "control de acceso" con 10 pases libres. var semaforo = new SemaphoreSlim(GRADO_DE_PARALELISMO); // Usamos un ConcurrentDictionary para almacenar los resultados. A diferencia de un Dictionary normal, // este permite que múltiples tareas escriban en él al mismo tiempo sin conflictos. var resultadosPorId = new ConcurrentDictionary(); _logger.LogInformation("Iniciando sondeo de resultados para {count} municipios con un paralelismo de {degree}...", municipiosASondear.Count, GRADO_DE_PARALELISMO); // Creamos una lista de tareas (Tasks), una por cada municipio a consultar. // El método .Select() no ejecuta las tareas todavía, solo las prepara. var tareas = municipiosASondear.Select(async municipio => { // Cada tarea debe "pedir permiso" al semáforo antes de ejecutarse. // Si ya hay 10 tareas en ejecución, esta línea esperará hasta que una termine. await semaforo.WaitAsync(stoppingToken); try { // Una vez que obtiene el permiso, ejecuta la petición a la API. var resultados = await _apiService.GetResultadosAsync( authToken, municipio.DistritoId!, municipio.SeccionId!, null, categoriaConcejales.Id ); // Si la API devuelve datos válidos... if (resultados != null) { // ...los guardamos en el diccionario concurrente. resultadosPorId[municipio.Id] = resultados; } } finally { // ¡CRUCIAL! Liberamos el pase del semáforo, permitiendo que la siguiente // tarea en espera pueda comenzar su ejecución. semaforo.Release(); // Añadir un pequeño retraso aleatorio para no parecer un robot await Task.Delay(TimeSpan.FromMilliseconds(new Random().Next(50, 251)), stoppingToken); } }); // Ahora sí, ejecutamos todas las tareas preparadas en paralelo y esperamos a que todas terminen. await Task.WhenAll(tareas); // PASO 3: Guardar los resultados en la base de datos. // Solo procedemos si recolectamos al menos un resultado válido. if (resultadosPorId.Any()) { // Llamamos a nuestro método de guardado masivo y optimizado, pasándole todos los resultados // recolectados para que los inserte en una única y eficiente transacción. await GuardarResultadosDeMunicipiosAsync(dbContext, resultadosPorId.ToDictionary(kv => kv.Key, kv => kv.Value), stoppingToken); } } catch (Exception ex) { // Capturamos cualquier error inesperado en el proceso para que el worker no se detenga. _logger.LogError(ex, "Ocurrió un error inesperado durante el sondeo de resultados municipales."); } } /// Realiza una operación "Upsert" (Update o Insert) de forma masiva y optimizada. /// Este método es llamado por SondearResultadosMunicipalesAsync. /// private async Task GuardarResultadosDeMunicipiosAsync( EleccionesDbContext dbContext, Dictionary todosLosResultados, CancellationToken stoppingToken) // <-- PARÁMETRO AÑADIDO { // Obtenemos los IDs de todos los ámbitos que vamos a actualizar. var ambitoIds = todosLosResultados.Keys; // --- OPTIMIZACIÓN 1: Cargar todos los datos existentes en memoria UNA SOLA VEZ --- var estadosRecuentoExistentes = await dbContext.EstadosRecuentos .Where(e => ambitoIds.Contains(e.AmbitoGeograficoId)) .ToDictionaryAsync(e => e.AmbitoGeograficoId, stoppingToken); var resultadosVotosExistentes = await dbContext.ResultadosVotos .Where(rv => ambitoIds.Contains(rv.AmbitoGeograficoId)) .GroupBy(rv => rv.AmbitoGeograficoId) .ToDictionaryAsync(g => g.Key, g => g.ToDictionary(item => item.AgrupacionPoliticaId), stoppingToken); _logger.LogInformation("Procesando en memoria los resultados de {count} municipios.", todosLosResultados.Count); // --- OPTIMIZACIÓN 2: Procesar todo en memoria --- foreach (var kvp in todosLosResultados) { var ambitoId = kvp.Key; var resultadosDto = kvp.Value; // Lógica Upsert para EstadoRecuento if (!estadosRecuentoExistentes.TryGetValue(ambitoId, out var estadoRecuento)) { estadoRecuento = new EstadoRecuento { AmbitoGeograficoId = ambitoId }; dbContext.EstadosRecuentos.Add(estadoRecuento); } // Mapeo completo de propiedades para EstadoRecuento estadoRecuento.FechaTotalizacion = DateTime.Parse(resultadosDto.FechaTotalizacion).ToUniversalTime(); estadoRecuento.MesasEsperadas = resultadosDto.EstadoRecuento.MesasEsperadas; estadoRecuento.MesasTotalizadas = resultadosDto.EstadoRecuento.MesasTotalizadas; estadoRecuento.MesasTotalizadasPorcentaje = resultadosDto.EstadoRecuento.MesasTotalizadasPorcentaje; estadoRecuento.CantidadElectores = resultadosDto.EstadoRecuento.CantidadElectores; estadoRecuento.CantidadVotantes = resultadosDto.EstadoRecuento.CantidadVotantes; estadoRecuento.ParticipacionPorcentaje = resultadosDto.EstadoRecuento.ParticipacionPorcentaje; if (resultadosDto.ValoresTotalizadosOtros != null) { estadoRecuento.VotosEnBlanco = resultadosDto.ValoresTotalizadosOtros.VotosEnBlanco; estadoRecuento.VotosEnBlancoPorcentaje = resultadosDto.ValoresTotalizadosOtros.VotosEnBlancoPorcentaje; estadoRecuento.VotosNulos = resultadosDto.ValoresTotalizadosOtros.VotosNulos; estadoRecuento.VotosNulosPorcentaje = resultadosDto.ValoresTotalizadosOtros.VotosNulosPorcentaje; estadoRecuento.VotosRecurridos = resultadosDto.ValoresTotalizadosOtros.VotosRecurridos; estadoRecuento.VotosRecurridosPorcentaje = resultadosDto.ValoresTotalizadosOtros.VotosRecurridosPorcentaje; } // Lógica Upsert para ResultadosVotos var votosDeAmbitoExistentes = resultadosVotosExistentes.GetValueOrDefault(ambitoId); foreach (var votoPositivoDto in resultadosDto.ValoresTotalizadosPositivos) { ResultadoVoto? resultadoVoto = null; if (votosDeAmbitoExistentes != null) { votosDeAmbitoExistentes.TryGetValue(votoPositivoDto.IdAgrupacion, out resultadoVoto); } if (resultadoVoto == null) { resultadoVoto = new ResultadoVoto { AmbitoGeograficoId = ambitoId, AgrupacionPoliticaId = votoPositivoDto.IdAgrupacion }; dbContext.ResultadosVotos.Add(resultadoVoto); } resultadoVoto.CantidadVotos = votoPositivoDto.Votos; resultadoVoto.PorcentajeVotos = votoPositivoDto.VotosPorcentaje; } } // --- OPTIMIZACIÓN 3: Guardar todos los cambios en UNA SOLA TRANSACCIÓN --- _logger.LogInformation("Guardando todos los cambios de resultados municipales en la base de datos..."); // Ahora 'stoppingToken' es reconocido aquí await dbContext.SaveChangesAsync(stoppingToken); _logger.LogInformation("Guardado completado."); } /// /// Obtiene y actualiza el resumen de votos a nivel provincial. /// Esta versión mejorada utiliza una transacción para garantizar la consistencia de los datos. /// private async Task SondearResumenProvincialAsync(string authToken, CancellationToken stoppingToken) { try { using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); var provincia = await dbContext.AmbitosGeograficos.AsNoTracking().FirstOrDefaultAsync(a => a.NivelId == 10, stoppingToken); if (provincia == null) return; var resumen = await _apiService.GetResumenAsync(authToken, provincia.DistritoId!); // --- CAMBIO CLAVE: Lógica de actualización robusta --- // Solo procedemos si la respuesta de la API es válida Y contiene datos de votos positivos. if (resumen?.ValoresTotalizadosPositivos is { Count: > 0 } nuevosVotos) { // Usamos una transacción explícita para asegurar que la operación sea atómica: // O se completa todo (borrado e inserción), o no se hace nada. await using var transaction = await dbContext.Database.BeginTransactionAsync(stoppingToken); // 1. Borramos los datos viejos. await dbContext.Database.ExecuteSqlRawAsync("DELETE FROM ResumenesVotos", stoppingToken); // 2. Insertamos los nuevos datos. foreach (var voto in nuevosVotos) { dbContext.ResumenesVotos.Add(new ResumenVoto { AmbitoGeograficoId = provincia.Id, AgrupacionPoliticaId = voto.IdAgrupacion, Votos = voto.Votos, VotosPorcentaje = voto.VotosPorcentaje }); } // 3. Guardamos los cambios y confirmamos la transacción. await dbContext.SaveChangesAsync(stoppingToken); await transaction.CommitAsync(stoppingToken); _logger.LogInformation("Sondeo de Resumen Provincial completado. La tabla ha sido actualizada."); } else { // Si la API no devuelve datos de votos, no hacemos NADA en la base de datos. _logger.LogInformation("Sondeo de Resumen Provincial completado. No se recibieron datos nuevos, la tabla no fue modificada."); } } catch (Exception ex) { _logger.LogError(ex, "Ocurrió un error en el sondeo de Resumen Provincial."); } } /// /// Obtiene y actualiza el estado general del recuento a nivel provincial para CADA categoría electoral. /// Esta versión es robusta: consulta dinámicamente las categorías, usa la clave primaria compuesta /// de la base de datos y guarda todos los cambios en una única transacción al final. /// /// El token de autenticación válido para la sesión. /// El token de cancelación para detener la operación. private async Task SondearEstadoRecuentoGeneralAsync(string authToken, CancellationToken stoppingToken) { try { // PASO 1: Crear un "scope" para obtener una instancia fresca de DbContext. // Esto es una práctica recomendada para servicios de larga duración para evitar problemas de concurrencia. using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); // PASO 2: Obtener el ámbito geográfico de la Provincia. // Necesitamos este objeto para obtener su 'DistritoId' ("02"), que es requerido por la API. var provincia = await dbContext.AmbitosGeograficos .AsNoTracking() // Optimización: Solo necesitamos leer datos, no modificarlos. .FirstOrDefaultAsync(a => a.NivelId == 10, stoppingToken); // Comprobación de seguridad: Si la sincronización inicial falló y no tenemos el registro de la provincia, // no podemos continuar. Registramos una advertencia y salimos del método. if (provincia == null) { _logger.LogWarning("No se encontró el ámbito 'Provincia' (NivelId 10) en la BD. Omitiendo sondeo de estado general."); return; } // PASO 3: Obtener todas las categorías electorales disponibles desde nuestra base de datos. // Esto hace que el método sea dinámico y no dependa de IDs fijos en el código. var categoriasParaSondear = await dbContext.CategoriasElectorales .AsNoTracking() .ToListAsync(stoppingToken); if (!categoriasParaSondear.Any()) { _logger.LogWarning("No hay categorías en la BD para sondear el estado general del recuento."); return; } _logger.LogInformation("Iniciando sondeo de Estado Recuento General para {count} categorías...", categoriasParaSondear.Count); // PASO 4: Iterar sobre cada categoría para obtener su estado de recuento individual. foreach (var categoria in categoriasParaSondear) { // Salimos limpiamente del bucle si la aplicación se está deteniendo. if (stoppingToken.IsCancellationRequested) break; // Llamamos a la API con el distrito y la CATEGORÍA ACTUAL del bucle. var estadoDto = await _apiService.GetEstadoRecuentoGeneralAsync(authToken, provincia.DistritoId!, categoria.Id); // Solo procedemos si la API devolvió datos válidos. if (estadoDto != null) { // Lógica "Upsert" (Update or Insert): // Buscamos un registro existente usando la CLAVE PRIMARIA COMPUESTA. var registroDb = await dbContext.EstadosRecuentosGenerales.FindAsync( new object[] { provincia.Id, categoria.Id }, cancellationToken: stoppingToken ); // Si no se encuentra (FindAsync devuelve null), es un registro nuevo. if (registroDb == null) { // Creamos una nueva instancia de la entidad. registroDb = new EstadoRecuentoGeneral { AmbitoGeograficoId = provincia.Id, CategoriaId = categoria.Id // Asignamos ambas partes de la clave primaria. }; // Y la añadimos al ChangeTracker de EF para que la inserte en la BD. dbContext.EstadosRecuentosGenerales.Add(registroDb); } // Mapeamos los datos del DTO de la API a nuestra entidad de base de datos. // Esto se hace tanto para registros nuevos como para los existentes que se van a actualizar. registroDb.MesasEsperadas = estadoDto.MesasEsperadas; registroDb.MesasTotalizadas = estadoDto.MesasTotalizadas; registroDb.MesasTotalizadasPorcentaje = estadoDto.MesasTotalizadasPorcentaje; registroDb.CantidadElectores = estadoDto.CantidadElectores; registroDb.CantidadVotantes = estadoDto.CantidadVotantes; registroDb.ParticipacionPorcentaje = estadoDto.ParticipacionPorcentaje; } } // PASO 5: Guardar todos los cambios en la base de datos. // Al llamar a SaveChangesAsync UNA SOLA VEZ fuera del bucle, EF Core agrupa // todas las inserciones y actualizaciones en una única transacción eficiente. await dbContext.SaveChangesAsync(stoppingToken); _logger.LogInformation("Sondeo de Estado Recuento General completado para todas las categorías."); } catch (Exception ex) { // Capturamos cualquier excepción inesperada para que no detenga el worker y la registramos. _logger.LogError(ex, "Ocurrió un error CRÍTICO en el sondeo de Estado Recuento General."); } } // Pega aquí los métodos: // - SondearResultadosMunicipalesAsync // - GuardarResultadosDeMunicipiosAsync // - SondearResumenProvincialAsync // - SondearEstadoRecuentoGeneralAsync // (Estos métodos necesitan IServiceProvider y SharedTokenService, que ya están inyectados) }