From 7b7afbf1d5fb1dc7264d2eca1f95bf5d2fbb9b52 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Fri, 16 Jan 2026 16:28:19 +1100 Subject: [PATCH] start work on dev branch [CI SKIP] --- internal/tasks/dailyAggregate.go | 392 +++++++++++++++++++++++++++++++ 1 file changed, 392 insertions(+) diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index fb1f55f..b267053 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -2,8 +2,13 @@ package tasks import ( "context" + "database/sql" "fmt" "log/slog" + "os" + "runtime" + "strings" + "sync" "time" "vctp/db" "vctp/internal/metrics" @@ -56,6 +61,17 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti } } + // If enabled, use the Go fan-out/reduce path to parallelize aggregation. + if os.Getenv("DAILY_AGG_GO") == "1" { + if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil { + c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) + } else { + metrics.RecordDailyAggregation(time.Since(jobStart), nil) + c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable) + return nil + } + } + hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err @@ -164,3 +180,379 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti func dailySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) } + +// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel, +// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path +// as closely as possible while improving CPU utilization on multi-core hosts. +func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error { + jobStart := time.Now() + dbConn := c.Database.DB() + + hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) + if err != nil { + return err + } + hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) + hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) + if len(hourlySnapshots) == 0 { + return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) + } + + hourlyTables := make([]string, 0, len(hourlySnapshots)) + for _, snapshot := range hourlySnapshots { + hourlyTables = append(hourlyTables, snapshot.TableName) + if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { + c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) + } + } + unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) + if err != nil { + return err + } + + // Clear existing summary if forcing. + if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { + return err + } else if rowsExist && !force { + c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable) + return nil + } else if rowsExist && force { + if err := clearTable(ctx, dbConn, summaryTable); err != nil { + return err + } + } + + totalSamples := len(hourlyTables) + aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) + if err != nil { + return err + } + if len(aggMap) == 0 { + return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) + } + + // Insert aggregated rows. + if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil { + return err + } + + // Refine lifecycle with existing SQL helper to pick up first-after deletions. + if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { + c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) + } + + db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) + rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) + if err != nil { + c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) + } + if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { + c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) + } + if err := c.generateReport(ctx, summaryTable); err != nil { + c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) + return err + } + + c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) + return nil +} + +type dailyAggKey struct { + Vcenter string + VmId string + VmUuid string + Name string +} + +type dailyAggVal struct { + key dailyAggKey + resourcePool string + datacenter string + cluster string + folder string + isTemplate string + poweredOn string + srmPlaceholder string + creation int64 + firstSeen int64 + lastSeen int64 + sumVcpu int64 + sumRam int64 + sumDisk float64 + samples int64 + tinHits int64 + bronzeHits int64 + silverHits int64 + goldHits int64 +} + +func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) { + agg := make(map[dailyAggKey]*dailyAggVal, 1024) + mu := sync.Mutex{} + workers := runtime.NumCPU() + if workers < 2 { + workers = 2 + } + if workers > len(snapshots) { + workers = len(snapshots) + } + + jobs := make(chan report.SnapshotRecord, len(snapshots)) + wg := sync.WaitGroup{} + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for snap := range jobs { + rows, err := c.scanHourlyTable(ctx, snap) + if err != nil { + c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err) + continue + } + mu.Lock() + for k, v := range rows { + if existing, ok := agg[k]; ok { + mergeDailyAgg(existing, v) + } else { + agg[k] = v + } + } + mu.Unlock() + } + }() + } + for _, snap := range snapshots { + jobs <- snap + } + close(jobs) + wg.Wait() + return agg, nil +} + +func mergeDailyAgg(dst, src *dailyAggVal) { + if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { + dst.creation = src.creation + } + if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) { + dst.firstSeen = src.firstSeen + } + if src.lastSeen > dst.lastSeen { + dst.lastSeen = src.lastSeen + dst.resourcePool = src.resourcePool + dst.datacenter = src.datacenter + dst.cluster = src.cluster + dst.folder = src.folder + dst.isTemplate = src.isTemplate + dst.poweredOn = src.poweredOn + dst.srmPlaceholder = src.srmPlaceholder + } + dst.sumVcpu += src.sumVcpu + dst.sumRam += src.sumRam + dst.sumDisk += src.sumDisk + dst.samples += src.samples + dst.tinHits += src.tinHits + dst.bronzeHits += src.bronzeHits + dst.silverHits += src.silverHits + dst.goldHits += src.goldHits +} + +func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { + dbConn := c.Database.DB() + query := fmt.Sprintf(` +SELECT + "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", + COALESCE("ProvisionedDisk",0) AS disk, + COALESCE("VcpuCount",0) AS vcpu, + COALESCE("RamGB",0) AS ram, + COALESCE("CreationTime",0) AS creation, + COALESCE("DeletionTime",0) AS deletion, + COALESCE("IsTemplate",'FALSE') AS is_template, + COALESCE("PoweredOn",'FALSE') AS powered_on, + COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder, + COALESCE("SnapshotTime",0) AS snapshot_time +FROM %s +`, snap.TableName) + rows, err := dbConn.QueryxContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make(map[dailyAggKey]*dailyAggVal, 256) + for rows.Next() { + var ( + name, vcenter, vmId, vmUuid, resourcePool string + dc, cluster, folder sql.NullString + disk sql.NullFloat64 + vcpu, ram sql.NullInt64 + creation, deletion, snapshotTime sql.NullInt64 + isTemplate, poweredOn, srmPlaceholder sql.NullString + ) + if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { + continue + } + if vcenter == "" { + continue + } + // Skip templates. + if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { + continue + } + key := dailyAggKey{ + Vcenter: vcenter, + VmId: strings.TrimSpace(vmId), + VmUuid: strings.TrimSpace(vmUuid), + Name: strings.TrimSpace(name), + } + if key.VmId == "" && key.VmUuid == "" && key.Name == "" { + continue + } + if key.VmId == "" { + key.VmId = key.VmUuid + } + pool := strings.ToLower(strings.TrimSpace(resourcePool)) + hitTin := btoi(pool == "tin") + hitBronze := btoi(pool == "bronze") + hitSilver := btoi(pool == "silver") + hitGold := btoi(pool == "gold") + row := &dailyAggVal{ + key: key, + resourcePool: resourcePool, + datacenter: dc.String, + cluster: cluster.String, + folder: folder.String, + isTemplate: isTemplate.String, + poweredOn: poweredOn.String, + srmPlaceholder: srmPlaceholder.String, + creation: int64OrZero(creation), + firstSeen: int64OrZero(snapshotTime), + lastSeen: int64OrZero(snapshotTime), + sumVcpu: vcpu.Int64, + sumRam: ram.Int64, + sumDisk: disk.Float64, + samples: 1, + tinHits: hitTin, + bronzeHits: hitBronze, + silverHits: hitSilver, + goldHits: hitGold, + } + out[key] = row + } + return out, nil +} + +func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { + dbConn := c.Database.DB() + tx, err := dbConn.Beginx() + if err != nil { + return err + } + defer tx.Rollback() + + driver := strings.ToLower(dbConn.DriverName()) + placeholders := makePlaceholders(driver, 29) + insert := fmt.Sprintf(` +INSERT INTO %s ( + "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", + "ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder", + "CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk", + "AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold" +) VALUES (%s) +`, table, placeholders) + + for _, v := range agg { + if v.samples == 0 { + continue + } + avgVcpu := float64(v.sumVcpu) / float64(v.samples) + avgRam := float64(v.sumRam) / float64(v.samples) + avgDisk := v.sumDisk / float64(v.samples) + total := float64(totalSamples) + avgPresent := 0.0 + tinPct := 0.0 + bronzePct := 0.0 + silverPct := 0.0 + goldPct := 0.0 + if total > 0 { + avgPresent = float64(v.samples) / total + tinPct = float64(v.tinHits) * 100 / total + bronzePct = float64(v.bronzeHits) * 100 / total + silverPct = float64(v.silverHits) * 100 / total + goldPct = float64(v.goldHits) * 100 / total + } + args := []interface{}{ + v.key.Name, + v.key.Vcenter, + nullIfEmpty(v.key.VmId), + nullIfEmpty(v.key.VmUuid), + v.resourcePool, + nullIfEmpty(v.datacenter), + nullIfEmpty(v.cluster), + nullIfEmpty(v.folder), + v.sumDisk, + v.sumVcpu, + v.sumRam, + v.isTemplate, + v.poweredOn, + v.srmPlaceholder, + v.creation, + int64(0), // deletion time refined later + v.samples, + avgVcpu, + avgRam, + avgDisk, + avgPresent, + tinPct, bronzePct, silverPct, goldPct, + float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits), + } + if driver != "sqlite" { + // Postgres expects primitive types, nulls are handled by pq via nil. + for i, a := range args { + if s, ok := a.(string); ok && s == "" { + args[i] = nil + } + } + } + if _, err := tx.ExecContext(ctx, insert, args...); err != nil { + return err + } + } + return tx.Commit() +} + +func int64OrZero(v sql.NullInt64) int64 { + if v.Valid { + return v.Int64 + } + return 0 +} + +func nullIfEmpty(s string) interface{} { + if strings.TrimSpace(s) == "" { + return nil + } + return s +} + +func makePlaceholders(driver string, n int) string { + if driver == "sqlite" { + parts := make([]string, n) + for i := 0; i < n; i++ { + parts[i] = "?" + } + return strings.Join(parts, ",") + } + parts := make([]string, n) + for i := 0; i < n; i++ { + parts[i] = fmt.Sprintf("$%d", i+1) + } + return strings.Join(parts, ",") +} + +func btoi(b bool) int64 { + if b { + return 1 + } + return 0 +}