package tasks import ( "context" "database/sql" "fmt" "log/slog" "os" "runtime" "sort" "strings" "sync" "time" "vctp/db" "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error { startedAt := time.Now() defer func() { logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) }() // Aggregate the previous day to avoid partial "today" data when the job runs just after midnight. targetTime := time.Now().AddDate(0, 0, -1) logger.Info("Daily summary job starting", "target_date", targetTime.Format("2006-01-02")) // Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier. return c.aggregateDailySummary(jobCtx, targetTime, true) }) } func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { return c.aggregateDailySummary(ctx, date, force) } func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error { jobStart := time.Now() dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) dayEnd := dayStart.AddDate(0, 0, 1) c.Logger.Info("Daily aggregation window", "start", dayStart, "end", dayEnd) summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err } dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { return err } if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } // If enabled, use the Go fan-out/reduce path to parallelize aggregation. if os.Getenv("DAILY_AGG_GO") == "1" { c.Logger.Debug("Using go implementation of aggregation") if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil { c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordDailyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable) return nil } } hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) c.Logger.Info("Daily aggregation hourly snapshot count", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02")) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot totals", "date", dayStart.Format("2006-01-02"), "vm_count", currentTotals.VmCount, "vcpu_total", currentTotals.VcpuTotal, "ram_total_gb", currentTotals.RamTotal, "disk_total_gb", currentTotals.DiskTotal, ) } prevStart := dayStart.AddDate(0, 0, -1) prevEnd := dayStart prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd) if err == nil && len(prevSnapshots) > 0 { prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd) prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) prevTables := make([]string, 0, len(prevSnapshots)) for _, snapshot := range prevSnapshots { prevTables = append(prevTables, snapshot.TableName) } prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) if err == nil { prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) if err != nil { c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot comparison", "current_date", dayStart.Format("2006-01-02"), "previous_date", prevStart.Format("2006-01-02"), "vm_delta", currentTotals.VmCount-prevTotals.VmCount, "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, ) } } else { c.Logger.Warn("unable to build previous day union", "error", err) } } insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to daily summary", "error", err, "table", summaryTable) } else { c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), err) return err } if err := db.CheckpointSQLite(ctx, dbConn); err != nil { c.Logger.Warn("failed to checkpoint sqlite after daily aggregation", "error", err) } c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), nil) return nil } func dailySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) } // aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel, // reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path // as closely as possible while improving CPU utilization on multi-core hosts. func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error { jobStart := time.Now() dbConn := c.Database.DB() hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) c.Logger.Info("Daily aggregation hourly snapshot count (go path)", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02")) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } else { c.Logger.Debug("Found hourly snapshot tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots)) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } // Clear existing summary if forcing. if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } totalSamples := len(hourlyTables) var ( aggMap map[dailyAggKey]*dailyAggVal snapTimes []int64 ) if db.TableExists(ctx, dbConn, "vm_hourly_stats") { cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd) if cacheErr != nil { c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) } else if len(cacheAgg) > 0 { c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg)) aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) } } if aggMap == nil { var errScan error aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots) if errScan != nil { return errScan } c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap)) if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } // Build ordered list of snapshot times for deletion inference. snapTimes = make([]int64, 0, len(hourlySnapshots)) for _, snap := range hourlySnapshots { snapTimes = append(snapTimes, snap.SnapshotTime.Unix()) } sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) } lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, dayStart, dayEnd) c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions) inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd) c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions) // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. var nextSnapshotTable string nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, ` SELECT table_name FROM snapshot_registry WHERE snapshot_type = 'hourly' AND snapshot_time >= ? ORDER BY snapshot_time ASC LIMIT 1 `, dayEnd.Unix()) if nextErr == nil { if nextSnapshotRows.Next() { if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil { nextSnapshotTable = "" } } nextSnapshotRows.Close() } nextPresence := make(map[string]struct{}) if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) { rows, err := querySnapshotRows(ctx, dbConn, nextSnapshotTable, []string{"VmId", "VmUuid", "Name"}, `"Vcenter" = ?`, c.Settings.Values.Settings.VcenterAddresses[0]) if err == nil { for rows.Next() { var vmId, vmUuid, name sql.NullString if err := rows.Scan(&vmId, &vmUuid, &name); err == nil { if vmId.Valid { nextPresence["id:"+vmId.String] = struct{}{} } if vmUuid.Valid { nextPresence["uuid:"+vmUuid.String] = struct{}{} } if name.Valid { nextPresence["name:"+name.String] = struct{}{} } } } rows.Close() } } var maxSnap int64 if len(snapTimes) > 0 { maxSnap = snapTimes[len(snapTimes)-1] } inferredDeletions := 0 for _, v := range aggMap { if v.deletion != 0 { continue } // Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen. if maxSnap > 0 && len(v.seen) > 0 && v.lastSeen < maxSnap { c.Logger.Debug("inferring deletion window", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(snapTimes)) } consecutiveMisses := 0 firstMiss := int64(0) for _, t := range snapTimes { if t <= v.lastSeen { continue } if _, ok := v.seen[t]; ok { consecutiveMisses = 0 firstMiss = 0 continue } consecutiveMisses++ if firstMiss == 0 { firstMiss = t } if consecutiveMisses >= 2 { v.deletion = firstMiss inferredDeletions++ break } } if v.deletion == 0 && firstMiss > 0 { // Not enough consecutive misses within the day; try to use the first snapshot of the next day to confirm. if nextSnapshotTable != "" && len(nextPresence) > 0 { _, presentByID := nextPresence["id:"+v.key.VmId] _, presentByUUID := nextPresence["uuid:"+v.key.VmUuid] _, presentByName := nextPresence["name:"+v.key.Name] if !presentByID && !presentByUUID && !presentByName { v.deletion = firstMiss inferredDeletions++ c.Logger.Debug("cross-day deletion inferred from next snapshot", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable) } } if v.deletion == 0 { c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss) } } } c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions) totalSamplesByVcenter := sampleCountsByVcenter(aggMap) // Insert aggregated rows. if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil { return err } c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples) // Persist rollup cache for monthly aggregation. if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples, totalSamplesByVcenter); err != nil { c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Debug("persisted daily rollup cache", "date", dayStart.Format("2006-01-02")) } // Refine lifecycle with existing SQL helper to pick up first-after deletions. if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } else { c.Logger.Debug("refined creation/deletion times", "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) return err } if err := db.CheckpointSQLite(ctx, dbConn); err != nil { c.Logger.Warn("failed to checkpoint sqlite after daily aggregation (Go path)", "error", err) } c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart), "tables_scanned", len(hourlyTables), "rows_written", rowCount, "total_samples", totalSamples, ) return nil } func (c *CronTask) applyLifecycleDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") { return 0 } type aggIndex struct { byID map[string]*dailyAggVal byUUID map[string]*dailyAggVal byName map[string]*dailyAggVal } indexes := make(map[string]*aggIndex, 8) for k, v := range agg { if k.Vcenter == "" { continue } idx := indexes[k.Vcenter] if idx == nil { idx = &aggIndex{ byID: make(map[string]*dailyAggVal), byUUID: make(map[string]*dailyAggVal), byName: make(map[string]*dailyAggVal), } indexes[k.Vcenter] = idx } if k.VmId != "" { idx.byID[k.VmId] = v } if k.VmUuid != "" { idx.byUUID[k.VmUuid] = v } if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" { idx.byName[name] = v } } totalApplied := 0 for vcenter, idx := range indexes { query := ` SELECT "VmId","VmUuid","Name","DeletedAt" FROM vm_lifecycle_cache WHERE "Vcenter" = ? AND "DeletedAt" IS NOT NULL AND "DeletedAt" > 0 AND "DeletedAt" >= ? AND "DeletedAt" < ? ` bind := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, bind, vcenter, start.Unix(), end.Unix()) if err != nil { c.Logger.Warn("failed to load lifecycle deletions", "vcenter", vcenter, "error", err) continue } scanned := 0 applied := 0 missed := 0 for rows.Next() { scanned++ var vmId, vmUuid, name sql.NullString var deletedAt sql.NullInt64 if err := rows.Scan(&vmId, &vmUuid, &name, &deletedAt); err != nil { c.Logger.Warn("failed to scan lifecycle deletion", "vcenter", vcenter, "error", err) continue } if !deletedAt.Valid || deletedAt.Int64 <= 0 { continue } var target *dailyAggVal if vmId.Valid { target = idx.byID[strings.TrimSpace(vmId.String)] } if target == nil && vmUuid.Valid { target = idx.byUUID[strings.TrimSpace(vmUuid.String)] } if target == nil && name.Valid { target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))] } if target == nil { missed++ continue } target.deletion = deletedAt.Int64 applied++ } rows.Close() if err := rows.Err(); err != nil { c.Logger.Warn("failed to read lifecycle deletions", "vcenter", vcenter, "error", err) } c.Logger.Debug("lifecycle cache deletions applied", "vcenter", vcenter, "window_start", start, "window_end", end, "scanned", scanned, "applied", applied, "missed", missed) totalApplied += applied } return totalApplied } func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { dbConn := c.Database.DB() vcenters := make(map[string]struct{}, 8) for k := range agg { if k.Vcenter != "" { vcenters[k.Vcenter] = struct{}{} } } totalApplied := 0 for vcenter := range vcenters { inventoryRows, err := queries.New(dbConn).GetInventoryByVcenter(ctx, vcenter) if err != nil { c.Logger.Warn("failed to load inventory for daily deletion times", "vcenter", vcenter, "error", err) continue } byID := make(map[string]int64, len(inventoryRows)) byUUID := make(map[string]int64, len(inventoryRows)) byName := make(map[string]int64, len(inventoryRows)) for _, inv := range inventoryRows { if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 { continue } if inv.DeletionTime.Int64 < start.Unix() || inv.DeletionTime.Int64 >= end.Unix() { continue } if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64 } if inv.VmUuid.Valid && strings.TrimSpace(inv.VmUuid.String) != "" { byUUID[strings.TrimSpace(inv.VmUuid.String)] = inv.DeletionTime.Int64 } if strings.TrimSpace(inv.Name) != "" { byName[strings.ToLower(strings.TrimSpace(inv.Name))] = inv.DeletionTime.Int64 } } for k, v := range agg { if k.Vcenter != vcenter { continue } if ts, ok := byID[k.VmId]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } continue } if ts, ok := byUUID[k.VmUuid]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } continue } if ts, ok := byName[strings.ToLower(k.Name)]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } } } } return totalApplied } func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { agg := make(map[dailyAggKey]*dailyAggVal, 1024) mu := sync.Mutex{} workers := runtime.NumCPU() if workers < 2 { workers = 2 } if workers > len(snapshots) { workers = len(snapshots) } jobs := make(chan report.SnapshotRecord, len(snapshots)) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() for snap := range jobs { rows, err := c.scanHourlyTable(ctx, snap) if err != nil { c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err) continue } mu.Lock() for k, v := range rows { if existing, ok := agg[k]; ok { mergeDailyAgg(existing, v) } else { agg[k] = v } } mu.Unlock() } }() } for _, snap := range snapshots { jobs <- snap } close(jobs) wg.Wait() return agg, nil } func mergeDailyAgg(dst, src *dailyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) { dst.firstSeen = src.firstSeen } if src.lastSeen > dst.lastSeen { dst.lastSeen = src.lastSeen dst.resourcePool = src.resourcePool dst.datacenter = src.datacenter dst.cluster = src.cluster dst.folder = src.folder dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder dst.lastDisk = src.lastDisk dst.lastVcpu = src.lastVcpu dst.lastRam = src.lastRam } dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam dst.sumDisk += src.sumDisk dst.samples += src.samples dst.tinHits += src.tinHits dst.bronzeHits += src.bronzeHits dst.silverHits += src.silverHits dst.goldHits += src.goldHits if dst.seen == nil { dst.seen = make(map[int64]struct{}, len(src.seen)) } for t := range src.seen { dst.seen[t] = struct{}{} } } func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { dbConn := c.Database.DB() query := fmt.Sprintf(` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'FALSE') AS is_template, COALESCE("PoweredOn",'FALSE') AS powered_on, COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder, COALESCE("SnapshotTime",0) AS snapshot_time FROM %s `, snap.TableName) rows, err := dbConn.QueryxContext(ctx, query) if err != nil { return nil, err } defer rows.Close() out := make(map[dailyAggKey]*dailyAggVal, 256) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } // Skip templates. if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: int64OrZero(snapshotTime), lastSeen: int64OrZero(snapshotTime), lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{int64OrZero(snapshotTime): {}}, } if deletion.Valid && deletion.Int64 > 0 { row.deletion = deletion.Int64 } out[key] = row } return out, nil } // scanHourlyCache aggregates directly from vm_hourly_stats when available. func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) { dbConn := c.Database.DB() query := ` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'') AS is_template, COALESCE("PoweredOn",'') AS powered_on, COALESCE("SrmPlaceholder",'') AS srm_placeholder, "SnapshotTime" FROM vm_hourly_stats WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?` q := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix()) if err != nil { return nil, nil, err } defer rows.Close() agg := make(map[dailyAggKey]*dailyAggVal, 512) timeSet := make(map[int64]struct{}, 64) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") snapTs := int64OrZero(snapshotTime) timeSet[snapTs] = struct{}{} row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: snapTs, lastSeen: snapTs, lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{snapTs: {}}, } if deletion.Valid && deletion.Int64 > 0 { row.deletion = deletion.Int64 } if existing, ok := agg[key]; ok { mergeDailyAgg(existing, row) } else { agg[key] = row } } snapTimes := make([]int64, 0, len(timeSet)) for t := range timeSet { snapTimes = append(snapTimes, t) } sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) return agg, snapTimes, rows.Err() } func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() tx, err := dbConn.Beginx() if err != nil { return err } defer tx.Rollback() driver := strings.ToLower(dbConn.DriverName()) placeholders := makePlaceholders(driver, 30) insert := fmt.Sprintf(` INSERT INTO %s ( "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", "ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder", "CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk", "AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold" ) VALUES (%s) `, table, placeholders) for _, v := range agg { if v.samples == 0 { continue } vcTotal := totalSamplesByVcenter[v.key.Vcenter] if vcTotal <= 0 { vcTotal = totalSamples } total := float64(vcTotal) avgVcpu := 0.0 avgRam := 0.0 avgDisk := 0.0 avgPresent := 0.0 tinPct := 0.0 bronzePct := 0.0 silverPct := 0.0 goldPct := 0.0 if total > 0 { avgPresent = float64(v.samples) / total avgVcpu = float64(v.sumVcpu) / total avgRam = float64(v.sumRam) / total avgDisk = v.sumDisk / total } if v.samples > 0 { tinPct = float64(v.tinHits) * 100 / float64(v.samples) bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples) silverPct = float64(v.silverHits) * 100 / float64(v.samples) goldPct = float64(v.goldHits) * 100 / float64(v.samples) } args := []interface{}{ v.key.Name, v.key.Vcenter, nullIfEmpty(v.key.VmId), nullIfEmpty(v.key.VmUuid), v.resourcePool, nullIfEmpty(v.datacenter), nullIfEmpty(v.cluster), nullIfEmpty(v.folder), v.lastDisk, v.lastVcpu, v.lastRam, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.creation, v.deletion, v.lastSeen, v.samples, avgVcpu, avgRam, avgDisk, avgPresent, tinPct, bronzePct, silverPct, goldPct, tinPct, bronzePct, silverPct, goldPct, } if driver != "sqlite" { // Postgres expects primitive types, nulls are handled by pq via nil. for i, a := range args { if s, ok := a.(string); ok && s == "" { args[i] = nil } } } if _, err := tx.ExecContext(ctx, insert, args...); err != nil { return err } } return tx.Commit() } func int64OrZero(v sql.NullInt64) int64 { if v.Valid { return v.Int64 } return 0 } func nullIfEmpty(s string) interface{} { if strings.TrimSpace(s) == "" { return nil } return s } func makePlaceholders(driver string, n int) string { if driver == "sqlite" { parts := make([]string, n) for i := 0; i < n; i++ { parts[i] = "?" } return strings.Join(parts, ",") } parts := make([]string, n) for i := 0; i < n; i++ { parts[i] = fmt.Sprintf("$%d", i+1) } return strings.Join(parts, ",") } func btoi(b bool) int64 { if b { return 1 } return 0 } // persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation. func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() for _, v := range agg { if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" { continue } vcTotal := totalSamplesByVcenter[v.key.Vcenter] if vcTotal <= 0 { vcTotal = totalSamples } row := db.VmDailyRollupRow{ Vcenter: v.key.Vcenter, VmId: v.key.VmId, VmUuid: v.key.VmUuid, Name: v.key.Name, CreationTime: v.creation, DeletionTime: v.deletion, SamplesPresent: v.samples, TotalSamples: int64(vcTotal), SumVcpu: float64(v.sumVcpu), SumRam: float64(v.sumRam), SumDisk: v.sumDisk, TinHits: v.tinHits, BronzeHits: v.bronzeHits, SilverHits: v.silverHits, GoldHits: v.goldHits, LastResourcePool: v.resourcePool, LastDatacenter: v.datacenter, LastCluster: v.cluster, LastFolder: v.folder, LastProvisionedDisk: v.lastDisk, LastVcpuCount: v.lastVcpu, LastRamGB: v.lastRam, IsTemplate: v.isTemplate, PoweredOn: v.poweredOn, SrmPlaceholder: v.srmPlaceholder, } if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil { return err } } return nil } func sampleCountsByVcenter(agg map[dailyAggKey]*dailyAggVal) map[string]int { vcenterTimes := make(map[string]map[int64]struct{}, 8) for _, v := range agg { if v.key.Vcenter == "" { continue } set := vcenterTimes[v.key.Vcenter] if set == nil { set = make(map[int64]struct{}, len(v.seen)) vcenterTimes[v.key.Vcenter] = set } for t := range v.seen { set[t] = struct{}{} } } counts := make(map[string]int, len(vcenterTimes)) for vc, set := range vcenterTimes { counts[vc] = len(set) } return counts }