From 4cc9d239cf14567f87d248792a05a1c995d96e23 Mon Sep 17 00:00:00 2001 From: dmolinari Date: Thu, 3 Jul 2025 12:16:04 -0300 Subject: [PATCH] =?UTF-8?q?perf(Worker):=20Implementa=20ejecuci=C3=B3n=20p?= =?UTF-8?q?aralela=20de=20fetchers=20para=20mejorar=20rendimiento?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Mercados.Worker/DataFetchingService.cs | 64 ++++++++++------------ 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/src/Mercados.Worker/DataFetchingService.cs b/src/Mercados.Worker/DataFetchingService.cs index 4d56408..f8ea02d 100644 --- a/src/Mercados.Worker/DataFetchingService.cs +++ b/src/Mercados.Worker/DataFetchingService.cs @@ -67,40 +67,31 @@ namespace Mercados.Worker private async Task RunScheduledTasksAsync(CancellationToken stoppingToken) { var utcNow = DateTime.UtcNow; - - // Obtenemos las expresiones Cron desde la configuración + + // Tareas diarias (estas suelen ser rápidas y no se solapan, no es crítico paralelizar) + // Mantenerlas secuenciales puede ser más simple de leer. string? agroSchedule = _configuration["Schedules:MercadoAgroganadero"]; - string? bcrSchedule = _configuration["Schedules:BCR"]; - string? bolsasSchedule = _configuration["Schedules:Bolsas"]; - - // Comprobamos cada una antes de usarla if (!string.IsNullOrEmpty(agroSchedule)) { await TryRunDailyTaskAsync("MercadoAgroganadero", agroSchedule, utcNow, stoppingToken); } - else - { - _logger.LogWarning("No se encontró la configuración de horario para 'MercadoAgroganadero' en appsettings.json."); - } + else { _logger.LogWarning("..."); } + string? bcrSchedule = _configuration["Schedules:BCR"]; if (!string.IsNullOrEmpty(bcrSchedule)) { await TryRunDailyTaskAsync("BCR", bcrSchedule, utcNow, stoppingToken); } - else - { - _logger.LogWarning("No se encontró la configuración de horario para 'BCR' en appsettings.json."); - } + else { _logger.LogWarning("..."); } + // --- Tareas Recurrentes (Bolsas) --- + string? bolsasSchedule = _configuration["Schedules:Bolsas"]; if (!string.IsNullOrEmpty(bolsasSchedule)) { - await TryRunRecurringTaskAsync(new[] { "YahooFinance", "Finnhub" }, bolsasSchedule, utcNow, stoppingToken); + // Reemplazamos la llamada secuencial con la ejecución paralela + await TryRunRecurringTaskInParallelAsync(new[] { "YahooFinance", "Finnhub" }, bolsasSchedule, utcNow, stoppingToken); } - else - { - _logger.LogWarning("No se encontró la configuración de horario para 'Bolsas' en appsettings.json."); - } - // --- ^ FIN DE LA CORRECCIÓN DE NULABILIDAD ^ --- + else { _logger.LogWarning("..."); } } /// @@ -124,20 +115,22 @@ namespace Mercados.Worker /// /// Comprueba y ejecuta una tarea que puede correr múltiples veces al día. /// - private async Task TryRunRecurringTaskAsync(string[] taskNames, string cronExpression, DateTime utcNow, CancellationToken stoppingToken) + private async Task TryRunRecurringTaskInParallelAsync(string[] taskNames, string cronExpression, DateTime utcNow, CancellationToken stoppingToken) { - // Añadimos 'IncludeSeconds' para que la comparación sea precisa y no se ejecute dos veces en el mismo minuto. var cron = CronExpression.Parse(cronExpression, CronFormat.IncludeSeconds); - // Comprobamos si hubo una ocurrencia en el último minuto. var nextOccurrence = cron.GetNextOccurrence(utcNow.AddMinutes(-1)); if (nextOccurrence.HasValue && nextOccurrence.Value <= utcNow) { - _logger.LogInformation("Ventana de ejecución recurrente detectada para: {Tasks}", string.Join(", ", taskNames)); - foreach (var taskName in taskNames) - { - await RunFetcherByNameAsync(taskName, stoppingToken); - } + _logger.LogInformation("Ventana de ejecución para: {Tasks}. Iniciando en paralelo...", string.Join(", ", taskNames)); + + // Creamos una lista de tareas, una por cada fetcher a ejecutar + var tasks = taskNames.Select(taskName => RunFetcherByNameAsync(taskName, stoppingToken)).ToList(); + + // Iniciamos todas las tareas a la vez y esperamos a que todas terminen + await Task.WhenAll(tasks); + + _logger.LogInformation("Todas las tareas recurrentes han finalizado."); } } @@ -178,14 +171,17 @@ namespace Mercados.Worker /* private async Task RunAllFetchersAsync(CancellationToken stoppingToken) { - _logger.LogInformation("Ejecutando todos los fetchers al iniciar..."); + _logger.LogInformation("Ejecutando todos los fetchers al iniciar en paralelo..."); using var scope = _serviceProvider.CreateScope(); var fetchers = scope.ServiceProvider.GetRequiredService>(); - foreach (var fetcher in fetchers) - { - if (stoppingToken.IsCancellationRequested) break; - await RunFetcherByNameAsync(fetcher.SourceName, stoppingToken); - } + + // Creamos una lista de tareas, una por cada fetcher disponible + var tasks = fetchers.Select(fetcher => RunFetcherByNameAsync(fetcher.SourceName, stoppingToken)).ToList(); + + // Ejecutamos todo y esperamos + await Task.WhenAll(tasks); + + _logger.LogInformation("Ejecución inicial de todos los fetchers completada."); } */