package tasks import ( "context" "database/sql" "errors" "fmt" "log/slog" "strconv" "strings" "time" "vctp/db" "vctp/db/queries" "github.com/jmoiron/sqlx" ) var snapshotProbeLimiter = make(chan struct{}, 1) func acquireSnapshotProbe(ctx context.Context) (func(), error) { select { case snapshotProbeLimiter <- struct{}{}: return func() { <-snapshotProbeLimiter }, nil case <-ctx.Done(): return nil, ctx.Err() } } func boolStringFromInterface(value interface{}) string { switch v := value.(type) { case nil: return "" case string: return v case []byte: return string(v) case bool: if v { return "TRUE" } return "FALSE" case int: if v != 0 { return "TRUE" } return "FALSE" case int64: if v != 0 { return "TRUE" } return "FALSE" default: return fmt.Sprint(v) } } // latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables. func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time, logger *slog.Logger) (string, error) { tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1, logger) if err != nil { return "", err } if len(tables) == 0 { return "", nil } return tables[0].Table, nil } // parseSnapshotTime extracts the unix suffix from an inventory_hourly table name. func parseSnapshotTime(table string) (int64, bool) { const prefix = "inventory_hourly_" if !strings.HasPrefix(table, prefix) { return 0, false } ts, err := strconv.ParseInt(strings.TrimPrefix(table, prefix), 10, 64) if err != nil { return 0, false } return ts, true } // listLatestHourlyWithRows returns recent hourly snapshot tables (ordered desc by time) that have rows, optionally filtered by vcenter. func listLatestHourlyWithRows(ctx context.Context, dbConn *sqlx.DB, vcenter string, beforeUnix int64, limit int, logger *slog.Logger) ([]snapshotTable, error) { if limit <= 0 { limit = 50 } rows, err := dbConn.QueryxContext(ctx, ` SELECT table_name, snapshot_time, snapshot_count FROM snapshot_registry WHERE snapshot_type = 'hourly' AND snapshot_time < ? ORDER BY snapshot_time DESC LIMIT ? `, beforeUnix, limit) if err != nil { return nil, err } defer rows.Close() var out []snapshotTable for rows.Next() { var name string var ts int64 var count sql.NullInt64 if scanErr := rows.Scan(&name, &ts, &count); scanErr != nil { continue } if err := db.ValidateTableName(name); err != nil { continue } if count.Valid && count.Int64 == 0 { if logger != nil { logger.Debug("skipping snapshot table with zero count", "table", name, "snapshot_time", ts, "vcenter", vcenter) } continue } probed := false var probeErr error probeTimeout := false // If count is known and >0, trust it; if NULL, accept optimistically to avoid heavy probes. hasRows := !count.Valid || count.Int64 > 0 start := time.Now() if vcenter != "" && hasRows { probed = true probeCtx, cancel := context.WithTimeout(ctx, 2*time.Second) release, err := acquireSnapshotProbe(probeCtx) if err != nil { probeErr = err hasRows = false cancel() } else { vrows, qerr := querySnapshotRows(probeCtx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter) if qerr == nil { hasRows = vrows.Next() vrows.Close() } else { probeErr = qerr hasRows = false } release() cancel() } probeTimeout = errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled) } elapsed := time.Since(start) if logger != nil { logger.Debug("evaluated snapshot table", "table", name, "snapshot_time", ts, "snapshot_count", count, "probed", probed, "has_rows", hasRows, "elapsed", elapsed, "vcenter", vcenter, "probe_error", probeErr, "probe_timeout", probeTimeout) } if !hasRows { continue } out = append(out, snapshotTable{Table: name, Time: ts, Count: count}) } return out, nil } // SnapshotTooSoon reports whether the gap between prev and curr is significantly shorter than expected. func SnapshotTooSoon(prevUnix, currUnix int64, expectedSeconds int64) bool { if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 { return false } return currUnix-prevUnix < expectedSeconds } // querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where. func querySnapshotRows(ctx context.Context, dbConn *sqlx.DB, table string, columns []string, where string, args ...interface{}) (*sqlx.Rows, error) { if err := db.ValidateTableName(table); err != nil { return nil, err } colExpr := "*" if len(columns) > 0 { colExpr = `"` + strings.Join(columns, `","`) + `"` } query := fmt.Sprintf(`SELECT %s FROM %s`, colExpr, table) if strings.TrimSpace(where) != "" { query = fmt.Sprintf(`%s WHERE %s`, query, where) } query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) return dbConn.QueryxContext(ctx, query, args...) } func updateDeletionTimeInSnapshot(ctx context.Context, dbConn *sqlx.DB, table, vcenter, vmID, vmUUID, name string, deletionUnix int64) (int64, error) { if err := db.ValidateTableName(table); err != nil { return 0, err } matchColumn := "" matchValue := "" switch { case vmID != "": matchColumn = "VmId" matchValue = vmID case vmUUID != "": matchColumn = "VmUuid" matchValue = vmUUID case name != "": matchColumn = "Name" matchValue = name default: return 0, nil } query := fmt.Sprintf(`UPDATE %s SET "DeletionTime" = ? WHERE "Vcenter" = ? AND "%s" = ? AND ("DeletionTime" IS NULL OR "DeletionTime" = 0 OR "DeletionTime" > ?)`, table, matchColumn) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) result, err := dbConn.ExecContext(ctx, query, deletionUnix, vcenter, matchValue, deletionUnix) if err != nil { return 0, err } rowsAffected, err := result.RowsAffected() if err != nil { return 0, err } return rowsAffected, nil } // markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time, currentByID map[string]InventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{}, invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) (int, bool) { if err := db.ValidateTableName(prevTable); err != nil { return 0, false } type prevRow struct { VmId sql.NullString `db:"VmId"` VmUuid sql.NullString `db:"VmUuid"` Name string `db:"Name"` Cluster sql.NullString `db:"Cluster"` Datacenter sql.NullString `db:"Datacenter"` DeletionTime sql.NullInt64 `db:"DeletionTime"` } rows, err := querySnapshotRows(ctx, dbConn, prevTable, []string{"VmId", "VmUuid", "Name", "Cluster", "Datacenter", "DeletionTime"}, `"Vcenter" = ?`, vcenter) if err != nil { c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) return 0, false } defer rows.Close() missing := 0 tableUpdated := false for rows.Next() { var r prevRow if err := rows.StructScan(&r); err != nil { continue } vmID := r.VmId.String uuid := r.VmUuid.String name := r.Name cluster := r.Cluster.String found := false if vmID != "" { if _, ok := currentByID[vmID]; ok { found = true } } if !found && uuid != "" { if _, ok := currentByUuid[uuid]; ok { found = true } } if !found && name != "" { if _, ok := currentByName[name]; ok { found = true } } // If the name is missing but UUID+Cluster still exists in inventory/current, treat it as present (rename, not delete). if !found && uuid != "" && cluster != "" { if inv, ok := invByUuid[uuid]; ok && strings.EqualFold(inv.Cluster.String, cluster) { found = true } } if found { continue } var inv queries.Inventory var ok bool if vmID != "" { inv, ok = invByID[vmID] } if !ok && uuid != "" { inv, ok = invByUuid[uuid] } if !ok && name != "" { inv, ok = invByName[name] } if !ok { continue } delTime := inv.DeletionTime if !delTime.Valid { delTime = sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: delTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String) } } // Also update lifecycle cache so deletion time is available for rollups. vmUUID := "" if inv.VmUuid.Valid { vmUUID = inv.VmUuid.String } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, inv.VmId.String, vmUUID, inv.Name, inv.Cluster.String, delTime.Int64); err != nil { c.Logger.Warn("failed to mark lifecycle cache deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter) } if rowsAffected, err := updateDeletionTimeInSnapshot(ctx, dbConn, prevTable, vcenter, inv.VmId.String, vmUUID, inv.Name, delTime.Int64); err != nil { c.Logger.Warn("failed to update hourly snapshot deletion time", "error", err, "table", prevTable, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter) } else if rowsAffected > 0 { tableUpdated = true c.Logger.Debug("updated hourly snapshot deletion time", "table", prevTable, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter, "deletion_time", delTime.Int64) } c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable) missing++ } return missing, tableUpdated } // countNewFromPrevious returns how many VMs are present in the current snapshot but not in the previous snapshot. func countNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) int { if err := db.ValidateTableName(prevTable); err != nil { return len(current) } query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) rows, err := dbConn.QueryxContext(ctx, query, vcenter) if err != nil { return len(current) } defer rows.Close() prevIDs := make(map[string]struct{}) prevUUIDs := make(map[string]struct{}) prevNames := make(map[string]struct{}) for rows.Next() { var vmID, vmUUID, name string if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil { continue } if vmID != "" { prevIDs[vmID] = struct{}{} } if vmUUID != "" { prevUUIDs[vmUUID] = struct{}{} } if name != "" { prevNames[name] = struct{}{} } } newCount := 0 for _, cur := range current { id := cur.VmId.String uuid := cur.VmUuid.String name := cur.Name if id != "" { if _, ok := prevIDs[id]; ok { continue } } if uuid != "" { if _, ok := prevUUIDs[uuid]; ok { continue } } if name != "" { if _, ok := prevNames[name]; ok { continue } } newCount++ } return newCount } // listNewFromPrevious returns the rows present now but not in the previous snapshot. func listNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) []InventorySnapshotRow { if err := db.ValidateTableName(prevTable); err != nil { all := make([]InventorySnapshotRow, 0, len(current)) for _, cur := range current { all = append(all, cur) } return all } query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) rows, err := dbConn.QueryxContext(ctx, query, vcenter) if err != nil { all := make([]InventorySnapshotRow, 0, len(current)) for _, cur := range current { all = append(all, cur) } return all } defer rows.Close() prevIDs := make(map[string]struct{}) prevUUIDs := make(map[string]struct{}) prevNames := make(map[string]struct{}) for rows.Next() { var vmID, vmUUID, name string if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil { continue } if vmID != "" { prevIDs[vmID] = struct{}{} } if vmUUID != "" { prevUUIDs[vmUUID] = struct{}{} } if name != "" { prevNames[name] = struct{}{} } } newRows := make([]InventorySnapshotRow, 0) for _, cur := range current { id := cur.VmId.String uuid := cur.VmUuid.String name := cur.Name if id != "" { if _, ok := prevIDs[id]; ok { continue } } if uuid != "" { if _, ok := prevUUIDs[uuid]; ok { continue } } if name != "" { if _, ok := prevNames[name]; ok { continue } } newRows = append(newRows, cur) } return newRows } // findVMInHourlySnapshots searches recent hourly snapshot tables for a VM by ID for the given vCenter. // extraTables are searched first (e.g., known previous snapshot tables). func findVMInHourlySnapshots(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID string, extraTables ...string) (InventorySnapshotRow, string, bool) { if vmID == "" { return InventorySnapshotRow{}, "", false } // Use a short timeout to avoid hanging if the DB is busy. ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // First search any explicit tables provided. for _, table := range extraTables { if table == "" { continue } if err := db.ValidateTableName(table); err != nil { continue } query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) var row InventorySnapshotRow if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil { return row, table, true } } // Try a handful of most recent hourly tables from the registry. rows, err := dbConn.QueryxContext(ctx, ` SELECT table_name FROM snapshot_registry WHERE snapshot_type = 'hourly' ORDER BY snapshot_time DESC LIMIT 20 `) if err != nil { return InventorySnapshotRow{}, "", false } defer rows.Close() checked := 0 for rows.Next() { var table string if scanErr := rows.Scan(&table); scanErr != nil { continue } if err := db.ValidateTableName(table); err != nil { continue } query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) var row InventorySnapshotRow if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil { return row, table, true } checked++ if checked >= 10 { // limit work break } } return InventorySnapshotRow{}, "", false }