progress on go based aggregation
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-16 17:37:55 +11:00
parent 6af49471b2
commit 1cd1046433
5 changed files with 758 additions and 18 deletions

View File

@@ -394,6 +394,115 @@ func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) {
} }
} }
// EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations.
func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vm_hourly_stats (
"SnapshotTime" BIGINT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"VmUuid" TEXT,
"Name" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
PRIMARY KEY ("Vcenter","VmId","SnapshotTime")
);`
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`)
return nil
}
// EnsureVmLifecycleCache creates an upsert cache for first/last seen VM info.
func EnsureVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vm_lifecycle_cache (
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"VmUuid" TEXT,
"Name" TEXT,
"Cluster" TEXT,
"FirstSeen" BIGINT,
"LastSeen" BIGINT,
"DeletedAt" BIGINT,
PRIMARY KEY ("Vcenter","VmId","VmUuid")
);`
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`)
return nil
}
// UpsertVmLifecycleCache updates first/last seen info for a VM.
func UpsertVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID, vmUUID, name, cluster string, seen time.Time) error {
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
query := `
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen")
VALUES ($1,$2,$3,$4,$5,$6,$6)
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
"Name"=EXCLUDED."Name",
"Cluster"=EXCLUDED."Cluster",
"LastSeen"=EXCLUDED."LastSeen",
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"),
"DeletedAt"=NULL
`
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix()}
if driver == "sqlite" {
query = `
INSERT OR REPLACE INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen")
VALUES (?,?,?,?,?,?,?)
`
}
_, err := dbConn.ExecContext(ctx, query, args...)
return err
}
// MarkVmDeleted updates lifecycle cache with a deletion timestamp.
func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID string, deletedAt int64) error {
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
query := `
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","DeletedAt","FirstSeen","LastSeen")
VALUES ($1,$2,$3,$4,$4,$4)
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
"DeletedAt"=CASE
WHEN vm_lifecycle_cache."DeletedAt" IS NULL OR vm_lifecycle_cache."DeletedAt"=0 OR EXCLUDED."DeletedAt"<vm_lifecycle_cache."DeletedAt"
THEN EXCLUDED."DeletedAt"
ELSE vm_lifecycle_cache."DeletedAt"
END,
"LastSeen"=COALESCE(vm_lifecycle_cache."LastSeen", EXCLUDED."LastSeen"),
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen")
`
args := []interface{}{vcenter, vmID, vmUUID, deletedAt}
if driver == "sqlite" {
query = `
INSERT OR REPLACE INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","DeletedAt","FirstSeen","LastSeen")
VALUES (?,?,?,?,?,?)
`
args = []interface{}{vcenter, vmID, vmUUID, deletedAt, deletedAt, deletedAt}
}
_, err := dbConn.ExecContext(ctx, query, args...)
return err
}
// EnsureVmIdentityTables creates the identity and rename audit tables. // EnsureVmIdentityTables creates the identity and rename audit tables.
func EnsureVmIdentityTables(ctx context.Context, dbConn *sqlx.DB) error { func EnsureVmIdentityTables(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName()) driver := strings.ToLower(dbConn.DriverName())
@@ -1318,8 +1427,13 @@ SELECT
COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime", COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime",
NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime", NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime",
MAX("ResourcePool") AS "ResourcePool", MAX("ResourcePool") AS "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "Datacenter", "Cluster", "Folder",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", MAX("ProvisionedDisk") AS "ProvisionedDisk",
MAX("VcpuCount") AS "VcpuCount",
MAX("RamGB") AS "RamGB",
"IsTemplate",
MAX("PoweredOn") AS "PoweredOn",
"SrmPlaceholder", "VmUuid",
SUM("SamplesPresent") AS "SamplesPresent", SUM("SamplesPresent") AS "SamplesPresent",
CASE WHEN totals.total_samples > 0 CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples
@@ -1361,8 +1475,8 @@ FROM enriched
CROSS JOIN totals CROSS JOIN totals
GROUP BY GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "Datacenter", "Cluster", "Folder",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid"; "IsTemplate", "SrmPlaceholder", "VmUuid";
`, unionQuery, tableName) `, unionQuery, tableName)
return insert, nil return insert, nil
} }

View File

