package tasks import ( "context" "fmt" "log/slog" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" ) // RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute) return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error { startedAt := time.Now() defer func() { logger.Info("Monthly summary job finished", "duration", time.Since(startedAt)) }() now := time.Now() firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) targetMonth := firstOfThisMonth.AddDate(0, -1, 0) return c.aggregateMonthlySummary(jobCtx, targetMonth, false) }) } func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error { return c.aggregateMonthlySummary(ctx, month, force) } func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error { jobStart := time.Now() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) monthEnd := monthStart.AddDate(0, 1, 0) dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd) if err != nil { return err } dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) if len(dailySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) if err != nil { return err } if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, monthlyTable); err != nil { return err } } dailyTables := make([]string, 0, len(dailySnapshots)) for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01")) } else { c.Logger.Info("Monthly snapshot totals", "month", targetMonth.Format("2006-01"), "vm_count", monthlyTotals.VmCount, "vcpu_total", monthlyTotals.VcpuTotal, "ram_total_gb", monthlyTotals.RamTotal, "disk_total_gb", monthlyTotals.DiskTotal, ) } insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) return err } // Backfill missing creation times to the start of the month for rows lacking creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, monthStart.Unix(), ); err != nil { c.Logger.Warn("failed to normalize creation times for monthly summary", "error", err, "table", monthlyTable) } rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable) if err := c.generateReport(ctx, monthlyTable); err != nil { c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) return nil } func monthlySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) }