feat(Worker): Implementa servicio de notificación para alertas de fallos críticos - Se remueve .env y se utilizan appsettings.Development.json y User Secrets
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
using Mercados.Infrastructure.DataFetchers;
|
||||
using Cronos;
|
||||
using Mercados.Infrastructure.DataFetchers;
|
||||
using Mercados.Infrastructure.Services;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace Mercados.Worker
|
||||
{
|
||||
@@ -12,31 +14,47 @@ namespace Mercados.Worker
|
||||
private readonly ILogger<DataFetchingService> _logger;
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly TimeZoneInfo _argentinaTimeZone;
|
||||
private readonly IConfiguration _configuration;
|
||||
|
||||
// Diccionario para rastrear la última vez que se ejecutó una tarea diaria
|
||||
// y evitar que se ejecute múltiples veces si el servicio se reinicia.
|
||||
private readonly Dictionary<string, DateTime> _lastDailyRun = new();
|
||||
// Almacenamos las expresiones Cron parseadas para no tener que hacerlo en cada ciclo.
|
||||
private readonly CronExpression _agroSchedule;
|
||||
private readonly CronExpression _bcrSchedule;
|
||||
private readonly CronExpression _bolsasSchedule;
|
||||
|
||||
public DataFetchingService(ILogger<DataFetchingService> logger, IServiceProvider serviceProvider,IConfiguration configuration)
|
||||
// Almacenamos la próxima ejecución calculada para cada tarea.
|
||||
private DateTime? _nextAgroRun;
|
||||
private DateTime? _nextBcrRun;
|
||||
private DateTime? _nextBolsasRun;
|
||||
|
||||
// Diccionario para rastrear la hora de la última alerta ENVIADA por cada tarea.
|
||||
private readonly Dictionary<string, DateTime> _lastAlertSent = new();
|
||||
// Definimos el período de "silencio" para las alertas (ej. 4 horas).
|
||||
private readonly TimeSpan _alertSilencePeriod = TimeSpan.FromHours(4);
|
||||
|
||||
public DataFetchingService(
|
||||
ILogger<DataFetchingService> logger,
|
||||
IServiceProvider serviceProvider,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_serviceProvider = serviceProvider;
|
||||
_configuration = configuration;
|
||||
|
||||
|
||||
// Se define explícitamente la zona horaria de Argentina.
|
||||
// Esto asegura que los cálculos de tiempo sean correctos, sin importar
|
||||
// la configuración de zona horaria del servidor donde se ejecute el worker.
|
||||
try
|
||||
{
|
||||
// El ID estándar para Linux y macOS
|
||||
_argentinaTimeZone = TimeZoneInfo.FindSystemTimeZoneById("America/Argentina/Buenos_Aires");
|
||||
}
|
||||
catch (TimeZoneNotFoundException)
|
||||
{
|
||||
// El ID equivalente para Windows
|
||||
_argentinaTimeZone = TimeZoneInfo.FindSystemTimeZoneById("Argentina Standard Time");
|
||||
}
|
||||
|
||||
// Parseamos las expresiones Cron UNA SOLA VEZ, en el constructor.
|
||||
// Si una expresión es inválida o nula, el servicio fallará al iniciar,
|
||||
// lo cual es un comportamiento deseable para alertar de una mala configuración.
|
||||
// El '!' le dice al compilador que confiamos que estos valores no serán nulos.
|
||||
_agroSchedule = CronExpression.Parse(configuration["Schedules:MercadoAgroganadero"]!);
|
||||
_bcrSchedule = CronExpression.Parse(configuration["Schedules:BCR"]!);
|
||||
_bolsasSchedule = CronExpression.Parse(configuration["Schedules:Bolsas"]!);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -46,106 +64,58 @@ namespace Mercados.Worker
|
||||
{
|
||||
_logger.LogInformation("🚀 Servicio de Fetching iniciado a las: {time}", DateTimeOffset.Now);
|
||||
|
||||
// Se recomienda una ejecución inicial para poblar la base de datos inmediatamente
|
||||
// al iniciar el servicio, en lugar de esperar al primer horario programado.
|
||||
//await RunAllFetchersAsync(stoppingToken);
|
||||
// Ejecutamos una vez al inicio para tener datos frescos inmediatamente.
|
||||
await RunAllFetchersAsync(stoppingToken);
|
||||
|
||||
// PeriodicTimer es una forma moderna y eficiente de crear un bucle de "tic-tac"
|
||||
// sin bloquear un hilo con Task.Delay.
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(1));
|
||||
// Calculamos las primeras ejecuciones programadas al arrancar.
|
||||
var utcNow = DateTime.UtcNow;
|
||||
_nextAgroRun = _agroSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
_nextBcrRun = _bcrSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
_nextBolsasRun = _bolsasSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
|
||||
// Usamos un PeriodicTimer que "despierta" cada 30 segundos para revisar si hay tareas pendientes.
|
||||
// Un intervalo más corto aumenta la precisión del disparo de las tareas.
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
|
||||
|
||||
// El bucle se ejecuta cada minuto mientras el servicio no reciba una señal de detención.
|
||||
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
|
||||
{
|
||||
await RunScheduledTasksAsync(stoppingToken);
|
||||
}
|
||||
}
|
||||
utcNow = DateTime.UtcNow;
|
||||
|
||||
/// <summary>
|
||||
/// Revisa la hora actual y ejecuta las tareas que coincidan con su horario programado.
|
||||
/// </summary>
|
||||
private async Task RunScheduledTasksAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var utcNow = DateTime.UtcNow;
|
||||
|
||||
// Tareas diarias (estas suelen ser rápidas y no se solapan, no es crítico paralelizar)
|
||||
// Mantenerlas secuenciales puede ser más simple de leer.
|
||||
string? agroSchedule = _configuration["Schedules:MercadoAgroganadero"];
|
||||
if (!string.IsNullOrEmpty(agroSchedule))
|
||||
{
|
||||
await TryRunDailyTaskAsync("MercadoAgroganadero", agroSchedule, utcNow, stoppingToken);
|
||||
}
|
||||
else { _logger.LogWarning("..."); }
|
||||
|
||||
string? bcrSchedule = _configuration["Schedules:BCR"];
|
||||
if (!string.IsNullOrEmpty(bcrSchedule))
|
||||
{
|
||||
await TryRunDailyTaskAsync("BCR", bcrSchedule, utcNow, stoppingToken);
|
||||
}
|
||||
else { _logger.LogWarning("..."); }
|
||||
|
||||
// --- Tareas Recurrentes (Bolsas) ---
|
||||
string? bolsasSchedule = _configuration["Schedules:Bolsas"];
|
||||
if (!string.IsNullOrEmpty(bolsasSchedule))
|
||||
{
|
||||
// Reemplazamos la llamada secuencial con la ejecución paralela
|
||||
await TryRunRecurringTaskInParallelAsync(new[] { "YahooFinance", "Finnhub" }, bolsasSchedule, utcNow, stoppingToken);
|
||||
}
|
||||
else { _logger.LogWarning("..."); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Comprueba y ejecuta una tarea que debe correr solo una vez al día.
|
||||
/// </summary>
|
||||
private async Task TryRunDailyTaskAsync(string taskName, string cronExpression, DateTime utcNow, CancellationToken stoppingToken)
|
||||
{
|
||||
var cron = CronExpression.Parse(cronExpression);
|
||||
var nextOccurrence = cron.GetNextOccurrence(utcNow.AddMinutes(-1));
|
||||
|
||||
if (nextOccurrence.HasValue && nextOccurrence.Value <= utcNow)
|
||||
{
|
||||
if (HasNotRunToday(taskName))
|
||||
// Comprobamos si ha llegado el momento de la próxima ejecución para cada tarea.
|
||||
if (_nextAgroRun.HasValue && utcNow >= _nextAgroRun.Value)
|
||||
{
|
||||
await RunFetcherByNameAsync(taskName, stoppingToken);
|
||||
_lastDailyRun[taskName] = TimeZoneInfo.ConvertTimeFromUtc(utcNow, _argentinaTimeZone).Date;
|
||||
await RunFetcherByNameAsync("MercadoAgroganadero", stoppingToken);
|
||||
// Inmediatamente después de ejecutar, calculamos la SIGUIENTE ocurrencia.
|
||||
_nextAgroRun = _agroSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
}
|
||||
|
||||
if (_nextBcrRun.HasValue && utcNow >= _nextBcrRun.Value)
|
||||
{
|
||||
await RunFetcherByNameAsync("BCR", stoppingToken);
|
||||
_nextBcrRun = _bcrSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
}
|
||||
|
||||
if (_nextBolsasRun.HasValue && utcNow >= _nextBolsasRun.Value)
|
||||
{
|
||||
_logger.LogInformation("Ventana de ejecución para Bolsas. Iniciando en paralelo...");
|
||||
await Task.WhenAll(
|
||||
RunFetcherByNameAsync("YahooFinance", stoppingToken),
|
||||
RunFetcherByNameAsync("Finnhub", stoppingToken)
|
||||
);
|
||||
_nextBolsasRun = _bolsasSchedule.GetNextOccurrence(utcNow, _argentinaTimeZone);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Comprueba y ejecuta una tarea que puede correr múltiples veces al día.
|
||||
/// </summary>
|
||||
private async Task TryRunRecurringTaskInParallelAsync(string[] taskNames, string cronExpression, DateTime utcNow, CancellationToken stoppingToken)
|
||||
{
|
||||
var cron = CronExpression.Parse(cronExpression, CronFormat.IncludeSeconds);
|
||||
var nextOccurrence = cron.GetNextOccurrence(utcNow.AddMinutes(-1));
|
||||
|
||||
if (nextOccurrence.HasValue && nextOccurrence.Value <= utcNow)
|
||||
{
|
||||
_logger.LogInformation("Ventana de ejecución para: {Tasks}. Iniciando en paralelo...", string.Join(", ", taskNames));
|
||||
|
||||
// Creamos una lista de tareas, una por cada fetcher a ejecutar
|
||||
var tasks = taskNames.Select(taskName => RunFetcherByNameAsync(taskName, stoppingToken)).ToList();
|
||||
|
||||
// Iniciamos todas las tareas a la vez y esperamos a que todas terminen
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
_logger.LogInformation("Todas las tareas recurrentes han finalizado.");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ejecuta un fetcher específico por su nombre. Utiliza un scope de DI para gestionar
|
||||
/// correctamente el ciclo de vida de los servicios (como las conexiones a la BD).
|
||||
/// Ejecuta un fetcher específico por su nombre, gestionando el scope de DI y las notificaciones.
|
||||
/// </summary>
|
||||
private async Task RunFetcherByNameAsync(string sourceName, CancellationToken stoppingToken)
|
||||
{
|
||||
if (stoppingToken.IsCancellationRequested) return;
|
||||
|
||||
_logger.LogInformation("Intentando ejecutar fetcher: {sourceName}", sourceName);
|
||||
|
||||
// Crea un "scope" de servicios. Todos los servicios "scoped" (como los repositorios)
|
||||
// se crearán de nuevo para esta ejecución y se desecharán al final, evitando problemas.
|
||||
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var fetchers = scope.ServiceProvider.GetRequiredService<IEnumerable<IDataFetcher>>();
|
||||
var fetcher = fetchers.FirstOrDefault(f => f.SourceName.Equals(sourceName, StringComparison.OrdinalIgnoreCase));
|
||||
@@ -155,7 +125,19 @@ namespace Mercados.Worker
|
||||
var (success, message) = await fetcher.FetchDataAsync();
|
||||
if (!success)
|
||||
{
|
||||
_logger.LogError("Falló la ejecución del fetcher {sourceName}: {message}", sourceName, message);
|
||||
var errorMessage = $"Falló la ejecución del fetcher {sourceName}: {message}";
|
||||
_logger.LogError(errorMessage);
|
||||
|
||||
if (ShouldSendAlert(sourceName))
|
||||
{
|
||||
var notifier = scope.ServiceProvider.GetRequiredService<INotificationService>();
|
||||
await notifier.SendFailureAlertAsync($"Fallo Crítico en el Fetcher: {sourceName}", errorMessage, DateTime.UtcNow);
|
||||
_lastAlertSent[sourceName] = DateTime.UtcNow;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning("Fallo repetido para {sourceName}. Alerta silenciada temporalmente.", sourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -165,31 +147,35 @@ namespace Mercados.Worker
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ejecuta todos los fetchers al iniciar el servicio. Esto es útil para poblar
|
||||
/// la base de datos inmediatamente al arrancar el worker.
|
||||
/// Ejecuta todos los fetchers en paralelo al iniciar el servicio.
|
||||
/// </summary>
|
||||
/*
|
||||
private async Task RunAllFetchersAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Ejecutando todos los fetchers al iniciar en paralelo...");
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var fetchers = scope.ServiceProvider.GetRequiredService<IEnumerable<IDataFetcher>>();
|
||||
|
||||
// Creamos una lista de tareas, una por cada fetcher disponible
|
||||
var tasks = fetchers.Select(fetcher => RunFetcherByNameAsync(fetcher.SourceName, stoppingToken)).ToList();
|
||||
|
||||
// Ejecutamos todo y esperamos
|
||||
|
||||
var tasks = fetchers.Select(fetcher => RunFetcherByNameAsync(fetcher.SourceName, stoppingToken));
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
_logger.LogInformation("Ejecución inicial de todos los fetchers completada.");
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
#region Funciones de Ayuda para la Planificación
|
||||
|
||||
private bool HasNotRunToday(string taskName)
|
||||
/// <summary>
|
||||
/// Determina si se debe enviar una alerta o si está en período de silencio.
|
||||
/// </summary>
|
||||
private bool ShouldSendAlert(string taskName)
|
||||
{
|
||||
return !_lastDailyRun.ContainsKey(taskName) || _lastDailyRun[taskName].Date < TimeZoneInfo.ConvertTimeFromUtc(DateTime.UtcNow, _argentinaTimeZone).Date;
|
||||
if (!_lastAlertSent.ContainsKey(taskName))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
var lastAlertTime = _lastAlertSent[taskName];
|
||||
return DateTime.UtcNow.Subtract(lastAlertTime) > _alertSilencePeriod;
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
@@ -6,90 +6,62 @@ using Mercados.Infrastructure.Persistence.Repositories;
|
||||
using Mercados.Worker;
|
||||
using Polly;
|
||||
using Polly.Extensions.Http;
|
||||
using Mercados.Infrastructure.Services;
|
||||
using DotNetEnv;
|
||||
using DotNetEnv.Configuration;
|
||||
|
||||
// Carga las variables de entorno desde el archivo .env en la raíz de la solución.
|
||||
DotNetEnv.Env.Load();
|
||||
var envFilePath = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "../../../../../.env"));
|
||||
|
||||
// Cargamos el archivo .env desde la ruta explícita.
|
||||
// Si no lo encuentra, Load retornará false.
|
||||
if (!Env.Load(envFilePath).Any())
|
||||
{
|
||||
Console.WriteLine($"ADVERTENCIA: No se pudo encontrar el archivo .env en la ruta: {envFilePath}");
|
||||
}
|
||||
|
||||
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
|
||||
// --- Configuración del Host ---
|
||||
// Esto prepara el host del servicio, permitiendo la inyección de dependencias,
|
||||
// la configuración desde appsettings.json y el logging.
|
||||
|
||||
IHost host = Host.CreateDefaultBuilder(args)
|
||||
.ConfigureServices((hostContext, services) =>
|
||||
{
|
||||
// Obtenemos la configuración desde el host builder para usarla aquí.
|
||||
// La línea 'config.AddDotNetEnv(optional: true);' ha sido eliminada.
|
||||
|
||||
IConfiguration configuration = hostContext.Configuration;
|
||||
|
||||
// --- 1. Registro de Servicios de Infraestructura ---
|
||||
|
||||
// Registramos la fábrica de conexiones a la BD. Es un Singleton porque
|
||||
// solo necesita ser creada una vez para leer la cadena de conexión.
|
||||
services.AddSingleton<IDbConnectionFactory, SqlConnectionFactory>();
|
||||
|
||||
// Registramos los repositorios. Se crean "por petición" (Scoped).
|
||||
// En un worker, "Scoped" significa que se creará una instancia por cada
|
||||
// ejecución del servicio, lo cual es seguro y eficiente.
|
||||
services.AddScoped<ICotizacionGanadoRepository, CotizacionGanadoRepository>();
|
||||
services.AddScoped<ICotizacionGranoRepository, CotizacionGranoRepository>();
|
||||
services.AddScoped<ICotizacionBolsaRepository, CotizacionBolsaRepository>();
|
||||
services.AddScoped<IFuenteDatoRepository, FuenteDatoRepository>();
|
||||
//services.AddScoped<INotificationService, ConsoleNotificationService>();
|
||||
services.AddScoped<INotificationService, EmailNotificationService>();
|
||||
|
||||
// --- 2. Registro de los Data Fetchers ---
|
||||
|
||||
// Registramos CADA uno de nuestros fetchers. El contenedor de DI sabrá
|
||||
// que todos implementan la interfaz IDataFetcher.
|
||||
// Descomentados para la versión final y funcional.
|
||||
services.AddScoped<IDataFetcher, MercadoAgroFetcher>();
|
||||
services.AddScoped<IDataFetcher, BcrDataFetcher>();
|
||||
services.AddScoped<IDataFetcher, FinnhubDataFetcher>();
|
||||
services.AddScoped<IDataFetcher, YahooFinanceDataFetcher>();
|
||||
|
||||
// El cliente HTTP es fundamental para hacer llamadas a APIs externas.
|
||||
// Le damos un nombre al cliente de Finnhub para cumplir con los requisitos de su constructor.
|
||||
//services.AddHttpClient("Finnhub");
|
||||
|
||||
// Configuramos CADA cliente HTTP que nuestros fetchers usan.
|
||||
// IHttpClientFactory nos permite nombrar y configurar clientes de forma independiente.
|
||||
|
||||
// Cliente para el scraper del MercadoAgro, con una política de reintentos
|
||||
services.AddHttpClient("MercadoAgroFetcher")
|
||||
.AddPolicyHandler(GetRetryPolicy());
|
||||
// --- 3. Configuración de Clientes HTTP con Polly ---
|
||||
services.AddHttpClient("MercadoAgroFetcher").AddPolicyHandler(GetRetryPolicy());
|
||||
services.AddHttpClient("BcrDataFetcher").AddPolicyHandler(GetRetryPolicy());
|
||||
services.AddHttpClient("FinnhubDataFetcher").AddPolicyHandler(GetRetryPolicy());
|
||||
|
||||
// Cliente para la API de BCR, con la misma política de reintentos
|
||||
services.AddHttpClient("BcrDataFetcher")
|
||||
.AddPolicyHandler(GetRetryPolicy());
|
||||
|
||||
// Cliente para Finnhub, con la misma política de reintentos
|
||||
services.AddHttpClient("FinnhubDataFetcher")
|
||||
.AddPolicyHandler(GetRetryPolicy());
|
||||
|
||||
// Cliente para YahooFinance (aunque es menos probable que falle, es buena práctica incluirlo)
|
||||
// La librería YahooFinanceApi usa su propio HttpClient, así que esta configuración
|
||||
// no le afectará directamente. La resiliencia para YahooFinance la manejaremos de otra forma si es necesario.
|
||||
// Por ahora, lo dejamos así y nos enfocamos en los que usan IHttpClientFactory.
|
||||
|
||||
|
||||
// --- 3. Registro del Worker Principal ---
|
||||
|
||||
// Finalmente, registramos nuestro servicio de fondo (el worker en sí).
|
||||
// --- 4. Registro del Worker Principal ---
|
||||
services.AddHostedService<DataFetchingService>();
|
||||
})
|
||||
.Build();
|
||||
|
||||
// Esta función define nuestra política de reintentos.
|
||||
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
|
||||
{
|
||||
// Polly.Extensions.Http nos da este método conveniente.
|
||||
return HttpPolicyExtensions
|
||||
// Maneja errores de red transitorios O códigos de estado de servidor que indican un problema temporal.
|
||||
.HandleTransientHttpError()
|
||||
// También maneja el error 408 Request Timeout
|
||||
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
|
||||
// Política de reintento con espera exponencial: 3 reintentos, esperando 2^intento segundos.
|
||||
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
||||
.HandleTransientHttpError()
|
||||
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
|
||||
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
||||
onRetry: (outcome, timespan, retryAttempt, context) =>
|
||||
{
|
||||
// Registramos un log cada vez que se realiza un reintento.
|
||||
// Esta es una forma de hacerlo sin tener acceso directo al ILogger aquí.
|
||||
Console.WriteLine($"[Polly] Reintentando petición... Intento {retryAttempt}. Esperando {timespan.TotalSeconds}s. Causa: {outcome.Exception?.Message ?? outcome.Result.ReasonPhrase}");
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user