package tasks import ( "context" "database/sql" "fmt" "log/slog" "os" "runtime" "slices" "strings" "sync" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/settings" ) // RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute) return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error { if err := c.Settings.ReadYMLSettings(); err != nil { return err } jobCtx = settings.MarkReloadedInContext(jobCtx, c.Settings) startedAt := time.Now() defer func() { logger.Info("Monthly summary job finished", "duration", time.Since(startedAt)) }() now := time.Now() firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) targetMonth := firstOfThisMonth.AddDate(0, -1, 0) return c.aggregateMonthlySummaryWithMode(jobCtx, targetMonth, false, true) }) } func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error { return c.aggregateMonthlySummaryWithMode(ctx, month, force, false) } func (c *CronTask) aggregateMonthlySummaryWithMode(ctx context.Context, targetMonth time.Time, force bool, scheduled bool) error { jobStart := time.Now() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } granularity := strings.ToLower(strings.TrimSpace(c.Settings.Values.Settings.MonthlyAggregationGranularity)) if granularity == "" { granularity = "daily" } if scheduled { granularity = "daily" } if granularity != "hourly" && granularity != "daily" { c.Logger.Warn("unknown monthly aggregation granularity; defaulting to daily", "granularity", granularity) granularity = "daily" } monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) monthEnd := monthStart.AddDate(0, 1, 0) dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) driver := strings.ToLower(dbConn.DriverName()) // Canonical Go aggregation is the default for both scheduled and manual runs. // Legacy SQL/union aggregation stays available as a manual fallback/backfill path. forceGoAgg := os.Getenv("MONTHLY_AGG_GO") == "1" forceSQLAgg := !scheduled && os.Getenv("MONTHLY_AGG_SQL") == "1" useGoAgg := scheduled || forceGoAgg || !forceSQLAgg if forceSQLAgg && !forceGoAgg { c.Logger.Info("MONTHLY_AGG_SQL=1 enabled; using SQL fallback path for manual monthly aggregation") } if !useGoAgg && granularity == "hourly" && driver == "sqlite" { c.Logger.Warn("SQL monthly aggregation is slow on sqlite; overriding to Go path", "granularity", granularity) useGoAgg = true } var snapshots []report.SnapshotRecord var unionColumns []string if !scheduled { if granularity == "daily" { dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd) if err != nil { return err } dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd) dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots) snapshots = dailySnapshots unionColumns = monthlyUnionColumns } else { hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", monthStart, monthEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, monthStart, monthEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) snapshots = hourlySnapshots unionColumns = summaryUnionColumns } } if !scheduled && len(snapshots) == 0 { return fmt.Errorf("no %s snapshot tables found for %s", granularity, targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) if err != nil { return err } if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, monthlyTable); err != nil { return err } } if scheduled && c.scheduledAggregationEngine() == "sql" { c.Logger.Info("scheduled_aggregation_engine=sql enabled; using canonical SQL monthly aggregation path") if err := c.aggregateMonthlySummarySQLCanonical(ctx, monthStart, monthEnd, monthlyTable); err != nil { c.Logger.Warn("scheduled canonical SQL monthly aggregation failed; falling back to go path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished monthly inventory aggregation (SQL canonical path)", "summary_table", monthlyTable) return nil } } // Optional Go-based aggregation path. if useGoAgg { switch granularity { case "daily": c.Logger.Debug("Using go implementation of monthly aggregation (daily)") if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots, scheduled); err != nil { if scheduled { return err } c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable) return nil } case "hourly": if scheduled { return fmt.Errorf("scheduled monthly aggregation does not support hourly source mode") } c.Logger.Debug("Using go implementation of monthly aggregation (hourly)") if err := c.aggregateMonthlySummaryGoHourly(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil { c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable) return nil } default: c.Logger.Warn("MONTHLY_AGG_GO is set but granularity is unsupported; using SQL path", "granularity", granularity) } } if scheduled { return fmt.Errorf("scheduled monthly aggregation requires go daily-rollup path") } tables := make([]string, 0, len(snapshots)) for _, snapshot := range snapshots { tables = append(tables, snapshot.TableName) } unionQuery, err := buildUnionQuery(tables, unionColumns, templateExclusionFilter()) if err != nil { return err } monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01")) } else { c.Logger.Info("Monthly snapshot totals", "month", targetMonth.Format("2006-01"), "vm_count", monthlyTotals.VmCount, "vcpu_total", monthlyTotals.VcpuTotal, "ram_total_gb", monthlyTotals.RamTotal, "disk_total_gb", monthlyTotals.DiskTotal, ) } var insertQuery string if granularity == "daily" { insertQuery, err = db.BuildMonthlySummaryInsert(monthlyTable, unionQuery) } else { insertQuery, err = db.BuildDailySummaryInsert(monthlyTable, unionQuery) } if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to monthly summary", "error", err, "table", monthlyTable) } else { c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window", "error", err, "table", monthlyTable) } rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, monthlyTable, "monthly", monthStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter monthly aggregate totals cache", "error", err, "table", monthlyTable) } else { c.Logger.Debug("refreshed vcenter monthly aggregate totals cache", "table", monthlyTable, "rows", refreshed) } db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable) if err := c.generateReportWithPolicy(ctx, monthlyTable); err != nil { c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) return nil } func monthlySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) } func (c *CronTask) aggregateMonthlySummarySQLCanonical(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string) error { jobStart := time.Now() dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_daily_rollup") { return fmt.Errorf("vm_daily_rollup table not found for canonical SQL monthly aggregation") } unionQuery := buildCanonicalDailyRollupSummaryUnion(monthStart, monthEnd) insertQuery, err := db.BuildMonthlySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (SQL canonical)", "error", err, "table", summaryTable) } else { c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, buildDailyRollupLifecycleUnion(monthStart, monthEnd)); err != nil { c.Logger.Warn("failed to refine creation/deletion times (monthly SQL canonical)", "error", err, "table", summaryTable) } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (SQL canonical)", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows (SQL canonical)", "error", err, "table", summaryTable) } if rowCount == 0 { return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01")) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot (SQL canonical)", "error", err, "table", summaryTable) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "monthly", monthStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter monthly aggregate totals cache (SQL canonical)", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter monthly aggregate totals cache", "table", summaryTable, "rows", refreshed) } if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate monthly report (SQL canonical)", "error", err, "table", summaryTable) return err } c.Logger.Debug("Finished monthly inventory aggregation (SQL canonical path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) return nil } // aggregateMonthlySummaryGoHourly aggregates hourly snapshots directly into the monthly summary table. func (c *CronTask) aggregateMonthlySummaryGoHourly(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, hourlySnapshots []report.SnapshotRecord) error { jobStart := time.Now() dbConn := c.Database.DB() if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", monthStart.Format("2006-01")) } totalSamples := len(hourlySnapshots) var ( aggMap map[dailyAggKey]*dailyAggVal snapTimes []int64 ) if db.TableExists(ctx, dbConn, "vm_hourly_stats") { cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, monthStart, monthEnd) if cacheErr != nil { c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) } else if len(cacheAgg) > 0 { c.Logger.Debug("using hourly cache for monthly aggregation", "month", monthStart.Format("2006-01"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg)) aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) } } if aggMap == nil { var errScan error aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots) if errScan != nil { return errScan } c.Logger.Debug("scanned hourly tables for monthly aggregation", "month", monthStart.Format("2006-01"), "tables", len(hourlySnapshots), "vm_count", len(aggMap)) if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01")) } snapTimes = make([]int64, 0, len(hourlySnapshots)) for _, snap := range hourlySnapshots { snapTimes = append(snapTimes, snap.SnapshotTime.Unix()) } slices.Sort(snapTimes) } lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, monthStart, monthEnd) c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions) inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, monthStart, monthEnd) c.Logger.Info("Monthly aggregation deletion times", "source_inventory", inventoryDeletions) if len(snapTimes) > 0 { maxSnap := snapTimes[len(snapTimes)-1] inferredDeletions := 0 for _, v := range aggMap { if v.deletion != 0 { continue } consecutiveMisses := 0 firstMiss := int64(0) for _, t := range snapTimes { if t <= v.lastSeen { continue } if _, ok := v.seen[t]; ok { consecutiveMisses = 0 firstMiss = 0 continue } consecutiveMisses++ if firstMiss == 0 { firstMiss = t } if consecutiveMisses >= 2 { v.deletion = firstMiss inferredDeletions++ break } } if v.deletion == 0 && v.lastSeen < maxSnap && firstMiss > 0 { c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss) } } c.Logger.Info("Monthly aggregation deletion times", "source_inferred", inferredDeletions) } totalSamplesByVcenter := sampleCountsByVcenter(aggMap) if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil { return err } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go hourly)", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows (Go hourly)", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot (Go hourly)", "error", err, "table", summaryTable) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "monthly", monthStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter monthly aggregate totals cache (Go hourly)", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter monthly aggregate totals cache", "table", summaryTable, "rows", refreshed) } if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate monthly report (Go hourly)", "error", err, "table", summaryTable) return err } c.Logger.Debug("Finished monthly inventory aggregation (Go hourly)", "summary_table", summaryTable, "duration", time.Since(jobStart), "tables_scanned", len(hourlySnapshots), "rows_written", rowCount, "total_samples", totalSamples, ) return nil } // aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go, // reading daily summaries in parallel and reducing them to a single monthly summary table. func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord, canonicalOnly bool) error { jobStart := time.Now() dbConn := c.Database.DB() if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } unionQuery := "" var ( aggMap map[monthlyAggKey]*monthlyAggVal err error ) if canonicalOnly { aggMap, err = c.scanDailyRollup(ctx, monthStart, monthEnd) if err != nil { return err } unionQuery = buildDailyRollupLifecycleUnion(monthStart, monthEnd) } else { // Build union query for lifecycle refinement after inserts. dailyTables := make([]string, 0, len(dailySnapshots)) for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } unionQuery, err = buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) if err != nil { return err } aggMap, err = c.scanDailyTablesParallel(ctx, dailySnapshots) if err != nil { return err } if len(aggMap) == 0 { cacheAgg, cacheErr := c.scanDailyRollup(ctx, monthStart, monthEnd) if cacheErr == nil && len(cacheAgg) > 0 { aggMap = cacheAgg } else if cacheErr != nil { c.Logger.Warn("failed to read daily rollup cache; using table scan", "error", cacheErr) } } } if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01")) } if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil { return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (Go)", "error", err, "table", summaryTable) } else { c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil { c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go)", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil { c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "monthly", monthStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter monthly aggregate totals cache", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter monthly aggregate totals cache", "table", summaryTable, "rows", refreshed) } if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable) return err } c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) return nil } func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { agg := make(map[monthlyAggKey]*monthlyAggVal, 1024) mu := sync.Mutex{} workers := min(max(runtime.NumCPU(), 2), len(snapshots)) jobs := make(chan report.SnapshotRecord, len(snapshots)) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { wg.Go(func() { for snap := range jobs { rows, err := c.scanDailyTable(ctx, snap) if err != nil { c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err) continue } mu.Lock() for k, v := range rows { if existing, ok := agg[k]; ok { mergeMonthlyAgg(existing, v) } else { agg[k] = v } } mu.Unlock() } }) } for _, snap := range snapshots { jobs <- snap } close(jobs) wg.Wait() return agg, nil } func mergeMonthlyAgg(dst, src *monthlyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } // If creation is unknown in all daily summaries, leave it zero for reports (VM trace handles approximation separately). if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) { dst.deletion = src.deletion } if src.lastSnapshot.After(dst.lastSnapshot) { dst.lastSnapshot = src.lastSnapshot if src.inventoryId != 0 { dst.inventoryId = src.inventoryId } dst.resourcePool = src.resourcePool dst.datacenter = src.datacenter dst.cluster = src.cluster dst.folder = src.folder dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder dst.provisioned = src.provisioned dst.vcpuCount = src.vcpuCount dst.ramGB = src.ramGB dst.eventKey = src.eventKey dst.cloudId = src.cloudId } dst.samplesPresent += src.samplesPresent dst.totalSamples += src.totalSamples dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam dst.sumDisk += src.sumDisk dst.tinWeighted += src.tinWeighted dst.bronzeWeighted += src.bronzeWeighted dst.silverWeighted += src.silverWeighted dst.goldWeighted += src.goldWeighted } func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) { dbConn := c.Database.DB() query := fmt.Sprintf(` SELECT "InventoryId", "Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("SamplesPresent",0) AS samples_present, "AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent", "PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct", "Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder" FROM %s `, snap.TableName) rows, err := dbConn.QueryxContext(ctx, query) if err != nil { return nil, err } defer rows.Close() result := make(map[monthlyAggKey]*monthlyAggVal, 256) for rows.Next() { var ( inventoryId sql.NullInt64 name, vcenter, vmId, vmUuid string eventKey, cloudId sql.NullString resourcePool, datacenter, cluster, folder sql.NullString isTemplate, poweredOn, srmPlaceholder sql.NullString disk, avgVcpu, avgRam, avgDisk sql.NullFloat64 avgIsPresent sql.NullFloat64 poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64 tinPct, bronzePct, silverPct, goldPct sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion sql.NullInt64 samplesPresent sql.NullInt64 ) if err := rows.Scan( &inventoryId, &name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &samplesPresent, &avgVcpu, &avgRam, &avgDisk, &avgIsPresent, &poolTin, &poolBronze, &poolSilver, &poolGold, &tinPct, &bronzePct, &silverPct, &goldPct, &isTemplate, &poweredOn, &srmPlaceholder, ); err != nil { c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err) continue } templateVal := strings.TrimSpace(isTemplate.String) if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name} agg := &monthlyAggVal{ key: key, inventoryId: inventoryId.Int64, eventKey: eventKey.String, cloudId: cloudId.String, resourcePool: resourcePool.String, datacenter: datacenter.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, provisioned: disk.Float64, vcpuCount: vcpu.Int64, ramGB: ram.Int64, creation: creation.Int64, deletion: deletion.Int64, lastSnapshot: snap.SnapshotTime, samplesPresent: samplesPresent.Int64, } totalSamplesDay := float64(samplesPresent.Int64) if avgIsPresent.Valid && avgIsPresent.Float64 > 0 { totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64 } agg.totalSamples = totalSamplesDay if avgVcpu.Valid { agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay } if avgRam.Valid { agg.sumRam = avgRam.Float64 * totalSamplesDay } if avgDisk.Valid { agg.sumDisk = avgDisk.Float64 * totalSamplesDay } if poolTin.Valid { agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay } if poolBronze.Valid { agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay } if poolSilver.Valid { agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay } if poolGold.Valid { agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay } result[key] = agg } return result, rows.Err() } // scanDailyRollup aggregates monthly data from vm_daily_rollup cache. func (c *CronTask) scanDailyRollup(ctx context.Context, start, end time.Time) (map[monthlyAggKey]*monthlyAggVal, error) { dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_daily_rollup") { return map[monthlyAggKey]*monthlyAggVal{}, nil } query := ` SELECT "Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime", "SamplesPresent","TotalSamples","SumVcpu","SumRam","SumDisk", "TinHits","BronzeHits","SilverHits","GoldHits", "LastResourcePool","LastDatacenter","LastCluster","LastFolder", "LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder" FROM vm_daily_rollup WHERE "Date" >= ? AND "Date" < ? ` bind := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, bind, start.Unix(), end.Unix()) if err != nil { return nil, err } defer rows.Close() agg := make(map[monthlyAggKey]*monthlyAggVal, 512) for rows.Next() { var ( date sql.NullInt64 vcenter, vmId, vmUuid, name string creation, deletion sql.NullInt64 samplesPresent, totalSamples sql.NullInt64 sumVcpu, sumRam, sumDisk sql.NullFloat64 tinHits, bronzeHits, silverHits, goldHits sql.NullInt64 lastPool, lastDc, lastCluster, lastFolder sql.NullString lastDisk, lastVcpu, lastRam sql.NullFloat64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan( &date, &vcenter, &vmId, &vmUuid, &name, &creation, &deletion, &samplesPresent, &totalSamples, &sumVcpu, &sumRam, &sumDisk, &tinHits, &bronzeHits, &silverHits, &goldHits, &lastPool, &lastDc, &lastCluster, &lastFolder, &lastDisk, &lastVcpu, &lastRam, &isTemplate, &poweredOn, &srmPlaceholder, ); err != nil { continue } templateVal := strings.TrimSpace(isTemplate.String) if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name} val := &monthlyAggVal{ key: key, resourcePool: lastPool.String, datacenter: lastDc.String, cluster: lastCluster.String, folder: lastFolder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, provisioned: lastDisk.Float64, vcpuCount: int64(lastVcpu.Float64), ramGB: int64(lastRam.Float64), creation: creation.Int64, deletion: deletion.Int64, lastSnapshot: time.Unix(date.Int64, 0), samplesPresent: samplesPresent.Int64, totalSamples: float64(totalSamples.Int64), sumVcpu: sumVcpu.Float64, sumRam: sumRam.Float64, sumDisk: sumDisk.Float64, tinWeighted: float64(tinHits.Int64), bronzeWeighted: float64(bronzeHits.Int64), silverWeighted: float64(silverHits.Int64), goldWeighted: float64(goldHits.Int64), } if existing, ok := agg[key]; ok { mergeMonthlyAgg(existing, val) } else { agg[key] = val } } return agg, rows.Err() } func buildDailyRollupLifecycleUnion(start, end time.Time) string { return fmt.Sprintf(` SELECT "VmId","VmUuid","Name","Vcenter","CreationTime","DeletionTime","Date" AS "SnapshotTime" FROM vm_daily_rollup WHERE "Date" >= %d AND "Date" < %d `, start.Unix(), end.Unix()) } func buildCanonicalDailyRollupSummaryUnion(start, end time.Time) string { return fmt.Sprintf(` SELECT CAST(NULL AS BIGINT) AS "InventoryId", COALESCE("Name",'') AS "Name", COALESCE("Vcenter",'') AS "Vcenter", COALESCE("VmId",'') AS "VmId", CAST(NULL AS TEXT) AS "EventKey", CAST(NULL AS TEXT) AS "CloudId", COALESCE("CreationTime",0) AS "CreationTime", COALESCE("DeletionTime",0) AS "DeletionTime", COALESCE("LastResourcePool",'') AS "ResourcePool", COALESCE("LastDatacenter",'') AS "Datacenter", COALESCE("LastCluster",'') AS "Cluster", COALESCE("LastFolder",'') AS "Folder", COALESCE("LastProvisionedDisk",0) AS "ProvisionedDisk", COALESCE("LastVcpuCount",0) AS "VcpuCount", COALESCE("LastRamGB",0) AS "RamGB", COALESCE("IsTemplate",'') AS "IsTemplate", COALESCE("PoweredOn",'') AS "PoweredOn", COALESCE("SrmPlaceholder",'') AS "SrmPlaceholder", COALESCE("VmUuid",'') AS "VmUuid", COALESCE("SamplesPresent",0) AS "SamplesPresent", CASE WHEN COALESCE("TotalSamples",0) > 0 THEN 1.0 * COALESCE("SumVcpu",0) / "TotalSamples" ELSE NULL END AS "AvgVcpuCount", CASE WHEN COALESCE("TotalSamples",0) > 0 THEN 1.0 * COALESCE("SumRam",0) / "TotalSamples" ELSE NULL END AS "AvgRamGB", CASE WHEN COALESCE("TotalSamples",0) > 0 THEN 1.0 * COALESCE("SumDisk",0) / "TotalSamples" ELSE NULL END AS "AvgProvisionedDisk", CASE WHEN COALESCE("TotalSamples",0) > 0 THEN 1.0 * COALESCE("SamplesPresent",0) / "TotalSamples" ELSE NULL END AS "AvgIsPresent", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("TinHits",0) / "SamplesPresent" ELSE NULL END AS "PoolTinPct", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("BronzeHits",0) / "SamplesPresent" ELSE NULL END AS "PoolBronzePct", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("SilverHits",0) / "SamplesPresent" ELSE NULL END AS "PoolSilverPct", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("GoldHits",0) / "SamplesPresent" ELSE NULL END AS "PoolGoldPct", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("TinHits",0) / "SamplesPresent" ELSE NULL END AS "Tin", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("BronzeHits",0) / "SamplesPresent" ELSE NULL END AS "Bronze", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("SilverHits",0) / "SamplesPresent" ELSE NULL END AS "Silver", CASE WHEN COALESCE("SamplesPresent",0) > 0 THEN 100.0 * COALESCE("GoldHits",0) / "SamplesPresent" ELSE NULL END AS "Gold", "Date" AS "SnapshotTime" FROM vm_daily_rollup WHERE "Date" >= %d AND "Date" < %d AND %s `, start.Unix(), end.Unix(), templateExclusionFilter()) } func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error { dbConn := c.Database.DB() columns := []string{ "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent", "PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct", "Tin", "Bronze", "Silver", "Gold", } quotedColumns := make([]string, len(columns)) for i, col := range columns { quotedColumns[i] = fmt.Sprintf(`"%s"`, col) } placeholders := make([]string, len(columns)) for i := range columns { placeholders[i] = "?" } stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(quotedColumns, ","), strings.Join(placeholders, ",")) stmtText = dbConn.Rebind(stmtText) tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } stmt, err := tx.PreparexContext(ctx, stmtText) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, v := range aggMap { inventoryVal := sql.NullInt64{} if v.inventoryId != 0 { inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true} } avgVcpu := sql.NullFloat64{} avgRam := sql.NullFloat64{} avgDisk := sql.NullFloat64{} avgIsPresent := sql.NullFloat64{} tinPct := sql.NullFloat64{} bronzePct := sql.NullFloat64{} silverPct := sql.NullFloat64{} goldPct := sql.NullFloat64{} if v.totalSamples > 0 { avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true} avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true} avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true} avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true} tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true} bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true} silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true} goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true} } if _, err := stmt.ExecContext(ctx, inventoryVal, v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion, v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent, avgVcpu, avgRam, avgDisk, avgIsPresent, tinPct, bronzePct, silverPct, goldPct, tinPct, bronzePct, silverPct, goldPct, ); err != nil { tx.Rollback() return err } } return tx.Commit() }