package tasks import ( "context" "database/sql" "fmt" "log/slog" "strconv" "strings" "sync" "sync/atomic" "time" "vctp/db" "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/utils" "vctp/internal/vcenter" "github.com/jmoiron/sqlx" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) type inventorySnapshotRow struct { InventoryId sql.NullInt64 Name string Vcenter string VmId sql.NullString EventKey sql.NullString CloudId sql.NullString CreationTime sql.NullInt64 DeletionTime sql.NullInt64 ResourcePool sql.NullString Datacenter sql.NullString Cluster sql.NullString Folder sql.NullString ProvisionedDisk sql.NullFloat64 VcpuCount sql.NullInt64 RamGB sql.NullInt64 IsTemplate string PoweredOn string SrmPlaceholder string VmUuid sql.NullString SnapshotTime int64 } type snapshotTotals = db.SnapshotTotals // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } tracker := NewCronTracker(c.Database) // Clear stale marker for this job only (short timeout to avoid blocking). staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second) defer cancelStale() if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil { logger.Warn("failed to clear stale cron status", "error", err) } startedAt := time.Now() defer func() { logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt)) }() done, skip, err := tracker.Start(jobCtx, "hourly_snapshot") if err != nil { return err } if skip { logger.Warn("Hourly snapshot skipped because a previous run is still active") return nil } defer func() { done(err) }() ctx, cancel := context.WithCancel(jobCtx) defer cancel() startTime := time.Now() if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { return err } if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil { return err } // Best-effort cleanup of legacy IsPresent columns to simplify inserts. c.dropLegacyIsPresentColumns(jobCtx) // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() if c.FirstHourlySnapshotCheck { if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly") if err != nil { return err } minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600) if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second { c.Logger.Info("Skipping hourly snapshot, last snapshot too recent", "last_snapshot", lastSnapshot, "min_interval_seconds", minIntervalSeconds, ) c.FirstHourlySnapshotCheck = false return nil } c.FirstHourlySnapshotCheck = false } tableName, err := hourlyInventoryTableName(startTime) if err != nil { return err } dbConn := c.Database.DB() db.ApplySQLiteTuning(ctx, dbConn) if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil { return err } var wg sync.WaitGroup var errCount int64 concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 { concurrencyLimit = override } var sem chan struct{} if concurrencyLimit > 0 { sem = make(chan struct{}, concurrencyLimit) } c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit) for _, url := range c.Settings.Values.Settings.VcenterAddresses { wg.Add(1) go func(url string) { defer wg.Done() waitStarted := time.Now() vcStart := time.Now() if sem != nil { sem <- struct{}{} defer func() { <-sem }() } waitDuration := time.Since(waitStarted) timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute) runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c.Logger.Info("Starting hourly snapshot for vcenter", "url", url) if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil { atomic.AddInt64(&errCount, 1) c.Logger.Error("hourly snapshot failed", "error", err, "url", url) } else { c.Logger.Info("Finished hourly snapshot for vcenter", "url", url, "queue_wait", waitDuration, "duration", time.Since(vcStart), "timeout", timeout, ) } }(url) } wg.Wait() if errCount > 0 { err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount) return err } rowCount, err := db.TableRowCount(ctx, dbConn, tableName) if err != nil { c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName) } if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil { c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName) } metrics.RecordHourlySnapshot(startTime, rowCount, err) if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName) } c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount) return nil } // dropLegacyIsPresentColumns attempts to remove the old IsPresent column from hourly tables. // This keeps inserts simple and avoids keeping unused data around. func (c *CronTask) dropLegacyIsPresentColumns(ctx context.Context) { dbConn := c.Database.DB() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { c.Logger.Debug("skip IsPresent cleanup; registry unavailable", "error", err) return } records, err := report.ListSnapshots(ctx, c.Database, "hourly") if err != nil { c.Logger.Debug("skip IsPresent cleanup; unable to list hourly snapshots", "error", err) return } for _, r := range records { if ok, err := db.ColumnExists(ctx, dbConn, r.TableName, "IsPresent"); err == nil && ok { if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, r.TableName)); err != nil { c.Logger.Debug("unable to drop legacy IsPresent column", "table", r.TableName, "error", err) } else { c.Logger.Info("dropped legacy IsPresent column", "table", r.TableName) } } } } // RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count. func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) { jobStart := time.Now() defer func() { logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart)) }() maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries if maxRetries <= 0 { maxRetries = 3 } dbConn := c.Database.DB() if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil { return err } failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries) if err != nil { return err } if len(failed) == 0 { logger.Debug("No failed hourly snapshots to retry") return nil } for _, f := range failed { startTime := time.Unix(f.SnapshotTime, 0) tableName, tnErr := hourlyInventoryTableName(startTime) if tnErr != nil { logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter) continue } logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1) if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil { logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err) } } return nil } // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } tracker := NewCronTracker(c.Database) done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup") if err != nil { return err } if skip { logger.Warn("Snapshot cleanup skipped because a previous run is still active") return nil } defer func() { done(err) }() if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { return err } startedAt := time.Now() defer func() { logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt)) }() now := time.Now() hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60) dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) dbConn := c.Database.DB() hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_") if err != nil { return err } removedHourly := 0 for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue } tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch") if !ok { continue } if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) } else { removedHourly++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table) } } } } dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_") if err != nil { return err } removedDaily := 0 for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) } else { removedDaily++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table) } } } } c.Logger.Info("Finished snapshot cleanup", "removed_hourly_tables", removedHourly, "removed_daily_tables", removedDaily, "hourly_max_age_days", hourlyMaxDays, "daily_max_age_months", dailyMaxMonths, ) return nil } func hourlyInventoryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil { return err } if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil { return err } return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{ {Name: "VcpuCount", Type: "BIGINT"}, {Name: "RamGB", Type: "BIGINT"}, }) } func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) { if len(tables) == 0 { return "", fmt.Errorf("no tables provided for union") } if len(columns) == 0 { return "", fmt.Errorf("no columns provided for union") } queries := make([]string, 0, len(tables)) columnList := strings.Join(columns, ", ") for _, table := range tables { safeName, err := db.SafeTableName(table) if err != nil { return "", err } query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName) if whereClause != "" { query = fmt.Sprintf("%s WHERE %s", query, whereClause) } queries = append(queries, query) } if len(queries) == 0 { return "", fmt.Errorf("no valid tables provided for union") } return strings.Join(queries, "\nUNION ALL\n"), nil } func templateExclusionFilter() string { return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')` } func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) { if !strings.HasPrefix(table, prefix) { return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) if layout == "epoch" { epoch, err := strconv.ParseInt(suffix, 10, 64) if err != nil { return time.Time{}, false } return time.Unix(epoch, 0), true } parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false } return parsed, true } func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table)) return err } func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table)) if err != nil { return fmt.Errorf("failed to clear table %s: %w", table, err) } return nil } func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord { filtered := snapshots[:0] for _, snapshot := range snapshots { if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist { filtered = append(filtered, snapshot) } } return filtered } func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord { filtered := records[:0] for _, r := range records { if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) { filtered = append(filtered, r) } } return filtered } type columnDef struct { Name string Type string } var summaryUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, } func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error { driver := strings.ToLower(dbConn.DriverName()) switch driver { case "pgx", "postgres": hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId") if err != nil { return err } if !hasColumn { if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil { return err } } if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil { return err } case "sqlite": return nil } return nil } func nullInt64ToInt(value sql.NullInt64) int64 { if value.Valid { return value.Int64 } return 0 } func nullFloat64ToFloat(value sql.NullFloat64) float64 { if value.Valid { return value.Float64 } return 0 } func intWithDefault(value int, fallback int) int { if value <= 0 { return fallback } return value } func durationFromSeconds(seconds int, fallback time.Duration) time.Duration { if seconds > 0 { return time.Duration(seconds) * time.Second } return fallback } func normalizeResourcePool(value string) string { trimmed := strings.TrimSpace(value) if trimmed == "" { return "" } lower := strings.ToLower(trimmed) canonical := map[string]string{ "tin": "Tin", "bronze": "Bronze", "silver": "Silver", "gold": "Gold", } if val, ok := canonical[lower]; ok { return val } return trimmed } func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { return dir } } return "/var/lib/vctp/reports" } func (c *CronTask) generateReport(ctx context.Context, tableName string) error { dest := c.reportsDir() _, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest) return err } func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (inventorySnapshotRow, error) { if vmObject == nil { return inventorySnapshotRow{}, fmt.Errorf("missing VM object") } row := inventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, SnapshotTime: snapshotTime.Unix(), } if inv != nil { row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0} row.EventKey = inv.EventKey row.CloudId = inv.CloudId row.DeletionTime = inv.DeletionTime } if vmObject.Config != nil { row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""} if !vmObject.Config.CreateDate.IsZero() { row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true} } row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0} row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0} totalDiskBytes := int64(0) for _, device := range vmObject.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { totalDiskBytes += disk.CapacityInBytes } } if totalDiskBytes > 0 { row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true} } if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" { row.SrmPlaceholder = "TRUE" } else { row.SrmPlaceholder = "FALSE" } if vmObject.Config.Template { row.IsTemplate = "TRUE" } else { row.IsTemplate = "FALSE" } } if vmObject.Runtime.PowerState == "poweredOff" { row.PoweredOn = "FALSE" } else { row.PoweredOn = "TRUE" } if inv != nil { if inv.ResourcePool.Valid { row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true} } row.Datacenter = inv.Datacenter row.Cluster = inv.Cluster row.Folder = inv.Folder if !row.CreationTime.Valid { row.CreationTime = inv.CreationTime } if !row.ProvisionedDisk.Valid { row.ProvisionedDisk = inv.ProvisionedDisk } if !row.VcpuCount.Valid { row.VcpuCount = inv.InitialVcpus } if !row.RamGB.Valid && inv.InitialRam.Valid { row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0} } if row.IsTemplate == "" { row.IsTemplate = boolStringFromInterface(inv.IsTemplate) } if row.PoweredOn == "" { row.PoweredOn = boolStringFromInterface(inv.PoweredOn) } if row.SrmPlaceholder == "" { row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder) } if !row.VmUuid.Valid { row.VmUuid = inv.VmUuid } } if row.ResourcePool.String == "" && vmObject.ResourcePool != nil { if rpLookup != nil { if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok { row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""} } } } if row.Folder.String == "" { if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } else { // Unable to resolve folder path from lookup; leave empty. } } if vmObject.Runtime.Host != nil && hostLookup != nil { if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok { if row.Cluster.String == "" && lookup.Cluster != "" { row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true} } if row.Datacenter.String == "" && lookup.Datacenter != "" { row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true} } } } if row.Cluster.String == "" { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} } } if row.Datacenter.String == "" { if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil { row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""} } } return row, nil } func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow { return inventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, VmId: inv.VmId, EventKey: inv.EventKey, CloudId: inv.CloudId, CreationTime: inv.CreationTime, DeletionTime: inv.DeletionTime, ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid}, Datacenter: inv.Datacenter, Cluster: inv.Cluster, Folder: inv.Folder, ProvisionedDisk: inv.ProvisionedDisk, VcpuCount: inv.InitialVcpus, RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0}, IsTemplate: boolStringFromInterface(inv.IsTemplate), PoweredOn: boolStringFromInterface(inv.PoweredOn), SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder), VmUuid: inv.VmUuid, SnapshotTime: snapshotTime.Unix(), } } func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []inventorySnapshotRow) error { if len(rows) == 0 { return nil } if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { return err } driver := strings.ToLower(dbConn.DriverName()) conflict := "" verb := "INSERT INTO" if driver == "sqlite" { verb = "INSERT OR REPLACE INTO" } else { conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET "VmUuid"=EXCLUDED."VmUuid", "Name"=EXCLUDED."Name", "CreationTime"=EXCLUDED."CreationTime", "DeletionTime"=EXCLUDED."DeletionTime", "ResourcePool"=EXCLUDED."ResourcePool", "Datacenter"=EXCLUDED."Datacenter", "Cluster"=EXCLUDED."Cluster", "Folder"=EXCLUDED."Folder", "ProvisionedDisk"=EXCLUDED."ProvisionedDisk", "VcpuCount"=EXCLUDED."VcpuCount", "RamGB"=EXCLUDED."RamGB", "IsTemplate"=EXCLUDED."IsTemplate", "PoweredOn"=EXCLUDED."PoweredOn", "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` } cols := []string{ "SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", } bind := sqlx.BindType(dbConn.DriverName()) placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict) stmtText = sqlx.Rebind(bind, stmtText) tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } stmt, err := tx.PreparexContext(ctx, stmtText) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, r := range rows { args := []interface{}{ r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool, r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder, } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error { if len(rows) == 0 { return nil } if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { return err } tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } baseCols := []string{ "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", } bind := sqlx.BindType(dbConn.DriverName()) buildStmt := func(cols []string) (*sqlx.Stmt, error) { colList := `"` + strings.Join(cols, `", "`) + `"` placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") return tx.PreparexContext(ctx, sqlx.Rebind(bind, fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, tableName, colList, placeholders))) } stmt, err := buildStmt(baseCols) if err != nil { // Fallback for legacy tables that still have IsPresent. withLegacy := append(append([]string{}, baseCols...), "IsPresent") stmt, err = buildStmt(withLegacy) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, row := range rows { args := []interface{}{ row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, "TRUE", } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } defer stmt.Close() for _, row := range rows { args := []interface{}{ row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { started := time.Now() c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to connect to vcenter: %w", err) } defer func() { if err := vc.Logout(); err != nil { c.Logger.Warn("vcenter logout failed", "url", url, "error", err) } }() vcVms, err := vc.GetAllVMsWithProps() if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to get VMs from vcenter: %w", err) } c.Logger.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vcVms)) if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { c.Logger.Warn("failed to ensure vm identity tables", "error", err) } hostLookup, err := vc.BuildHostLookup() if err != nil { c.Logger.Warn("failed to build host lookup", "url", url, "error", err) hostLookup = nil } else { c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) } folderLookup, err := vc.BuildFolderPathLookup() if err != nil { c.Logger.Warn("failed to build folder lookup", "url", url, "error", err) folderLookup = nil } else { c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) } rpLookup, err := vc.BuildResourcePoolLookup() if err != nil { c.Logger.Warn("failed to build resource pool lookup", "url", url, "error", err) rpLookup = nil } else { c.Logger.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) } inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) if err != nil { return fmt.Errorf("unable to query inventory table: %w", err) } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByName := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } if inv.VmUuid.Valid { inventoryByUuid[inv.VmUuid.String] = inv } if inv.Name != "" { inventoryByName[inv.Name] = inv } } dbConn := c.Database.DB() presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) presentByUuid := make(map[string]struct{}, len(vcVms)) presentByName := make(map[string]struct{}, len(vcVms)) totals := snapshotTotals{} deletionsMarked := false for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue } if vm.Config != nil && vm.Config.Template { continue } var inv *queries.Inventory if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { existingCopy := existing inv = &existingCopy } row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) if err != nil { c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) continue } if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } clusterName := "" if row.Cluster.Valid { clusterName = row.Cluster.String } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { c.Logger.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } presentSnapshots[vm.Reference().Value] = row if row.VmUuid.Valid { presentByUuid[row.VmUuid.String] = struct{}{} } if row.Name != "" { presentByName[row.Name] = struct{}{} } totals.VmCount++ totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) totals.RamTotal += nullInt64ToInt(row.RamGB) totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) } c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) batch := make([]inventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) for _, row := range presentSnapshots { batch = append(batch, row) } c.Logger.Debug("checking inventory for missing VMs", "vcenter", url) missingCount := 0 for _, inv := range inventoryRows { c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) if strings.HasPrefix(inv.Name, "vCLS-") { continue } vmID := inv.VmId.String uuid := "" if inv.VmUuid.Valid { uuid = inv.VmUuid.String } name := inv.Name found := false if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { found = true } } if !found && uuid != "" { if _, ok := presentByUuid[uuid]; ok { found = true } } if !found && name != "" { if _, ok := presentByName[name]; ok { found = true } } if found { continue } row := snapshotFromInventory(inv, startTime) if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: row.DeletionTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } if err := db.MarkVmDeleted(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, startTime.Unix()); err != nil { c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) } missingCount++ } c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) if err := insertHourlyCache(ctx, dbConn, batch); err != nil { c.Logger.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } return err } // Record per-vCenter totals snapshot. if err := db.InsertVcenterTotals(ctx, dbConn, url, startTime, totals.VmCount, totals.VcpuTotal, totals.RamTotal); err != nil { slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err) } // Compare with previous snapshot for this vcenter to mark deletions at snapshot time. if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" { moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) missingCount += moreMissing } else if err != nil { c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", err, "url", url) } c.Logger.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, "missing_marked", missingCount, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil { c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) } if deletionsMarked { if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName) } else { c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName) } } return nil } func boolStringFromInterface(value interface{}) string { switch v := value.(type) { case nil: return "" case string: return v case []byte: return string(v) case bool: if v { return "TRUE" } return "FALSE" case int: if v != 0 { return "TRUE" } return "FALSE" case int64: if v != 0 { return "TRUE" } return "FALSE" default: return fmt.Sprint(v) } } // latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time. func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) { driver := strings.ToLower(dbConn.DriverName()) var rows *sqlx.Rows var err error switch driver { case "sqlite": rows, err = dbConn.QueryxContext(ctx, ` SELECT name FROM sqlite_master WHERE type = 'table' AND name LIKE 'inventory_hourly_%' `) case "pgx", "postgres": rows, err = dbConn.QueryxContext(ctx, ` SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%' `) default: return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver) } if err != nil { return "", err } defer rows.Close() var latest string var latestTime int64 for rows.Next() { var name string if scanErr := rows.Scan(&name); scanErr != nil { continue } if !strings.HasPrefix(name, "inventory_hourly_") { continue } suffix := strings.TrimPrefix(name, "inventory_hourly_") epoch, parseErr := strconv.ParseInt(suffix, 10, 64) if parseErr != nil { continue } if epoch < cutoff.Unix() && epoch > latestTime { latestTime = epoch latest = name } } return latest, nil } // markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time, currentByID map[string]inventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{}, invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int { if err := db.ValidateTableName(prevTable); err != nil { return 0 } query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Cluster","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) type prevRow struct { VmId sql.NullString `db:"VmId"` VmUuid sql.NullString `db:"VmUuid"` Name string `db:"Name"` Cluster sql.NullString `db:"Cluster"` Datacenter sql.NullString `db:"Datacenter"` DeletionTime sql.NullInt64 `db:"DeletionTime"` } rows, err := dbConn.QueryxContext(ctx, query, vcenter) if err != nil { c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) return 0 } defer rows.Close() missing := 0 for rows.Next() { var r prevRow if err := rows.StructScan(&r); err != nil { continue } vmID := r.VmId.String uuid := r.VmUuid.String name := r.Name cluster := r.Cluster.String found := false if vmID != "" { if _, ok := currentByID[vmID]; ok { found = true } } if !found && uuid != "" { if _, ok := currentByUuid[uuid]; ok { found = true } } if !found && name != "" { if _, ok := currentByName[name]; ok { found = true } } // If the name is missing but UUID+Cluster still exists in inventory/current, treat it as present (rename, not delete). if !found && uuid != "" && cluster != "" { if inv, ok := invByUuid[uuid]; ok && strings.EqualFold(inv.Cluster.String, cluster) { found = true } } if found { continue } var inv queries.Inventory var ok bool if vmID != "" { inv, ok = invByID[vmID] } if !ok && uuid != "" { inv, ok = invByUuid[uuid] } if !ok && name != "" { inv, ok = invByName[name] } if !ok { continue } if inv.DeletionTime.Valid { continue } delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String) continue } c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable) missing++ } return missing }