@@ -7,6 +7,7 @@ import (
"log/slog" "log/slog"
"os" "os"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -63,6 +64,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
// If enabled, use the Go fan-out/reduce path to parallelize aggregation. // If enabled, use the Go fan-out/reduce path to parallelize aggregation.
if os.Getenv("DAILY_AGG_GO") == "1" { if os.Getenv("DAILY_AGG_GO") == "1" {
c.Logger.Debug("Using go implementation of aggregation")
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil { if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
} else { } else {
@@ -223,12 +225,54 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
} }
totalSamples := len(hourlyTables) totalSamples := len(hourlyTables)
aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) var (
if err != nil { aggMap map[dailyAggKey]*dailyAggVal
return err snapTimes []int64
)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
}
} }
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples)
if errScan != nil {
return errScan
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Build ordered list of snapshot times for deletion inference.
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
}
for _, v := range aggMap {
if v.creation == 0 {
v.creation = v.firstSeen
}
// Infer deletion as the first snapshot time after lastSeen where the VM is absent.
for _, t := range snapTimes {
if t <= v.lastSeen {
continue
}
if _, ok := v.seen[t]; !ok {
v.deletion = t
break
}
}
} }
// Insert aggregated rows. // Insert aggregated rows.
@@ -277,6 +321,9 @@ type dailyAggVal struct {
creation int64 creation int64
firstSeen int64 firstSeen int64
lastSeen int64 lastSeen int64
lastDisk float64
lastVcpu int64
lastRam int64
sumVcpu int64 sumVcpu int64
sumRam int64 sumRam int64
sumDisk float64 sumDisk float64
@@ -285,6 +332,8 @@ type dailyAggVal struct {
bronzeHits int64 bronzeHits int64
silverHits int64 silverHits int64
goldHits int64 goldHits int64
seen map[int64]struct{}
deletion int64
} }
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) { func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) {
@@ -346,6 +395,9 @@ func mergeDailyAgg(dst, src *dailyAggVal) {
dst.isTemplate = src.isTemplate dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder dst.srmPlaceholder = src.srmPlaceholder
dst.lastDisk = src.lastDisk
dst.lastVcpu = src.lastVcpu
dst.lastRam = src.lastRam
} }
dst.sumVcpu += src.sumVcpu dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam dst.sumRam += src.sumRam
@@ -355,6 +407,12 @@ func mergeDailyAgg(dst, src *dailyAggVal) {
dst.bronzeHits += src.bronzeHits dst.bronzeHits += src.bronzeHits
dst.silverHits += src.silverHits dst.silverHits += src.silverHits
dst.goldHits += src.goldHits dst.goldHits += src.goldHits
if dst.seen == nil {
dst.seen = make(map[int64]struct{}, len(src.seen))
}
for t := range src.seen {
dst.seen[t] = struct{}{}
}
} }
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
@@ -428,6 +486,9 @@ FROM %s
creation: int64OrZero(creation), creation: int64OrZero(creation),
firstSeen: int64OrZero(snapshotTime), firstSeen: int64OrZero(snapshotTime),
lastSeen: int64OrZero(snapshotTime), lastSeen: int64OrZero(snapshotTime),
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64, sumVcpu: vcpu.Int64,
sumRam: ram.Int64, sumRam: ram.Int64,
sumDisk: disk.Float64, sumDisk: disk.Float64,
@@ -436,12 +497,120 @@ FROM %s
bronzeHits: hitBronze, bronzeHits: hitBronze,
silverHits: hitSilver, silverHits: hitSilver,
goldHits: hitGold, goldHits: hitGold,
seen: map[int64]struct{}{int64OrZero(snapshotTime): {}},
} }
out[key] = row out[key] = row
} }
return out, nil return out, nil
} }
// scanHourlyCache aggregates directly from vm_hourly_stats when available.
func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) {
dbConn := c.Database.DB()
query := `
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'') AS is_template,
COALESCE("PoweredOn",'') AS powered_on,
COALESCE("SrmPlaceholder",'') AS srm_placeholder,
"SnapshotTime"
FROM vm_hourly_stats
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
q := dbConn.Rebind(query)
rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix())
if err != nil {
return nil, nil, err
}
defer rows.Close()
agg := make(map[dailyAggKey]*dailyAggVal, 512)
timeSet := make(map[int64]struct{}, 64)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
snapTs := int64OrZero(snapshotTime)
timeSet[snapTs] = struct{}{}
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: snapTs,
lastSeen: snapTs,
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
seen: map[int64]struct{}{snapTs: {}},
}
if deletion.Valid && deletion.Int64 > 0 {
row.deletion = deletion.Int64
}
if existing, ok := agg[key]; ok {
mergeDailyAgg(existing, row)
} else {
agg[key] = row
}
}
snapTimes := make([]int64, 0, len(timeSet))
for t := range timeSet {
snapTimes = append(snapTimes, t)
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
return agg, snapTimes, rows.Err()
}
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB() dbConn := c.Database.DB()
tx, err := dbConn.Beginx() tx, err := dbConn.Beginx()
@@ -476,10 +645,12 @@ INSERT INTO %s (
goldPct := 0.0 goldPct := 0.0
if total > 0 { if total > 0 {
avgPresent = float64(v.samples) / total avgPresent = float64(v.samples) / total
tinPct = float64(v.tinHits) * 100 / total }
bronzePct = float64(v.bronzeHits) * 100 / total if v.samples > 0 {
silverPct = float64(v.silverHits) * 100 / total tinPct = float64(v.tinHits) * 100 / float64(v.samples)
goldPct = float64(v.goldHits) * 100 / total bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples)
silverPct = float64(v.silverHits) * 100 / float64(v.samples)
goldPct = float64(v.goldHits) * 100 / float64(v.samples)
} }
args := []interface{}{ args := []interface{}{
v.key.Name, v.key.Name,
@@ -490,14 +661,14 @@ INSERT INTO %s (
nullIfEmpty(v.datacenter), nullIfEmpty(v.datacenter),
nullIfEmpty(v.cluster), nullIfEmpty(v.cluster),
nullIfEmpty(v.folder), nullIfEmpty(v.folder),
v.sumDisk, v.lastDisk,
v.sumVcpu, v.lastVcpu,
v.sumRam, v.lastRam,
v.isTemplate, v.isTemplate,
v.poweredOn, v.poweredOn,
v.srmPlaceholder, v.srmPlaceholder,
v.creation, v.creation,
int64(0), // deletion time refined later v.deletion,
v.samples, v.samples,
avgVcpu, avgVcpu,
avgRam, avgRam,

View File

@@ -707,10 +707,67 @@ func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) invent
} }
} }
func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []inventorySnapshotRow) error {
if len(rows) == 0 {
return nil
}
if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
insert := `
INSERT INTO vm_hourly_stats (
"SnapshotTime","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","ResourcePool",
"Datacenter","Cluster","Folder","ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder"
) VALUES (:SnapshotTime,:Vcenter,:VmId,:VmUuid,:Name,:CreationTime,:DeletionTime,:ResourcePool,
:Datacenter,:Cluster,:Folder,:ProvisionedDisk,:VcpuCount,:RamGB,:IsTemplate,:PoweredOn,:SrmPlaceholder)
`
if driver == "sqlite" {
insert = strings.Replace(insert, "INSERT INTO", "INSERT OR REPLACE INTO", 1)
} else {
insert += ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET
"VmUuid"=EXCLUDED."VmUuid",
"Name"=EXCLUDED."Name",
"CreationTime"=EXCLUDED."CreationTime",
"DeletionTime"=EXCLUDED."DeletionTime",
"ResourcePool"=EXCLUDED."ResourcePool",
"Datacenter"=EXCLUDED."Datacenter",
"Cluster"=EXCLUDED."Cluster",
"Folder"=EXCLUDED."Folder",
"ProvisionedDisk"=EXCLUDED."ProvisionedDisk",
"VcpuCount"=EXCLUDED."VcpuCount",
"RamGB"=EXCLUDED."RamGB",
"IsTemplate"=EXCLUDED."IsTemplate",
"PoweredOn"=EXCLUDED."PoweredOn",
"SrmPlaceholder"=EXCLUDED."SrmPlaceholder"`
}
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PrepareNamedContext(ctx, insert)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, r := range rows {
if _, err := stmt.ExecContext(ctx, r); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error { func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error {
if len(rows) == 0 { if len(rows) == 0 {
return nil return nil
} }
if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil {
return err
}
tx, err := dbConn.BeginTxx(ctx, nil) tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil { if err != nil {
return err return err
@@ -902,6 +959,13 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil {
c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err)
} }
clusterName := ""
if row.Cluster.Valid {
clusterName = row.Cluster.String
}
if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil {
c.Logger.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err)
}
presentSnapshots[vm.Reference().Value] = row presentSnapshots[vm.Reference().Value] = row
if row.VmUuid.Valid { if row.VmUuid.Valid {
presentByUuid[row.VmUuid.String] = struct{}{} presentByUuid[row.VmUuid.String] = struct{}{}
@@ -971,11 +1035,25 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime)
deletionsMarked = true deletionsMarked = true
} }
if err := db.MarkVmDeleted(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, startTime.Unix()); err != nil {
c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err)
}
clusterName := ""
if inv.Cluster.Valid {
clusterName = inv.Cluster.String
}
if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil {
c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err)
}
missingCount++ missingCount++
} }
c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch))
if err := insertHourlyCache(ctx, dbConn, batch); err != nil {
c.Logger.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err)
}
if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err)
if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil {

View File

@@ -2,8 +2,13 @@ package tasks
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"log/slog" "log/slog"
"os"
"runtime"
"strings"
"sync"
"time" "time"
"vctp/db" "vctp/db"
"vctp/internal/metrics" "vctp/internal/metrics"
@@ -69,6 +74,18 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
} }
} }
// Optional Go-based aggregation path.
if os.Getenv("MONTHLY_AGG_GO") == "1" {
c.Logger.Debug("Using go implementation of monthly aggregation")
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots, force); err != nil {
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
return nil
}
}
dailyTables := make([]string, 0, len(dailySnapshots)) dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots { for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName) dailyTables = append(dailyTables, snapshot.TableName)
@@ -131,3 +148,363 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
func monthlySummaryTableName(t time.Time) (string, error) { func monthlySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
} }
// aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go,
// reading daily summaries in parallel and reducing them to a single monthly summary table.
func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord, force bool) error {
jobStart := time.Now()
dbConn := c.Database.DB()
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
// Build union query for lifecycle refinement after inserts.
dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName)
}
unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
aggMap, err := c.scanDailyTablesParallel(ctx, dailySnapshots)
if err != nil {
return err
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01"))
}
if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil {
return err
}
// Refine creation/deletion using SQL helper.
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
}
// Backfill missing creation times to the start of the month for rows lacking creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
monthStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for monthly summary (Go)", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil {
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
type monthlyAggKey struct {
Vcenter string
VmId string
VmUuid string
Name string
}
type monthlyAggVal struct {
key monthlyAggKey
inventoryId int64
eventKey string
cloudId string
resourcePool string
datacenter string
cluster string
folder string
isTemplate string
poweredOn string
srmPlaceholder string
provisioned float64
vcpuCount int64
ramGB int64
creation int64
deletion int64
lastSnapshot time.Time
samplesPresent int64
totalSamples float64
sumVcpu float64
sumRam float64
sumDisk float64
tinWeighted float64
bronzeWeighted float64
silverWeighted float64
goldWeighted float64
}
func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
agg := make(map[monthlyAggKey]*monthlyAggVal, 1024)
mu := sync.Mutex{}
workers := runtime.NumCPU()
if workers < 2 {
workers = 2
}
if workers > len(snapshots) {
workers = len(snapshots)
}
jobs := make(chan report.SnapshotRecord, len(snapshots))
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for snap := range jobs {
rows, err := c.scanDailyTable(ctx, snap)
if err != nil {
c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err)
continue
}
mu.Lock()
for k, v := range rows {
if existing, ok := agg[k]; ok {
mergeMonthlyAgg(existing, v)
} else {
agg[k] = v
}
}
mu.Unlock()
}
}()
}
for _, snap := range snapshots {
jobs <- snap
}
close(jobs)
wg.Wait()
return agg, nil
}
func mergeMonthlyAgg(dst, src *monthlyAggVal) {
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
dst.creation = src.creation
}
if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) {
dst.deletion = src.deletion
}
if src.lastSnapshot.After(dst.lastSnapshot) {
dst.lastSnapshot = src.lastSnapshot
if src.inventoryId != 0 {
dst.inventoryId = src.inventoryId
}
dst.resourcePool = src.resourcePool
dst.datacenter = src.datacenter
dst.cluster = src.cluster
dst.folder = src.folder
dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder
dst.provisioned = src.provisioned
dst.vcpuCount = src.vcpuCount
dst.ramGB = src.ramGB
dst.eventKey = src.eventKey
dst.cloudId = src.cloudId
}
dst.samplesPresent += src.samplesPresent
dst.totalSamples += src.totalSamples
dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam
dst.sumDisk += src.sumDisk
dst.tinWeighted += src.tinWeighted
dst.bronzeWeighted += src.bronzeWeighted
dst.silverWeighted += src.silverWeighted
dst.goldWeighted += src.goldWeighted
}
func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
dbConn := c.Database.DB()
query := fmt.Sprintf(`
SELECT
"InventoryId",
"Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("SamplesPresent",0) AS samples_present,
"AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent",
"PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct",
"Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder"
FROM %s
`, snap.TableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[monthlyAggKey]*monthlyAggVal, 256)
for rows.Next() {
var (
inventoryId sql.NullInt64
name, vcenter, vmId, vmUuid, eventKey, cloudId string
resourcePool, datacenter, cluster, folder string
isTemplate, poweredOn, srmPlaceholder string
disk, avgVcpu, avgRam, avgDisk sql.NullFloat64
avgIsPresent sql.NullFloat64
poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64
tinPct, bronzePct, silverPct, goldPct sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion sql.NullInt64
samplesPresent sql.NullInt64
)
if err := rows.Scan(
&inventoryId,
&name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder,
&disk, &vcpu, &ram, &creation, &deletion, &samplesPresent,
&avgVcpu, &avgRam, &avgDisk, &avgIsPresent,
&poolTin, &poolBronze, &poolSilver, &poolGold,
&tinPct, &bronzePct, &silverPct, &goldPct,
&isTemplate, &poweredOn, &srmPlaceholder,
); err != nil {
c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err)
continue
}
if strings.EqualFold(strings.TrimSpace(isTemplate), "true") || strings.EqualFold(strings.TrimSpace(isTemplate), "1") {
continue
}
key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}
agg := &monthlyAggVal{
key: key,
inventoryId: inventoryId.Int64,
eventKey: eventKey,
cloudId: cloudId,
resourcePool: resourcePool,
datacenter: datacenter,
cluster: cluster,
folder: folder,
isTemplate: isTemplate,
poweredOn: poweredOn,
srmPlaceholder: srmPlaceholder,
provisioned: disk.Float64,
vcpuCount: vcpu.Int64,
ramGB: ram.Int64,
creation: creation.Int64,
deletion: deletion.Int64,
lastSnapshot: snap.SnapshotTime,
samplesPresent: samplesPresent.Int64,
}
totalSamplesDay := float64(samplesPresent.Int64)
if avgIsPresent.Valid && avgIsPresent.Float64 > 0 {
totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64
}
agg.totalSamples = totalSamplesDay
if avgVcpu.Valid {
agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay
}
if avgRam.Valid {
agg.sumRam = avgRam.Float64 * totalSamplesDay
}
if avgDisk.Valid {
agg.sumDisk = avgDisk.Float64 * totalSamplesDay
}
if poolTin.Valid {
agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay
}
if poolBronze.Valid {
agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay
}
if poolSilver.Valid {
agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay
}
if poolGold.Valid {
agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay
}
result[key] = agg
}
return result, rows.Err()
}
func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error {
dbConn := c.Database.DB()
columns := []string{
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold",
}
placeholders := make([]string, len(columns))
for i := range columns {
placeholders[i] = "?"
}
stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(columns, ","), strings.Join(placeholders, ","))
stmtText = dbConn.Rebind(stmtText)
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PreparexContext(ctx, stmtText)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, v := range aggMap {
inventoryVal := sql.NullInt64{}
if v.inventoryId != 0 {
inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true}
}
avgVcpu := sql.NullFloat64{}
avgRam := sql.NullFloat64{}
avgDisk := sql.NullFloat64{}
avgIsPresent := sql.NullFloat64{}
tinPct := sql.NullFloat64{}
bronzePct := sql.NullFloat64{}
silverPct := sql.NullFloat64{}
goldPct := sql.NullFloat64{}
if v.totalSamples > 0 {
avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true}
avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true}
avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true}
avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true}
tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true}
bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true}
silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true}
goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true}
}
if _, err := stmt.ExecContext(ctx,
inventoryVal,
v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion,
v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB,
v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent,
avgVcpu, avgRam, avgDisk, avgIsPresent,
tinPct, bronzePct, silverPct, goldPct,
tinPct, bronzePct, silverPct, goldPct,
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}

View File

@@ -1,7 +1,7 @@
name: "vctp" name: "vctp"
arch: "amd64" arch: "amd64"
platform: "linux" platform: "linux"
version: "v26.1.1" version: "v26.1.2"
version_schema: semver version_schema: semver
description: vCTP monitors VMware VM inventory and event data to build chargeback reports description: vCTP monitors VMware VM inventory and event data to build chargeback reports
maintainer: "@coadn" maintainer: "@coadn"