From 13adc159a2f3b3ed908c975b584817168cf214a3 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 22 Jan 2026 20:50:29 +1100 Subject: [PATCH] more accurate deletion times in aggregations --- db/helpers.go | 47 ++++++++++++ internal/tasks/dailyAggregate.go | 112 +++++++++++++++++++++++++++-- internal/tasks/monthlyAggregate.go | 11 +++ 3 files changed, 165 insertions(+), 5 deletions(-) diff --git a/db/helpers.go b/db/helpers.go index 3ca582b..c0166e4 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -542,6 +542,53 @@ func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID s return MarkVmDeletedWithDetails(ctx, dbConn, vcenter, vmID, vmUUID, "", "", deletedAt) } +// ApplyLifecycleDeletionToSummary updates DeletionTime values in a summary table from vm_lifecycle_cache. +func ApplyLifecycleDeletionToSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable string, start, end int64) (int64, error) { + if err := ValidateTableName(summaryTable); err != nil { + return 0, err + } + if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil { + return 0, err + } + query := fmt.Sprintf(` +UPDATE %[1]s +SET "DeletionTime" = ( + SELECT MIN(l."DeletedAt") + FROM vm_lifecycle_cache l + WHERE l."Vcenter" = %[1]s."Vcenter" + AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0 + AND l."DeletedAt" >= ? AND l."DeletedAt" < ? + AND ( + (l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId") + OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid") + OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name") + ) +) +WHERE EXISTS ( + SELECT 1 FROM vm_lifecycle_cache l + WHERE l."Vcenter" = %[1]s."Vcenter" + AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0 + AND l."DeletedAt" >= ? AND l."DeletedAt" < ? + AND ( + (l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId") + OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid") + OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name") + ) + AND (%[1]s."DeletionTime" IS NULL OR %[1]s."DeletionTime" = 0 OR l."DeletedAt" < %[1]s."DeletionTime") +); +`, summaryTable) + bind := dbConn.Rebind(query) + res, err := execLog(ctx, dbConn, bind, start, end, start, end) + if err != nil { + return 0, err + } + rows, err := res.RowsAffected() + if err != nil { + return 0, err + } + return rows, nil +} + // UpsertVmDailyRollup writes/updates a daily rollup row. func UpsertVmDailyRollup(ctx context.Context, dbConn *sqlx.DB, day int64, v VmDailyRollupRow) error { if err := EnsureVmDailyRollup(ctx, dbConn); err != nil { diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index e51abaa..1bb513c 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -150,6 +150,11 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } + if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { + c.Logger.Warn("failed to apply lifecycle deletions to daily summary", "error", err, "table", summaryTable) + } else { + c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied) + } // Backfill missing creation times to the start of the day for rows where vCenter had no creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, @@ -265,7 +270,10 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) } - inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap) + lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, dayStart, dayEnd) + c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions) + + inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd) c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions) // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. @@ -405,7 +413,98 @@ LIMIT 1 return nil } -func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int { +func (c *CronTask) applyLifecycleDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { + dbConn := c.Database.DB() + if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") { + return 0 + } + type aggIndex struct { + byID map[string]*dailyAggVal + byUUID map[string]*dailyAggVal + byName map[string]*dailyAggVal + } + indexes := make(map[string]*aggIndex, 8) + for k, v := range agg { + if k.Vcenter == "" { + continue + } + idx := indexes[k.Vcenter] + if idx == nil { + idx = &aggIndex{ + byID: make(map[string]*dailyAggVal), + byUUID: make(map[string]*dailyAggVal), + byName: make(map[string]*dailyAggVal), + } + indexes[k.Vcenter] = idx + } + if k.VmId != "" { + idx.byID[k.VmId] = v + } + if k.VmUuid != "" { + idx.byUUID[k.VmUuid] = v + } + if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" { + idx.byName[name] = v + } + } + + totalApplied := 0 + for vcenter, idx := range indexes { + query := ` +SELECT "VmId","VmUuid","Name","DeletedAt" +FROM vm_lifecycle_cache +WHERE "Vcenter" = ? AND "DeletedAt" IS NOT NULL AND "DeletedAt" > 0 AND "DeletedAt" >= ? AND "DeletedAt" < ? +` + bind := dbConn.Rebind(query) + rows, err := dbConn.QueryxContext(ctx, bind, vcenter, start.Unix(), end.Unix()) + if err != nil { + c.Logger.Warn("failed to load lifecycle deletions", "vcenter", vcenter, "error", err) + continue + } + scanned := 0 + applied := 0 + missed := 0 + for rows.Next() { + scanned++ + var vmId, vmUuid, name sql.NullString + var deletedAt sql.NullInt64 + if err := rows.Scan(&vmId, &vmUuid, &name, &deletedAt); err != nil { + c.Logger.Warn("failed to scan lifecycle deletion", "vcenter", vcenter, "error", err) + continue + } + if !deletedAt.Valid || deletedAt.Int64 <= 0 { + continue + } + var target *dailyAggVal + if vmId.Valid { + target = idx.byID[strings.TrimSpace(vmId.String)] + } + if target == nil && vmUuid.Valid { + target = idx.byUUID[strings.TrimSpace(vmUuid.String)] + } + if target == nil && name.Valid { + target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))] + } + if target == nil { + missed++ + continue + } + if target.deletion == 0 || deletedAt.Int64 < target.deletion { + target.deletion = deletedAt.Int64 + } + applied++ + } + rows.Close() + if err := rows.Err(); err != nil { + c.Logger.Warn("failed to read lifecycle deletions", "vcenter", vcenter, "error", err) + } + c.Logger.Debug("lifecycle cache deletions applied", "vcenter", vcenter, "window_start", start, "window_end", end, "scanned", scanned, "applied", applied, "missed", missed) + totalApplied += applied + } + return totalApplied +} + +func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { dbConn := c.Database.DB() vcenters := make(map[string]struct{}, 8) for k := range agg { @@ -427,6 +526,9 @@ func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAgg if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 { continue } + if inv.DeletionTime.Int64 < start.Unix() || inv.DeletionTime.Int64 >= end.Unix() { + continue + } if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64 } @@ -442,21 +544,21 @@ func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAgg continue } if ts, ok := byID[k.VmId]; ok { - if v.deletion != ts { + if v.deletion == 0 || ts < v.deletion { v.deletion = ts } totalApplied++ continue } if ts, ok := byUUID[k.VmUuid]; ok { - if v.deletion != ts { + if v.deletion == 0 || ts < v.deletion { v.deletion = ts } totalApplied++ continue } if ts, ok := byName[strings.ToLower(k.Name)]; ok { - if v.deletion != ts { + if v.deletion == 0 || ts < v.deletion { v.deletion = ts } totalApplied++ diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index 7c9aa39..5d110b0 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -148,6 +148,11 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) return err } + if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil { + c.Logger.Warn("failed to apply lifecycle deletions to monthly summary", "error", err, "table", monthlyTable) + } else { + c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) + } // Backfill missing creation times to the start of the month for rows lacking creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, @@ -220,6 +225,12 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo return err } + if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { + c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (Go)", "error", err, "table", summaryTable) + } else { + c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) + } + if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) }