Fix Goteo solo para Telegramas

This commit is contained in:
2025-08-20 16:28:17 -03:00
parent 7e1e487e83
commit 19b37f7320
3 changed files with 277 additions and 294 deletions

View File

@@ -83,7 +83,7 @@ builder.Services.AddHttpClient("ElectoralApiClient", client =>
.AddPolicyHandler(GetRetryPolicy());
// --- LIMITADOR DE VELOCIDAD BASADO EN TOKEN BUCKET ---
builder.Services.AddSingleton<RateLimiter>(sp =>
/*builder.Services.AddSingleton<RateLimiter>(sp =>
new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
{
// El tamaño máximo del cubo (la ráfaga máxima que permitimos).
@@ -101,7 +101,7 @@ builder.Services.AddSingleton<RateLimiter>(sp =>
// Cómo se comporta cuando la cola está llena.
QueueProcessingOrder = QueueProcessingOrder.OldestFirst
}));
*/
builder.Services.AddScoped<IElectoralApiService, ElectoralApiService>();
builder.Services.AddHostedService<Worker>();

View File

@@ -603,111 +603,85 @@ public class Worker : BackgroundService
{
try
{
// PASO 1: Obtener los datos base para las consultas.
// Usamos un DbContext inicial solo para leer los catálogos.
using var initialScope = _serviceProvider.CreateScope();
var initialDbContext = initialScope.ServiceProvider.GetRequiredService<EleccionesDbContext>();
_logger.LogInformation("--- Iniciando sondeo de Nuevos Telegramas (modo de bajo perfil) ---");
var partidos = await initialDbContext.AmbitosGeograficos
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<EleccionesDbContext>();
var partidos = await dbContext.AmbitosGeograficos
.AsNoTracking()
.Where(a => a.NivelId == 30 && a.DistritoId != null && a.SeccionId != null)
.ToListAsync(stoppingToken);
var categorias = await initialDbContext.CategoriasElectorales
var categorias = await dbContext.CategoriasElectorales
.AsNoTracking()
.ToListAsync(stoppingToken);
if (!partidos.Any() || !categorias.Any())
if (!partidos.Any() || !categorias.Any()) return;
// --- LÓGICA DE GOTEO LENTO ---
// Procesamos una combinación (partido/categoría) a la vez.
foreach (var partido in partidos)
{
_logger.LogWarning("No se encontraron partidos o categorías en la BD para sondear telegramas.");
return;
}
// Creamos una lista de todas las consultas que necesitamos hacer (135 partidos * 3 categorías = 405 consultas).
var combinaciones = partidos.SelectMany(partido => categorias, (partido, categoria) => new { partido, categoria });
const int GRADO_DE_PARALELISMO = 3;
var semaforo = new SemaphoreSlim(GRADO_DE_PARALELISMO);
_logger.LogInformation("Iniciando sondeo de Telegramas para {count} combinaciones... con paralelismo de {degree}", combinaciones.Count(), GRADO_DE_PARALELISMO);
// Usaremos un ConcurrentBag para recolectar de forma segura los telegramas nuevos desde múltiples hilos.
var telegramasNuevosParaGuardar = new ConcurrentBag<Telegrama>();
var tareas = combinaciones.Select(async item =>
{
await semaforo.WaitAsync(stoppingToken);
try
foreach (var categoria in categorias)
{
var idsDeApi = await _apiService.GetTelegramasTotalizadosAsync(authToken, item.partido.DistritoId!, item.partido.SeccionId!, item.categoria.Id);
// Si la aplicación se apaga, salimos inmediatamente.
if (stoppingToken.IsCancellationRequested) return;
if (idsDeApi is { Count: > 0 })
// Obtenemos la lista de IDs.
var listaTelegramasApi = await _apiService.GetTelegramasTotalizadosAsync(authToken, partido.DistritoId!, partido.SeccionId!, categoria.Id);
if (listaTelegramasApi is { Count: > 0 })
{
// Usamos un DbContext propio para este bloque para asegurar que los cambios se guarden.
using var innerScope = _serviceProvider.CreateScope();
var innerDbContext = innerScope.ServiceProvider.GetRequiredService<EleccionesDbContext>();
var idsYaEnDb = await innerDbContext.Telegramas
.Where(t => idsDeApi.Contains(t.Id))
.Where(t => listaTelegramasApi.Contains(t.Id))
.Select(t => t.Id)
.ToListAsync(stoppingToken);
var nuevosTelegramasIds = idsDeApi.Except(idsYaEnDb).ToList();
var nuevosTelegramasIds = listaTelegramasApi.Except(idsYaEnDb).ToList();
if (!nuevosTelegramasIds.Any()) return;
_logger.LogInformation("Se encontraron {count} telegramas nuevos en '{partido}' para '{cat}'. Descargando en paralelo...", nuevosTelegramasIds.Count, item.partido.Nombre, item.categoria.Nombre);
// --- NUEVA OPTIMIZACIÓN: Paralelizar la descarga de los archivos ---
await Task.WhenAll(nuevosTelegramasIds.Select(async mesaId =>
if (nuevosTelegramasIds.Any())
{
var telegramaFile = await _apiService.GetTelegramaFileAsync(authToken, mesaId);
if (telegramaFile != null)
_logger.LogInformation("Se encontraron {count} telegramas nuevos en '{partido}' para '{cat}'. Descargando...", nuevosTelegramasIds.Count, partido.Nombre, categoria.Nombre);
// Descargamos los archivos de uno en uno, con una pausa entre cada uno.
foreach (var mesaId in nuevosTelegramasIds)
{
var nuevoTelegrama = new Telegrama
if (stoppingToken.IsCancellationRequested) return;
var telegramaFile = await _apiService.GetTelegramaFileAsync(authToken, mesaId);
if (telegramaFile != null)
{
Id = telegramaFile.NombreArchivo,
AmbitoGeograficoId = item.partido.Id,
ContenidoBase64 = telegramaFile.Imagen,
FechaEscaneo = DateTime.Parse(telegramaFile.FechaEscaneo).ToUniversalTime(),
FechaTotalizacion = DateTime.Parse(telegramaFile.FechaTotalizacion).ToUniversalTime()
};
telegramasNuevosParaGuardar.Add(nuevoTelegrama);
var nuevoTelegrama = new Telegrama
{
Id = telegramaFile.NombreArchivo,
AmbitoGeograficoId = partido.Id,
ContenidoBase64 = telegramaFile.Imagen,
FechaEscaneo = DateTime.Parse(telegramaFile.FechaEscaneo).ToUniversalTime(),
FechaTotalizacion = DateTime.Parse(telegramaFile.FechaTotalizacion).ToUniversalTime()
};
await innerDbContext.Telegramas.AddAsync(nuevoTelegrama, stoppingToken);
}
// PAUSA DELIBERADA: Esperamos un poco para no parecer un bot.
await Task.Delay(250, stoppingToken); // 250ms de espera = 4 peticiones/segundo máximo.
}
// Si telegramaFile es null (por 403, 200 vacío, etc.), simplemente no hacemos nada.
}));
await innerDbContext.SaveChangesAsync(stoppingToken);
}
}
}
// --- NUEVA ROBUSTEZ: Capturar errores por tarea ---
catch (Exception ex)
{
// Si una combinación entera falla (ej. getTelegramasTotalizados da 500), lo logueamos
// pero NO relanzamos la excepción, para no cancelar el Task.WhenAll principal.
_logger.LogError(ex, "Falló el sondeo de telegramas para el partido '{partido}' y categoría '{cat}'", item.partido.Nombre, item.categoria.Nombre);
}
finally
{
semaforo.Release();
// Añadir un pequeño retraso aleatorio
await Task.Delay(TimeSpan.FromMilliseconds(new Random().Next(50, 251)), stoppingToken);
}
});
// Esperamos a que todas las tareas de sondeo y descarga terminen.
await Task.WhenAll(tareas);
// --- Guardado Masivo Final ---
// Después de que todo el paralelismo ha terminado, hacemos una única operación de escritura en la BD.
if (!telegramasNuevosParaGuardar.IsEmpty)
{
_logger.LogInformation("Guardando un total de {count} telegramas nuevos en la base de datos...", telegramasNuevosParaGuardar.Count);
using var finalScope = _serviceProvider.CreateScope();
var finalDbContext = finalScope.ServiceProvider.GetRequiredService<EleccionesDbContext>();
await finalDbContext.Telegramas.AddRangeAsync(telegramasNuevosParaGuardar, stoppingToken);
await finalDbContext.SaveChangesAsync(stoppingToken);
// PAUSA DELIBERADA: Esperamos un poco entre cada consulta de lista de telegramas.
await Task.Delay(100, stoppingToken);
}
}
_logger.LogInformation(
"Sondeo de Telegramas completado. Se guardaron {count} nuevos telegramas.", telegramasNuevosParaGuardar.Count);
_logger.LogInformation("Sondeo de Telegramas completado.");
}
catch (OperationCanceledException)
{
_logger.LogInformation("Sondeo de telegramas cancelado.");
}
catch (Exception ex)
{