diff --git a/db/helpers.go b/db/helpers.go index f51c930..741ce2f 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -394,6 +394,115 @@ func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) { } } +// EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations. +func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error { + ddl := ` +CREATE TABLE IF NOT EXISTS vm_hourly_stats ( + "SnapshotTime" BIGINT NOT NULL, + "Vcenter" TEXT NOT NULL, + "VmId" TEXT, + "VmUuid" TEXT, + "Name" TEXT, + "CreationTime" BIGINT, + "DeletionTime" BIGINT, + "ResourcePool" TEXT, + "Datacenter" TEXT, + "Cluster" TEXT, + "Folder" TEXT, + "ProvisionedDisk" REAL, + "VcpuCount" BIGINT, + "RamGB" BIGINT, + "IsTemplate" TEXT, + "PoweredOn" TEXT, + "SrmPlaceholder" TEXT, + PRIMARY KEY ("Vcenter","VmId","SnapshotTime") +);` + if _, err := execLog(ctx, dbConn, ddl); err != nil { + return err + } + _, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`) + _, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`) + return nil +} + +// EnsureVmLifecycleCache creates an upsert cache for first/last seen VM info. +func EnsureVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB) error { + ddl := ` +CREATE TABLE IF NOT EXISTS vm_lifecycle_cache ( + "Vcenter" TEXT NOT NULL, + "VmId" TEXT, + "VmUuid" TEXT, + "Name" TEXT, + "Cluster" TEXT, + "FirstSeen" BIGINT, + "LastSeen" BIGINT, + "DeletedAt" BIGINT, + PRIMARY KEY ("Vcenter","VmId","VmUuid") +);` + if _, err := execLog(ctx, dbConn, ddl); err != nil { + return err + } + _, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`) + return nil +} + +// UpsertVmLifecycleCache updates first/last seen info for a VM. +func UpsertVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID, vmUUID, name, cluster string, seen time.Time) error { + if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil { + return err + } + driver := strings.ToLower(dbConn.DriverName()) + query := ` +INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen") +VALUES ($1,$2,$3,$4,$5,$6,$6) +ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET + "Name"=EXCLUDED."Name", + "Cluster"=EXCLUDED."Cluster", + "LastSeen"=EXCLUDED."LastSeen", + "FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"), + "DeletedAt"=NULL +` + args := []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix()} + if driver == "sqlite" { + query = ` +INSERT OR REPLACE INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen") +VALUES (?,?,?,?,?,?,?) +` + } + _, err := dbConn.ExecContext(ctx, query, args...) + return err +} + +// MarkVmDeleted updates lifecycle cache with a deletion timestamp. +func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID string, deletedAt int64) error { + if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil { + return err + } + driver := strings.ToLower(dbConn.DriverName()) + query := ` +INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","DeletedAt","FirstSeen","LastSeen") +VALUES ($1,$2,$3,$4,$4,$4) +ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET + "DeletedAt"=CASE + WHEN vm_lifecycle_cache."DeletedAt" IS NULL OR vm_lifecycle_cache."DeletedAt"=0 OR EXCLUDED."DeletedAt" 0 THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples @@ -1361,8 +1475,8 @@ FROM enriched CROSS JOIN totals GROUP BY "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", - "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", - "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid"; + "Datacenter", "Cluster", "Folder", + "IsTemplate", "SrmPlaceholder", "VmUuid"; `, unionQuery, tableName) return insert, nil } diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index b267053..010ace8 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "runtime" + "sort" "strings" "sync" "time" @@ -63,6 +64,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti // If enabled, use the Go fan-out/reduce path to parallelize aggregation. if os.Getenv("DAILY_AGG_GO") == "1" { + c.Logger.Debug("Using go implementation of aggregation") if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil { c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) } else { @@ -223,12 +225,54 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd } totalSamples := len(hourlyTables) - aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) - if err != nil { - return err + var ( + aggMap map[dailyAggKey]*dailyAggVal + snapTimes []int64 + ) + + if db.TableExists(ctx, dbConn, "vm_hourly_stats") { + cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd) + if cacheErr != nil { + c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) + } else if len(cacheAgg) > 0 { + aggMap = cacheAgg + snapTimes = cacheTimes + totalSamples = len(cacheTimes) + } } - if len(aggMap) == 0 { - return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) + + if aggMap == nil { + var errScan error + aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) + if errScan != nil { + return errScan + } + if len(aggMap) == 0 { + return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) + } + + // Build ordered list of snapshot times for deletion inference. + snapTimes = make([]int64, 0, len(hourlySnapshots)) + for _, snap := range hourlySnapshots { + snapTimes = append(snapTimes, snap.SnapshotTime.Unix()) + } + sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) + } + + for _, v := range aggMap { + if v.creation == 0 { + v.creation = v.firstSeen + } + // Infer deletion as the first snapshot time after lastSeen where the VM is absent. + for _, t := range snapTimes { + if t <= v.lastSeen { + continue + } + if _, ok := v.seen[t]; !ok { + v.deletion = t + break + } + } } // Insert aggregated rows. @@ -277,6 +321,9 @@ type dailyAggVal struct { creation int64 firstSeen int64 lastSeen int64 + lastDisk float64 + lastVcpu int64 + lastRam int64 sumVcpu int64 sumRam int64 sumDisk float64 @@ -285,6 +332,8 @@ type dailyAggVal struct { bronzeHits int64 silverHits int64 goldHits int64 + seen map[int64]struct{} + deletion int64 } func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) { @@ -346,6 +395,9 @@ func mergeDailyAgg(dst, src *dailyAggVal) { dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder + dst.lastDisk = src.lastDisk + dst.lastVcpu = src.lastVcpu + dst.lastRam = src.lastRam } dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam @@ -355,6 +407,12 @@ func mergeDailyAgg(dst, src *dailyAggVal) { dst.bronzeHits += src.bronzeHits dst.silverHits += src.silverHits dst.goldHits += src.goldHits + if dst.seen == nil { + dst.seen = make(map[int64]struct{}, len(src.seen)) + } + for t := range src.seen { + dst.seen[t] = struct{}{} + } } func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { @@ -428,6 +486,9 @@ FROM %s creation: int64OrZero(creation), firstSeen: int64OrZero(snapshotTime), lastSeen: int64OrZero(snapshotTime), + lastDisk: disk.Float64, + lastVcpu: vcpu.Int64, + lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, @@ -436,12 +497,120 @@ FROM %s bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, + seen: map[int64]struct{}{int64OrZero(snapshotTime): {}}, } out[key] = row } return out, nil } +// scanHourlyCache aggregates directly from vm_hourly_stats when available. +func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) { + dbConn := c.Database.DB() + query := ` +SELECT + "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", + COALESCE("ProvisionedDisk",0) AS disk, + COALESCE("VcpuCount",0) AS vcpu, + COALESCE("RamGB",0) AS ram, + COALESCE("CreationTime",0) AS creation, + COALESCE("DeletionTime",0) AS deletion, + COALESCE("IsTemplate",'') AS is_template, + COALESCE("PoweredOn",'') AS powered_on, + COALESCE("SrmPlaceholder",'') AS srm_placeholder, + "SnapshotTime" +FROM vm_hourly_stats +WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?` + q := dbConn.Rebind(query) + rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix()) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + agg := make(map[dailyAggKey]*dailyAggVal, 512) + timeSet := make(map[int64]struct{}, 64) + for rows.Next() { + var ( + name, vcenter, vmId, vmUuid, resourcePool string + dc, cluster, folder sql.NullString + disk sql.NullFloat64 + vcpu, ram sql.NullInt64 + creation, deletion, snapshotTime sql.NullInt64 + isTemplate, poweredOn, srmPlaceholder sql.NullString + ) + if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { + continue + } + if vcenter == "" { + continue + } + if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { + continue + } + key := dailyAggKey{ + Vcenter: vcenter, + VmId: strings.TrimSpace(vmId), + VmUuid: strings.TrimSpace(vmUuid), + Name: strings.TrimSpace(name), + } + if key.VmId == "" && key.VmUuid == "" && key.Name == "" { + continue + } + if key.VmId == "" { + key.VmId = key.VmUuid + } + pool := strings.ToLower(strings.TrimSpace(resourcePool)) + hitTin := btoi(pool == "tin") + hitBronze := btoi(pool == "bronze") + hitSilver := btoi(pool == "silver") + hitGold := btoi(pool == "gold") + + snapTs := int64OrZero(snapshotTime) + timeSet[snapTs] = struct{}{} + + row := &dailyAggVal{ + key: key, + resourcePool: resourcePool, + datacenter: dc.String, + cluster: cluster.String, + folder: folder.String, + isTemplate: isTemplate.String, + poweredOn: poweredOn.String, + srmPlaceholder: srmPlaceholder.String, + creation: int64OrZero(creation), + firstSeen: snapTs, + lastSeen: snapTs, + lastDisk: disk.Float64, + lastVcpu: vcpu.Int64, + lastRam: ram.Int64, + sumVcpu: vcpu.Int64, + sumRam: ram.Int64, + sumDisk: disk.Float64, + samples: 1, + tinHits: hitTin, + bronzeHits: hitBronze, + silverHits: hitSilver, + goldHits: hitGold, + seen: map[int64]struct{}{snapTs: {}}, + } + if deletion.Valid && deletion.Int64 > 0 { + row.deletion = deletion.Int64 + } + if existing, ok := agg[key]; ok { + mergeDailyAgg(existing, row) + } else { + agg[key] = row + } + } + snapTimes := make([]int64, 0, len(timeSet)) + for t := range timeSet { + snapTimes = append(snapTimes, t) + } + sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) + return agg, snapTimes, rows.Err() +} + func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { dbConn := c.Database.DB() tx, err := dbConn.Beginx() @@ -476,10 +645,12 @@ INSERT INTO %s ( goldPct := 0.0 if total > 0 { avgPresent = float64(v.samples) / total - tinPct = float64(v.tinHits) * 100 / total - bronzePct = float64(v.bronzeHits) * 100 / total - silverPct = float64(v.silverHits) * 100 / total - goldPct = float64(v.goldHits) * 100 / total + } + if v.samples > 0 { + tinPct = float64(v.tinHits) * 100 / float64(v.samples) + bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples) + silverPct = float64(v.silverHits) * 100 / float64(v.samples) + goldPct = float64(v.goldHits) * 100 / float64(v.samples) } args := []interface{}{ v.key.Name, @@ -490,14 +661,14 @@ INSERT INTO %s ( nullIfEmpty(v.datacenter), nullIfEmpty(v.cluster), nullIfEmpty(v.folder), - v.sumDisk, - v.sumVcpu, - v.sumRam, + v.lastDisk, + v.lastVcpu, + v.lastRam, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.creation, - int64(0), // deletion time refined later + v.deletion, v.samples, avgVcpu, avgRam, diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index f52b877..046ea8e 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -707,10 +707,67 @@ func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) invent } } +func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []inventorySnapshotRow) error { + if len(rows) == 0 { + return nil + } + if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { + return err + } + driver := strings.ToLower(dbConn.DriverName()) + insert := ` +INSERT INTO vm_hourly_stats ( + "SnapshotTime","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","ResourcePool", + "Datacenter","Cluster","Folder","ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder" +) VALUES (:SnapshotTime,:Vcenter,:VmId,:VmUuid,:Name,:CreationTime,:DeletionTime,:ResourcePool, + :Datacenter,:Cluster,:Folder,:ProvisionedDisk,:VcpuCount,:RamGB,:IsTemplate,:PoweredOn,:SrmPlaceholder) +` + if driver == "sqlite" { + insert = strings.Replace(insert, "INSERT INTO", "INSERT OR REPLACE INTO", 1) + } else { + insert += ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET + "VmUuid"=EXCLUDED."VmUuid", + "Name"=EXCLUDED."Name", + "CreationTime"=EXCLUDED."CreationTime", + "DeletionTime"=EXCLUDED."DeletionTime", + "ResourcePool"=EXCLUDED."ResourcePool", + "Datacenter"=EXCLUDED."Datacenter", + "Cluster"=EXCLUDED."Cluster", + "Folder"=EXCLUDED."Folder", + "ProvisionedDisk"=EXCLUDED."ProvisionedDisk", + "VcpuCount"=EXCLUDED."VcpuCount", + "RamGB"=EXCLUDED."RamGB", + "IsTemplate"=EXCLUDED."IsTemplate", + "PoweredOn"=EXCLUDED."PoweredOn", + "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` + } + tx, err := dbConn.BeginTxx(ctx, nil) + if err != nil { + return err + } + stmt, err := tx.PrepareNamedContext(ctx, insert) + if err != nil { + tx.Rollback() + return err + } + defer stmt.Close() + + for _, r := range rows { + if _, err := stmt.ExecContext(ctx, r); err != nil { + tx.Rollback() + return err + } + } + return tx.Commit() +} + func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error { if len(rows) == 0 { return nil } + if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { + return err + } tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err @@ -902,6 +959,13 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } + clusterName := "" + if row.Cluster.Valid { + clusterName = row.Cluster.String + } + if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { + c.Logger.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) + } presentSnapshots[vm.Reference().Value] = row if row.VmUuid.Valid { presentByUuid[row.VmUuid.String] = struct{}{} @@ -971,11 +1035,25 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } + if err := db.MarkVmDeleted(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, startTime.Unix()); err != nil { + c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) + } + clusterName := "" + if inv.Cluster.Valid { + clusterName = inv.Cluster.String + } + if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { + c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) + } missingCount++ } c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) + if err := insertHourlyCache(ctx, dbConn, batch); err != nil { + c.Logger.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) + } + if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index 32d9e11..c3554c1 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -2,8 +2,13 @@ package tasks import ( "context" + "database/sql" "fmt" "log/slog" + "os" + "runtime" + "strings" + "sync" "time" "vctp/db" "vctp/internal/metrics" @@ -69,6 +74,18 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time } } + // Optional Go-based aggregation path. + if os.Getenv("MONTHLY_AGG_GO") == "1" { + c.Logger.Debug("Using go implementation of monthly aggregation") + if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots, force); err != nil { + c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) + } else { + metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) + c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable) + return nil + } + } + dailyTables := make([]string, 0, len(dailySnapshots)) for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) @@ -131,3 +148,363 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time func monthlySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) } + +// aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go, +// reading daily summaries in parallel and reducing them to a single monthly summary table. +func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord, force bool) error { + jobStart := time.Now() + dbConn := c.Database.DB() + + if err := clearTable(ctx, dbConn, summaryTable); err != nil { + return err + } + + // Build union query for lifecycle refinement after inserts. + dailyTables := make([]string, 0, len(dailySnapshots)) + for _, snapshot := range dailySnapshots { + dailyTables = append(dailyTables, snapshot.TableName) + } + unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) + if err != nil { + return err + } + + aggMap, err := c.scanDailyTablesParallel(ctx, dailySnapshots) + if err != nil { + return err + } + if len(aggMap) == 0 { + return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01")) + } + + if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil { + return err + } + + // Refine creation/deletion using SQL helper. + if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { + c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) + } + + // Backfill missing creation times to the start of the month for rows lacking creation info. + if _, err := dbConn.ExecContext(ctx, + `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, + monthStart.Unix(), + ); err != nil { + c.Logger.Warn("failed to normalize creation times for monthly summary (Go)", "error", err, "table", summaryTable) + } + + db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) + rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) + if err != nil { + c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable) + } + if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil { + c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable) + } + if err := c.generateReport(ctx, summaryTable); err != nil { + c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable) + return err + } + + c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) + return nil +} + +type monthlyAggKey struct { + Vcenter string + VmId string + VmUuid string + Name string +} + +type monthlyAggVal struct { + key monthlyAggKey + inventoryId int64 + eventKey string + cloudId string + resourcePool string + datacenter string + cluster string + folder string + isTemplate string + poweredOn string + srmPlaceholder string + provisioned float64 + vcpuCount int64 + ramGB int64 + creation int64 + deletion int64 + lastSnapshot time.Time + + samplesPresent int64 + totalSamples float64 + sumVcpu float64 + sumRam float64 + sumDisk float64 + tinWeighted float64 + bronzeWeighted float64 + silverWeighted float64 + goldWeighted float64 +} + +func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { + agg := make(map[monthlyAggKey]*monthlyAggVal, 1024) + mu := sync.Mutex{} + workers := runtime.NumCPU() + if workers < 2 { + workers = 2 + } + if workers > len(snapshots) { + workers = len(snapshots) + } + + jobs := make(chan report.SnapshotRecord, len(snapshots)) + wg := sync.WaitGroup{} + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for snap := range jobs { + rows, err := c.scanDailyTable(ctx, snap) + if err != nil { + c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err) + continue + } + mu.Lock() + for k, v := range rows { + if existing, ok := agg[k]; ok { + mergeMonthlyAgg(existing, v) + } else { + agg[k] = v + } + } + mu.Unlock() + } + }() + } + for _, snap := range snapshots { + jobs <- snap + } + close(jobs) + wg.Wait() + return agg, nil +} + +func mergeMonthlyAgg(dst, src *monthlyAggVal) { + if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { + dst.creation = src.creation + } + if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) { + dst.deletion = src.deletion + } + if src.lastSnapshot.After(dst.lastSnapshot) { + dst.lastSnapshot = src.lastSnapshot + if src.inventoryId != 0 { + dst.inventoryId = src.inventoryId + } + dst.resourcePool = src.resourcePool + dst.datacenter = src.datacenter + dst.cluster = src.cluster + dst.folder = src.folder + dst.isTemplate = src.isTemplate + dst.poweredOn = src.poweredOn + dst.srmPlaceholder = src.srmPlaceholder + dst.provisioned = src.provisioned + dst.vcpuCount = src.vcpuCount + dst.ramGB = src.ramGB + dst.eventKey = src.eventKey + dst.cloudId = src.cloudId + } + + dst.samplesPresent += src.samplesPresent + dst.totalSamples += src.totalSamples + dst.sumVcpu += src.sumVcpu + dst.sumRam += src.sumRam + dst.sumDisk += src.sumDisk + dst.tinWeighted += src.tinWeighted + dst.bronzeWeighted += src.bronzeWeighted + dst.silverWeighted += src.silverWeighted + dst.goldWeighted += src.goldWeighted +} + +func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { + dbConn := c.Database.DB() + query := fmt.Sprintf(` +SELECT + "InventoryId", + "Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder", + COALESCE("ProvisionedDisk",0) AS disk, + COALESCE("VcpuCount",0) AS vcpu, + COALESCE("RamGB",0) AS ram, + COALESCE("CreationTime",0) AS creation, + COALESCE("DeletionTime",0) AS deletion, + COALESCE("SamplesPresent",0) AS samples_present, + "AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent", + "PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct", + "Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder" +FROM %s +`, snap.TableName) + + rows, err := dbConn.QueryxContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[monthlyAggKey]*monthlyAggVal, 256) + for rows.Next() { + var ( + inventoryId sql.NullInt64 + name, vcenter, vmId, vmUuid, eventKey, cloudId string + resourcePool, datacenter, cluster, folder string + isTemplate, poweredOn, srmPlaceholder string + disk, avgVcpu, avgRam, avgDisk sql.NullFloat64 + avgIsPresent sql.NullFloat64 + poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64 + tinPct, bronzePct, silverPct, goldPct sql.NullFloat64 + vcpu, ram sql.NullInt64 + creation, deletion sql.NullInt64 + samplesPresent sql.NullInt64 + ) + + if err := rows.Scan( + &inventoryId, + &name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder, + &disk, &vcpu, &ram, &creation, &deletion, &samplesPresent, + &avgVcpu, &avgRam, &avgDisk, &avgIsPresent, + &poolTin, &poolBronze, &poolSilver, &poolGold, + &tinPct, &bronzePct, &silverPct, &goldPct, + &isTemplate, &poweredOn, &srmPlaceholder, + ); err != nil { + c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err) + continue + } + + if strings.EqualFold(strings.TrimSpace(isTemplate), "true") || strings.EqualFold(strings.TrimSpace(isTemplate), "1") { + continue + } + + key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name} + agg := &monthlyAggVal{ + key: key, + inventoryId: inventoryId.Int64, + eventKey: eventKey, + cloudId: cloudId, + resourcePool: resourcePool, + datacenter: datacenter, + cluster: cluster, + folder: folder, + isTemplate: isTemplate, + poweredOn: poweredOn, + srmPlaceholder: srmPlaceholder, + provisioned: disk.Float64, + vcpuCount: vcpu.Int64, + ramGB: ram.Int64, + creation: creation.Int64, + deletion: deletion.Int64, + lastSnapshot: snap.SnapshotTime, + samplesPresent: samplesPresent.Int64, + } + + totalSamplesDay := float64(samplesPresent.Int64) + if avgIsPresent.Valid && avgIsPresent.Float64 > 0 { + totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64 + } + agg.totalSamples = totalSamplesDay + if avgVcpu.Valid { + agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay + } + if avgRam.Valid { + agg.sumRam = avgRam.Float64 * totalSamplesDay + } + if avgDisk.Valid { + agg.sumDisk = avgDisk.Float64 * totalSamplesDay + } + if poolTin.Valid { + agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay + } + if poolBronze.Valid { + agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay + } + if poolSilver.Valid { + agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay + } + if poolGold.Valid { + agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay + } + + result[key] = agg + } + return result, rows.Err() +} + +func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error { + dbConn := c.Database.DB() + columns := []string{ + "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", + "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", + "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent", + "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent", + "PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct", + "Tin", "Bronze", "Silver", "Gold", + } + placeholders := make([]string, len(columns)) + for i := range columns { + placeholders[i] = "?" + } + stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(columns, ","), strings.Join(placeholders, ",")) + stmtText = dbConn.Rebind(stmtText) + + tx, err := dbConn.BeginTxx(ctx, nil) + if err != nil { + return err + } + stmt, err := tx.PreparexContext(ctx, stmtText) + if err != nil { + tx.Rollback() + return err + } + defer stmt.Close() + + for _, v := range aggMap { + inventoryVal := sql.NullInt64{} + if v.inventoryId != 0 { + inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true} + } + avgVcpu := sql.NullFloat64{} + avgRam := sql.NullFloat64{} + avgDisk := sql.NullFloat64{} + avgIsPresent := sql.NullFloat64{} + tinPct := sql.NullFloat64{} + bronzePct := sql.NullFloat64{} + silverPct := sql.NullFloat64{} + goldPct := sql.NullFloat64{} + + if v.totalSamples > 0 { + avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true} + avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true} + avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true} + avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true} + tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true} + bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true} + silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true} + goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true} + } + + if _, err := stmt.ExecContext(ctx, + inventoryVal, + v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion, + v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB, + v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent, + avgVcpu, avgRam, avgDisk, avgIsPresent, + tinPct, bronzePct, silverPct, goldPct, + tinPct, bronzePct, silverPct, goldPct, + ); err != nil { + tx.Rollback() + return err + } + } + + return tx.Commit() +} diff --git a/vctp.yml b/vctp.yml index c9be10c..abe6a53 100644 --- a/vctp.yml +++ b/vctp.yml @@ -1,7 +1,7 @@ name: "vctp" arch: "amd64" platform: "linux" -version: "v26.1.1" +version: "v26.1.2" version_schema: semver description: vCTP monitors VMware VM inventory and event data to build chargeback reports maintainer: "@coadn"