update aggregation jobs
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -40,19 +40,43 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
return err
|
||||
}
|
||||
|
||||
granularity := strings.ToLower(strings.TrimSpace(c.Settings.Values.Settings.MonthlyAggregationGranularity))
|
||||
if granularity == "" {
|
||||
granularity = "hourly"
|
||||
}
|
||||
if granularity != "hourly" && granularity != "daily" {
|
||||
c.Logger.Warn("unknown monthly aggregation granularity; defaulting to hourly", "granularity", granularity)
|
||||
granularity = "hourly"
|
||||
}
|
||||
|
||||
monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location())
|
||||
monthEnd := monthStart.AddDate(0, 1, 0)
|
||||
dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd)
|
||||
|
||||
dbConn := c.Database.DB()
|
||||
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
|
||||
dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots)
|
||||
if len(dailySnapshots) == 0 {
|
||||
return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01"))
|
||||
|
||||
var snapshots []report.SnapshotRecord
|
||||
var unionColumns []string
|
||||
if granularity == "daily" {
|
||||
dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd)
|
||||
dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots)
|
||||
snapshots = dailySnapshots
|
||||
unionColumns = monthlyUnionColumns
|
||||
} else {
|
||||
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", monthStart, monthEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hourlySnapshots = filterRecordsInRange(hourlySnapshots, monthStart, monthEnd)
|
||||
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
||||
snapshots = hourlySnapshots
|
||||
unionColumns = summaryUnionColumns
|
||||
}
|
||||
if len(snapshots) == 0 {
|
||||
return fmt.Errorf("no %s snapshot tables found for %s", granularity, targetMonth.Format("2006-01"))
|
||||
}
|
||||
|
||||
monthlyTable, err := monthlySummaryTableName(targetMonth)
|
||||
@@ -75,22 +99,24 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
}
|
||||
|
||||
// Optional Go-based aggregation path.
|
||||
if os.Getenv("MONTHLY_AGG_GO") == "1" {
|
||||
if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity == "daily" {
|
||||
c.Logger.Debug("Using go implementation of monthly aggregation")
|
||||
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots); err != nil {
|
||||
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
|
||||
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
|
||||
} else {
|
||||
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
|
||||
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
|
||||
return nil
|
||||
}
|
||||
} else if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity != "daily" {
|
||||
c.Logger.Warn("MONTHLY_AGG_GO is set but only daily granularity supports Go aggregation; using SQL path", "granularity", granularity)
|
||||
}
|
||||
|
||||
dailyTables := make([]string, 0, len(dailySnapshots))
|
||||
for _, snapshot := range dailySnapshots {
|
||||
dailyTables = append(dailyTables, snapshot.TableName)
|
||||
tables := make([]string, 0, len(snapshots))
|
||||
for _, snapshot := range snapshots {
|
||||
tables = append(tables, snapshot.TableName)
|
||||
}
|
||||
unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter())
|
||||
unionQuery, err := buildUnionQuery(tables, unionColumns, templateExclusionFilter())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -108,7 +134,12 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
|
||||
)
|
||||
}
|
||||
|
||||
insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery)
|
||||
var insertQuery string
|
||||
if granularity == "daily" {
|
||||
insertQuery, err = db.BuildMonthlySummaryInsert(monthlyTable, unionQuery)
|
||||
} else {
|
||||
insertQuery, err = db.BuildDailySummaryInsert(monthlyTable, unionQuery)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user