Files
vctp2/internal/tasks/monthlyAggregate.go
Nathan Coad 7ea02be91a
All checks were successful
continuous-integration/drone/push Build is passing
refactor code and improve daily cache handling of deleted VMs
2026-01-20 16:46:07 +11:00

567 lines
20 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"runtime"
"strings"
"sync"
"time"
"vctp/db"
"vctp/internal/metrics"
"vctp/internal/report"
)
// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots.
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute)
return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error {
startedAt := time.Now()
defer func() {
logger.Info("Monthly summary job finished", "duration", time.Since(startedAt))
}()
now := time.Now()
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
return c.aggregateMonthlySummary(jobCtx, targetMonth, false)
})
}
func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error {
return c.aggregateMonthlySummary(ctx, month, force)
}
func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error {
jobStart := time.Now()
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location())
monthEnd := monthStart.AddDate(0, 1, 0)
dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd)
if err != nil {
return err
}
dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd)
dbConn := c.Database.DB()
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots)
if len(dailySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01"))
}
monthlyTable, err := monthlySummaryTableName(targetMonth)
if err != nil {
return err
}
if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
}
// Optional Go-based aggregation path.
if os.Getenv("MONTHLY_AGG_GO") == "1" {
c.Logger.Debug("Using go implementation of monthly aggregation")
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots); err != nil {
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
return nil
}
}
dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName)
}
unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01"))
} else {
c.Logger.Info("Monthly snapshot totals",
"month", targetMonth.Format("2006-01"),
"vm_count", monthlyTotals.VmCount,
"vcpu_total", monthlyTotals.VcpuTotal,
"ram_total_gb", monthlyTotals.RamTotal,
"disk_total_gb", monthlyTotals.DiskTotal,
)
}
insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
return err
}
// Backfill missing creation times to the start of the month for rows lacking creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
monthStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for monthly summary", "error", err, "table", monthlyTable)
}
rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable)
if err != nil {
c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil {
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable)
if err := c.generateReport(ctx, monthlyTable); err != nil {
c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable)
metrics.RecordMonthlyAggregation(time.Since(jobStart), err)
return err
}
c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable)
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
return nil
}
func monthlySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
}
// aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go,
// reading daily summaries in parallel and reducing them to a single monthly summary table.
func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord) error {
jobStart := time.Now()
dbConn := c.Database.DB()
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
// Build union query for lifecycle refinement after inserts.
dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName)
}
unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
aggMap, err := c.scanDailyTablesParallel(ctx, dailySnapshots)
if err != nil {
return err
}
if len(aggMap) == 0 {
cacheAgg, cacheErr := c.scanDailyRollup(ctx, monthStart, monthEnd)
if cacheErr == nil && len(cacheAgg) > 0 {
aggMap = cacheAgg
} else if cacheErr != nil {
c.Logger.Warn("failed to read daily rollup cache; using table scan", "error", cacheErr)
}
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01"))
}
if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil {
return err
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
}
// Backfill missing creation times to the start of the month for rows lacking creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
monthStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for monthly summary (Go)", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil {
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
agg := make(map[monthlyAggKey]*monthlyAggVal, 1024)
mu := sync.Mutex{}
workers := runtime.NumCPU()
if workers < 2 {
workers = 2
}
if workers > len(snapshots) {
workers = len(snapshots)
}
jobs := make(chan report.SnapshotRecord, len(snapshots))
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for snap := range jobs {
rows, err := c.scanDailyTable(ctx, snap)
if err != nil {
c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err)
continue
}
mu.Lock()
for k, v := range rows {
if existing, ok := agg[k]; ok {
mergeMonthlyAgg(existing, v)
} else {
agg[k] = v
}
}
mu.Unlock()
}
}()
}
for _, snap := range snapshots {
jobs <- snap
}
close(jobs)
wg.Wait()
return agg, nil
}
func mergeMonthlyAgg(dst, src *monthlyAggVal) {
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
dst.creation = src.creation
}
if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) {
dst.deletion = src.deletion
}
if src.lastSnapshot.After(dst.lastSnapshot) {
dst.lastSnapshot = src.lastSnapshot
if src.inventoryId != 0 {
dst.inventoryId = src.inventoryId
}
dst.resourcePool = src.resourcePool
dst.datacenter = src.datacenter
dst.cluster = src.cluster
dst.folder = src.folder
dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder
dst.provisioned = src.provisioned
dst.vcpuCount = src.vcpuCount
dst.ramGB = src.ramGB
dst.eventKey = src.eventKey
dst.cloudId = src.cloudId
}
dst.samplesPresent += src.samplesPresent
dst.totalSamples += src.totalSamples
dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam
dst.sumDisk += src.sumDisk
dst.tinWeighted += src.tinWeighted
dst.bronzeWeighted += src.bronzeWeighted
dst.silverWeighted += src.silverWeighted
dst.goldWeighted += src.goldWeighted
}
func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
dbConn := c.Database.DB()
query := fmt.Sprintf(`
SELECT
"InventoryId",
"Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("SamplesPresent",0) AS samples_present,
"AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent",
"PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct",
"Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder"
FROM %s
`, snap.TableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[monthlyAggKey]*monthlyAggVal, 256)
for rows.Next() {
var (
inventoryId sql.NullInt64
name, vcenter, vmId, vmUuid string
eventKey, cloudId sql.NullString
resourcePool, datacenter, cluster, folder sql.NullString
isTemplate, poweredOn, srmPlaceholder sql.NullString
disk, avgVcpu, avgRam, avgDisk sql.NullFloat64
avgIsPresent sql.NullFloat64
poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64
tinPct, bronzePct, silverPct, goldPct sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion sql.NullInt64
samplesPresent sql.NullInt64
)
if err := rows.Scan(
&inventoryId,
&name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder,
&disk, &vcpu, &ram, &creation, &deletion, &samplesPresent,
&avgVcpu, &avgRam, &avgDisk, &avgIsPresent,
&poolTin, &poolBronze, &poolSilver, &poolGold,
&tinPct, &bronzePct, &silverPct, &goldPct,
&isTemplate, &poweredOn, &srmPlaceholder,
); err != nil {
c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err)
continue
}
templateVal := strings.TrimSpace(isTemplate.String)
if strings.EqualFold(templateVal, "true") || templateVal == "1" {
continue
}
key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}
agg := &monthlyAggVal{
key: key,
inventoryId: inventoryId.Int64,
eventKey: eventKey.String,
cloudId: cloudId.String,
resourcePool: resourcePool.String,
datacenter: datacenter.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
provisioned: disk.Float64,
vcpuCount: vcpu.Int64,
ramGB: ram.Int64,
creation: creation.Int64,
deletion: deletion.Int64,
lastSnapshot: snap.SnapshotTime,
samplesPresent: samplesPresent.Int64,
}
totalSamplesDay := float64(samplesPresent.Int64)
if avgIsPresent.Valid && avgIsPresent.Float64 > 0 {
totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64
}
agg.totalSamples = totalSamplesDay
if avgVcpu.Valid {
agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay
}
if avgRam.Valid {
agg.sumRam = avgRam.Float64 * totalSamplesDay
}
if avgDisk.Valid {
agg.sumDisk = avgDisk.Float64 * totalSamplesDay
}
if poolTin.Valid {
agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay
}
if poolBronze.Valid {
agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay
}
if poolSilver.Valid {
agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay
}
if poolGold.Valid {
agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay
}
result[key] = agg
}
return result, rows.Err()
}
// scanDailyRollup aggregates monthly data from vm_daily_rollup cache.
func (c *CronTask) scanDailyRollup(ctx context.Context, start, end time.Time) (map[monthlyAggKey]*monthlyAggVal, error) {
dbConn := c.Database.DB()
if !db.TableExists(ctx, dbConn, "vm_daily_rollup") {
return map[monthlyAggKey]*monthlyAggVal{}, nil
}
query := `
SELECT
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime",
"SamplesPresent","TotalSamples","SumVcpu","SumRam","SumDisk",
"TinHits","BronzeHits","SilverHits","GoldHits",
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
FROM vm_daily_rollup
WHERE "Date" >= ? AND "Date" < ?
`
bind := dbConn.Rebind(query)
rows, err := dbConn.QueryxContext(ctx, bind, start.Unix(), end.Unix())
if err != nil {
return nil, err
}
defer rows.Close()
agg := make(map[monthlyAggKey]*monthlyAggVal, 512)
for rows.Next() {
var (
date sql.NullInt64
vcenter, vmId, vmUuid, name string
creation, deletion sql.NullInt64
samplesPresent, totalSamples sql.NullInt64
sumVcpu, sumRam, sumDisk sql.NullFloat64
tinHits, bronzeHits, silverHits, goldHits sql.NullInt64
lastPool, lastDc, lastCluster, lastFolder sql.NullString
lastDisk, lastVcpu, lastRam sql.NullFloat64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(
&date, &vcenter, &vmId, &vmUuid, &name, &creation, &deletion,
&samplesPresent, &totalSamples, &sumVcpu, &sumRam, &sumDisk,
&tinHits, &bronzeHits, &silverHits, &goldHits,
&lastPool, &lastDc, &lastCluster, &lastFolder,
&lastDisk, &lastVcpu, &lastRam, &isTemplate, &poweredOn, &srmPlaceholder,
); err != nil {
continue
}
templateVal := strings.TrimSpace(isTemplate.String)
if strings.EqualFold(templateVal, "true") || templateVal == "1" {
continue
}
key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}
val := &monthlyAggVal{
key: key,
resourcePool: lastPool.String,
datacenter: lastDc.String,
cluster: lastCluster.String,
folder: lastFolder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
provisioned: lastDisk.Float64,
vcpuCount: int64(lastVcpu.Float64),
ramGB: int64(lastRam.Float64),
creation: creation.Int64,
deletion: deletion.Int64,
lastSnapshot: time.Unix(date.Int64, 0),
samplesPresent: samplesPresent.Int64,
totalSamples: float64(totalSamples.Int64),
sumVcpu: sumVcpu.Float64,
sumRam: sumRam.Float64,
sumDisk: sumDisk.Float64,
tinWeighted: float64(tinHits.Int64),
bronzeWeighted: float64(bronzeHits.Int64),
silverWeighted: float64(silverHits.Int64),
goldWeighted: float64(goldHits.Int64),
}
if existing, ok := agg[key]; ok {
mergeMonthlyAgg(existing, val)
} else {
agg[key] = val
}
}
return agg, rows.Err()
}
func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error {
dbConn := c.Database.DB()
columns := []string{
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold",
}
placeholders := make([]string, len(columns))
for i := range columns {
placeholders[i] = "?"
}
stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(columns, ","), strings.Join(placeholders, ","))
stmtText = dbConn.Rebind(stmtText)
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PreparexContext(ctx, stmtText)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, v := range aggMap {
inventoryVal := sql.NullInt64{}
if v.inventoryId != 0 {
inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true}
}
avgVcpu := sql.NullFloat64{}
avgRam := sql.NullFloat64{}
avgDisk := sql.NullFloat64{}
avgIsPresent := sql.NullFloat64{}
tinPct := sql.NullFloat64{}
bronzePct := sql.NullFloat64{}
silverPct := sql.NullFloat64{}
goldPct := sql.NullFloat64{}
if v.totalSamples > 0 {
avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true}
avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true}
avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true}
avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true}
tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true}
bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true}
silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true}
goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true}
}
if _, err := stmt.ExecContext(ctx,
inventoryVal,
v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion,
v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB,
v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent,
avgVcpu, avgRam, avgDisk, avgIsPresent,
tinPct, bronzePct, silverPct, goldPct,
tinPct, bronzePct, silverPct, goldPct,
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}