diff --git a/db/helpers.go b/db/helpers.go index 55c9408..5994ad8 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -1293,7 +1293,9 @@ func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error WITH snapshots AS ( %s ), totals AS ( - SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots + SELECT "Vcenter", COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot + FROM snapshots + GROUP BY "Vcenter" ), agg AS ( SELECT s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId", @@ -1449,13 +1451,13 @@ SELECT THEN 100.0 * agg.gold_hits / agg.samples_present ELSE NULL END AS "Gold" FROM agg -CROSS JOIN totals +JOIN totals ON totals."Vcenter" = agg."Vcenter" GROUP BY agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId", agg."Datacenter", agg."Cluster", agg."Folder", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid", agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present, - totals.total_samples; + totals.total_samples, totals.max_snapshot; `, unionQuery, tableName) return insert, nil } diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index 859ce52..e9c4e50 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -361,14 +361,16 @@ LIMIT 1 } c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions) + totalSamplesByVcenter := sampleCountsByVcenter(aggMap) + // Insert aggregated rows. - if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil { + if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil { return err } c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples) // Persist rollup cache for monthly aggregation. - if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil { + if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples, totalSamplesByVcenter); err != nil { c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02")) } @@ -736,7 +738,7 @@ WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?` return agg, snapTimes, rows.Err() } -func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { +func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() tx, err := dbConn.Beginx() if err != nil { @@ -759,7 +761,11 @@ INSERT INTO %s ( if v.samples == 0 { continue } - total := float64(totalSamples) + vcTotal := totalSamplesByVcenter[v.key.Vcenter] + if vcTotal <= 0 { + vcTotal = totalSamples + } + total := float64(vcTotal) avgVcpu := 0.0 avgRam := 0.0 avgDisk := 0.0 @@ -858,12 +864,16 @@ func btoi(b bool) int64 { } // persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation. -func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { +func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() for _, v := range agg { if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" { continue } + vcTotal := totalSamplesByVcenter[v.key.Vcenter] + if vcTotal <= 0 { + vcTotal = totalSamples + } row := db.VmDailyRollupRow{ Vcenter: v.key.Vcenter, VmId: v.key.VmId, @@ -872,7 +882,7 @@ func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg ma CreationTime: v.creation, DeletionTime: v.deletion, SamplesPresent: v.samples, - TotalSamples: int64(totalSamples), + TotalSamples: int64(vcTotal), SumVcpu: float64(v.sumVcpu), SumRam: float64(v.sumRam), SumDisk: v.sumDisk, @@ -897,3 +907,25 @@ func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg ma } return nil } + +func sampleCountsByVcenter(agg map[dailyAggKey]*dailyAggVal) map[string]int { + vcenterTimes := make(map[string]map[int64]struct{}, 8) + for _, v := range agg { + if v.key.Vcenter == "" { + continue + } + set := vcenterTimes[v.key.Vcenter] + if set == nil { + set = make(map[int64]struct{}, len(v.seen)) + vcenterTimes[v.key.Vcenter] = set + } + for t := range v.seen { + set[t] = struct{}{} + } + } + counts := make(map[string]int, len(vcenterTimes)) + for vc, set := range vcenterTimes { + counts[vc] = len(set) + } + return counts +}