perf(Worker): Implementa ejecución paralela de fetchers para mejorar rendimiento

This commit is contained in:
2025-07-03 12:16:04 -03:00
parent 6479a5a040
commit 4cc9d239cf

View File

@@ -67,40 +67,31 @@ namespace Mercados.Worker
private async Task RunScheduledTasksAsync(CancellationToken stoppingToken)
{
var utcNow = DateTime.UtcNow;
// Obtenemos las expresiones Cron desde la configuración
// Tareas diarias (estas suelen ser rápidas y no se solapan, no es crítico paralelizar)
// Mantenerlas secuenciales puede ser más simple de leer.
string? agroSchedule = _configuration["Schedules:MercadoAgroganadero"];
string? bcrSchedule = _configuration["Schedules:BCR"];
string? bolsasSchedule = _configuration["Schedules:Bolsas"];
// Comprobamos cada una antes de usarla
if (!string.IsNullOrEmpty(agroSchedule))
{
await TryRunDailyTaskAsync("MercadoAgroganadero", agroSchedule, utcNow, stoppingToken);
}
else
{
_logger.LogWarning("No se encontró la configuración de horario para 'MercadoAgroganadero' en appsettings.json.");
}
else { _logger.LogWarning("..."); }
string? bcrSchedule = _configuration["Schedules:BCR"];
if (!string.IsNullOrEmpty(bcrSchedule))
{
await TryRunDailyTaskAsync("BCR", bcrSchedule, utcNow, stoppingToken);
}
else
{
_logger.LogWarning("No se encontró la configuración de horario para 'BCR' en appsettings.json.");
}
else { _logger.LogWarning("..."); }
// --- Tareas Recurrentes (Bolsas) ---
string? bolsasSchedule = _configuration["Schedules:Bolsas"];
if (!string.IsNullOrEmpty(bolsasSchedule))
{
await TryRunRecurringTaskAsync(new[] { "YahooFinance", "Finnhub" }, bolsasSchedule, utcNow, stoppingToken);
// Reemplazamos la llamada secuencial con la ejecución paralela
await TryRunRecurringTaskInParallelAsync(new[] { "YahooFinance", "Finnhub" }, bolsasSchedule, utcNow, stoppingToken);
}
else
{
_logger.LogWarning("No se encontró la configuración de horario para 'Bolsas' en appsettings.json.");
}
// --- ^ FIN DE LA CORRECCIÓN DE NULABILIDAD ^ ---
else { _logger.LogWarning("..."); }
}
/// <summary>
@@ -124,20 +115,22 @@ namespace Mercados.Worker
/// <summary>
/// Comprueba y ejecuta una tarea que puede correr múltiples veces al día.
/// </summary>
private async Task TryRunRecurringTaskAsync(string[] taskNames, string cronExpression, DateTime utcNow, CancellationToken stoppingToken)
private async Task TryRunRecurringTaskInParallelAsync(string[] taskNames, string cronExpression, DateTime utcNow, CancellationToken stoppingToken)
{
// Añadimos 'IncludeSeconds' para que la comparación sea precisa y no se ejecute dos veces en el mismo minuto.
var cron = CronExpression.Parse(cronExpression, CronFormat.IncludeSeconds);
// Comprobamos si hubo una ocurrencia en el último minuto.
var nextOccurrence = cron.GetNextOccurrence(utcNow.AddMinutes(-1));
if (nextOccurrence.HasValue && nextOccurrence.Value <= utcNow)
{
_logger.LogInformation("Ventana de ejecución recurrente detectada para: {Tasks}", string.Join(", ", taskNames));
foreach (var taskName in taskNames)
{
await RunFetcherByNameAsync(taskName, stoppingToken);
}
_logger.LogInformation("Ventana de ejecución para: {Tasks}. Iniciando en paralelo...", string.Join(", ", taskNames));
// Creamos una lista de tareas, una por cada fetcher a ejecutar
var tasks = taskNames.Select(taskName => RunFetcherByNameAsync(taskName, stoppingToken)).ToList();
// Iniciamos todas las tareas a la vez y esperamos a que todas terminen
await Task.WhenAll(tasks);
_logger.LogInformation("Todas las tareas recurrentes han finalizado.");
}
}
@@ -178,14 +171,17 @@ namespace Mercados.Worker
/*
private async Task RunAllFetchersAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Ejecutando todos los fetchers al iniciar...");
_logger.LogInformation("Ejecutando todos los fetchers al iniciar en paralelo...");
using var scope = _serviceProvider.CreateScope();
var fetchers = scope.ServiceProvider.GetRequiredService<IEnumerable<IDataFetcher>>();
foreach (var fetcher in fetchers)
{
if (stoppingToken.IsCancellationRequested) break;
await RunFetcherByNameAsync(fetcher.SourceName, stoppingToken);
}
// Creamos una lista de tareas, una por cada fetcher disponible
var tasks = fetchers.Select(fetcher => RunFetcherByNameAsync(fetcher.SourceName, stoppingToken)).ToList();
// Ejecutamos todo y esperamos
await Task.WhenAll(tasks);
_logger.LogInformation("Ejecución inicial de todos los fetchers completada.");
}
*/