package tasks import ( "context" "database/sql" "fmt" "log/slog" "os" "runtime" "sort" "strings" "sync" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error { startedAt := time.Now() defer func() { logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) }() targetTime := time.Now().Add(-time.Minute) // Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier. return c.aggregateDailySummary(jobCtx, targetTime, true) }) } func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { return c.aggregateDailySummary(ctx, date, force) } func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error { jobStart := time.Now() dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) dayEnd := dayStart.AddDate(0, 0, 1) summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err } dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { return err } if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } // If enabled, use the Go fan-out/reduce path to parallelize aggregation. if os.Getenv("DAILY_AGG_GO") == "1" { c.Logger.Debug("Using go implementation of aggregation") if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil { c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordDailyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable) return nil } } hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) // Ensure indexes exist on historical hourly tables for faster aggregation. if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot totals", "date", dayStart.Format("2006-01-02"), "vm_count", currentTotals.VmCount, "vcpu_total", currentTotals.VcpuTotal, "ram_total_gb", currentTotals.RamTotal, "disk_total_gb", currentTotals.DiskTotal, ) } prevStart := dayStart.AddDate(0, 0, -1) prevEnd := dayStart prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd) if err == nil && len(prevSnapshots) > 0 { prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd) prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) prevTables := make([]string, 0, len(prevSnapshots)) for _, snapshot := range prevSnapshots { prevTables = append(prevTables, snapshot.TableName) } prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) if err == nil { prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) if err != nil { c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot comparison", "current_date", dayStart.Format("2006-01-02"), "previous_date", prevStart.Format("2006-01-02"), "vm_delta", currentTotals.VmCount-prevTotals.VmCount, "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, ) } } else { c.Logger.Warn("unable to build previous day union", "error", err) } } insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } // Backfill missing creation times to the start of the day for rows where vCenter had no creation info. if _, err := dbConn.ExecContext(ctx, `UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, dayStart.Unix(), ); err != nil { c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), nil) return nil } func dailySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) } // aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel, // reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path // as closely as possible while improving CPU utilization on multi-core hosts. func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error { jobStart := time.Now() dbConn := c.Database.DB() hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } // Clear existing summary if forcing. if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } totalSamples := len(hourlyTables) var ( aggMap map[dailyAggKey]*dailyAggVal snapTimes []int64 ) if db.TableExists(ctx, dbConn, "vm_hourly_stats") { cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd) if cacheErr != nil { c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) } else if len(cacheAgg) > 0 { aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) } } if aggMap == nil { var errScan error aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) if errScan != nil { return errScan } if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } // Build ordered list of snapshot times for deletion inference. snapTimes = make([]int64, 0, len(hourlySnapshots)) for _, snap := range hourlySnapshots { snapTimes = append(snapTimes, snap.SnapshotTime.Unix()) } sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) } for _, v := range aggMap { if v.creation == 0 { v.creation = v.firstSeen } // Infer deletion as the first snapshot time after lastSeen where the VM is absent. for _, t := range snapTimes { if t <= v.lastSeen { continue } if _, ok := v.seen[t]; !ok { v.deletion = t break } } } // Insert aggregated rows. if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil { return err } // Persist rollup cache for monthly aggregation. if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil { c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02")) } // Refine lifecycle with existing SQL helper to pick up first-after deletions. if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } if err := c.generateReport(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) return err } c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) return nil } type dailyAggKey struct { Vcenter string VmId string VmUuid string Name string } type dailyAggVal struct { key dailyAggKey resourcePool string datacenter string cluster string folder string isTemplate string poweredOn string srmPlaceholder string creation int64 firstSeen int64 lastSeen int64 lastDisk float64 lastVcpu int64 lastRam int64 sumVcpu int64 sumRam int64 sumDisk float64 samples int64 tinHits int64 bronzeHits int64 silverHits int64 goldHits int64 seen map[int64]struct{} deletion int64 } func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) { agg := make(map[dailyAggKey]*dailyAggVal, 1024) mu := sync.Mutex{} workers := runtime.NumCPU() if workers < 2 { workers = 2 } if workers > len(snapshots) { workers = len(snapshots) } jobs := make(chan report.SnapshotRecord, len(snapshots)) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() for snap := range jobs { rows, err := c.scanHourlyTable(ctx, snap) if err != nil { c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err) continue } mu.Lock() for k, v := range rows { if existing, ok := agg[k]; ok { mergeDailyAgg(existing, v) } else { agg[k] = v } } mu.Unlock() } }() } for _, snap := range snapshots { jobs <- snap } close(jobs) wg.Wait() return agg, nil } func mergeDailyAgg(dst, src *dailyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) { dst.firstSeen = src.firstSeen } if src.lastSeen > dst.lastSeen { dst.lastSeen = src.lastSeen dst.resourcePool = src.resourcePool dst.datacenter = src.datacenter dst.cluster = src.cluster dst.folder = src.folder dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder dst.lastDisk = src.lastDisk dst.lastVcpu = src.lastVcpu dst.lastRam = src.lastRam } dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam dst.sumDisk += src.sumDisk dst.samples += src.samples dst.tinHits += src.tinHits dst.bronzeHits += src.bronzeHits dst.silverHits += src.silverHits dst.goldHits += src.goldHits if dst.seen == nil { dst.seen = make(map[int64]struct{}, len(src.seen)) } for t := range src.seen { dst.seen[t] = struct{}{} } } func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { dbConn := c.Database.DB() query := fmt.Sprintf(` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'FALSE') AS is_template, COALESCE("PoweredOn",'FALSE') AS powered_on, COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder, COALESCE("SnapshotTime",0) AS snapshot_time FROM %s `, snap.TableName) rows, err := dbConn.QueryxContext(ctx, query) if err != nil { return nil, err } defer rows.Close() out := make(map[dailyAggKey]*dailyAggVal, 256) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } // Skip templates. if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: int64OrZero(snapshotTime), lastSeen: int64OrZero(snapshotTime), lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{int64OrZero(snapshotTime): {}}, } out[key] = row } return out, nil } // scanHourlyCache aggregates directly from vm_hourly_stats when available. func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) { dbConn := c.Database.DB() query := ` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'') AS is_template, COALESCE("PoweredOn",'') AS powered_on, COALESCE("SrmPlaceholder",'') AS srm_placeholder, "SnapshotTime" FROM vm_hourly_stats WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?` q := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix()) if err != nil { return nil, nil, err } defer rows.Close() agg := make(map[dailyAggKey]*dailyAggVal, 512) timeSet := make(map[int64]struct{}, 64) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") snapTs := int64OrZero(snapshotTime) timeSet[snapTs] = struct{}{} row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: snapTs, lastSeen: snapTs, lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{snapTs: {}}, } if deletion.Valid && deletion.Int64 > 0 { row.deletion = deletion.Int64 } if existing, ok := agg[key]; ok { mergeDailyAgg(existing, row) } else { agg[key] = row } } snapTimes := make([]int64, 0, len(timeSet)) for t := range timeSet { snapTimes = append(snapTimes, t) } sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) return agg, snapTimes, rows.Err() } func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { dbConn := c.Database.DB() tx, err := dbConn.Beginx() if err != nil { return err } defer tx.Rollback() driver := strings.ToLower(dbConn.DriverName()) placeholders := makePlaceholders(driver, 29) insert := fmt.Sprintf(` INSERT INTO %s ( "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", "ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder", "CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk", "AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold" ) VALUES (%s) `, table, placeholders) for _, v := range agg { if v.samples == 0 { continue } avgVcpu := float64(v.sumVcpu) / float64(v.samples) avgRam := float64(v.sumRam) / float64(v.samples) avgDisk := v.sumDisk / float64(v.samples) total := float64(totalSamples) avgPresent := 0.0 tinPct := 0.0 bronzePct := 0.0 silverPct := 0.0 goldPct := 0.0 if total > 0 { avgPresent = float64(v.samples) / total } if v.samples > 0 { tinPct = float64(v.tinHits) * 100 / float64(v.samples) bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples) silverPct = float64(v.silverHits) * 100 / float64(v.samples) goldPct = float64(v.goldHits) * 100 / float64(v.samples) } args := []interface{}{ v.key.Name, v.key.Vcenter, nullIfEmpty(v.key.VmId), nullIfEmpty(v.key.VmUuid), v.resourcePool, nullIfEmpty(v.datacenter), nullIfEmpty(v.cluster), nullIfEmpty(v.folder), v.lastDisk, v.lastVcpu, v.lastRam, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.creation, v.deletion, v.samples, avgVcpu, avgRam, avgDisk, avgPresent, tinPct, bronzePct, silverPct, goldPct, float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits), } if driver != "sqlite" { // Postgres expects primitive types, nulls are handled by pq via nil. for i, a := range args { if s, ok := a.(string); ok && s == "" { args[i] = nil } } } if _, err := tx.ExecContext(ctx, insert, args...); err != nil { return err } } return tx.Commit() } func int64OrZero(v sql.NullInt64) int64 { if v.Valid { return v.Int64 } return 0 } func nullIfEmpty(s string) interface{} { if strings.TrimSpace(s) == "" { return nil } return s } func makePlaceholders(driver string, n int) string { if driver == "sqlite" { parts := make([]string, n) for i := 0; i < n; i++ { parts[i] = "?" } return strings.Join(parts, ",") } parts := make([]string, n) for i := 0; i < n; i++ { parts[i] = fmt.Sprintf("$%d", i+1) } return strings.Join(parts, ",") } func btoi(b bool) int64 { if b { return 1 } return 0 } // persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation. func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error { dbConn := c.Database.DB() for _, v := range agg { if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" { continue } row := db.VmDailyRollupRow{ Vcenter: v.key.Vcenter, VmId: v.key.VmId, VmUuid: v.key.VmUuid, Name: v.key.Name, CreationTime: v.creation, DeletionTime: v.deletion, SamplesPresent: v.samples, TotalSamples: int64(totalSamples), SumVcpu: float64(v.sumVcpu), SumRam: float64(v.sumRam), SumDisk: v.sumDisk, TinHits: v.tinHits, BronzeHits: v.bronzeHits, SilverHits: v.silverHits, GoldHits: v.goldHits, LastResourcePool: v.resourcePool, LastDatacenter: v.datacenter, LastCluster: v.cluster, LastFolder: v.folder, LastProvisionedDisk: v.lastDisk, LastVcpuCount: v.lastVcpu, LastRamGB: v.lastRam, IsTemplate: v.isTemplate, PoweredOn: v.poweredOn, SrmPlaceholder: v.srmPlaceholder, } if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil { return err } } return nil }