From 374d4921e1091c6f8c9815a11f2f7de49626e7fb Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 22 Jan 2026 12:04:41 +1100 Subject: [PATCH] update aggregation jobs --- README.md | 2 +- internal/settings/settings.go | 1 + internal/tasks/dailyAggregate.go | 69 ++++++++++++++++++++++++++++ internal/tasks/inventorySnapshots.go | 15 ++++++ internal/tasks/monthlyAggregate.go | 63 ++++++++++++++++++------- server/handler/snapshotAggregate.go | 26 ++++++++++- src/vctp.yml | 1 + 7 files changed, 158 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index eb80478..81dee66 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ vCTP is a vSphere Chargeback Tracking Platform, designed for a specific customer ## Snapshots and Reports - Hourly snapshots capture inventory per vCenter (concurrency via `hourly_snapshot_concurrency`). -- Daily summaries aggregate the hourly snapshots for the day; monthly summaries aggregate daily summaries for the month. +- Daily summaries aggregate the hourly snapshots for the day; monthly summaries aggregate daily summaries for the month (or hourly snapshots if configured). - Snapshots are registered in `snapshot_registry` so regeneration via `/api/snapshots/aggregate` can locate the correct tables (fallback scanning is also supported). - Reports (XLSX with totals/charts) are generated automatically after hourly, daily, and monthly jobs and written to a reports directory. - Prometheus metrics are exposed at `/metrics`: diff --git a/internal/settings/settings.go b/internal/settings/settings.go index d3a1286..8d52736 100644 --- a/internal/settings/settings.go +++ b/internal/settings/settings.go @@ -47,6 +47,7 @@ type SettingsYML struct { HourlySnapshotMaxRetries int `yaml:"hourly_snapshot_max_retries"` DailyJobTimeoutSeconds int `yaml:"daily_job_timeout_seconds"` MonthlyJobTimeoutSeconds int `yaml:"monthly_job_timeout_seconds"` + MonthlyAggregationGranularity string `yaml:"monthly_aggregation_granularity"` CleanupJobTimeoutSeconds int `yaml:"cleanup_job_timeout_seconds"` TenantsToFilter []string `yaml:"tenants_to_filter"` NodeChargeClusters []string `yaml:"node_charge_clusters"` diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index 109afa5..6ba3b39 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -12,6 +12,7 @@ import ( "sync" "time" "vctp/db" + "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" ) @@ -264,6 +265,9 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) } + inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap) + c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions) + // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. var nextSnapshotTable string nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, ` @@ -308,7 +312,11 @@ LIMIT 1 maxSnap = snapTimes[len(snapTimes)-1] } + inferredDeletions := 0 for _, v := range aggMap { + if v.deletion != 0 { + continue + } // Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen. if maxSnap > 0 && len(v.seen) > 0 && v.lastSeen < maxSnap { c.Logger.Debug("inferring deletion window", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(snapTimes)) @@ -330,6 +338,7 @@ LIMIT 1 } if consecutiveMisses >= 2 { v.deletion = firstMiss + inferredDeletions++ break } } @@ -341,6 +350,7 @@ LIMIT 1 _, presentByName := nextPresence["name:"+v.key.Name] if !presentByID && !presentByUUID && !presentByName { v.deletion = firstMiss + inferredDeletions++ c.Logger.Debug("cross-day deletion inferred from next snapshot", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable) } } @@ -349,6 +359,7 @@ LIMIT 1 } } } + c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions) // Insert aggregated rows. if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil { @@ -392,6 +403,61 @@ LIMIT 1 return nil } +func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int { + dbConn := c.Database.DB() + vcenters := make(map[string]struct{}, 8) + for k := range agg { + if k.Vcenter != "" { + vcenters[k.Vcenter] = struct{}{} + } + } + totalApplied := 0 + for vcenter := range vcenters { + inventoryRows, err := queries.New(dbConn).GetInventoryByVcenter(ctx, vcenter) + if err != nil { + c.Logger.Warn("failed to load inventory for daily deletion times", "vcenter", vcenter, "error", err) + continue + } + byID := make(map[string]int64, len(inventoryRows)) + byUUID := make(map[string]int64, len(inventoryRows)) + byName := make(map[string]int64, len(inventoryRows)) + for _, inv := range inventoryRows { + if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 { + continue + } + if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { + byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64 + } + if inv.VmUuid.Valid && strings.TrimSpace(inv.VmUuid.String) != "" { + byUUID[strings.TrimSpace(inv.VmUuid.String)] = inv.DeletionTime.Int64 + } + if strings.TrimSpace(inv.Name) != "" { + byName[strings.ToLower(strings.TrimSpace(inv.Name))] = inv.DeletionTime.Int64 + } + } + for k, v := range agg { + if v.deletion != 0 || k.Vcenter != vcenter { + continue + } + if ts, ok := byID[k.VmId]; ok { + v.deletion = ts + totalApplied++ + continue + } + if ts, ok := byUUID[k.VmUuid]; ok { + v.deletion = ts + totalApplied++ + continue + } + if ts, ok := byName[strings.ToLower(k.Name)]; ok { + v.deletion = ts + totalApplied++ + } + } + } + return totalApplied +} + func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { agg := make(map[dailyAggKey]*dailyAggVal, 1024) mu := sync.Mutex{} @@ -555,6 +621,9 @@ FROM %s goldHits: hitGold, seen: map[int64]struct{}{int64OrZero(snapshotTime): {}}, } + if deletion.Valid && deletion.Int64 > 0 { + row.deletion = deletion.Int64 + } out[key] = row } return out, nil diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 5ce6cd1..9c64ece 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -317,6 +317,13 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) + logger.Info("Starting snapshot cleanup", + "now", now, + "hourly_cutoff", truncateDate(hourlyCutoff), + "daily_cutoff", truncateDate(dailyCutoff), + "hourly_max_age_days", hourlyMaxDays, + "daily_max_age_months", dailyMaxMonths, + ) dbConn := c.Database.DB() @@ -324,8 +331,10 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) if err != nil { return err } + logger.Info("Snapshot cleanup hourly scan", "tables_found", len(hourlyTables)) removedHourly := 0 + scannedHourly := 0 for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue @@ -334,6 +343,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) if !ok { continue } + scannedHourly++ if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) @@ -350,12 +360,15 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) if err != nil { return err } + logger.Info("Snapshot cleanup daily scan", "tables_found", len(dailyTables)) removedDaily := 0 + scannedDaily := 0 for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } + scannedDaily++ if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) @@ -369,6 +382,8 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) } c.Logger.Info("Finished snapshot cleanup", + "hourly_tables_scanned", scannedHourly, + "daily_tables_scanned", scannedDaily, "removed_hourly_tables", removedHourly, "removed_daily_tables", removedDaily, "hourly_max_age_days", hourlyMaxDays, diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index 535ae89..7c9aa39 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -40,19 +40,43 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time return err } + granularity := strings.ToLower(strings.TrimSpace(c.Settings.Values.Settings.MonthlyAggregationGranularity)) + if granularity == "" { + granularity = "hourly" + } + if granularity != "hourly" && granularity != "daily" { + c.Logger.Warn("unknown monthly aggregation granularity; defaulting to hourly", "granularity", granularity) + granularity = "hourly" + } + monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) monthEnd := monthStart.AddDate(0, 1, 0) - dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd) - if err != nil { - return err - } - dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) - dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) - dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) - if len(dailySnapshots) == 0 { - return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) + + var snapshots []report.SnapshotRecord + var unionColumns []string + if granularity == "daily" { + dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd) + if err != nil { + return err + } + dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) + dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) + snapshots = dailySnapshots + unionColumns = monthlyUnionColumns + } else { + hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", monthStart, monthEnd) + if err != nil { + return err + } + hourlySnapshots = filterRecordsInRange(hourlySnapshots, monthStart, monthEnd) + hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) + snapshots = hourlySnapshots + unionColumns = summaryUnionColumns + } + if len(snapshots) == 0 { + return fmt.Errorf("no %s snapshot tables found for %s", granularity, targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) @@ -75,22 +99,24 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time } // Optional Go-based aggregation path. - if os.Getenv("MONTHLY_AGG_GO") == "1" { + if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity == "daily" { c.Logger.Debug("Using go implementation of monthly aggregation") - if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots); err != nil { + if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil { c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable) return nil } + } else if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity != "daily" { + c.Logger.Warn("MONTHLY_AGG_GO is set but only daily granularity supports Go aggregation; using SQL path", "granularity", granularity) } - dailyTables := make([]string, 0, len(dailySnapshots)) - for _, snapshot := range dailySnapshots { - dailyTables = append(dailyTables, snapshot.TableName) + tables := make([]string, 0, len(snapshots)) + for _, snapshot := range snapshots { + tables = append(tables, snapshot.TableName) } - unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) + unionQuery, err := buildUnionQuery(tables, unionColumns, templateExclusionFilter()) if err != nil { return err } @@ -108,7 +134,12 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time ) } - insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) + var insertQuery string + if granularity == "daily" { + insertQuery, err = db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) + } else { + insertQuery, err = db.BuildDailySummaryInsert(monthlyTable, unionQuery) + } if err != nil { return err } diff --git a/server/handler/snapshotAggregate.go b/server/handler/snapshotAggregate.go index 682d0ab..d2a83f7 100644 --- a/server/handler/snapshotAggregate.go +++ b/server/handler/snapshotAggregate.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" "time" + "vctp/internal/settings" "vctp/internal/tasks" ) @@ -16,6 +17,7 @@ import ( // @Produce json // @Param type query string true "Aggregation type: daily or monthly" // @Param date query string true "Daily date (YYYY-MM-DD) or monthly date (YYYY-MM)" +// @Param granularity query string false "Monthly aggregation granularity: hourly or daily" // @Success 200 {object} map[string]string "Aggregation complete" // @Failure 400 {object} map[string]string "Invalid request" // @Failure 500 {object} map[string]string "Server error" @@ -23,6 +25,7 @@ import ( func (h *Handler) SnapshotAggregateForce(w http.ResponseWriter, r *http.Request) { snapshotType := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("type"))) dateValue := strings.TrimSpace(r.URL.Query().Get("date")) + granularity := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("granularity"))) startedAt := time.Now() loc := time.Now().Location() @@ -35,11 +38,29 @@ func (h *Handler) SnapshotAggregateForce(w http.ResponseWriter, r *http.Request) return } + if granularity != "" && snapshotType != "monthly" { + h.Logger.Warn("Snapshot aggregation granularity supplied for non-monthly request", + "type", snapshotType, + "granularity", granularity, + ) + writeJSONError(w, http.StatusBadRequest, "granularity is only supported for monthly aggregation") + return + } + if granularity != "" && granularity != "hourly" && granularity != "daily" { + h.Logger.Warn("Snapshot aggregation invalid granularity", "granularity", granularity) + writeJSONError(w, http.StatusBadRequest, "granularity must be hourly or daily") + return + } + ctx := context.Background() + settingsCopy := *h.Settings.Values + if granularity != "" { + settingsCopy.Settings.MonthlyAggregationGranularity = granularity + } ct := &tasks.CronTask{ Logger: h.Logger, Database: h.Database, - Settings: h.Settings, + Settings: &settings.Settings{Logger: h.Logger, SettingsPath: h.Settings.SettingsPath, Values: &settingsCopy}, } switch snapshotType { @@ -63,7 +84,7 @@ func (h *Handler) SnapshotAggregateForce(w http.ResponseWriter, r *http.Request) writeJSONError(w, http.StatusBadRequest, "date must be YYYY-MM") return } - h.Logger.Info("Starting monthly snapshot aggregation", "date", parsed.Format("2006-01"), "force", true) + h.Logger.Info("Starting monthly snapshot aggregation", "date", parsed.Format("2006-01"), "force", true, "granularity", granularity) if err := ct.AggregateMonthlySummary(ctx, parsed, true); err != nil { h.Logger.Error("Monthly snapshot aggregation failed", "date", parsed.Format("2006-01"), "error", err) writeJSONError(w, http.StatusInternalServerError, err.Error()) @@ -78,6 +99,7 @@ func (h *Handler) SnapshotAggregateForce(w http.ResponseWriter, r *http.Request) h.Logger.Info("Snapshot aggregation completed", "type", snapshotType, "date", dateValue, + "granularity", granularity, "duration", time.Since(startedAt), ) w.Header().Set("Content-Type", "application/json") diff --git a/src/vctp.yml b/src/vctp.yml index eaffdd6..202a135 100644 --- a/src/vctp.yml +++ b/src/vctp.yml @@ -26,6 +26,7 @@ settings: hourly_snapshot_timeout_seconds: 600 daily_job_timeout_seconds: 900 monthly_job_timeout_seconds: 1200 + monthly_aggregation_granularity: "hourly" cleanup_job_timeout_seconds: 600 tenants_to_filter: node_charge_clusters: