package tasks import ( "context" "database/sql" "errors" "fmt" "log/slog" "os" "runtime" "slices" "strings" "sync" "time" "vctp/db" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/settings" "github.com/jmoiron/sqlx" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) { jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute) return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error { if err := c.Settings.ReadYMLSettings(); err != nil { return err } jobCtx = settings.MarkReloadedInContext(jobCtx, c.Settings) startedAt := time.Now() defer func() { logger.Info("Daily summary job finished", "duration", time.Since(startedAt)) }() // Aggregate the previous day to avoid partial "today" data when the job runs just after midnight. targetTime := time.Now().AddDate(0, 0, -1) logger.Info("Daily summary job starting", "target_date", targetTime.Format("2006-01-02")) // Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier. return c.aggregateDailySummaryWithMode(jobCtx, targetTime, true, true) }) } func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error { return c.aggregateDailySummaryWithMode(ctx, date, force, false) } func (c *CronTask) aggregateDailySummaryWithMode(ctx context.Context, targetTime time.Time, force bool, scheduled bool) error { jobStart := time.Now() dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) dayEnd := dayStart.AddDate(0, 0, 1) c.Logger.Info("Daily aggregation window", "start", dayStart, "end", dayEnd) summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err } dbConn := c.Database.DB() db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB) if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil { return err } if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } if scheduled && c.scheduledAggregationEngine() == "sql" { c.Logger.Info("scheduled_aggregation_engine=sql enabled; using canonical SQL daily aggregation path") if err := c.aggregateDailySummarySQLCanonical(ctx, dayStart, dayEnd, summaryTable); err != nil { c.Logger.Warn("scheduled canonical SQL daily aggregation failed; falling back to go path", "error", err) } else { metrics.RecordDailyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished daily inventory aggregation (SQL canonical path)", "summary_table", summaryTable) return nil } } // Canonical Go aggregation is the default for both scheduled and manual runs. // Legacy SQL/union aggregation stays available as a manual fallback/backfill path. forceGoAgg := os.Getenv("DAILY_AGG_GO") == "1" forceSQLAgg := !scheduled && os.Getenv("DAILY_AGG_SQL") == "1" useGoAgg := scheduled || forceGoAgg || !forceSQLAgg if forceSQLAgg && !forceGoAgg { c.Logger.Info("DAILY_AGG_SQL=1 enabled; using SQL fallback path for manual daily aggregation") } if useGoAgg { c.Logger.Debug("Using go implementation of aggregation") if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force, scheduled); err != nil { if scheduled { return err } c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordDailyAggregation(time.Since(jobStart), nil) c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable) return nil } } hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) c.Logger.Info("Daily aggregation hourly snapshot count", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02")) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery) if err != nil { c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot totals", "date", dayStart.Format("2006-01-02"), "vm_count", currentTotals.VmCount, "vcpu_total", currentTotals.VcpuTotal, "ram_total_gb", currentTotals.RamTotal, "disk_total_gb", currentTotals.DiskTotal, ) } prevStart := dayStart.AddDate(0, 0, -1) prevEnd := dayStart prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd) if err == nil && len(prevSnapshots) > 0 { prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd) prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots) prevTables := make([]string, 0, len(prevSnapshots)) for _, snapshot := range prevSnapshots { prevTables = append(prevTables, snapshot.TableName) } prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter()) if err == nil { prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion) if err != nil { c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot comparison", "current_date", dayStart.Format("2006-01-02"), "previous_date", prevStart.Format("2006-01-02"), "vm_delta", currentTotals.VmCount-prevTotals.VmCount, "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, "disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal, ) } } else { c.Logger.Warn("unable to build previous day union", "error", err) } } insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to daily summary", "error", err, "table", summaryTable) } else { c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied) } if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil { c.Logger.Warn("failed to apply lifecycle creations to daily summary", "error", err, "table", summaryTable) } else { c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window", "error", err, "table", summaryTable) } analyzeStart := time.Now() c.Logger.Debug("Analyzing daily summary table", "table", summaryTable) db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) c.Logger.Debug("Analyzed daily summary table", "table", summaryTable, "duration", time.Since(analyzeStart)) rowCountStart := time.Now() c.Logger.Debug("Counting daily summary rows", "table", summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } c.Logger.Debug("Counted daily summary rows", "table", summaryTable, "rows", rowCount, "duration", time.Since(rowCountStart)) logMissingCreationSummary(ctx, c.Logger, c.Database, summaryTable, rowCount) registerStart := time.Now() c.Logger.Debug("Registering daily snapshot", "table", summaryTable, "date", dayStart.Format("2006-01-02"), "rows", rowCount) if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } else { c.Logger.Debug("Registered daily snapshot", "table", summaryTable, "duration", time.Since(registerStart)) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "daily", dayStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter daily aggregate totals cache", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter daily aggregate totals cache", "table", summaryTable, "rows", refreshed) } reportStart := time.Now() c.Logger.Debug("Generating daily report", "table", summaryTable) if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), err) return err } c.Logger.Debug("Generated daily report", "table", summaryTable, "duration", time.Since(reportStart)) checkpointStart := time.Now() driver := strings.ToLower(dbConn.DriverName()) c.Logger.Debug("Running database checkpoint after daily aggregation", "table", summaryTable, "driver", driver) action, err := db.CheckpointDatabase(ctx, dbConn) if err != nil { c.Logger.Warn("failed to run database checkpoint after daily aggregation", "driver", driver, "action", action, "error", err) } else { c.Logger.Debug("Completed database checkpoint after daily aggregation", "table", summaryTable, "driver", driver, "action", action, "duration", time.Since(checkpointStart)) } c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), nil) return nil } func dailySummaryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) } func (c *CronTask) aggregateDailySummarySQLCanonical(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string) error { jobStart := time.Now() dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_hourly_stats") { return fmt.Errorf("vm_hourly_stats table not found for canonical SQL daily aggregation") } unionQuery := buildCanonicalHourlySummaryUnion(dayStart, dayEnd) insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery) if err != nil { return err } if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { return err } if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { c.Logger.Warn("failed to apply lifecycle deletions to daily summary (SQL canonical)", "error", err, "table", summaryTable) } else { c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied) } if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil { c.Logger.Warn("failed to apply lifecycle creations to daily summary (SQL canonical)", "error", err, "table", summaryTable) } else { c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied) } if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, buildHourlyCacheLifecycleUnion(dayStart, dayEnd)); err != nil { c.Logger.Warn("failed to refine creation/deletion times (SQL canonical)", "error", err, "table", summaryTable) } if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window (SQL canonical)", "error", err, "table", summaryTable) } db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows (SQL canonical)", "error", err, "table", summaryTable) } if rowCount == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } logMissingCreationSummary(ctx, c.Logger, c.Database, summaryTable, rowCount) if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot (SQL canonical)", "error", err, "table", summaryTable) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "daily", dayStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter daily aggregate totals cache (SQL canonical)", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter daily aggregate totals cache", "table", summaryTable, "rows", refreshed) } if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report (SQL canonical)", "error", err, "table", summaryTable) return err } driver := strings.ToLower(dbConn.DriverName()) action, checkpointErr := db.CheckpointDatabase(ctx, dbConn) if checkpointErr != nil { c.Logger.Warn("failed to run database checkpoint after daily aggregation (SQL canonical)", "driver", driver, "action", action, "error", checkpointErr) } c.Logger.Debug("Finished daily inventory aggregation (SQL canonical path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) return nil } func buildCanonicalHourlySummaryUnion(start, end time.Time) string { return fmt.Sprintf(` SELECT NULL AS "InventoryId", COALESCE("Name",'') AS "Name", COALESCE("Vcenter",'') AS "Vcenter", COALESCE("VmId",'') AS "VmId", NULL AS "EventKey", NULL AS "CloudId", COALESCE("CreationTime",0) AS "CreationTime", COALESCE("DeletionTime",0) AS "DeletionTime", COALESCE("ResourcePool",'') AS "ResourcePool", COALESCE("Datacenter",'') AS "Datacenter", COALESCE("Cluster",'') AS "Cluster", COALESCE("Folder",'') AS "Folder", COALESCE("ProvisionedDisk",0) AS "ProvisionedDisk", COALESCE("VcpuCount",0) AS "VcpuCount", COALESCE("RamGB",0) AS "RamGB", COALESCE("IsTemplate",'') AS "IsTemplate", COALESCE("PoweredOn",'') AS "PoweredOn", COALESCE("SrmPlaceholder",'') AS "SrmPlaceholder", COALESCE("VmUuid",'') AS "VmUuid", "SnapshotTime" FROM vm_hourly_stats WHERE "SnapshotTime" >= %d AND "SnapshotTime" < %d AND %s `, start.Unix(), end.Unix(), templateExclusionFilter()) } // aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel, // reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path // as closely as possible while improving CPU utilization on multi-core hosts. func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool, canonicalOnly bool) error { jobStart := time.Now() dbConn := c.Database.DB() hourlyTables := make([]string, 0, 64) unionQuery := "" // Clear existing summary if forcing. if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil { return err } else if rowsExist && !force { c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable) return nil } else if rowsExist && force { if err := clearTable(ctx, dbConn, summaryTable); err != nil { return err } } totalSamples := 0 var ( aggMap map[dailyAggKey]*dailyAggVal snapTimes []int64 ) if canonicalOnly { if !db.TableExists(ctx, dbConn, "vm_hourly_stats") { return fmt.Errorf("vm_hourly_stats table not found for canonical daily aggregation") } cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd) if cacheErr != nil { return cacheErr } if len(cacheAgg) == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } c.Logger.Debug("using canonical hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg)) aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) unionQuery = buildHourlyCacheLifecycleUnion(dayStart, dayEnd) } else { hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd) if err != nil { return err } hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd) hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) c.Logger.Info("Daily aggregation hourly snapshot count (go path)", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02")) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) } for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) } unionQuery, err = buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err } totalSamples = len(hourlyTables) if db.TableExists(ctx, dbConn, "vm_hourly_stats") { cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd) if cacheErr != nil { c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) } else if len(cacheAgg) > 0 { c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg)) aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) } } if aggMap == nil { var errScan error aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots) if errScan != nil { return errScan } c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap)) if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } // Build ordered list of snapshot times for deletion inference. snapTimes = make([]int64, 0, len(hourlySnapshots)) for _, snap := range hourlySnapshots { snapTimes = append(snapTimes, snap.SnapshotTime.Unix()) } slices.Sort(snapTimes) } } lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, dayStart, dayEnd) c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions) inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd) c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions) lifecycleCreations := c.applyLifecycleCreations(ctx, aggMap) c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", lifecycleCreations) inventoryCreations := c.applyInventoryCreations(ctx, aggMap) c.Logger.Info("Daily aggregation creation times", "source_inventory", inventoryCreations) // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. var ( nextSnapshotTable string nextSnapshotTime int64 ) nextPresenceByVcenter := make(map[string]map[string]struct{}, 8) if canonicalOnly { presence, snapshotTime, err := loadNextHourlyCachePresence(ctx, dbConn, dayEnd) if err != nil { c.Logger.Warn("failed to load next-hourly presence from canonical cache", "error", err) } else { nextPresenceByVcenter = presence nextSnapshotTime = snapshotTime } } else { nextSnapshotQuery := dbConn.Rebind(` SELECT table_name FROM snapshot_registry WHERE snapshot_type = 'hourly' AND snapshot_time >= ? ORDER BY snapshot_time ASC LIMIT 1 `) nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, nextSnapshotQuery, dayEnd.Unix()) if nextErr == nil { if nextSnapshotRows.Next() { if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil { nextSnapshotTable = "" } } nextSnapshotRows.Close() } } // Build per-vCenter snapshot timelines from observed VM samples so deletion // inference is only based on times where that vCenter actually reported data. vcenterTimeSet := make(map[string]map[int64]struct{}, 8) for _, v := range aggMap { if v.key.Vcenter == "" { continue } set := vcenterTimeSet[v.key.Vcenter] if set == nil { set = make(map[int64]struct{}, len(v.seen)) vcenterTimeSet[v.key.Vcenter] = set } for t := range v.seen { if t > 0 { set[t] = struct{}{} } } } vcenterSnapTimes := make(map[string][]int64, len(vcenterTimeSet)) for vcenter, set := range vcenterTimeSet { times := make([]int64, 0, len(set)) for t := range set { times = append(times, t) } slices.Sort(times) vcenterSnapTimes[vcenter] = times } if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) { rows, err := querySnapshotRows(ctx, dbConn, nextSnapshotTable, []string{"Vcenter", "VmId", "VmUuid", "Name"}, "") if err == nil { for rows.Next() { var vcenter string var vmId, vmUuid, name sql.NullString if err := rows.Scan(&vcenter, &vmId, &vmUuid, &name); err == nil { if strings.TrimSpace(vcenter) == "" { continue } vcPresence := nextPresenceByVcenter[vcenter] if vcPresence == nil { vcPresence = make(map[string]struct{}, 1024) nextPresenceByVcenter[vcenter] = vcPresence } if vmId.Valid && strings.TrimSpace(vmId.String) != "" { vcPresence["id:"+strings.TrimSpace(vmId.String)] = struct{}{} } if vmUuid.Valid && strings.TrimSpace(vmUuid.String) != "" { vcPresence["uuid:"+strings.TrimSpace(vmUuid.String)] = struct{}{} } if name.Valid && strings.TrimSpace(name.String) != "" { vcPresence["name:"+strings.ToLower(strings.TrimSpace(name.String))] = struct{}{} } } } rows.Close() } } inferredDeletions := 0 for _, v := range aggMap { if v.deletion != 0 { continue } vcSnapTimes := vcenterSnapTimes[v.key.Vcenter] // Deletion inference needs meaningful per-vCenter continuity. if len(vcSnapTimes) < 3 { continue } vcMaxSnap := vcSnapTimes[len(vcSnapTimes)-1] // Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen. if vcMaxSnap > 0 && len(v.seen) > 0 && v.lastSeen < vcMaxSnap { c.Logger.Debug("inferring deletion window", "vcenter", v.key.Vcenter, "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(vcSnapTimes)) } consecutiveMisses := 0 firstMiss := int64(0) for _, t := range vcSnapTimes { if t <= v.lastSeen { continue } if _, ok := v.seen[t]; ok { consecutiveMisses = 0 firstMiss = 0 continue } consecutiveMisses++ if firstMiss == 0 { firstMiss = t } if consecutiveMisses >= 2 { v.deletion = firstMiss inferredDeletions++ break } } if v.deletion == 0 && firstMiss > 0 { // Not enough consecutive misses within the day; try to use the first snapshot of the next day to confirm. nextPresence := nextPresenceByVcenter[v.key.Vcenter] if nextSnapshotTable != "" && len(nextPresence) > 0 { _, presentByID := nextPresence["id:"+strings.TrimSpace(v.key.VmId)] _, presentByUUID := nextPresence["uuid:"+strings.TrimSpace(v.key.VmUuid)] _, presentByName := nextPresence["name:"+strings.ToLower(strings.TrimSpace(v.key.Name))] if !presentByID && !presentByUUID && !presentByName { v.deletion = firstMiss inferredDeletions++ c.Logger.Debug("cross-day deletion inferred from next snapshot", "vcenter", v.key.Vcenter, "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable, "next_snapshot_time", nextSnapshotTime) } } if v.deletion == 0 { c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vcenter", v.key.Vcenter, "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss) } } } c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions) totalSamplesByVcenter := sampleCountsByVcenter(aggMap) // Insert aggregated rows. if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil { return err } c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples) // Persist rollup cache for monthly aggregation. if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples, totalSamplesByVcenter); err != nil { c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Debug("persisted daily rollup cache", "date", dayStart.Format("2006-01-02")) } // Refine lifecycle with existing SQL helper to pick up first-after deletions. refineStart := time.Now() c.Logger.Debug("Refining creation/deletion times", "table", summaryTable) refineCtx, cancelRefine := context.WithTimeout(ctx, 2*time.Minute) defer cancelRefine() if err := db.RefineCreationDeletionFromUnion(refineCtx, dbConn, summaryTable, unionQuery); err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(refineCtx.Err(), context.DeadlineExceeded) { c.Logger.Warn("timed out refining creation/deletion times; continuing", "table", summaryTable, "timeout", "2m") } else { c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable) } } else { c.Logger.Debug("Refined creation/deletion times", "table", summaryTable, "duration", time.Since(refineStart)) } presenceStart := time.Now() c.Logger.Debug("Updating daily AvgIsPresent from lifecycle window", "table", summaryTable) presenceCtx, cancelPresence := context.WithTimeout(ctx, 2*time.Minute) defer cancelPresence() if err := db.UpdateSummaryPresenceByWindow(presenceCtx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(presenceCtx.Err(), context.DeadlineExceeded) { c.Logger.Warn("timed out updating daily AvgIsPresent from lifecycle window; continuing", "table", summaryTable, "timeout", "2m") } else { c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window (Go path)", "error", err, "table", summaryTable) } } else { c.Logger.Debug("Updated daily AvgIsPresent from lifecycle window", "table", summaryTable, "duration", time.Since(presenceStart)) } analyzeStart := time.Now() c.Logger.Debug("Analyzing daily summary table", "table", summaryTable) db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable) c.Logger.Debug("Analyzed daily summary table", "table", summaryTable, "duration", time.Since(analyzeStart)) rowCountStart := time.Now() c.Logger.Debug("Counting daily summary rows", "table", summaryTable) rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable) if err != nil { c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable) } c.Logger.Debug("Counted daily summary rows", "table", summaryTable, "rows", rowCount, "duration", time.Since(rowCountStart)) logMissingCreationSummary(ctx, c.Logger, c.Database, summaryTable, rowCount) registerStart := time.Now() c.Logger.Debug("Registering daily snapshot", "table", summaryTable, "date", dayStart.Format("2006-01-02"), "rows", rowCount) if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } else { c.Logger.Debug("Registered daily snapshot", "table", summaryTable, "duration", time.Since(registerStart)) } if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "daily", dayStart.Unix()); err != nil { c.Logger.Warn("failed to refresh vcenter daily aggregate totals cache", "error", err, "table", summaryTable) } else { c.Logger.Debug("refreshed vcenter daily aggregate totals cache", "table", summaryTable, "rows", refreshed) } reportStart := time.Now() c.Logger.Debug("Generating daily report", "table", summaryTable) if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil { c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) return err } c.Logger.Debug("Generated daily report", "table", summaryTable, "duration", time.Since(reportStart)) checkpointStart := time.Now() driver := strings.ToLower(dbConn.DriverName()) c.Logger.Debug("Running database checkpoint after daily aggregation", "table", summaryTable, "driver", driver) action, err := db.CheckpointDatabase(ctx, dbConn) if err != nil { c.Logger.Warn("failed to run database checkpoint after daily aggregation (Go path)", "driver", driver, "action", action, "error", err) } else { c.Logger.Debug("Completed database checkpoint after daily aggregation", "table", summaryTable, "driver", driver, "action", action, "duration", time.Since(checkpointStart)) } c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart), "tables_scanned", len(hourlyTables), "rows_written", rowCount, "total_samples", totalSamples, ) return nil } func (c *CronTask) applyLifecycleDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") { return 0 } type aggIndex struct { byID map[string]*dailyAggVal byUUID map[string]*dailyAggVal byName map[string]*dailyAggVal } indexes := make(map[string]*aggIndex, 8) for k, v := range agg { if k.Vcenter == "" { continue } idx := indexes[k.Vcenter] if idx == nil { idx = &aggIndex{ byID: make(map[string]*dailyAggVal), byUUID: make(map[string]*dailyAggVal), byName: make(map[string]*dailyAggVal), } indexes[k.Vcenter] = idx } if k.VmId != "" { idx.byID[k.VmId] = v } if k.VmUuid != "" { idx.byUUID[k.VmUuid] = v } if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" { idx.byName[name] = v } } totalApplied := 0 for vcenter, idx := range indexes { query := ` SELECT "VmId","VmUuid","Name","DeletedAt" FROM vm_lifecycle_cache WHERE "Vcenter" = ? AND "DeletedAt" IS NOT NULL AND "DeletedAt" > 0 AND "DeletedAt" >= ? AND "DeletedAt" < ? ` bind := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, bind, vcenter, start.Unix(), end.Unix()) if err != nil { c.Logger.Warn("failed to load lifecycle deletions", "vcenter", vcenter, "error", err) continue } scanned := 0 applied := 0 missed := 0 for rows.Next() { scanned++ var vmId, vmUuid, name sql.NullString var deletedAt sql.NullInt64 if err := rows.Scan(&vmId, &vmUuid, &name, &deletedAt); err != nil { c.Logger.Warn("failed to scan lifecycle deletion", "vcenter", vcenter, "error", err) continue } if !deletedAt.Valid || deletedAt.Int64 <= 0 { continue } var target *dailyAggVal if vmId.Valid { target = idx.byID[strings.TrimSpace(vmId.String)] } if target == nil && vmUuid.Valid { target = idx.byUUID[strings.TrimSpace(vmUuid.String)] } if target == nil && name.Valid { target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))] } if target == nil { missed++ continue } if target.deletion == 0 || deletedAt.Int64 < target.deletion { target.deletion = deletedAt.Int64 applied++ } } rows.Close() if err := rows.Err(); err != nil { c.Logger.Warn("failed to read lifecycle deletions", "vcenter", vcenter, "error", err) } c.Logger.Debug("lifecycle cache deletions applied", "vcenter", vcenter, "window_start", start, "window_end", end, "scanned", scanned, "applied", applied, "missed", missed) totalApplied += applied } return totalApplied } func (c *CronTask) applyLifecycleCreations(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int { dbConn := c.Database.DB() if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") { return 0 } type aggIndex struct { byID map[string]*dailyAggVal byUUID map[string]*dailyAggVal byName map[string]*dailyAggVal } indexes := make(map[string]*aggIndex, 8) for k, v := range agg { if k.Vcenter == "" || v.creation > 0 { continue } idx := indexes[k.Vcenter] if idx == nil { idx = &aggIndex{ byID: make(map[string]*dailyAggVal), byUUID: make(map[string]*dailyAggVal), byName: make(map[string]*dailyAggVal), } indexes[k.Vcenter] = idx } if k.VmId != "" { idx.byID[k.VmId] = v } if k.VmUuid != "" { idx.byUUID[k.VmUuid] = v } if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" { idx.byName[name] = v } } totalApplied := 0 for vcenter, idx := range indexes { query := ` SELECT "VmId","VmUuid","Name","FirstSeen" FROM vm_lifecycle_cache WHERE "Vcenter" = ? AND "FirstSeen" IS NOT NULL AND "FirstSeen" > 0 ` bind := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, bind, vcenter) if err != nil { c.Logger.Warn("failed to load lifecycle creations", "vcenter", vcenter, "error", err) continue } scanned := 0 applied := 0 missed := 0 for rows.Next() { scanned++ var vmId, vmUuid, name sql.NullString var firstSeen sql.NullInt64 if err := rows.Scan(&vmId, &vmUuid, &name, &firstSeen); err != nil { c.Logger.Warn("failed to scan lifecycle creation", "vcenter", vcenter, "error", err) continue } if !firstSeen.Valid || firstSeen.Int64 <= 0 { continue } var target *dailyAggVal if vmId.Valid { target = idx.byID[strings.TrimSpace(vmId.String)] } if target == nil && vmUuid.Valid { target = idx.byUUID[strings.TrimSpace(vmUuid.String)] } if target == nil && name.Valid { target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))] } if target == nil { missed++ continue } if target.creation == 0 { target.creation = firstSeen.Int64 applied++ } } rows.Close() if err := rows.Err(); err != nil { c.Logger.Warn("failed to read lifecycle creations", "vcenter", vcenter, "error", err) } c.Logger.Debug("lifecycle cache creations applied", "vcenter", vcenter, "scanned", scanned, "applied", applied, "missed", missed) totalApplied += applied } return totalApplied } func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int { vcenters := make(map[string]struct{}, 8) for k := range agg { if k.Vcenter != "" { vcenters[k.Vcenter] = struct{}{} } } totalApplied := 0 for vcenter := range vcenters { inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, vcenter) if err != nil { c.Logger.Warn("failed to load inventory for daily deletion times", "vcenter", vcenter, "error", err) continue } byID := make(map[string]int64, len(inventoryRows)) byUUID := make(map[string]int64, len(inventoryRows)) byName := make(map[string]int64, len(inventoryRows)) for _, inv := range inventoryRows { if !inv.DeletionTime.Valid || inv.DeletionTime.Int64 <= 0 { continue } if inv.DeletionTime.Int64 < start.Unix() || inv.DeletionTime.Int64 >= end.Unix() { continue } if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { byID[strings.TrimSpace(inv.VmId.String)] = inv.DeletionTime.Int64 } if inv.VmUuid.Valid && strings.TrimSpace(inv.VmUuid.String) != "" { byUUID[strings.TrimSpace(inv.VmUuid.String)] = inv.DeletionTime.Int64 } if strings.TrimSpace(inv.Name) != "" { byName[strings.ToLower(strings.TrimSpace(inv.Name))] = inv.DeletionTime.Int64 } } for k, v := range agg { if k.Vcenter != vcenter { continue } if ts, ok := byID[k.VmId]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } continue } if ts, ok := byUUID[k.VmUuid]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } continue } if ts, ok := byName[strings.ToLower(k.Name)]; ok { if v.deletion == 0 { v.deletion = ts totalApplied++ } } } } return totalApplied } func (c *CronTask) applyInventoryCreations(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int { vcenters := make(map[string]struct{}, 8) for k := range agg { if k.Vcenter != "" { vcenters[k.Vcenter] = struct{}{} } } totalApplied := 0 for vcenter := range vcenters { inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, vcenter) if err != nil { c.Logger.Warn("failed to load inventory for daily creation times", "vcenter", vcenter, "error", err) continue } byID := make(map[string]int64, len(inventoryRows)) byUUID := make(map[string]int64, len(inventoryRows)) byName := make(map[string]int64, len(inventoryRows)) for _, inv := range inventoryRows { if !inv.CreationTime.Valid || inv.CreationTime.Int64 <= 0 { continue } if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" { byID[strings.TrimSpace(inv.VmId.String)] = inv.CreationTime.Int64 } if inv.VmUuid.Valid && strings.TrimSpace(inv.VmUuid.String) != "" { byUUID[strings.TrimSpace(inv.VmUuid.String)] = inv.CreationTime.Int64 } if strings.TrimSpace(inv.Name) != "" { byName[strings.ToLower(strings.TrimSpace(inv.Name))] = inv.CreationTime.Int64 } } for k, v := range agg { if k.Vcenter != vcenter || v.creation > 0 { continue } if ts, ok := byID[k.VmId]; ok { v.creation = ts totalApplied++ continue } if ts, ok := byUUID[k.VmUuid]; ok { v.creation = ts totalApplied++ continue } if ts, ok := byName[strings.ToLower(k.Name)]; ok { v.creation = ts totalApplied++ } } } return totalApplied } func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { agg := make(map[dailyAggKey]*dailyAggVal, 1024) mu := sync.Mutex{} workers := min(max(runtime.NumCPU(), 2), len(snapshots)) jobs := make(chan report.SnapshotRecord, len(snapshots)) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { wg.Go(func() { for snap := range jobs { rows, err := c.scanHourlyTable(ctx, snap) if err != nil { c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err) continue } mu.Lock() for k, v := range rows { if existing, ok := agg[k]; ok { mergeDailyAgg(existing, v) } else { agg[k] = v } } mu.Unlock() } }) } for _, snap := range snapshots { jobs <- snap } close(jobs) wg.Wait() return agg, nil } func mergeDailyAgg(dst, src *dailyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) { dst.firstSeen = src.firstSeen } if src.lastSeen > dst.lastSeen { dst.lastSeen = src.lastSeen dst.resourcePool = src.resourcePool dst.datacenter = src.datacenter dst.cluster = src.cluster dst.folder = src.folder dst.isTemplate = src.isTemplate dst.poweredOn = src.poweredOn dst.srmPlaceholder = src.srmPlaceholder dst.lastDisk = src.lastDisk dst.lastVcpu = src.lastVcpu dst.lastRam = src.lastRam } dst.sumVcpu += src.sumVcpu dst.sumRam += src.sumRam dst.sumDisk += src.sumDisk dst.samples += src.samples dst.tinHits += src.tinHits dst.bronzeHits += src.bronzeHits dst.silverHits += src.silverHits dst.goldHits += src.goldHits if dst.seen == nil { dst.seen = make(map[int64]struct{}, len(src.seen)) } for t := range src.seen { dst.seen[t] = struct{}{} } } func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { dbConn := c.Database.DB() query := fmt.Sprintf(` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'FALSE') AS is_template, COALESCE("PoweredOn",'FALSE') AS powered_on, COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder, COALESCE("SnapshotTime",0) AS snapshot_time FROM %s `, snap.TableName) rows, err := dbConn.QueryxContext(ctx, query) if err != nil { return nil, err } defer rows.Close() out := make(map[dailyAggKey]*dailyAggVal, 256) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } // Skip templates. if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: int64OrZero(snapshotTime), lastSeen: int64OrZero(snapshotTime), lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{int64OrZero(snapshotTime): {}}, } if deletion.Valid && deletion.Int64 > 0 { row.deletion = deletion.Int64 } out[key] = row } return out, nil } // scanHourlyCache aggregates directly from vm_hourly_stats when available. func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) { dbConn := c.Database.DB() query := ` SELECT "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", COALESCE("ProvisionedDisk",0) AS disk, COALESCE("VcpuCount",0) AS vcpu, COALESCE("RamGB",0) AS ram, COALESCE("CreationTime",0) AS creation, COALESCE("DeletionTime",0) AS deletion, COALESCE("IsTemplate",'') AS is_template, COALESCE("PoweredOn",'') AS powered_on, COALESCE("SrmPlaceholder",'') AS srm_placeholder, "SnapshotTime" FROM vm_hourly_stats WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?` q := dbConn.Rebind(query) rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix()) if err != nil { return nil, nil, err } defer rows.Close() agg := make(map[dailyAggKey]*dailyAggVal, 512) timeSet := make(map[int64]struct{}, 64) for rows.Next() { var ( name, vcenter, vmId, vmUuid, resourcePool string dc, cluster, folder sql.NullString disk sql.NullFloat64 vcpu, ram sql.NullInt64 creation, deletion, snapshotTime sql.NullInt64 isTemplate, poweredOn, srmPlaceholder sql.NullString ) if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil { continue } if vcenter == "" { continue } if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { continue } key := dailyAggKey{ Vcenter: vcenter, VmId: strings.TrimSpace(vmId), VmUuid: strings.TrimSpace(vmUuid), Name: strings.TrimSpace(name), } if key.VmId == "" && key.VmUuid == "" && key.Name == "" { continue } if key.VmId == "" { key.VmId = key.VmUuid } pool := strings.ToLower(strings.TrimSpace(resourcePool)) hitTin := btoi(pool == "tin") hitBronze := btoi(pool == "bronze") hitSilver := btoi(pool == "silver") hitGold := btoi(pool == "gold") snapTs := int64OrZero(snapshotTime) timeSet[snapTs] = struct{}{} row := &dailyAggVal{ key: key, resourcePool: resourcePool, datacenter: dc.String, cluster: cluster.String, folder: folder.String, isTemplate: isTemplate.String, poweredOn: poweredOn.String, srmPlaceholder: srmPlaceholder.String, creation: int64OrZero(creation), firstSeen: snapTs, lastSeen: snapTs, lastDisk: disk.Float64, lastVcpu: vcpu.Int64, lastRam: ram.Int64, sumVcpu: vcpu.Int64, sumRam: ram.Int64, sumDisk: disk.Float64, samples: 1, tinHits: hitTin, bronzeHits: hitBronze, silverHits: hitSilver, goldHits: hitGold, seen: map[int64]struct{}{snapTs: {}}, } if deletion.Valid && deletion.Int64 > 0 { row.deletion = deletion.Int64 } if existing, ok := agg[key]; ok { mergeDailyAgg(existing, row) } else { agg[key] = row } } snapTimes := make([]int64, 0, len(timeSet)) for t := range timeSet { snapTimes = append(snapTimes, t) } slices.Sort(snapTimes) return agg, snapTimes, rows.Err() } func buildHourlyCacheLifecycleUnion(start, end time.Time) string { return fmt.Sprintf(` SELECT "VmId","VmUuid","Name","Vcenter","CreationTime","DeletionTime","SnapshotTime" FROM vm_hourly_stats WHERE "SnapshotTime" >= %d AND "SnapshotTime" < %d `, start.Unix(), end.Unix()) } func loadNextHourlyCachePresence(ctx context.Context, dbConn *sqlx.DB, dayEnd time.Time) (map[string]map[string]struct{}, int64, error) { presence := make(map[string]map[string]struct{}, 8) query := dbConn.Rebind(` WITH next_by_vcenter AS ( SELECT "Vcenter", MIN("SnapshotTime") AS snapshot_time FROM vm_hourly_stats WHERE "SnapshotTime" >= ? GROUP BY "Vcenter" ) SELECT h."Vcenter", h."VmId", h."VmUuid", h."Name", n.snapshot_time FROM next_by_vcenter n JOIN vm_hourly_stats h ON h."Vcenter" = n."Vcenter" AND h."SnapshotTime" = n.snapshot_time `) rows, err := dbConn.QueryxContext(ctx, query, dayEnd.Unix()) if err != nil { return nil, 0, err } defer rows.Close() var minSnapshotTime int64 for rows.Next() { var ( vcenter string vmID, vmUUID sql.NullString name sql.NullString snapshotTime sql.NullInt64 ) if err := rows.Scan(&vcenter, &vmID, &vmUUID, &name, &snapshotTime); err != nil { continue } if strings.TrimSpace(vcenter) == "" { continue } if snapshotTime.Valid && snapshotTime.Int64 > 0 && (minSnapshotTime == 0 || snapshotTime.Int64 < minSnapshotTime) { minSnapshotTime = snapshotTime.Int64 } vcPresence := presence[vcenter] if vcPresence == nil { vcPresence = make(map[string]struct{}, 1024) presence[vcenter] = vcPresence } if vmID.Valid && strings.TrimSpace(vmID.String) != "" { vcPresence["id:"+strings.TrimSpace(vmID.String)] = struct{}{} } if vmUUID.Valid && strings.TrimSpace(vmUUID.String) != "" { vcPresence["uuid:"+strings.TrimSpace(vmUUID.String)] = struct{}{} } if name.Valid && strings.TrimSpace(name.String) != "" { vcPresence["name:"+strings.ToLower(strings.TrimSpace(name.String))] = struct{}{} } } if err := rows.Err(); err != nil { return nil, 0, err } return presence, minSnapshotTime, nil } func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() tx, err := dbConn.Beginx() if err != nil { return err } defer tx.Rollback() driver := strings.ToLower(dbConn.DriverName()) placeholders := makePlaceholders(driver, 30) insert := fmt.Sprintf(` INSERT INTO %s ( "Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder", "ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder", "CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk", "AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold" ) VALUES (%s) `, table, placeholders) for _, v := range agg { if v.samples == 0 { continue } vcTotal := totalSamplesByVcenter[v.key.Vcenter] if vcTotal <= 0 { vcTotal = totalSamples } total := float64(vcTotal) avgVcpu := 0.0 avgRam := 0.0 avgDisk := 0.0 avgPresent := 0.0 tinPct := 0.0 bronzePct := 0.0 silverPct := 0.0 goldPct := 0.0 if total > 0 { avgPresent = float64(v.samples) / total avgVcpu = float64(v.sumVcpu) / total avgRam = float64(v.sumRam) / total avgDisk = v.sumDisk / total } if v.samples > 0 { tinPct = float64(v.tinHits) * 100 / float64(v.samples) bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples) silverPct = float64(v.silverHits) * 100 / float64(v.samples) goldPct = float64(v.goldHits) * 100 / float64(v.samples) } args := []any{ v.key.Name, v.key.Vcenter, nullIfEmpty(v.key.VmId), nullIfEmpty(v.key.VmUuid), v.resourcePool, nullIfEmpty(v.datacenter), nullIfEmpty(v.cluster), nullIfEmpty(v.folder), v.lastDisk, v.lastVcpu, v.lastRam, v.isTemplate, v.poweredOn, v.srmPlaceholder, v.creation, v.deletion, v.lastSeen, v.samples, avgVcpu, avgRam, avgDisk, avgPresent, tinPct, bronzePct, silverPct, goldPct, tinPct, bronzePct, silverPct, goldPct, } if driver != "sqlite" { // Postgres expects primitive types, nulls are handled by pq via nil. for i, a := range args { if s, ok := a.(string); ok && s == "" { args[i] = nil } } } if _, err := tx.ExecContext(ctx, insert, args...); err != nil { return err } } return tx.Commit() } func int64OrZero(v sql.NullInt64) int64 { if v.Valid { return v.Int64 } return 0 } func nullIfEmpty(s string) any { if strings.TrimSpace(s) == "" { return nil } return s } func makePlaceholders(driver string, n int) string { if driver == "sqlite" { parts := make([]string, n) for i := range n { parts[i] = "?" } return strings.Join(parts, ",") } parts := make([]string, n) for i := range n { parts[i] = fmt.Sprintf("$%d", i+1) } return strings.Join(parts, ",") } func btoi(b bool) int64 { if b { return 1 } return 0 } func logMissingCreationSummary(ctx context.Context, logger *slog.Logger, database db.Database, summaryTable string, totalRows int64) { if logger == nil { logger = slog.Default() } if err := db.ValidateTableName(summaryTable); err != nil { logger.Warn("daily summary creation diagnostics skipped (invalid table)", "table", summaryTable, "error", err) return } if ctx == nil { ctx = context.Background() } diagCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() dbConn := database.DB() var missingTotal int64 countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE "CreationTime" IS NULL OR "CreationTime" = 0`, summaryTable) if err := dbConn.GetContext(diagCtx, &missingTotal, countQuery); err != nil { logger.Warn("daily summary creation diagnostics failed", "table", summaryTable, "error", err) return } if missingTotal == 0 { logger.Debug("daily summary creation diagnostics", "table", summaryTable, "missing_creation", 0) return } missingPct := 0.0 if totalRows > 0 { missingPct = float64(missingTotal) * 100 / float64(totalRows) } logger.Warn("daily summary rows missing CreationTime", "table", summaryTable, "missing_count", missingTotal, "total_rows", totalRows, "missing_pct", missingPct, ) byVcenterQuery := fmt.Sprintf(` SELECT "Vcenter", COUNT(*) FROM %s WHERE "CreationTime" IS NULL OR "CreationTime" = 0 GROUP BY "Vcenter" ORDER BY COUNT(*) DESC `, summaryTable) if rows, err := dbConn.QueryxContext(diagCtx, byVcenterQuery); err != nil { logger.Warn("daily summary creation diagnostics (by vcenter) failed", "table", summaryTable, "error", err) } else { for rows.Next() { var vcenter string var count int64 if err := rows.Scan(&vcenter, &count); err != nil { continue } logger.Warn("daily summary rows missing CreationTime by vcenter", "table", summaryTable, "vcenter", vcenter, "missing_count", count) } rows.Close() if err := rows.Err(); err != nil { logger.Warn("daily summary creation diagnostics (by vcenter) iteration failed", "table", summaryTable, "error", err) } } const sampleLimit = 10 sampleQuery := fmt.Sprintf(` SELECT "Vcenter","VmId","VmUuid","Name","SamplesPresent","AvgIsPresent","SnapshotTime" FROM %s WHERE "CreationTime" IS NULL OR "CreationTime" = 0 ORDER BY "SamplesPresent" DESC LIMIT %d `, summaryTable, sampleLimit) if rows, err := dbConn.QueryxContext(diagCtx, sampleQuery); err != nil { logger.Warn("daily summary creation diagnostics (sample) failed", "table", summaryTable, "error", err) } else { for rows.Next() { var ( vcenter string vmId, vmUuid sql.NullString name sql.NullString samplesPresent, snapshotTime sql.NullInt64 avgIsPresent sql.NullFloat64 ) if err := rows.Scan(&vcenter, &vmId, &vmUuid, &name, &samplesPresent, &avgIsPresent, &snapshotTime); err != nil { continue } logger.Debug("daily summary missing CreationTime sample", "table", summaryTable, "vcenter", vcenter, "vm_id", vmId.String, "vm_uuid", vmUuid.String, "name", name.String, "samples_present", samplesPresent.Int64, "avg_is_present", avgIsPresent.Float64, "snapshot_time", snapshotTime.Int64, ) } rows.Close() if err := rows.Err(); err != nil { logger.Warn("daily summary creation diagnostics (sample) iteration failed", "table", summaryTable, "error", err) } } } // persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation. func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error { dbConn := c.Database.DB() for _, v := range agg { if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" { continue } vcTotal := totalSamplesByVcenter[v.key.Vcenter] if vcTotal <= 0 { vcTotal = totalSamples } row := db.VmDailyRollupRow{ Vcenter: v.key.Vcenter, VmId: v.key.VmId, VmUuid: v.key.VmUuid, Name: v.key.Name, CreationTime: v.creation, DeletionTime: v.deletion, SamplesPresent: v.samples, TotalSamples: int64(vcTotal), SumVcpu: float64(v.sumVcpu), SumRam: float64(v.sumRam), SumDisk: v.sumDisk, TinHits: v.tinHits, BronzeHits: v.bronzeHits, SilverHits: v.silverHits, GoldHits: v.goldHits, LastResourcePool: v.resourcePool, LastDatacenter: v.datacenter, LastCluster: v.cluster, LastFolder: v.folder, LastProvisionedDisk: v.lastDisk, LastVcpuCount: v.lastVcpu, LastRamGB: v.lastRam, IsTemplate: v.isTemplate, PoweredOn: v.poweredOn, SrmPlaceholder: v.srmPlaceholder, } if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil { return err } } return nil } func sampleCountsByVcenter(agg map[dailyAggKey]*dailyAggVal) map[string]int { vcenterTimes := make(map[string]map[int64]struct{}, 8) for _, v := range agg { if v.key.Vcenter == "" { continue } set := vcenterTimes[v.key.Vcenter] if set == nil { set = make(map[int64]struct{}, len(v.seen)) vcenterTimes[v.key.Vcenter] = set } for t := range v.seen { set[t] = struct{}{} } } counts := make(map[string]int, len(vcenterTimes)) for vc, set := range vcenterTimes { counts[vc] = len(set) } return counts }