package tasks import ( "context" "database/sql" "errors" "fmt" "log/slog" "strconv" "strings" "sync" "sync/atomic" "time" "vctp/db" "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/utils" "vctp/internal/vcenter" "github.com/jmoiron/sqlx" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) type InventorySnapshotRow struct { InventoryId sql.NullInt64 Name string Vcenter string VmId sql.NullString EventKey sql.NullString CloudId sql.NullString CreationTime sql.NullInt64 DeletionTime sql.NullInt64 ResourcePool sql.NullString Datacenter sql.NullString Cluster sql.NullString Folder sql.NullString ProvisionedDisk sql.NullFloat64 VcpuCount sql.NullInt64 RamGB sql.NullInt64 IsTemplate string PoweredOn string SrmPlaceholder string VmUuid sql.NullString SnapshotTime int64 } type snapshotTotals = db.SnapshotTotals // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } snapshotFreq := durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour) tracker := NewCronTracker(c.Database) // Clear stale marker for this job only (short timeout to avoid blocking). staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second) defer cancelStale() if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil { logger.Warn("failed to clear stale cron status", "error", err) } startedAt := time.Now() defer func() { // gocron logs the next run on its side, but log here for quick visibility. logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt), "next_run_estimated", time.Now().Add(snapshotFreq)) }() done, skip, err := tracker.Start(jobCtx, "hourly_snapshot") if err != nil { return err } if skip { logger.Warn("Hourly snapshot skipped because a previous run is still active") return nil } defer func() { done(err) }() ctx, cancel := context.WithCancel(jobCtx) defer cancel() startTime := time.Now() if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { return err } if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil { return err } // Best-effort cleanup of legacy IsPresent columns to simplify inserts. c.dropLegacyIsPresentColumns(jobCtx) // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() if c.FirstHourlySnapshotCheck { if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly") if err != nil { return err } minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600) / 2 if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second { c.Logger.Info("Skipping hourly snapshot, last snapshot too recent", "last_snapshot", lastSnapshot, "min_interval_seconds", minIntervalSeconds, ) c.FirstHourlySnapshotCheck = false return nil } c.FirstHourlySnapshotCheck = false } tableName, err := hourlyInventoryTableName(startTime) if err != nil { return err } dbConn := c.Database.DB() db.ApplySQLiteTuning(ctx, dbConn) if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil { return err } var wg sync.WaitGroup var errCount int64 concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 { concurrencyLimit = override } var sem chan struct{} if concurrencyLimit > 0 { sem = make(chan struct{}, concurrencyLimit) } c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit) for _, url := range c.Settings.Values.Settings.VcenterAddresses { wg.Add(1) go func(url string) { defer wg.Done() waitStarted := time.Now() vcStart := time.Now() if sem != nil { sem <- struct{}{} defer func() { <-sem }() } waitDuration := time.Since(waitStarted) timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute) runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c.Logger.Info("Starting hourly snapshot for vcenter", "url", url) if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil { atomic.AddInt64(&errCount, 1) c.Logger.Error("hourly snapshot failed", "error", err, "url", url) } else { c.Logger.Info("Finished hourly snapshot for vcenter", "url", url, "queue_wait", waitDuration, "duration", time.Since(vcStart), "timeout", timeout, ) } }(url) } wg.Wait() if errCount > 0 { err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount) return err } rowCount, err := db.TableRowCount(ctx, dbConn, tableName) if err != nil { c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName) } if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil { c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName) } metrics.RecordHourlySnapshot(startTime, rowCount, err) if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName) } c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount) return nil } // dropLegacyIsPresentColumns attempts to remove the old IsPresent column from hourly tables. // This keeps inserts simple and avoids keeping unused data around. func (c *CronTask) dropLegacyIsPresentColumns(ctx context.Context) { dbConn := c.Database.DB() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { c.Logger.Debug("skip IsPresent cleanup; registry unavailable", "error", err) return } records, err := report.ListSnapshots(ctx, c.Database, "hourly") if err != nil { c.Logger.Debug("skip IsPresent cleanup; unable to list hourly snapshots", "error", err) return } for _, r := range records { if ok, err := db.ColumnExists(ctx, dbConn, r.TableName, "IsPresent"); err == nil && ok { if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, r.TableName)); err != nil { c.Logger.Debug("unable to drop legacy IsPresent column", "table", r.TableName, "error", err) } else { c.Logger.Info("dropped legacy IsPresent column", "table", r.TableName) } } } } // RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count. func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) { jobStart := time.Now() defer func() { logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart)) }() maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries if maxRetries <= 0 { maxRetries = 3 } dbConn := c.Database.DB() if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil { return err } failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries) if err != nil { return err } if len(failed) == 0 { logger.Debug("No failed hourly snapshots to retry") return nil } for _, f := range failed { startTime := time.Unix(f.SnapshotTime, 0) tableName, tnErr := hourlyInventoryTableName(startTime) if tnErr != nil { logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter) continue } logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1) if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil { logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err) } } return nil } // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } tracker := NewCronTracker(c.Database) done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup") if err != nil { return err } if skip { logger.Warn("Snapshot cleanup skipped because a previous run is still active") return nil } defer func() { done(err) }() if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { return err } startedAt := time.Now() defer func() { logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt)) }() now := time.Now() hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60) dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) dbConn := c.Database.DB() hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_") if err != nil { return err } removedHourly := 0 for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue } tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch") if !ok { continue } if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) } else { removedHourly++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table) } } } } dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_") if err != nil { return err } removedDaily := 0 for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) } else { removedDaily++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table) } } } } c.Logger.Info("Finished snapshot cleanup", "removed_hourly_tables", removedHourly, "removed_daily_tables", removedDaily, "hourly_max_age_days", hourlyMaxDays, "daily_max_age_months", dailyMaxMonths, ) return nil } func hourlyInventoryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil { return err } if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil { return err } return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{ {Name: "VcpuCount", Type: "BIGINT"}, {Name: "RamGB", Type: "BIGINT"}, }) } func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) { if len(tables) == 0 { return "", fmt.Errorf("no tables provided for union") } if len(columns) == 0 { return "", fmt.Errorf("no columns provided for union") } queries := make([]string, 0, len(tables)) columnList := strings.Join(columns, ", ") for _, table := range tables { safeName, err := db.SafeTableName(table) if err != nil { return "", err } query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName) if whereClause != "" { query = fmt.Sprintf("%s WHERE %s", query, whereClause) } queries = append(queries, query) } if len(queries) == 0 { return "", fmt.Errorf("no valid tables provided for union") } return strings.Join(queries, "\nUNION ALL\n"), nil } func templateExclusionFilter() string { return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')` } func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) { if !strings.HasPrefix(table, prefix) { return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) if layout == "epoch" { epoch, err := strconv.ParseInt(suffix, 10, 64) if err != nil { return time.Time{}, false } return time.Unix(epoch, 0), true } parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false } return parsed, true } func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord { filtered := snapshots[:0] for _, snapshot := range snapshots { if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist { filtered = append(filtered, snapshot) } } return filtered } func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord { filtered := records[:0] for _, r := range records { if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) { filtered = append(filtered, r) } } return filtered } type columnDef struct { Name string Type string } var summaryUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, } // monthlyUnionColumns are the fields needed from daily summaries for monthly aggregation/refinement. var monthlyUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SamplesPresent"`, `"AvgVcpuCount"`, `"AvgRamGB"`, `"AvgProvisionedDisk"`, `"AvgIsPresent"`, `"PoolTinPct"`, `"PoolBronzePct"`, `"PoolSilverPct"`, `"PoolGoldPct"`, `"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`, `"SnapshotTime"`, } func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error { driver := strings.ToLower(dbConn.DriverName()) switch driver { case "pgx", "postgres": hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId") if err != nil { return err } if !hasColumn { if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil { return err } } if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil { return err } case "sqlite": return nil } return nil } func nullInt64ToInt(value sql.NullInt64) int64 { if value.Valid { return value.Int64 } return 0 } func nullFloat64ToFloat(value sql.NullFloat64) float64 { if value.Valid { return value.Float64 } return 0 } func intWithDefault(value int, fallback int) int { if value <= 0 { return fallback } return value } func durationFromSeconds(seconds int, fallback time.Duration) time.Duration { if seconds > 0 { return time.Duration(seconds) * time.Second } return fallback } func normalizeResourcePool(value string) string { trimmed := strings.TrimSpace(value) if trimmed == "" { return "" } lower := strings.ToLower(trimmed) canonical := map[string]string{ "tin": "Tin", "bronze": "Bronze", "silver": "Silver", "gold": "Gold", } if val, ok := canonical[lower]; ok { return val } return trimmed } // backfillLifecycleDeletionsToday looks for VMs in the lifecycle cache that are not in the current inventory, // have no DeletedAt, and determines their deletion time from today's hourly snapshots. func backfillLifecycleDeletionsToday(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, present map[string]InventorySnapshotRow) error { dayStart := truncateDate(snapshotTime) dayEnd := dayStart.Add(24 * time.Hour) // Lifecycle entries missing DeletedAt. queryLifecycle := ` SELECT "VmId","VmUuid","Name","Cluster" FROM vm_lifecycle_cache WHERE "Vcenter" = ? AND ("DeletedAt" IS NULL OR "DeletedAt" = 0) ` rows, err := dbConn.QueryxContext(ctx, queryLifecycle, vcenter) if err != nil { return err } defer rows.Close() type candidate struct { vmID string vmUUID string name string cluster string } var cands []candidate for rows.Next() { var vmID, vmUUID, name, cluster sql.NullString if scanErr := rows.Scan(&vmID, &vmUUID, &name, &cluster); scanErr != nil { continue } if vmID.String == "" { continue } if _, ok := present[vmID.String]; ok { continue // still present, skip } cands = append(cands, candidate{ vmID: vmID.String, vmUUID: vmUUID.String, name: name.String, cluster: cluster.String, }) } if len(cands) == 0 { return nil } // Get today's hourly tables. query := ` SELECT table_name, snapshot_time FROM snapshot_registry WHERE snapshot_type = 'hourly' AND snapshot_time >= ? AND snapshot_time < ? ORDER BY snapshot_time ASC ` query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) var tables []struct { Table string `db:"table_name"` Time int64 `db:"snapshot_time"` } rowsTables, err := dbConn.QueryxContext(ctx, query, dayStart.Unix(), dayEnd.Unix()) if err != nil { return err } defer rowsTables.Close() for rowsTables.Next() { var t struct { Table string `db:"table_name"` Time int64 `db:"snapshot_time"` } if err := rowsTables.StructScan(&t); err != nil { continue } tables = append(tables, t) } if len(tables) == 0 { return nil } for _, cand := range cands { var lastSeen int64 var deletion int64 logger.Debug("lifecycle backfill candidate", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "tables", len(tables)) for i, tbl := range tables { if err := db.ValidateTableName(tbl.Table); err != nil { continue } q := fmt.Sprintf(`SELECT "Name","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, tbl.Table) q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q) var name, cluster sql.NullString err := dbConn.QueryRowxContext(ctx, q, vcenter, cand.vmID).Scan(&name, &cluster) if err == nil { lastSeen = tbl.Time if cand.name == "" && name.Valid { cand.name = name.String } if cand.cluster == "" && cluster.Valid { cand.cluster = cluster.String } continue } // Not found in this table; if previously seen today, mark deletion at this snapshot time. if lastSeen > 0 { deletion = tbl.Time break } // If never seen today and we're at the last table, mark deletion at current snapshot time. if i == len(tables)-1 { deletion = tbl.Time } } if deletion > 0 { if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, cand.vmID, cand.vmUUID, cand.name, cand.cluster, deletion); err != nil { logger.Warn("lifecycle backfill mark deleted failed", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion, "error", err) continue } logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion) } } return nil } func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { return dir } } return "/var/lib/vctp/reports" } func (c *CronTask) generateReport(ctx context.Context, tableName string) error { dest := c.reportsDir() _, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest) return err } func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (InventorySnapshotRow, error) { if vmObject == nil { return InventorySnapshotRow{}, fmt.Errorf("missing VM object") } row := InventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, SnapshotTime: snapshotTime.Unix(), } if inv != nil { row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0} row.EventKey = inv.EventKey row.CloudId = inv.CloudId row.DeletionTime = inv.DeletionTime } if vmObject.Config != nil { row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""} if !vmObject.Config.CreateDate.IsZero() { row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true} } row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0} row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0} totalDiskBytes := int64(0) for _, device := range vmObject.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { totalDiskBytes += disk.CapacityInBytes } } if totalDiskBytes > 0 { row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true} } if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" { row.SrmPlaceholder = "TRUE" } else { row.SrmPlaceholder = "FALSE" } if vmObject.Config.Template { row.IsTemplate = "TRUE" } else { row.IsTemplate = "FALSE" } } if vmObject.Runtime.PowerState == "poweredOff" { row.PoweredOn = "FALSE" } else { row.PoweredOn = "TRUE" } if inv != nil { if inv.ResourcePool.Valid { row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true} } row.Datacenter = inv.Datacenter row.Cluster = inv.Cluster row.Folder = inv.Folder if !row.CreationTime.Valid { row.CreationTime = inv.CreationTime } if !row.ProvisionedDisk.Valid { row.ProvisionedDisk = inv.ProvisionedDisk } if !row.VcpuCount.Valid { row.VcpuCount = inv.InitialVcpus } if !row.RamGB.Valid && inv.InitialRam.Valid { row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0} } if row.IsTemplate == "" { row.IsTemplate = boolStringFromInterface(inv.IsTemplate) } if row.PoweredOn == "" { row.PoweredOn = boolStringFromInterface(inv.PoweredOn) } if row.SrmPlaceholder == "" { row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder) } if !row.VmUuid.Valid { row.VmUuid = inv.VmUuid } } if row.ResourcePool.String == "" && vmObject.ResourcePool != nil { if rpLookup != nil { if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok { row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""} } } } if row.Folder.String == "" { if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } else { // Unable to resolve folder path from lookup; leave empty. } } if vmObject.Runtime.Host != nil && hostLookup != nil { if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok { if row.Cluster.String == "" && lookup.Cluster != "" { row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true} } if row.Datacenter.String == "" && lookup.Datacenter != "" { row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true} } } } if row.Cluster.String == "" && vmObject.Runtime.Host != nil { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} } else if vc.Logger != nil { vc.Logger.Warn("failed to resolve cluster from host", "vm_id", vmObject.Reference().Value, "error", err) } } if row.Datacenter.String == "" { if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil { row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""} } } return row, nil } func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) InventorySnapshotRow { return InventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, VmId: inv.VmId, EventKey: inv.EventKey, CloudId: inv.CloudId, CreationTime: inv.CreationTime, DeletionTime: inv.DeletionTime, ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid}, Datacenter: inv.Datacenter, Cluster: inv.Cluster, Folder: inv.Folder, ProvisionedDisk: inv.ProvisionedDisk, VcpuCount: inv.InitialVcpus, RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0}, IsTemplate: boolStringFromInterface(inv.IsTemplate), PoweredOn: boolStringFromInterface(inv.PoweredOn), SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder), VmUuid: inv.VmUuid, SnapshotTime: snapshotTime.Unix(), } } func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { started := time.Now() c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to connect to vcenter: %w", err) } defer func() { logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := vc.Logout(logCtx); err != nil { c.Logger.Warn("vcenter logout failed", "url", url, "error", err) } else { c.Logger.Debug("vcenter logout succeeded", "url", url) } }() vcVms, err := vc.GetAllVMsWithProps() if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to get VMs from vcenter: %w", err) } c.Logger.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vcVms)) if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { c.Logger.Warn("failed to ensure vm identity tables", "error", err) } hostLookup, err := vc.BuildHostLookup() if err != nil { c.Logger.Warn("failed to build host lookup", "url", url, "error", err) hostLookup = nil } else { c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) } folderLookup, err := vc.BuildFolderPathLookup() if err != nil { c.Logger.Warn("failed to build folder lookup", "url", url, "error", err) folderLookup = nil } else { c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) } rpLookup, err := vc.BuildResourcePoolLookup() if err != nil { c.Logger.Warn("failed to build resource pool lookup", "url", url, "error", err) rpLookup = nil } else { c.Logger.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) } inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) if err != nil { return fmt.Errorf("unable to query inventory table: %w", err) } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByName := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } if inv.VmUuid.Valid { inventoryByUuid[inv.VmUuid.String] = inv } if inv.Name != "" { inventoryByName[inv.Name] = inv } } dbConn := c.Database.DB() presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms)) presentByUuid := make(map[string]struct{}, len(vcVms)) presentByName := make(map[string]struct{}, len(vcVms)) totals := snapshotTotals{} deletionsMarked := false var prevVmCount sql.NullInt64 countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1` countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery) if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) { c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err) } type deletionCandidate struct { vmID string vmUUID string name string cluster string datacenter sql.NullString } candidates := make([]deletionCandidate, 0) for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue } if vm.Config != nil && vm.Config.Template { continue } var inv *queries.Inventory if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { existingCopy := existing inv = &existingCopy } row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) if err != nil { c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) continue } if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } clusterName := "" if row.Cluster.Valid { clusterName = row.Cluster.String } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { c.Logger.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } presentSnapshots[vm.Reference().Value] = row if row.VmUuid.Valid { presentByUuid[row.VmUuid.String] = struct{}{} } if row.Name != "" { presentByName[row.Name] = struct{}{} } totals.VmCount++ totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) totals.RamTotal += nullInt64ToInt(row.RamGB) totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) } c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) for _, row := range presentSnapshots { batch = append(batch, row) } c.Logger.Debug("checking inventory for missing VMs", "vcenter", url) missingCount := 0 newCount := 0 for _, inv := range inventoryRows { c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) if strings.HasPrefix(inv.Name, "vCLS-") { continue } vmID := inv.VmId.String uuid := "" if inv.VmUuid.Valid { uuid = inv.VmUuid.String } name := inv.Name found := false if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { found = true } } if !found && uuid != "" { if _, ok := presentByUuid[uuid]; ok { found = true } } if !found && name != "" { if _, ok := presentByName[name]; ok { found = true } } if found { continue } row := snapshotFromInventory(inv, startTime) if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: row.DeletionTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } candidates = append(candidates, deletionCandidate{ vmID: vmID, vmUUID: uuid, name: name, cluster: clusterName, datacenter: inv.Datacenter, }) if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil { c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) } missingCount++ } // If deletions detected, refine deletion time using vCenter events in a small window. if missingCount > 0 { freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-4 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { c.Logger.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) } else { c.Logger.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) for _, cand := range candidates { if t, ok := events[cand.vmID]; ok { delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTs, VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, DatacenterName: cand.datacenter, }); err != nil { c.Logger.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { c.Logger.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } c.Logger.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) } } } } // If VM count dropped vs prior totals but we didn't mark missing, still look for events (best-effort logging). if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-2 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } else { c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end) } } c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) if err := insertHourlyCache(ctx, dbConn, batch); err != nil { c.Logger.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return err } // Record per-vCenter totals snapshot. if err := db.InsertVcenterTotals(ctx, dbConn, url, startTime, totals.VmCount, totals.VcpuTotal, totals.RamTotal); err != nil { slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err) } // Compare with previous snapshot for this vcenter to mark deletions at snapshot time. prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime) if prevTableErr != nil { c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url) } if prevTableName != "" { moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) missingCount += moreMissing newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) if newCount > 0 { newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) names := make([]string, 0, len(newRows)) for _, r := range newRows { if r.Name != "" { names = append(names, r.Name) } else if r.VmId.Valid { names = append(names, r.VmId.String) } } c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names) } c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount) } else { // No previous snapshot found (or lookup failed). newCount = len(presentSnapshots) } // If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window. if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { // Fallback: compare against latest registered snapshot table. if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" { moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) if moreMissing > 0 { missingCount += moreMissing } // Reuse this table name for later snapshot lookups when correlating deletion events. prevTableName = prevTable } freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-4 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } else { c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start_local", begin, "window_end_local", end, "window_start_utc", begin.UTC(), "window_end_utc", end.UTC(), "window_minutes", end.Sub(begin).Minutes()) for vmID, t := range events { // Skip if VM is still present. if _, ok := presentSnapshots[vmID]; ok { continue } inv, ok := inventoryByVmID[vmID] var snapRow InventorySnapshotRow if !ok { var found bool snapRow, found = findVMInHourlySnapshots(ctx, dbConn, url, vmID, prevTableName) if !found { c.Logger.Debug("count-drop: deletion event has no snapshot match", "vm_id", vmID, "vcenter", url, "event_time", t) continue } inv = queries.Inventory{ VmId: snapRow.VmId, VmUuid: snapRow.VmUuid, Name: snapRow.Name, Datacenter: snapRow.Datacenter, } c.Logger.Info("count-drop: correlated deletion via snapshot lookup", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "snapshot_table", prevTableName) } // Prefer UUID from snapshot if inventory entry lacks it. if !inv.VmUuid.Valid && snapRow.VmUuid.Valid { inv.VmUuid = snapRow.VmUuid } delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTs, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("count-drop: failed to update inventory deletion time from event", "vm_id", vmID, "vcenter", url, "error", err) } else { c.Logger.Info("count-drop: correlated deletion event to inventory", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, vmID, inv.VmUuid.String, inv.Name, clusterName, t.Unix()); err != nil { c.Logger.Warn("count-drop: failed to refine lifecycle cache deletion time", "vm_id", vmID, "vm_uuid", inv.VmUuid, "vcenter", url, "error", err) } missingCount++ deletionsMarked = true } } } // Backfill lifecycle deletions for VMs missing from inventory and without DeletedAt. if err := backfillLifecycleDeletionsToday(ctx, c.Logger, dbConn, url, startTime, presentSnapshots); err != nil { c.Logger.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err) } c.Logger.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, "missing_marked", missingCount, "created_since_prev", newCount, "deleted_since_prev", missingCount, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } if deletionsMarked { if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName) } else { c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName) } } return nil }