package tasks import ( "context" "fmt" "log/slog" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error { startedAt := time.Now() defer func() { logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) }() targetTime := time.Now().Add(-time.Minute) return c.aggregateDailySummary(jobCtx, targetTime, false) }) } func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { return c.aggregateDailySummary(ctx, date, force) } func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error { jobStart := time.Now() dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) dayEnd := dayStart.AddDate(0, 0, 1) summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err } dbConn := c.Database.DB() if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { return err } if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) // Ensure indexes exist on historical hourly tables for faster aggregation. if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot totals", "date", dayStart.Format("2006-01-02"), "vm_count", currentTotals.VmCount, "vcpu_total", currentTotals.VcpuTotal, "ram_total_gb", currentTotals.RamTotal, "disk_total_gb", currentTotals.DiskTotal, ) } prevStart := dayStart.AddDate(0, 0, -1) prevEnd := dayStart prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd) if err == nil && len(prevSnapshots) > 0 { prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd) prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) prevTables := make([]string, 0, len(prevSnapshots)) for _, snapshot := range prevSnapshots { prevTables = append(prevTables, snapshot.TableName) } prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) if err == nil { prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) if err != nil { c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot comparison", "current_date", dayStart.Format("2006-01-02"), "previous_date", prevStart.Format("2006-01-02"), "vm_delta", currentTotals.VmCount-prevTotals.VmCount, "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, ) } } else { c.Logger.Warn("unable to build previous day union", "error", err) } } insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } // Backfill missing creation times to the start of the day for rows where vCenter had no creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, dayStart.Unix(), ); err != nil { c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), nil) return nil } func dailySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) }