Files
vctp2/internal/tasks/dailyAggregate.go

559 lines
18 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"runtime"
"strings"
"sync"
"time"
"vctp/db"
"vctp/internal/metrics"
"vctp/internal/report"
)
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error {
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
// Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier.
return c.aggregateDailySummary(jobCtx, targetTime, true)
})
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
jobStart := time.Now()
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
if os.Getenv("DAILY_AGG_GO") == "1" {
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
return nil
}
}
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
// Ensure indexes exist on historical hourly tables for faster aggregation.
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd)
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter())
if err == nil {
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
} else {
c.Logger.Warn("unable to build previous day union", "error", err)
}
}
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
// Backfill missing creation times to the start of the day for rows where vCenter had no creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
dayStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable)
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), err)
return err
}
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
return nil
}
func dailySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
// as closely as possible while improving CPU utilization on multi-core hosts.
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
jobStart := time.Now()
dbConn := c.Database.DB()
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
// Clear existing summary if forcing.
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
totalSamples := len(hourlyTables)
aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples)
if err != nil {
return err
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Insert aggregated rows.
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
return err
}
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
type dailyAggKey struct {
Vcenter string
VmId string
VmUuid string
Name string
}
type dailyAggVal struct {
key dailyAggKey
resourcePool string
datacenter string
cluster string
folder string
isTemplate string
poweredOn string
srmPlaceholder string
creation int64
firstSeen int64
lastSeen int64
sumVcpu int64
sumRam int64
sumDisk float64
samples int64
tinHits int64
bronzeHits int64
silverHits int64
goldHits int64
}
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) {
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
mu := sync.Mutex{}
workers := runtime.NumCPU()
if workers < 2 {
workers = 2
}
if workers > len(snapshots) {
workers = len(snapshots)
}
jobs := make(chan report.SnapshotRecord, len(snapshots))
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for snap := range jobs {
rows, err := c.scanHourlyTable(ctx, snap)
if err != nil {
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
continue
}
mu.Lock()
for k, v := range rows {
if existing, ok := agg[k]; ok {
mergeDailyAgg(existing, v)
} else {
agg[k] = v
}
}
mu.Unlock()
}
}()
}
for _, snap := range snapshots {
jobs <- snap
}
close(jobs)
wg.Wait()
return agg, nil
}
func mergeDailyAgg(dst, src *dailyAggVal) {
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
dst.creation = src.creation
}
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
dst.firstSeen = src.firstSeen
}
if src.lastSeen > dst.lastSeen {
dst.lastSeen = src.lastSeen
dst.resourcePool = src.resourcePool
dst.datacenter = src.datacenter
dst.cluster = src.cluster
dst.folder = src.folder
dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder
}
dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam
dst.sumDisk += src.sumDisk
dst.samples += src.samples
dst.tinHits += src.tinHits
dst.bronzeHits += src.bronzeHits
dst.silverHits += src.silverHits
dst.goldHits += src.goldHits
}
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
dbConn := c.Database.DB()
query := fmt.Sprintf(`
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'FALSE') AS is_template,
COALESCE("PoweredOn",'FALSE') AS powered_on,
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
COALESCE("SnapshotTime",0) AS snapshot_time
FROM %s
`, snap.TableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
out := make(map[dailyAggKey]*dailyAggVal, 256)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
// Skip templates.
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: int64OrZero(snapshotTime),
lastSeen: int64OrZero(snapshotTime),
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
}
out[key] = row
}
return out, nil
}
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB()
tx, err := dbConn.Beginx()
if err != nil {
return err
}
defer tx.Rollback()
driver := strings.ToLower(dbConn.DriverName())
placeholders := makePlaceholders(driver, 29)
insert := fmt.Sprintf(`
INSERT INTO %s (
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
"CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
) VALUES (%s)
`, table, placeholders)
for _, v := range agg {
if v.samples == 0 {
continue
}
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
avgRam := float64(v.sumRam) / float64(v.samples)
avgDisk := v.sumDisk / float64(v.samples)
total := float64(totalSamples)
avgPresent := 0.0
tinPct := 0.0
bronzePct := 0.0
silverPct := 0.0
goldPct := 0.0
if total > 0 {
avgPresent = float64(v.samples) / total
tinPct = float64(v.tinHits) * 100 / total
bronzePct = float64(v.bronzeHits) * 100 / total
silverPct = float64(v.silverHits) * 100 / total
goldPct = float64(v.goldHits) * 100 / total
}
args := []interface{}{
v.key.Name,
v.key.Vcenter,
nullIfEmpty(v.key.VmId),
nullIfEmpty(v.key.VmUuid),
v.resourcePool,
nullIfEmpty(v.datacenter),
nullIfEmpty(v.cluster),
nullIfEmpty(v.folder),
v.sumDisk,
v.sumVcpu,
v.sumRam,
v.isTemplate,
v.poweredOn,
v.srmPlaceholder,
v.creation,
int64(0), // deletion time refined later
v.samples,
avgVcpu,
avgRam,
avgDisk,
avgPresent,
tinPct, bronzePct, silverPct, goldPct,
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
}
if driver != "sqlite" {
// Postgres expects primitive types, nulls are handled by pq via nil.
for i, a := range args {
if s, ok := a.(string); ok && s == "" {
args[i] = nil
}
}
}
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
return err
}
}
return tx.Commit()
}
func int64OrZero(v sql.NullInt64) int64 {
if v.Valid {
return v.Int64
}
return 0
}
func nullIfEmpty(s string) interface{} {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func makePlaceholders(driver string, n int) string {
if driver == "sqlite" {
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = "?"
}
return strings.Join(parts, ",")
}
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = fmt.Sprintf("$%d", i+1)
}
return strings.Join(parts, ",")
}
func btoi(b bool) int64 {
if b {
return 1
}
return 0
}