package tasks import ( "context" "database/sql" "fmt" "log/slog" "os" "runtime" "strings" "sync" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" ) // RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute) return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error { startedAt := time.Now() defer func() { logger.Info("Monthly summary job finished", "duration", time.Since(startedAt)) }() now := time.Now() firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) targetMonth := firstOfThisMonth.AddDate(0, -1, 0) return c.aggregateMonthlySummary(jobCtx, targetMonth, false) }) } func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error { return c.aggregateMonthlySummary(ctx, month, force) } func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error { jobStart := time.Now() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) monthEnd := monthStart.AddDate(0, 1, 0) dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd) if err != nil { return err } dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) if len(dailySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) if err != nil { return err } if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, monthlyTable); err != nil { return err } } // Optional Go-based aggregation path. if os.Getenv("MONTHLY_AGG_GO") == "1" { c.Logger.Debug("Using go implementation of monthly aggregation") if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots); err != nil { c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable) return nil } } dailyTables := make([]string, 0, len(dailySnapshots)) for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) if err != nil { return err } monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01")) } else { c.Logger.Info("Monthly snapshot totals", "month", targetMonth.Format("2006-01"), "vm_count", monthlyTotals.VmCount, "vcpu_total", monthlyTotals.VcpuTotal, "ram_total_gb", monthlyTotals.RamTotal, "disk_total_gb", monthlyTotals.DiskTotal, ) } insertQuery, err := db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) return err } // Backfill missing creation times to the start of the month for rows lacking creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+monthlyTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, monthStart.Unix(), ); err != nil { c.Logger.Warn("failed to normalize creation times for monthly summary", "error", err, "table", monthlyTable) } rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable) if err := c.generateReport(ctx, monthlyTable); err != nil { c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) return nil } func monthlySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) } // aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go, // reading daily summaries in parallel and reducing them to a single monthly summary table. func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord) error { jobStart := time.Now() dbConn := c.Database.DB() if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } // Build union query for lifecycle refinement after inserts. dailyTables := make([]string, 0, len(dailySnapshots)) for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) if err != nil { return err } aggMap, err := c.scanDailyTablesParallel(ctx, dailySnapshots) if err != nil { return err } if len(aggMap) == 0 { cacheAgg, cacheErr := c.scanDailyRollup(ctx, monthStart, monthEnd) if cacheErr == nil && len(cacheAgg) > 0 { aggMap = cacheAgg } else if cacheErr != nil { c.Logger.Warn("failed to read daily rollup cache; using table scan", "error", cacheErr) } } if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01")) } if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil { return err } // Refine creation/deletion using SQL helper (requires SnapshotTime in union). if strings.Contains(unionQuery, `"SnapshotTime"`) { if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) } } else { c.Logger.Debug("Skipping lifecycle refinement for monthly aggregation (no SnapshotTime in union)") } // Backfill missing creation times to the start of the month for rows lacking creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, monthStart.Unix(), ); err != nil { c.Logger.Warn("failed to normalize creation times for monthly summary (Go)", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable) return err } c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) return nil } type monthlyAggKey struct { Vcenter string VmId string VmUuid string Name string } type monthlyAggVal struct { key monthlyAggKey inventoryId int64 eventKey string cloudId string resourcePool string datacenter string cluster string folder string isTemplate string poweredOn string srmPlaceholder string provisioned float64 vcpuCount int64 ramGB int64 creation int64 deletion int64 lastSnapshot time.Time samplesPresent int64 totalSamples float64 sumVcpu float64 sumRam float64 sumDisk float64 tinWeighted float64 bronzeWeighted float64 silverWeighted float64 goldWeighted float64 } func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { agg := make(map[monthlyAggKey]*monthlyAggVal, 1024) mu := sync.Mutex{} workers := runtime.NumCPU() if workers < 2 { workers = 2 } if workers > len(snapshots) { workers = len(snapshots) } jobs := make(chan report.SnapshotRecord, len(snapshots)) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() for snap := range jobs { rows, err := c.scanDailyTable(ctx, snap) if err != nil { c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err) continue } mu.Lock() for k, v := range rows { if existing, ok := agg[k]; ok { mergeMonthlyAgg(existing, v) } else { agg[k] = v } } mu.Unlock() } }() } for _, snap := range snapshots { jobs <- snap } close(jobs) wg.Wait() return agg, nil } func mergeMonthlyAgg(dst, src *monthlyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) { dst.deletion = src.deletion } if src.lastSnapshot.After(dst.lastSnapshot) { dst.lastSnapshot = src.lastSnapshot if src.inventoryId != 0 { dst.inventoryId = src.inventoryId } dst.resourcePool = src.resourcePool dst.datacenter = src.datacenter dst.cluster = src.cluster dst.folder = src.folder dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder dst.provisioned = src.provisioned dst.vcpuCount = src.vcpuCount dst.ramGB = src.ramGB dst.eventKey = src.eventKey dst.cloudId = src.cloudId } dst.samplesPresent += src.samplesPresent dst.totalSamples += src.totalSamples dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam dst.sumDisk += src.sumDisk dst.tinWeighted += src.tinWeighted dst.bronzeWeighted += src.bronzeWeighted dst.silverWeighted += src.silverWeighted dst.goldWeighted += src.goldWeighted } func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { dbConn := c.Database.DB() query := fmt.Sprintf(` SELECT "InventoryId", "Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("SamplesPresent",0) AS samples_present, "AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent", "PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct", "Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder" FROM %s `, snap.TableName) rows, err := dbConn.QueryxContext(ctx, query) if err != nil { return nil, err } defer rows.Close() result := make(map[monthlyAggKey]*monthlyAggVal, 256) for rows.Next() { var ( inventoryId sql.NullInt64 name, vcenter, vmId, vmUuid string eventKey, cloudId sql.NullString resourcePool, datacenter, cluster, folder sql.NullString isTemplate, poweredOn, srmPlaceholder sql.NullString disk, avgVcpu, avgRam, avgDisk sql.NullFloat64 avgIsPresent sql.NullFloat64 poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64 tinPct, bronzePct, silverPct, goldPct sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion sql.NullInt64 samplesPresent sql.NullInt64 ) if err := rows.Scan( &inventoryId, &name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &samplesPresent, &avgVcpu, &avgRam, &avgDisk, &avgIsPresent, &poolTin, &poolBronze, &poolSilver, &poolGold, &tinPct, &bronzePct, &silverPct, &goldPct, &isTemplate, &poweredOn, &srmPlaceholder, ); err != nil { c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err) continue } templateVal := strings.TrimSpace(isTemplate.String) if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name} agg := &monthlyAggVal{ key: key, inventoryId: inventoryId.Int64, eventKey: eventKey.String, cloudId: cloudId.String, resourcePool: resourcePool.String, datacenter: datacenter.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, provisioned: disk.Float64, vcpuCount: vcpu.Int64, ramGB: ram.Int64, creation: creation.Int64, deletion: deletion.Int64, lastSnapshot: snap.SnapshotTime, samplesPresent: samplesPresent.Int64, } totalSamplesDay := float64(samplesPresent.Int64) if avgIsPresent.Valid && avgIsPresent.Float64 > 0 { totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64 } agg.totalSamples = totalSamplesDay if avgVcpu.Valid { agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay } if avgRam.Valid { agg.sumRam = avgRam.Float64 * totalSamplesDay } if avgDisk.Valid { agg.sumDisk = avgDisk.Float64 * totalSamplesDay } if poolTin.Valid { agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay } if poolBronze.Valid { agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay } if poolSilver.Valid { agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay } if poolGold.Valid { agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay } result[key] = agg } return result, rows.Err() } // scanDailyRollup aggregates monthly data from vm_daily_rollup cache. func (c *CronTask) scanDailyRollup(ctx context.Context, start, end time.Time) (map[monthlyAggKey]*monthlyAggVal, error) { dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_daily_rollup") { return map[monthlyAggKey]*monthlyAggVal{}, nil } query := ` SELECT "Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime", "SamplesPresent","TotalSamples","SumVcpu","SumRam","SumDisk", "TinHits","BronzeHits","SilverHits","GoldHits", "LastResourcePool","LastDatacenter","LastCluster","LastFolder", "LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder" FROM vm_daily_rollup WHERE "Date" >= ? AND "Date" < ? ` bind := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, bind, start.Unix(), end.Unix()) if err != nil { return nil, err } defer rows.Close() agg := make(map[monthlyAggKey]*monthlyAggVal, 512) for rows.Next() { var ( date sql.NullInt64 vcenter, vmId, vmUuid, name string creation, deletion sql.NullInt64 samplesPresent, totalSamples sql.NullInt64 sumVcpu, sumRam, sumDisk sql.NullFloat64 tinHits, bronzeHits, silverHits, goldHits sql.NullInt64 lastPool, lastDc, lastCluster, lastFolder sql.NullString lastDisk, lastVcpu, lastRam sql.NullFloat64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan( &date, &vcenter, &vmId, &vmUuid, &name, &creation, &deletion, &samplesPresent, &totalSamples, &sumVcpu, &sumRam, &sumDisk, &tinHits, &bronzeHits, &silverHits, &goldHits, &lastPool, &lastDc, &lastCluster, &lastFolder, &lastDisk, &lastVcpu, &lastRam, &isTemplate, &poweredOn, &srmPlaceholder, ); err != nil { continue } templateVal := strings.TrimSpace(isTemplate.String) if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name} val := &monthlyAggVal{ key: key, resourcePool: lastPool.String, datacenter: lastDc.String, cluster: lastCluster.String, folder: lastFolder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, provisioned: lastDisk.Float64, vcpuCount: int64(lastVcpu.Float64), ramGB: int64(lastRam.Float64), creation: creation.Int64, deletion: deletion.Int64, lastSnapshot: time.Unix(date.Int64, 0), samplesPresent: samplesPresent.Int64, totalSamples: float64(totalSamples.Int64), sumVcpu: sumVcpu.Float64, sumRam: sumRam.Float64, sumDisk: sumDisk.Float64, tinWeighted: float64(tinHits.Int64), bronzeWeighted: float64(bronzeHits.Int64), silverWeighted: float64(silverHits.Int64), goldWeighted: float64(goldHits.Int64), } if existing, ok := agg[key]; ok { mergeMonthlyAgg(existing, val) } else { agg[key] = val } } return agg, rows.Err() } func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error { dbConn := c.Database.DB() columns := []string{ "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent", "PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct", "Tin", "Bronze", "Silver", "Gold", } placeholders := make([]string, len(columns)) for i := range columns { placeholders[i] = "?" } stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(columns, ","), strings.Join(placeholders, ",")) stmtText = dbConn.Rebind(stmtText) tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } stmt, err := tx.PreparexContext(ctx, stmtText) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, v := range aggMap { inventoryVal := sql.NullInt64{} if v.inventoryId != 0 { inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true} } avgVcpu := sql.NullFloat64{} avgRam := sql.NullFloat64{} avgDisk := sql.NullFloat64{} avgIsPresent := sql.NullFloat64{} tinPct := sql.NullFloat64{} bronzePct := sql.NullFloat64{} silverPct := sql.NullFloat64{} goldPct := sql.NullFloat64{} if v.totalSamples > 0 { avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true} avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true} avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true} avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true} tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true} bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true} silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true} goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true} } if _, err := stmt.ExecContext(ctx, inventoryVal, v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion, v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent, avgVcpu, avgRam, avgDisk, avgIsPresent, tinPct, bronzePct, silverPct, goldPct, tinPct, bronzePct, silverPct, goldPct, ); err != nil { tx.Rollback() return err } } return tx.Commit() }