more accurate deletion times in aggregations
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-22 20:50:29 +11:00
parent c8f04efd51
commit 13adc159a2
3 changed files with 165 additions and 5 deletions

View File

@@ -542,6 +542,53 @@ func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID s
return MarkVmDeletedWithDetails(ctx, dbConn, vcenter, vmID, vmUUID, "", "", deletedAt) return MarkVmDeletedWithDetails(ctx, dbConn, vcenter, vmID, vmUUID, "", "", deletedAt)
} }
// ApplyLifecycleDeletionToSummary updates DeletionTime values in a summary table from vm_lifecycle_cache.
func ApplyLifecycleDeletionToSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable string, start, end int64) (int64, error) {
if err := ValidateTableName(summaryTable); err != nil {
return 0, err
}
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return 0, err
}
query := fmt.Sprintf(`
UPDATE %[1]s
SET "DeletionTime" = (
SELECT MIN(l."DeletedAt")
FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0
AND l."DeletedAt" >= ? AND l."DeletedAt" < ?
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
)
WHERE EXISTS (
SELECT 1 FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0
AND l."DeletedAt" >= ? AND l."DeletedAt" < ?
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
AND (%[1]s."DeletionTime" IS NULL OR %[1]s."DeletionTime" = 0 OR l."DeletedAt" < %[1]s."DeletionTime")
);
`, summaryTable)
bind := dbConn.Rebind(query)
res, err := execLog(ctx, dbConn, bind, start, end, start, end)
if err != nil {
return 0, err
}
rows, err := res.RowsAffected()
if err != nil {
return 0, err
}
return rows, nil
}
// UpsertVmDailyRollup writes/updates a daily rollup row. // UpsertVmDailyRollup writes/updates a daily rollup row.
func UpsertVmDailyRollup(ctx context.Context, dbConn *sqlx.DB, day int64, v VmDailyRollupRow) error { func UpsertVmDailyRollup(ctx context.Context, dbConn *sqlx.DB, day int64, v VmDailyRollupRow) error {
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil { if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {

View File

@@ -150,6 +150,11 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err return err
} }
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to daily summary", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied)
}
// Backfill missing creation times to the start of the day for rows where vCenter had no creation info. // Backfill missing creation times to the start of the day for rows where vCenter had no creation info.
if _, err := dbConn.ExecContext(ctx, if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
@@ -265,7 +270,10 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
} }
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap) lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, dayStart, dayEnd)
c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions)
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd)
c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions) c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions)
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
@@ -405,7 +413,98 @@ LIMIT 1
return nil return nil
} }
func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int { func (c *CronTask) applyLifecycleDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int {
dbConn := c.Database.DB()
if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") {
return 0
}
type aggIndex struct {
byID map[string]*dailyAggVal
byUUID map[string]*dailyAggVal
byName map[string]*dailyAggVal
}
indexes := make(map[string]*aggIndex, 8)
for k, v := range agg {
if k.Vcenter == "" {
continue
}
idx := indexes[k.Vcenter]
if idx == nil {
idx = &aggIndex{
byID: make(map[string]*dailyAggVal),
byUUID: make(map[string]*dailyAggVal),
byName: make(map[string]*dailyAggVal),
}
indexes[k.Vcenter] = idx
}
if k.VmId != "" {
idx.byID[k.VmId] = v
}
if k.VmUuid != "" {
idx.byUUID[k.VmUuid] = v
}
if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" {
idx.byName[name] = v
}
}
totalApplied := 0
for vcenter, idx := range indexes {
query := `
SELECT "VmId","VmUuid","Name","DeletedAt"
FROM vm_lifecycle_cache
WHERE "Vcenter" = ? AND "DeletedAt" IS NOT NULL AND "DeletedAt" > 0 AND "DeletedAt" >= ? AND "DeletedAt" < ?
`
bind := dbConn.Rebind(query)
rows, err := dbConn.QueryxContext(ctx, bind, vcenter, start.Unix(), end.Unix())
if err != nil {
c.Logger.Warn("failed to load lifecycle deletions", "vcenter", vcenter, "error", err)
continue
}
scanned := 0
applied := 0
missed := 0
for rows.Next() {
scanned++
var vmId, vmUuid, name sql.NullString
var deletedAt sql.NullInt64
if err := rows.Scan(&vmId, &vmUuid, &name, &deletedAt); err != nil {
c.Logger.Warn("failed to scan lifecycle deletion", "vcenter", vcenter, "error", err)
continue
}
if !deletedAt.Valid || deletedAt.Int64 <= 0 {
continue
}
var target *dailyAggVal
if vmId.Valid {
target = idx.byID[strings.TrimSpace(vmId.String)]
}
if target == nil && vmUuid.Valid {
target = idx.byUUID[strings.TrimSpace(vmUuid.String)]
}
if target == nil && name.Valid {
target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))]
}
if target == nil {
missed++
continue
}
if target.deletion == 0 || deletedAt.Int64 < target.deletion {
target.deletion = deletedAt.Int64
}
applied++
}
rows.Close()
if err := rows.Err(); err != nil {
c.Logger.Warn("failed to read lifecycle deletions", "vcenter", vcenter, "error", err)
}
c.Logger.Debug("lifecycle cache deletions applied", "vcenter", vcenter, "window_start", start, "window_end", end, "scanned", scanned, "applied", applied, "missed", missed)
totalApplied += applied
}
return totalApplied
}
func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int {
dbConn := c.Database.DB() dbConn := c.Database.DB()
vcenters := make(map[string]struct{}, 8) vcenters := make(map[string]struct{}, 8)
for k := range agg { for k := range agg {
@@ -427,6 +526,9 @@ func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAgg
if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 { if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 {
continue continue
} }
if inv.DeletionTime.Int64 < start.Unix() || inv.DeletionTime.Int64 >= end.Unix() {
continue
}
if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" {
byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64 byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64
} }
@@ -442,21 +544,21 @@ func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAgg
continue continue
} }
if ts, ok := byID[k.VmId]; ok { if ts, ok := byID[k.VmId]; ok {
if v.deletion != ts { if v.deletion == 0 || ts < v.deletion {
v.deletion = ts v.deletion = ts
} }
totalApplied++ totalApplied++
continue continue
} }
if ts, ok := byUUID[k.VmUuid]; ok { if ts, ok := byUUID[k.VmUuid]; ok {
if v.deletion != ts { if v.deletion == 0 || ts < v.deletion {
v.deletion = ts v.deletion = ts
} }
totalApplied++ totalApplied++
continue continue
} }
if ts, ok := byName[strings.ToLower(k.Name)]; ok { if ts, ok := byName[strings.ToLower(k.Name)]; ok {
if v.deletion != ts { if v.deletion == 0 || ts < v.deletion {
v.deletion = ts v.deletion = ts
} }
totalApplied++ totalApplied++

View File

@@ -148,6 +148,11 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
return err return err
} }
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary", "error", err, "table", monthlyTable)
} else {
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied)
}
// Backfill missing creation times to the start of the month for rows lacking creation info. // Backfill missing creation times to the start of the month for rows lacking creation info.
if _, err := dbConn.ExecContext(ctx, if _, err := dbConn.ExecContext(ctx,
`UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, `UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
@@ -220,6 +225,12 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo
return err return err
} }
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (Go)", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied)
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
} }