package tasks import ( "context" "database/sql" "errors" "fmt" "log/slog" "strconv" "strings" "sync" "sync/atomic" "time" "vctp/db" "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/utils" "vctp/internal/vcenter" "github.com/jmoiron/sqlx" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) type ctxLoggerKey struct{} type deletionCandidate struct { vmID string vmUUID string name string cluster string datacenter sql.NullString } type vcenterResources struct { vms []mo.VirtualMachine hostLookup map[string]vcenter.HostLookup folderLookup map[string]string rpLookup map[string]string } func loggerFromCtx(ctx context.Context, fallback *slog.Logger) *slog.Logger { if ctx == nil { return fallback } if l, ok := ctx.Value(ctxLoggerKey{}).(*slog.Logger); ok && l != nil { return l } return fallback } // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. // If force is true, any in-progress marker will be cleared before starting (useful for manual recovery). func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } snapshotFreq := durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour) tracker := NewCronTracker(c.Database) // Clear stale marker for this job only (short timeout to avoid blocking). staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second) defer cancelStale() if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil { logger.Warn("failed to clear stale cron status", "error", err) } if force { if err := tracker.ClearAllInProgress(staleCtx); err != nil { logger.Warn("failed to clear in-progress flag (force run)", "error", err) } else { logger.Info("force run cleared in-progress marker before starting") } } startedAt := time.Now() defer func() { // gocron logs the next run on its side, but log here for quick visibility. logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt), "next_run_estimated", time.Now().Add(snapshotFreq)) }() done, skip, err := tracker.Start(jobCtx, "hourly_snapshot") if err != nil { return err } if skip { if force { logger.Info("Force run requested; clearing in-progress marker and retrying") if err := tracker.ClearAllInProgress(jobCtx); err != nil { logger.Warn("failed to clear in-progress flag for force run", "error", err) return nil } done, skip, err = tracker.Start(jobCtx, "hourly_snapshot") if err != nil { return err } if skip { logger.Warn("Hourly snapshot still marked active after force clear; skipping") return nil } } else { logger.Warn("Hourly snapshot skipped because a previous run is still active", "force", force) return nil } } defer func() { done(err) }() ctx, cancel := context.WithCancel(jobCtx) defer cancel() startTime := time.Now() if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { return err } if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil { return err } // Best-effort cleanup of legacy IsPresent columns to simplify inserts. c.dropLegacyIsPresentColumns(jobCtx) // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() if c.FirstHourlySnapshotCheck { if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly") if err != nil { return err } minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600) / 2 if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second { c.Logger.Info("Skipping hourly snapshot, last snapshot too recent", "last_snapshot", lastSnapshot, "min_interval_seconds", minIntervalSeconds, ) c.FirstHourlySnapshotCheck = false return nil } c.FirstHourlySnapshotCheck = false } tableName, err := hourlyInventoryTableName(startTime) if err != nil { return err } dbConn := c.Database.DB() db.ApplySQLiteTuning(ctx, dbConn) if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil { return err } var wg sync.WaitGroup var errCount int64 concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 { concurrencyLimit = override } var sem chan struct{} if concurrencyLimit > 0 { sem = make(chan struct{}, concurrencyLimit) } c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit) for _, url := range c.Settings.Values.Settings.VcenterAddresses { wg.Add(1) go func(url string) { defer wg.Done() waitStarted := time.Now() vcStart := time.Now() if sem != nil { sem <- struct{}{} defer func() { <-sem }() } waitDuration := time.Since(waitStarted) timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute) runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c.Logger.Info("Starting hourly snapshot for vcenter", "url", url) if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil { atomic.AddInt64(&errCount, 1) c.Logger.Error("hourly snapshot failed", "error", err, "url", url) } else { c.Logger.Info("Finished hourly snapshot for vcenter", "url", url, "queue_wait", waitDuration, "duration", time.Since(vcStart), "timeout", timeout, ) } }(url) } wg.Wait() if errCount > 0 { err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount) return err } rowCount, err := db.TableRowCount(ctx, dbConn, tableName) if err != nil { c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName) } if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil { c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName) } metrics.RecordHourlySnapshot(startTime, rowCount, err) if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName) } c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount) return nil } // dropLegacyIsPresentColumns attempts to remove the old IsPresent column from hourly tables. // This keeps inserts simple and avoids keeping unused data around. func (c *CronTask) dropLegacyIsPresentColumns(ctx context.Context) { dbConn := c.Database.DB() if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { c.Logger.Debug("skip IsPresent cleanup; registry unavailable", "error", err) return } records, err := report.ListSnapshots(ctx, c.Database, "hourly") if err != nil { c.Logger.Debug("skip IsPresent cleanup; unable to list hourly snapshots", "error", err) return } for _, r := range records { if ok, err := db.ColumnExists(ctx, dbConn, r.TableName, "IsPresent"); err == nil && ok { if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, r.TableName)); err != nil { c.Logger.Debug("unable to drop legacy IsPresent column", "table", r.TableName, "error", err) } else { c.Logger.Info("dropped legacy IsPresent column", "table", r.TableName) } } } } // RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count. func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) { jobStart := time.Now() defer func() { logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart)) }() maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries if maxRetries <= 0 { maxRetries = 3 } dbConn := c.Database.DB() if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil { return err } failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries) if err != nil { return err } if len(failed) == 0 { logger.Debug("No failed hourly snapshots to retry") return nil } for _, f := range failed { startTime := time.Unix(f.SnapshotTime, 0) tableName, tnErr := hourlyInventoryTableName(startTime) if tnErr != nil { logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter) continue } logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1) if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil { logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err) } } return nil } // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } tracker := NewCronTracker(c.Database) done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup") if err != nil { return err } if skip { logger.Warn("Snapshot cleanup skipped because a previous run is still active") return nil } defer func() { done(err) }() if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { return err } startedAt := time.Now() defer func() { logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt)) }() now := time.Now() hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60) dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) logger.Info("Starting snapshot cleanup", "now", now, "hourly_cutoff", truncateDate(hourlyCutoff), "daily_cutoff", truncateDate(dailyCutoff), "hourly_max_age_days", hourlyMaxDays, "daily_max_age_months", dailyMaxMonths, ) dbConn := c.Database.DB() hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_") if err != nil { return err } logger.Info("Snapshot cleanup hourly scan", "tables_found", len(hourlyTables)) removedHourly := 0 scannedHourly := 0 for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue } tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch") if !ok { continue } scannedHourly++ if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) } else { removedHourly++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table) } } } } dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_") if err != nil { return err } logger.Info("Snapshot cleanup daily scan", "tables_found", len(dailyTables)) removedDaily := 0 scannedDaily := 0 for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } scannedDaily++ if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) } else { removedDaily++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table) } } } } c.Logger.Info("Finished snapshot cleanup", "hourly_tables_scanned", scannedHourly, "daily_tables_scanned", scannedDaily, "removed_hourly_tables", removedHourly, "removed_daily_tables", removedDaily, "hourly_max_age_days", hourlyMaxDays, "daily_max_age_months", dailyMaxMonths, ) return nil } func hourlyInventoryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil { return err } if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil { return err } return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{ {Name: "VcpuCount", Type: "BIGINT"}, {Name: "RamGB", Type: "BIGINT"}, }) } func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) { if len(tables) == 0 { return "", fmt.Errorf("no tables provided for union") } if len(columns) == 0 { return "", fmt.Errorf("no columns provided for union") } columnList := strings.Join(columns, ", ") const maxCompoundTerms = 450 if len(tables) <= maxCompoundTerms { queries := make([]string, 0, len(tables)) for _, table := range tables { safeName, err := db.SafeTableName(table) if err != nil { return "", err } query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName) if whereClause != "" { query = fmt.Sprintf("%s WHERE %s", query, whereClause) } queries = append(queries, query) } if len(queries) == 0 { return "", fmt.Errorf("no valid tables provided for union") } union := strings.Join(queries, "\nUNION ALL\n") return fmt.Sprintf("SELECT * FROM (%s) AS union_all", union), nil } batches := make([]string, 0, (len(tables)/maxCompoundTerms)+1) batchIndex := 0 for start := 0; start < len(tables); start += maxCompoundTerms { end := start + maxCompoundTerms if end > len(tables) { end = len(tables) } queries := make([]string, 0, end-start) for _, table := range tables[start:end] { safeName, err := db.SafeTableName(table) if err != nil { return "", err } query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName) if whereClause != "" { query = fmt.Sprintf("%s WHERE %s", query, whereClause) } queries = append(queries, query) } if len(queries) == 0 { continue } subUnion := strings.Join(queries, "\nUNION ALL\n") batches = append(batches, fmt.Sprintf("SELECT * FROM (%s) AS batch_%d", subUnion, batchIndex)) batchIndex++ } if len(batches) == 0 { return "", fmt.Errorf("no valid tables provided for union") } outerUnion := strings.Join(batches, "\nUNION ALL\n") return fmt.Sprintf("SELECT * FROM (%s) AS union_batches", outerUnion), nil } func templateExclusionFilter() string { return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')` } func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) { if !strings.HasPrefix(table, prefix) { return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) if layout == "epoch" { epoch, err := strconv.ParseInt(suffix, 10, 64) if err != nil { return time.Time{}, false } return time.Unix(epoch, 0), true } parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false } return parsed, true } func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord { filtered := snapshots[:0] log := loggerFromCtx(ctx, nil) for _, snapshot := range snapshots { switch { case snapshot.SnapshotCount > 0: filtered = append(filtered, snapshot) case snapshot.SnapshotCount == 0: if log != nil { log.Debug("skipping snapshot table with zero count", "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime) } default: if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist { filtered = append(filtered, snapshot) } else if log != nil { log.Debug("snapshot table probe empty or failed", "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime, "error", err) } } } return filtered } func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord { filtered := records[:0] for _, r := range records { if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) { filtered = append(filtered, r) } } return filtered } var summaryUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, } // monthlyUnionColumns are the fields needed from daily summaries for monthly aggregation/refinement. var monthlyUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SamplesPresent"`, `"AvgVcpuCount"`, `"AvgRamGB"`, `"AvgProvisionedDisk"`, `"AvgIsPresent"`, `"PoolTinPct"`, `"PoolBronzePct"`, `"PoolSilverPct"`, `"PoolGoldPct"`, `"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`, `"SnapshotTime"`, } func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error { driver := strings.ToLower(dbConn.DriverName()) switch driver { case "pgx", "postgres": hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId") if err != nil { return err } if !hasColumn { if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil { return err } } if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil { return err } case "sqlite": return nil } return nil } func nullInt64ToInt(value sql.NullInt64) int64 { if value.Valid { return value.Int64 } return 0 } func nullFloat64ToFloat(value sql.NullFloat64) float64 { if value.Valid { return value.Float64 } return 0 } func intWithDefault(value int, fallback int) int { if value <= 0 { return fallback } return value } func durationFromSeconds(seconds int, fallback time.Duration) time.Duration { if seconds > 0 { return time.Duration(seconds) * time.Second } return fallback } func normalizeResourcePool(value string) string { trimmed := strings.TrimSpace(value) if trimmed == "" { return "" } lower := strings.ToLower(trimmed) canonical := map[string]string{ "tin": "Tin", "bronze": "Bronze", "silver": "Silver", "gold": "Gold", } if val, ok := canonical[lower]; ok { return val } return trimmed } func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { return dir } } return "/var/lib/vctp/reports" } func (c *CronTask) generateReport(ctx context.Context, tableName string) error { dest := c.reportsDir() start := time.Now() c.Logger.Debug("Report generation start", "table", tableName, "dest", dest) filename, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest) if err == nil { c.Logger.Debug("Report generation complete", "table", tableName, "file", filename, "duration", time.Since(start)) } else { c.Logger.Debug("Report generation failed", "table", tableName, "duration", time.Since(start), "error", err) } return err } func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (InventorySnapshotRow, error) { if vmObject == nil { return InventorySnapshotRow{}, fmt.Errorf("missing VM object") } row := InventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, SnapshotTime: snapshotTime.Unix(), } if inv != nil { row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0} row.EventKey = inv.EventKey row.CloudId = inv.CloudId row.DeletionTime = inv.DeletionTime } if vmObject.Config != nil { row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""} if !vmObject.Config.CreateDate.IsZero() { row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true} } row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0} row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0} totalDiskBytes := int64(0) for _, device := range vmObject.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { totalDiskBytes += disk.CapacityInBytes } } if totalDiskBytes > 0 { row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true} } if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" { row.SrmPlaceholder = "TRUE" } else { row.SrmPlaceholder = "FALSE" } if vmObject.Config.Template { row.IsTemplate = "TRUE" } else { row.IsTemplate = "FALSE" } } if vmObject.Runtime.PowerState == "poweredOff" { row.PoweredOn = "FALSE" } else { row.PoweredOn = "TRUE" } if inv != nil { if inv.ResourcePool.Valid { row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true} } row.Datacenter = inv.Datacenter row.Cluster = inv.Cluster row.Folder = inv.Folder if !row.CreationTime.Valid { row.CreationTime = inv.CreationTime } if !row.ProvisionedDisk.Valid { row.ProvisionedDisk = inv.ProvisionedDisk } if !row.VcpuCount.Valid { row.VcpuCount = inv.InitialVcpus } if !row.RamGB.Valid && inv.InitialRam.Valid { row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0} } if row.IsTemplate == "" { row.IsTemplate = boolStringFromInterface(inv.IsTemplate) } if row.PoweredOn == "" { row.PoweredOn = boolStringFromInterface(inv.PoweredOn) } if row.SrmPlaceholder == "" { row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder) } if !row.VmUuid.Valid { row.VmUuid = inv.VmUuid } } if row.ResourcePool.String == "" && vmObject.ResourcePool != nil { if rpLookup != nil { if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok { row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""} } } } if row.Folder.String == "" { if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } else { // Unable to resolve folder path from lookup; leave empty. } } if vmObject.Runtime.Host != nil && hostLookup != nil { if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok { if row.Cluster.String == "" && lookup.Cluster != "" { row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true} } if row.Datacenter.String == "" && lookup.Datacenter != "" { row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true} } } } if row.Cluster.String == "" && vmObject.Runtime.Host != nil { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} } else if vc.Logger != nil { vc.Logger.Warn("failed to resolve cluster from host", "vm_id", vmObject.Reference().Value, "error", err) } } if row.Datacenter.String == "" { if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil { row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""} } } return row, nil } func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) InventorySnapshotRow { return InventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, VmId: inv.VmId, EventKey: inv.EventKey, CloudId: inv.CloudId, CreationTime: inv.CreationTime, DeletionTime: inv.DeletionTime, ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid}, Datacenter: inv.Datacenter, Cluster: inv.Cluster, Folder: inv.Folder, ProvisionedDisk: inv.ProvisionedDisk, VcpuCount: inv.InitialVcpus, RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0}, IsTemplate: boolStringFromInterface(inv.IsTemplate), PoweredOn: boolStringFromInterface(inv.PoweredOn), SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder), VmUuid: inv.VmUuid, SnapshotTime: snapshotTime.Unix(), } } func loadInventoryMaps(ctx context.Context, dbConn *sqlx.DB, url string) ([]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, error) { inventoryRows, err := queries.New(dbConn).GetInventoryByVcenter(ctx, url) if err != nil { return nil, nil, nil, nil, fmt.Errorf("unable to query inventory table: %w", err) } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByName := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } if inv.VmUuid.Valid { inventoryByUuid[inv.VmUuid.String] = inv } if inv.Name != "" { inventoryByName[inv.Name] = inv } } return inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, nil } func prepareDeletionCandidates(ctx context.Context, log *slog.Logger, dbConn *sqlx.DB, url string, inventoryRows []queries.Inventory, presentSnapshots map[string]InventorySnapshotRow, presentByUuid, presentByName map[string]struct{}, startTime time.Time) (int, bool, []deletionCandidate) { candidates := make([]deletionCandidate, 0) missingCount := 0 deletionsMarked := false for _, inv := range inventoryRows { log.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) if strings.HasPrefix(inv.Name, "vCLS-") { continue } vmID := inv.VmId.String uuid := "" if inv.VmUuid.Valid { uuid = inv.VmUuid.String } name := inv.Name found := false if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { found = true } } if !found && uuid != "" { if _, ok := presentByUuid[uuid]; ok { found = true } } if !found && name != "" { if _, ok := presentByName[name]; ok { found = true } } if found { continue } row := snapshotFromInventory(inv, startTime) if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} if err := queries.New(dbConn).InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: row.DeletionTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { log.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } log.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } candidates = append(candidates, deletionCandidate{ vmID: vmID, vmUUID: uuid, name: name, cluster: clusterName, datacenter: inv.Datacenter, }) if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil { log.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { log.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) } missingCount++ } return missingCount, deletionsMarked, candidates } // buildPresentSnapshots converts vCenter VM objects into snapshot rows and aggregates totals. func (c *CronTask) buildPresentSnapshots(ctx context.Context, dbConn *sqlx.DB, vc *vcenter.Vcenter, vcVms []mo.VirtualMachine, startTime time.Time, url string, inventoryByVmID map[string]queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup map[string]string, rpLookup map[string]string) (map[string]InventorySnapshotRow, map[string]struct{}, map[string]struct{}, snapshotTotals) { log := loggerFromCtx(ctx, c.Logger) presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms)) presentByUuid := make(map[string]struct{}, len(vcVms)) presentByName := make(map[string]struct{}, len(vcVms)) totals := snapshotTotals{} for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue } if vm.Config != nil && vm.Config.Template { continue } var inv *queries.Inventory if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { existingCopy := existing inv = &existingCopy } row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) if err != nil { log.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) continue } if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { log.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } clusterName := "" if row.Cluster.Valid { clusterName = row.Cluster.String } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { log.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) } presentSnapshots[vm.Reference().Value] = row if row.VmUuid.Valid { presentByUuid[row.VmUuid.String] = struct{}{} } if row.Name != "" { presentByName[row.Name] = struct{}{} } totals.VmCount++ totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) totals.RamTotal += nullInt64ToInt(row.RamGB) totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) } return presentSnapshots, presentByUuid, presentByName, totals } // initVcenterResources logs into vCenter, fetches VMs, builds lookups, and returns a cleanup function for logout. func (c *CronTask) initVcenterResources(ctx context.Context, log *slog.Logger, url string, startTime, started time.Time) (*vcenter.Vcenter, vcenterResources, func(), error) { res := vcenterResources{} vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { log.Warn("failed to record snapshot run", "url", url, "error", upErr) } return nil, res, nil, fmt.Errorf("unable to connect to vcenter: %w", err) } cleanup := func() { logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := vc.Logout(logCtx); err != nil { log.Warn("vcenter logout failed", "url", url, "error", err) } else { log.Debug("vcenter logout succeeded", "url", url) } } vms, err := vc.GetAllVMsWithProps() if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { log.Warn("failed to record snapshot run", "url", url, "error", upErr) } cleanup() return nil, res, nil, fmt.Errorf("unable to get VMs from vcenter: %w", err) } log.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vms)) if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { log.Warn("failed to ensure vm identity tables", "error", err) } hostLookup, err := vc.BuildHostLookup() if err != nil { log.Warn("failed to build host lookup", "url", url, "error", err) hostLookup = nil } else { log.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) } folderLookup, err := vc.BuildFolderPathLookup() if err != nil { log.Warn("failed to build folder lookup", "url", url, "error", err) folderLookup = nil } else { log.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) } rpLookup, err := vc.BuildResourcePoolLookup() if err != nil { log.Warn("failed to build resource pool lookup", "url", url, "error", err) rpLookup = nil } else { log.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) } res.vms = vms res.hostLookup = hostLookup res.folderLookup = folderLookup res.rpLookup = rpLookup return vc, res, cleanup, nil } func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { log := c.Logger.With("vcenter", url) ctx = context.WithValue(ctx, ctxLoggerKey{}, log) started := time.Now() log.Debug("connecting to vcenter for hourly snapshot", "url", url) vc, resources, cleanup, err := c.initVcenterResources(ctx, log, url, startTime, started) if err != nil { return err } defer cleanup() vcVms := resources.vms hostLookup := resources.hostLookup folderLookup := resources.folderLookup rpLookup := resources.rpLookup inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, err := loadInventoryMaps(ctx, c.Database.DB(), url) if err != nil { return err } dbConn := c.Database.DB() presentSnapshots, presentByUuid, presentByName, totals := c.buildPresentSnapshots(ctx, dbConn, vc, vcVms, startTime, url, inventoryByVmID, hostLookup, folderLookup, rpLookup) deletionsMarked := false candidates := make([]deletionCandidate, 0) var prevVmCount sql.NullInt64 countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1` countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery) if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) { c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err) } c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) for _, row := range presentSnapshots { batch = append(batch, row) } log.Debug("checking inventory for missing VMs") missingCount, deletionsMarked, candidates := prepareDeletionCandidates(ctx, log, dbConn, url, inventoryRows, presentSnapshots, presentByUuid, presentByName, startTime) newCount := 0 prevTableName := "" reportTables := make(map[string]struct{}) // If deletions detected, refine deletion time using vCenter events in a small window. if missingCount > 0 { freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-4 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) } else { log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) for _, cand := range candidates { if t, ok := events[cand.vmID]; ok { delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTs, VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, DatacenterName: cand.datacenter, }); err != nil { log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } if snapRow, snapTable, found := findVMInHourlySnapshots(ctx, dbConn, url, cand.vmID); found { vmUUID := cand.vmUUID if vmUUID == "" && snapRow.VmUuid.Valid { vmUUID = snapRow.VmUuid.String } name := cand.name if name == "" { name = snapRow.Name } if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, snapTable, url, cand.vmID, vmUUID, name, delTs.Int64); err != nil { log.Warn("failed to update hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err) } else if rowsAffected > 0 { reportTables[snapTable] = struct{}{} deletionsMarked = true log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t) } } log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) } } } } // If VM count dropped vs prior totals but we didn't mark missing, still look for events (best-effort logging). if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-2 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { log.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } else { log.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end) } } log.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) if err := insertHourlyCache(ctx, dbConn, batch); err != nil { log.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { log.Warn("failed to record snapshot run", "url", url, "error", upErr) } return err } // Record per-vCenter totals snapshot. if err := db.InsertVcenterTotals(ctx, dbConn, url, startTime, totals.VmCount, totals.VcpuTotal, totals.RamTotal); err != nil { slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err) } // Discover previous snapshots once per run (serial) to avoid concurrent probes across vCenters. var prevTableTouched bool prevTableName, newCount, missingCount, prevTableTouched = c.compareWithPreviousSnapshot(ctx, dbConn, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName, missingCount) if prevTableTouched && prevTableName != "" { reportTables[prevTableName] = struct{}{} deletionsMarked = true } // If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window. if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { // Fallback: locate a previous table only if we didn't already find one. if prevTableName == "" { if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)); err == nil && prevTable != "" { moreMissing, tableUpdated := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) if moreMissing > 0 { missingCount += moreMissing } if tableUpdated { reportTables[prevTable] = struct{}{} deletionsMarked = true } // Reuse this table name for later snapshot lookups when correlating deletion events. prevTableName = prevTable } } freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second if freq <= 0 { freq = time.Hour } begin := startTime.Add(-4 * freq) end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } else { c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start_local", begin, "window_end_local", end, "window_start_utc", begin.UTC(), "window_end_utc", end.UTC(), "window_minutes", end.Sub(begin).Minutes()) for vmID, t := range events { // Skip if VM is still present. if _, ok := presentSnapshots[vmID]; ok { continue } inv, ok := inventoryByVmID[vmID] var snapRow InventorySnapshotRow var snapTable string if !ok { var found bool snapRow, snapTable, found = findVMInHourlySnapshots(ctx, dbConn, url, vmID, prevTableName) if !found { c.Logger.Debug("count-drop: deletion event has no snapshot match", "vm_id", vmID, "vcenter", url, "event_time", t) continue } inv = queries.Inventory{ VmId: snapRow.VmId, VmUuid: snapRow.VmUuid, Name: snapRow.Name, Datacenter: snapRow.Datacenter, } c.Logger.Info("count-drop: correlated deletion via snapshot lookup", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "snapshot_table", snapTable) } // Prefer UUID from snapshot if inventory entry lacks it. if !inv.VmUuid.Valid && snapRow.VmUuid.Valid { inv.VmUuid = snapRow.VmUuid } delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTs, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("count-drop: failed to update inventory deletion time from event", "vm_id", vmID, "vcenter", url, "error", err) } else { c.Logger.Info("count-drop: correlated deletion event to inventory", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, vmID, inv.VmUuid.String, inv.Name, clusterName, t.Unix()); err != nil { c.Logger.Warn("count-drop: failed to refine lifecycle cache deletion time", "vm_id", vmID, "vm_uuid", inv.VmUuid, "vcenter", url, "error", err) } tableToUpdate := snapTable if tableToUpdate == "" { tableToUpdate = prevTableName } if tableToUpdate != "" { if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, tableToUpdate, url, vmID, inv.VmUuid.String, inv.Name, delTs.Int64); err != nil { c.Logger.Warn("count-drop: failed to update hourly snapshot deletion time from event", "table", tableToUpdate, "vm_id", vmID, "vcenter", url, "error", err) } else if rowsAffected > 0 { reportTables[tableToUpdate] = struct{}{} deletionsMarked = true c.Logger.Debug("count-drop: updated hourly snapshot deletion time from event", "table", tableToUpdate, "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "vcenter", url, "event_time", t) } } missingCount++ deletionsMarked = true } } } // Backfill lifecycle deletions for VMs missing from inventory and without DeletedAt. if backfillTables, err := backfillLifecycleDeletionsToday(ctx, log, dbConn, url, startTime, presentSnapshots); err != nil { log.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err) } else if len(backfillTables) > 0 { for _, table := range backfillTables { reportTables[table] = struct{}{} } deletionsMarked = true } log.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, "missing_marked", missingCount, "created_since_prev", newCount, "deleted_since_prev", missingCount, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil { log.Warn("failed to record snapshot run", "url", url, "error", upErr) } if deletionsMarked { if len(reportTables) == 0 { reportTables[tableName] = struct{}{} } for reportTable := range reportTables { if err := c.generateReport(ctx, reportTable); err != nil { log.Warn("failed to regenerate hourly report after deletions", "error", err, "table", reportTable) } else { log.Debug("Regenerated hourly report after deletions", "table", reportTable) } } } return nil } // compareWithPreviousSnapshot cross-checks current vs. previous hourly snapshots: // marks deletions, detects new VMs when no gap exists, and returns the previous table name along with new/missing counts. func (c *CronTask) compareWithPreviousSnapshot( ctx context.Context, dbConn *sqlx.DB, url string, startTime time.Time, presentSnapshots map[string]InventorySnapshotRow, presentByUuid map[string]struct{}, presentByName map[string]struct{}, inventoryByVmID map[string]queries.Inventory, inventoryByUuid map[string]queries.Inventory, inventoryByName map[string]queries.Inventory, missingCount int, ) (string, int, int, bool) { prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)) if prevTableErr != nil { c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url) } prevSnapshotTime, _ := parseSnapshotTime(prevTableName) newCount := 0 prevTableTouched := false if prevTableName != "" { moreMissing, tableUpdated := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) missingCount += moreMissing if tableUpdated { prevTableTouched = true } expectedSeconds := int64(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) / 2 // Skip only if snapshots are closer together than half the configured cadence if SnapshotTooSoon(prevSnapshotTime, startTime.Unix(), expectedSeconds) { c.Logger.Info("skipping new-VM detection because snapshots are too close together", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix(), "expected_interval_seconds", expectedSeconds) } else { newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) if newCount > 0 { newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) names := make([]string, 0, len(newRows)) for _, r := range newRows { if r.Name != "" { names = append(names, r.Name) } else if r.VmId.Valid { names = append(names, r.VmId.String) } } c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names) } } c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount) } else { newCount = len(presentSnapshots) } return prevTableName, newCount, missingCount, prevTableTouched }