From 35b4a50cf6a92cef6a7b75335cef41ce3309e3ae Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Tue, 27 Jan 2026 09:09:24 +1100 Subject: [PATCH] try to fix pro-rata yet again --- db/helpers.go | 30 ++++++++++++++++++++++ internal/report/snapshots.go | 40 ++++++++++++++++++++++-------- internal/tasks/dailyAggregate.go | 6 +++++ internal/tasks/monthlyAggregate.go | 9 +++++++ 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/db/helpers.go b/db/helpers.go index b1d4a22..a1f5295 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -1532,6 +1532,36 @@ GROUP BY return insert, nil } +// UpdateSummaryPresenceByWindow recomputes AvgIsPresent using CreationTime/DeletionTime overlap with the window. +func UpdateSummaryPresenceByWindow(ctx context.Context, dbConn *sqlx.DB, summaryTable string, windowStart, windowEnd int64) error { + if err := ValidateTableName(summaryTable); err != nil { + return err + } + if windowEnd <= windowStart { + return fmt.Errorf("invalid presence window: %d to %d", windowStart, windowEnd) + } + duration := float64(windowEnd - windowStart) + startExpr := `CASE WHEN "CreationTime" IS NOT NULL AND "CreationTime" > 0 AND "CreationTime" > ? THEN "CreationTime" ELSE ? END` + endExpr := `CASE WHEN "DeletionTime" IS NOT NULL AND "DeletionTime" > 0 AND "DeletionTime" < ? THEN "DeletionTime" ELSE ? END` + query := fmt.Sprintf(` +UPDATE %s +SET "AvgIsPresent" = CASE + WHEN %s > %s THEN (CAST((%s - %s) AS REAL) / ?) + ELSE 0 +END +`, summaryTable, endExpr, startExpr, endExpr, startExpr) + query = dbConn.Rebind(query) + args := []interface{}{ + windowEnd, windowEnd, + windowStart, windowStart, + windowEnd, windowEnd, + windowStart, windowStart, + duration, + } + _, err := execLog(ctx, dbConn, query, args...) + return err +} + // RefineCreationDeletionFromUnion walks all snapshot rows in a period and tightens CreationTime/DeletionTime // by using the first and last observed samples and the first sample after disappearance. func RefineCreationDeletionFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error { diff --git a/internal/report/snapshots.go b/internal/report/snapshots.go index 88ce141..6537430 100644 --- a/internal/report/snapshots.go +++ b/internal/report/snapshots.go @@ -1152,6 +1152,9 @@ func buildHourlyTotals(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB startExpr := `CASE WHEN "CreationTime" IS NOT NULL AND "CreationTime" > 0 AND "CreationTime" > ? THEN "CreationTime" ELSE ? END` endExpr := `CASE WHEN "DeletionTime" IS NOT NULL AND "DeletionTime" > 0 AND "DeletionTime" < ? THEN "DeletionTime" ELSE ? END` overlapExpr := fmt.Sprintf(`CASE WHEN %s > %s THEN (CAST((%s - %s) AS REAL) / ?) ELSE 0 END`, endExpr, startExpr, endExpr, startExpr) + aggStartExpr := `CASE WHEN creation_time IS NOT NULL AND creation_time > 0 AND creation_time > ? THEN creation_time ELSE ? END` + aggEndExpr := `CASE WHEN deletion_time IS NOT NULL AND deletion_time > 0 AND deletion_time < ? THEN deletion_time ELSE ? END` + aggOverlapExpr := fmt.Sprintf(`CASE WHEN %s > %s THEN (CAST((%s - %s) AS REAL) / ?) ELSE 0 END`, aggEndExpr, aggStartExpr, aggEndExpr, aggStartExpr) idExpr := `COALESCE(NULLIF("VmId", ''), NULLIF("VmUuid", ''), NULLIF("Name", ''), 'unknown')` vmKeyExpr := fmt.Sprintf(`(%s || '|' || COALESCE("Vcenter", ''))`, idExpr) query := fmt.Sprintf(` @@ -1164,6 +1167,8 @@ WITH base AS ( "VcpuCount", "RamGB", LOWER(COALESCE("ResourcePool", '')) AS pool, + NULLIF("CreationTime", 0) AS creation_time, + NULLIF("DeletionTime", 0) AS deletion_time, %s AS presence FROM %s WHERE %s @@ -1174,10 +1179,20 @@ agg AS ( MAX("VcpuCount") AS "VcpuCount", MAX("RamGB") AS "RamGB", MAX(pool) AS pool, - MAX(presence) AS presence + MIN(creation_time) AS creation_time, + MIN(deletion_time) AS deletion_time FROM base GROUP BY vm_key ), +agg_presence AS ( + SELECT + vm_key, + "VcpuCount", + "RamGB", + pool, + %s AS presence + FROM agg +), diag AS ( SELECT COUNT(*) AS row_count, @@ -1192,14 +1207,14 @@ diag AS ( FROM base ) SELECT - (SELECT COUNT(*) FROM agg) AS vm_count, - (SELECT COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) FROM agg) AS vcpu_total, - (SELECT COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) FROM agg) AS ram_total, - (SELECT COALESCE(SUM(presence), 0) FROM agg) AS presence_ratio, - (SELECT COALESCE(SUM(CASE WHEN pool = 'tin' THEN presence ELSE 0 END), 0) FROM agg) AS tin_total, - (SELECT COALESCE(SUM(CASE WHEN pool = 'bronze' THEN presence ELSE 0 END), 0) FROM agg) AS bronze_total, - (SELECT COALESCE(SUM(CASE WHEN pool = 'silver' THEN presence ELSE 0 END), 0) FROM agg) AS silver_total, - (SELECT COALESCE(SUM(CASE WHEN pool = 'gold' THEN presence ELSE 0 END), 0) FROM agg) AS gold_total, + (SELECT COUNT(*) FROM agg_presence) AS vm_count, + (SELECT COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) FROM agg_presence) AS vcpu_total, + (SELECT COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) FROM agg_presence) AS ram_total, + (SELECT COALESCE(SUM(presence), 0) FROM agg_presence) AS presence_ratio, + (SELECT COALESCE(SUM(CASE WHEN pool = 'tin' THEN presence ELSE 0 END), 0) FROM agg_presence) AS tin_total, + (SELECT COALESCE(SUM(CASE WHEN pool = 'bronze' THEN presence ELSE 0 END), 0) FROM agg_presence) AS bronze_total, + (SELECT COALESCE(SUM(CASE WHEN pool = 'silver' THEN presence ELSE 0 END), 0) FROM agg_presence) AS silver_total, + (SELECT COALESCE(SUM(CASE WHEN pool = 'gold' THEN presence ELSE 0 END), 0) FROM agg_presence) AS gold_total, diag.row_count, diag.distinct_keys, diag.unknown_keys, @@ -1210,7 +1225,7 @@ SELECT diag.presence_under_zero, diag.base_presence_sum FROM diag -`, vmKeyExpr, overlapExpr, selected.TableName, templateExclusionFilter()) +`, vmKeyExpr, overlapExpr, selected.TableName, templateExclusionFilter(), aggOverlapExpr) query = dbConn.Rebind(query) var row struct { VmCount int64 `db:"vm_count"` @@ -1237,6 +1252,11 @@ FROM diag hourEndUnix, hourEndUnix, hourStartUnix, hourStartUnix, durationSeconds, + hourEndUnix, hourEndUnix, + hourStartUnix, hourStartUnix, + hourEndUnix, hourEndUnix, + hourStartUnix, hourStartUnix, + durationSeconds, } if err := dbConn.GetContext(ctx, &row, query, args...); err != nil { return nil, err diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index cc8795b..4711c08 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -158,6 +158,9 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } + if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { + c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window", "error", err, "table", summaryTable) + } analyzeStart := time.Now() c.Logger.Debug("Analyzing daily summary table", "table", summaryTable) db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) @@ -402,6 +405,9 @@ LIMIT 1 } else { c.Logger.Debug("refined creation/deletion times", "table", summaryTable) } + if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { + c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window (Go path)", "error", err, "table", summaryTable) + } analyzeStart := time.Now() c.Logger.Debug("Analyzing daily summary table", "table", summaryTable) diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index bf262c2..302bee7 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -171,6 +171,9 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time } else { c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) } + if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil { + c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window", "error", err, "table", monthlyTable) + } rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) @@ -290,6 +293,9 @@ func (c *CronTask) aggregateMonthlySummaryGoHourly(ctx context.Context, monthSta if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil { return err } + if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { + c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go hourly)", "error", err, "table", summaryTable) + } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) @@ -363,6 +369,9 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) } + if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { + c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go)", "error", err, "table", summaryTable) + } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)