Files
vctp2/internal/tasks/inventoryHelpers.go
Nathan Coad 2483091861
All checks were successful
continuous-integration/drone/push Build is passing
improve logging and concurrent vcenter inventory
2026-01-21 10:25:04 +11:00

458 lines
13 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strconv"
"strings"
"time"
"vctp/db"
"vctp/db/queries"
"github.com/jmoiron/sqlx"
)
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil:
return ""
case string:
return v
case []byte:
return string(v)
case bool:
if v {
return "TRUE"
}
return "FALSE"
case int:
if v != 0 {
return "TRUE"
}
return "FALSE"
case int64:
if v != 0 {
return "TRUE"
}
return "FALSE"
default:
return fmt.Sprint(v)
}
}
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables.
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1, nil)
if err != nil {
return "", err
}
if len(tables) == 0 {
return "", nil
}
return tables[0].Table, nil
}
// parseSnapshotTime extracts the unix suffix from an inventory_hourly table name.
func parseSnapshotTime(table string) (int64, bool) {
const prefix = "inventory_hourly_"
if !strings.HasPrefix(table, prefix) {
return 0, false
}
ts, err := strconv.ParseInt(strings.TrimPrefix(table, prefix), 10, 64)
if err != nil {
return 0, false
}
return ts, true
}
// listLatestHourlyWithRows returns recent hourly snapshot tables (ordered desc by time) that have rows, optionally filtered by vcenter.
func listLatestHourlyWithRows(ctx context.Context, dbConn *sqlx.DB, vcenter string, beforeUnix int64, limit int, logger *slog.Logger) ([]snapshotTable, error) {
if limit <= 0 {
limit = 50
}
rows, err := dbConn.QueryxContext(ctx, `
SELECT table_name, snapshot_time, snapshot_count
FROM snapshot_registry
WHERE snapshot_type = 'hourly' AND snapshot_time < ?
ORDER BY snapshot_time DESC
LIMIT ?
`, beforeUnix, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []snapshotTable
for rows.Next() {
var name string
var ts int64
var count sql.NullInt64
if scanErr := rows.Scan(&name, &ts, &count); scanErr != nil {
continue
}
if err := db.ValidateTableName(name); err != nil {
continue
}
if count.Valid && count.Int64 == 0 {
if logger != nil {
logger.Debug("skipping snapshot table with zero count", "table", name, "snapshot_time", ts)
}
continue
}
probed := false
hasRows := true
start := time.Now()
if !count.Valid {
probed = true
if ok, err := db.TableHasRows(ctx, dbConn, name); err == nil {
hasRows = ok
} else {
hasRows = false
if logger != nil {
logger.Debug("snapshot table probe failed", "table", name, "error", err)
}
}
}
if vcenter != "" && hasRows {
probed = true
vrows, qerr := querySnapshotRows(ctx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter)
if qerr == nil {
hasRows = vrows.Next()
vrows.Close()
} else {
hasRows = false
if logger != nil {
logger.Debug("snapshot vcenter filter probe failed", "table", name, "vcenter", vcenter, "error", qerr)
}
}
}
elapsed := time.Since(start)
if logger != nil {
logger.Debug("evaluated snapshot table", "table", name, "snapshot_time", ts, "snapshot_count", count, "probed", probed, "has_rows", hasRows, "elapsed", elapsed)
}
if !hasRows {
continue
}
out = append(out, snapshotTable{Table: name, Time: ts, Count: count})
}
return out, nil
}
// SnapshotTooSoon reports whether the gap between prev and curr is significantly shorter than expected (default: <50% interval).
func SnapshotTooSoon(prevUnix, currUnix int64, expectedSeconds int64) bool {
if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 {
return false
}
return currUnix-prevUnix < expectedSeconds/2
}
// querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where.
func querySnapshotRows(ctx context.Context, dbConn *sqlx.DB, table string, columns []string, where string, args ...interface{}) (*sqlx.Rows, error) {
if err := db.ValidateTableName(table); err != nil {
return nil, err
}
colExpr := "*"
if len(columns) > 0 {
colExpr = `"` + strings.Join(columns, `","`) + `"`
}
query := fmt.Sprintf(`SELECT %s FROM %s`, colExpr, table)
if strings.TrimSpace(where) != "" {
query = fmt.Sprintf(`%s WHERE %s`, query, where)
}
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
return dbConn.QueryxContext(ctx, query, args...)
}
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time,
currentByID map[string]InventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{},
invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int {
if err := db.ValidateTableName(prevTable); err != nil {
return 0
}
type prevRow struct {
VmId sql.NullString `db:"VmId"`
VmUuid sql.NullString `db:"VmUuid"`
Name string `db:"Name"`
Cluster sql.NullString `db:"Cluster"`
Datacenter sql.NullString `db:"Datacenter"`
DeletionTime sql.NullInt64 `db:"DeletionTime"`
}
rows, err := querySnapshotRows(ctx, dbConn, prevTable, []string{"VmId", "VmUuid", "Name", "Cluster", "Datacenter", "DeletionTime"}, `"Vcenter" = ?`, vcenter)
if err != nil {
c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter)
return 0
}
defer rows.Close()
missing := 0
for rows.Next() {
var r prevRow
if err := rows.StructScan(&r); err != nil {
continue
}
vmID := r.VmId.String
uuid := r.VmUuid.String
name := r.Name
cluster := r.Cluster.String
found := false
if vmID != "" {
if _, ok := currentByID[vmID]; ok {
found = true
}
}
if !found && uuid != "" {
if _, ok := currentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := currentByName[name]; ok {
found = true
}
}
// If the name is missing but UUID+Cluster still exists in inventory/current, treat it as present (rename, not delete).
if !found && uuid != "" && cluster != "" {
if inv, ok := invByUuid[uuid]; ok && strings.EqualFold(inv.Cluster.String, cluster) {
found = true
}
}
if found {
continue
}
var inv queries.Inventory
var ok bool
if vmID != "" {
inv, ok = invByID[vmID]
}
if !ok && uuid != "" {
inv, ok = invByUuid[uuid]
}
if !ok && name != "" {
inv, ok = invByName[name]
}
if !ok {
continue
}
if inv.DeletionTime.Valid {
continue
}
delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: delTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String)
continue
}
// Also update lifecycle cache so deletion time is available for rollups.
vmUUID := ""
if inv.VmUuid.Valid {
vmUUID = inv.VmUuid.String
}
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, inv.VmId.String, vmUUID, inv.Name, inv.Cluster.String, delTime.Int64); err != nil {
c.Logger.Warn("failed to mark lifecycle cache deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter)
}
c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable)
missing++
}
return missing
}
// countNewFromPrevious returns how many VMs are present in the current snapshot but not in the previous snapshot.
func countNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) int {
if err := db.ValidateTableName(prevTable); err != nil {
return len(current)
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
return len(current)
}
defer rows.Close()
prevIDs := make(map[string]struct{})
prevUUIDs := make(map[string]struct{})
prevNames := make(map[string]struct{})
for rows.Next() {
var vmID, vmUUID, name string
if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil {
continue
}
if vmID != "" {
prevIDs[vmID] = struct{}{}
}
if vmUUID != "" {
prevUUIDs[vmUUID] = struct{}{}
}
if name != "" {
prevNames[name] = struct{}{}
}
}
newCount := 0
for _, cur := range current {
id := cur.VmId.String
uuid := cur.VmUuid.String
name := cur.Name
if id != "" {
if _, ok := prevIDs[id]; ok {
continue
}
}
if uuid != "" {
if _, ok := prevUUIDs[uuid]; ok {
continue
}
}
if name != "" {
if _, ok := prevNames[name]; ok {
continue
}
}
newCount++
}
return newCount
}
// listNewFromPrevious returns the rows present now but not in the previous snapshot.
func listNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) []InventorySnapshotRow {
if err := db.ValidateTableName(prevTable); err != nil {
all := make([]InventorySnapshotRow, 0, len(current))
for _, cur := range current {
all = append(all, cur)
}
return all
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
all := make([]InventorySnapshotRow, 0, len(current))
for _, cur := range current {
all = append(all, cur)
}
return all
}
defer rows.Close()
prevIDs := make(map[string]struct{})
prevUUIDs := make(map[string]struct{})
prevNames := make(map[string]struct{})
for rows.Next() {
var vmID, vmUUID, name string
if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil {
continue
}
if vmID != "" {
prevIDs[vmID] = struct{}{}
}
if vmUUID != "" {
prevUUIDs[vmUUID] = struct{}{}
}
if name != "" {
prevNames[name] = struct{}{}
}
}
newRows := make([]InventorySnapshotRow, 0)
for _, cur := range current {
id := cur.VmId.String
uuid := cur.VmUuid.String
name := cur.Name
if id != "" {
if _, ok := prevIDs[id]; ok {
continue
}
}
if uuid != "" {
if _, ok := prevUUIDs[uuid]; ok {
continue
}
}
if name != "" {
if _, ok := prevNames[name]; ok {
continue
}
}
newRows = append(newRows, cur)
}
return newRows
}
// findVMInHourlySnapshots searches recent hourly snapshot tables for a VM by ID for the given vCenter.
// extraTables are searched first (e.g., known previous snapshot tables).
func findVMInHourlySnapshots(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID string, extraTables ...string) (InventorySnapshotRow, bool) {
if vmID == "" {
return InventorySnapshotRow{}, false
}
// Use a short timeout to avoid hanging if the DB is busy.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// First search any explicit tables provided.
for _, table := range extraTables {
if table == "" {
continue
}
if err := db.ValidateTableName(table); err != nil {
continue
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
var row InventorySnapshotRow
if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil {
return row, true
}
}
// Try a handful of most recent hourly tables from the registry.
rows, err := dbConn.QueryxContext(ctx, `
SELECT table_name
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time DESC
LIMIT 20
`)
if err != nil {
return InventorySnapshotRow{}, false
}
defer rows.Close()
checked := 0
for rows.Next() {
var table string
if scanErr := rows.Scan(&table); scanErr != nil {
continue
}
if err := db.ValidateTableName(table); err != nil {
continue
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
var row InventorySnapshotRow
if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil {
return row, true
}
checked++
if checked >= 10 { // limit work
break
}
}
return InventorySnapshotRow{}, false
}