feat(AgroFetcher): Implement incremental updates for cattle market data

- Modified the schedule to fetch data multiple times a day (11, 15, 18, 21h).
- Refactored MercadoAgroFetcher to compare new data against the last saved records for the current day.
- Added a repository method to replace the daily batch atomically, ensuring data integrity.
- Database writes are now skipped if no changes are detected, improving efficiency."
This commit is contained in:
2025-07-17 13:38:47 -03:00
parent 23f0d02fe3
commit e9540fa155
4 changed files with 69 additions and 33 deletions

View File

@@ -30,35 +30,45 @@ namespace Mercados.Infrastructure.DataFetchers
public async Task<(bool Success, string Message)> FetchDataAsync()
{
_logger.LogInformation("Iniciando fetch para {SourceName}.", SourceName);
try
{
var htmlContent = await GetHtmlContentAsync();
if (string.IsNullOrEmpty(htmlContent))
if (string.IsNullOrEmpty(htmlContent)) return (false, "No se pudo obtener el contenido HTML.");
var cotizacionesNuevas = ParseHtmlToEntities(htmlContent);
if (!cotizacionesNuevas.Any())
{
// Esto sigue siendo un fallo, no se pudo obtener la página
return (false, "No se pudo obtener el contenido HTML.");
return (true, "Conexión exitosa, pero no se encontraron nuevos datos de ganado.");
}
var cotizaciones = ParseHtmlToEntities(htmlContent);
var hoy = DateOnly.FromDateTime(DateTime.UtcNow);
// 1. Obtenemos los últimos datos guardados para el día de HOY.
var cotizacionesViejas = await _cotizacionRepository.ObtenerTandaPorFechaAsync(hoy);
if (!cotizaciones.Any())
// 2. Comparamos si hay alguna diferencia real.
// Creamos un HashSet de los registros viejos para una comparación ultra rápida.
var viejasSet = new HashSet<string>(cotizacionesViejas.Select(c => $"{c.Categoria}|{c.Especificaciones}|{c.Maximo}|{c.Minimo}|{c.Cabezas}"));
var nuevasList = cotizacionesNuevas.Select(c => $"{c.Categoria}|{c.Especificaciones}|{c.Maximo}|{c.Minimo}|{c.Cabezas}").ToList();
bool hayCambios = cotizacionesViejas.Count() != nuevasList.Count || nuevasList.Any(n => !viejasSet.Contains(n));
if (!hayCambios)
{
// La conexión fue exitosa, pero no se encontraron datos válidos.
// Esto NO es un error crítico, es un estado informativo.
_logger.LogInformation("La conexión con {SourceName} fue exitosa, pero no se encontraron datos de cotizaciones para procesar.", SourceName);
return (true, "Conexión exitosa, pero no se encontraron nuevos datos.");
_logger.LogInformation("No se encontraron cambios en los datos de {SourceName}. No se requiere actualización.", SourceName);
return (true, "Datos sin cambios.");
}
await _cotizacionRepository.GuardarMuchosAsync(cotizaciones);
// 3. Si hay cambios, reemplazamos la tanda completa del día.
_logger.LogInformation("Se detectaron cambios en los datos de {SourceName}. Actualizando base de datos...", SourceName);
await _cotizacionRepository.ReemplazarTandaDelDiaAsync(hoy, cotizacionesNuevas);
await UpdateSourceInfoAsync();
_logger.LogInformation("Fetch para {SourceName} completado exitosamente. Se guardaron {Count} registros.", SourceName, cotizaciones.Count);
return (true, $"Proceso completado. Se guardaron {cotizaciones.Count} registros.");
_logger.LogInformation("Fetch para {SourceName} completado exitosamente. Se actualizaron {Count} registros.", SourceName, cotizacionesNuevas.Count);
return (true, $"Proceso completado. Se actualizaron {cotizacionesNuevas.Count} registros.");
}
catch (Exception ex)
{
// Un catch aquí sí es un error real (ej. 404, timeout, etc.)
_logger.LogError(ex, "Ocurrió un error durante el fetch para {SourceName}.", SourceName);
return (false, $"Error: {ex.Message}");
}

View File

@@ -13,6 +13,43 @@ namespace Mercados.Infrastructure.Persistence.Repositories
_connectionFactory = connectionFactory;
}
public async Task<IEnumerable<CotizacionGanado>> ObtenerTandaPorFechaAsync(DateOnly fecha)
{
using IDbConnection connection = _connectionFactory.CreateConnection();
const string sql = @"
SELECT * FROM CotizacionesGanado
WHERE CONVERT(date, FechaRegistro) = @Fecha;";
return await connection.QueryAsync<CotizacionGanado>(sql, new { Fecha = fecha.ToString("yyyy-MM-dd") });
}
// Nuevo método para hacer un "reemplazo" inteligente
public async Task ReemplazarTandaDelDiaAsync(DateOnly fecha, IEnumerable<CotizacionGanado> nuevasCotizaciones)
{
using IDbConnection connection = _connectionFactory.CreateConnection();
connection.Open();
using var transaction = connection.BeginTransaction();
try
{
// Borramos solo los registros del día que vamos a actualizar
const string deleteSql = "DELETE FROM CotizacionesGanado WHERE CONVERT(date, FechaRegistro) = @Fecha;";
await connection.ExecuteAsync(deleteSql, new { Fecha = fecha.ToString("yyyy-MM-dd") }, transaction);
// Insertamos la nueva tanda completa
const string insertSql = @"
INSERT INTO CotizacionesGanado (Categoria, Especificaciones, Maximo, Minimo, Promedio, Mediano, Cabezas, KilosTotales, KilosPorCabeza, ImporteTotal, FechaRegistro)
VALUES (@Categoria, @Especificaciones, @Maximo, @Minimo, @Promedio, @Mediano, @Cabezas, @KilosTotales, @KilosPorCabeza, @ImporteTotal, @FechaRegistro);";
await connection.ExecuteAsync(insertSql, nuevasCotizaciones, transaction);
transaction.Commit();
}
catch
{
transaction.Rollback();
throw;
}
}
public async Task GuardarMuchosAsync(IEnumerable<CotizacionGanado> cotizaciones)
{
using IDbConnection connection = _connectionFactory.CreateConnection();
@@ -30,19 +67,6 @@ namespace Mercados.Infrastructure.Persistence.Repositories
await connection.ExecuteAsync(sql, cotizaciones);
}
public async Task<IEnumerable<CotizacionGanado>> ObtenerUltimaTandaAsync()
{
using IDbConnection connection = _connectionFactory.CreateConnection();
// Primero, obtenemos la fecha de registro más reciente.
// Luego, seleccionamos todos los registros que tengan esa fecha.
const string sql = @"
SELECT *
FROM CotizacionesGanado
WHERE FechaRegistro = (SELECT MAX(FechaRegistro) FROM CotizacionesGanado);";
return await connection.QueryAsync<CotizacionGanado>(sql);
}
public async Task<IEnumerable<CotizacionGanado>> ObtenerHistorialAsync(string categoria, string especificaciones, int dias)
{
@@ -60,10 +84,11 @@ namespace Mercados.Infrastructure.Persistence.Repositories
ORDER BY
FechaRegistro ASC;";
return await connection.QueryAsync<CotizacionGanado>(sql, new {
Categoria = categoria,
Especificaciones = especificaciones,
Dias = dias
return await connection.QueryAsync<CotizacionGanado>(sql, new
{
Categoria = categoria,
Especificaciones = especificaciones,
Dias = dias
});
}
}

View File

@@ -4,8 +4,9 @@ namespace Mercados.Infrastructure.Persistence.Repositories
{
public interface ICotizacionGanadoRepository : IBaseRepository
{
Task<IEnumerable<CotizacionGanado>> ObtenerTandaPorFechaAsync(DateOnly fecha);
Task GuardarMuchosAsync(IEnumerable<CotizacionGanado> cotizaciones);
Task<IEnumerable<CotizacionGanado>> ObtenerUltimaTandaAsync();
Task ReemplazarTandaDelDiaAsync(DateOnly fecha, IEnumerable<CotizacionGanado> nuevasCotizaciones);
Task<IEnumerable<CotizacionGanado>> ObtenerHistorialAsync(string categoria, string especificaciones, int dias);
}
}

View File

@@ -10,7 +10,7 @@
"DefaultConnection": ""
},
"Schedules": {
"MercadoAgroganadero": "0 11 * * 1-5",
"MercadoAgroganadero": "0 11,15,18,21 * * 1-5",
"BCR": "30 11 * * 1-5",
"Bolsas": "10 11-17 * * 1-5",
"Holidays": "0 2 * * 1"