refactor aggregate jobs
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-15 09:04:52 +11:00
parent 8b2c8ae85d
commit 457d9395f0
5 changed files with 292 additions and 279 deletions

View File

@@ -0,0 +1,142 @@
package tasks
import (
"context"
"fmt"
"log/slog"
"time"
"vctp/db"
"vctp/internal/report"
)
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error {
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
return c.aggregateDailySummary(jobCtx, targetTime, false)
})
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter())
if err == nil {
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
} else {
c.Logger.Warn("unable to build previous day union", "error", err)
}
}
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
}
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
return nil
}
func dailySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}