From 0517ef88c3ec0e424ba1a22ce06f81a648cc5092 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Tue, 20 Jan 2026 16:33:31 +1100 Subject: [PATCH] [CI SKIP] bugfixes for vm deletion tracking --- db/helpers.go | 48 +- db/local.go | 6 +- internal/tasks/inventoryDatabase.go | 191 ++++++++ internal/tasks/inventoryHelpers.go | 383 +++++++++++++++ internal/tasks/inventorySnapshots.go | 684 ++++++++++++--------------- internal/vcenter/vcenter.go | 90 ++++ main.go | 12 +- 7 files changed, 1016 insertions(+), 398 deletions(-) create mode 100644 internal/tasks/inventoryDatabase.go create mode 100644 internal/tasks/inventoryHelpers.go diff --git a/db/helpers.go b/db/helpers.go index b14c3a1..e67c59f 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -452,9 +452,10 @@ func UpsertVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vcenter string return err } driver := strings.ToLower(dbConn.DriverName()) + bindType := sqlx.BindType(driver) query := ` INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen") -VALUES ($1,$2,$3,$4,$5,$6,$6) +VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET "Name"=EXCLUDED."Name", "Cluster"=EXCLUDED."Cluster", @@ -462,27 +463,26 @@ ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET "FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"), "DeletedAt"=NULL ` - args := []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix()} - if driver == "sqlite" { - query = ` -INSERT OR REPLACE INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen") -VALUES (?,?,?,?,?,?,?) -` - args = []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix(), seen.Unix()} - } + query = sqlx.Rebind(bindType, query) + args := []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix(), seen.Unix()} _, err := dbConn.ExecContext(ctx, query, args...) + if err != nil { + slog.Warn("lifecycle upsert exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err) + } return err } -// MarkVmDeleted updates lifecycle cache with a deletion timestamp. -func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID string, deletedAt int64) error { +// MarkVmDeleted updates lifecycle cache with a deletion timestamp, carrying optional name/cluster. +func MarkVmDeletedWithDetails(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID, name, cluster string, deletedAt int64) error { if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil { return err } driver := strings.ToLower(dbConn.DriverName()) + bindType := sqlx.BindType(driver) + query := ` -INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","DeletedAt","FirstSeen","LastSeen") -VALUES ($1,$2,$3,$4,$4,$4) +INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","DeletedAt","FirstSeen","LastSeen") +VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET "DeletedAt"=CASE WHEN vm_lifecycle_cache."DeletedAt" IS NULL OR vm_lifecycle_cache."DeletedAt"=0 OR EXCLUDED."DeletedAt" latestTime { + latestTime = epoch + latest = name + } + } + return latest, nil +} + +// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. +func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time, + currentByID map[string]InventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{}, + invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int { + + if err := db.ValidateTableName(prevTable); err != nil { + return 0 + } + + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Cluster","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + + type prevRow struct { + VmId sql.NullString `db:"VmId"` + VmUuid sql.NullString `db:"VmUuid"` + Name string `db:"Name"` + Cluster sql.NullString `db:"Cluster"` + Datacenter sql.NullString `db:"Datacenter"` + DeletionTime sql.NullInt64 `db:"DeletionTime"` + } + + rows, err := dbConn.QueryxContext(ctx, query, vcenter) + if err != nil { + c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) + return 0 + } + defer rows.Close() + + missing := 0 + for rows.Next() { + var r prevRow + if err := rows.StructScan(&r); err != nil { + continue + } + vmID := r.VmId.String + uuid := r.VmUuid.String + name := r.Name + cluster := r.Cluster.String + + found := false + if vmID != "" { + if _, ok := currentByID[vmID]; ok { + found = true + } + } + if !found && uuid != "" { + if _, ok := currentByUuid[uuid]; ok { + found = true + } + } + if !found && name != "" { + if _, ok := currentByName[name]; ok { + found = true + } + } + // If the name is missing but UUID+Cluster still exists in inventory/current, treat it as present (rename, not delete). + if !found && uuid != "" && cluster != "" { + if inv, ok := invByUuid[uuid]; ok && strings.EqualFold(inv.Cluster.String, cluster) { + found = true + } + } + if found { + continue + } + + var inv queries.Inventory + var ok bool + if vmID != "" { + inv, ok = invByID[vmID] + } + if !ok && uuid != "" { + inv, ok = invByUuid[uuid] + } + if !ok && name != "" { + inv, ok = invByName[name] + } + if !ok { + continue + } + if inv.DeletionTime.Valid { + continue + } + + delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: delTime, + VmId: inv.VmId, + DatacenterName: inv.Datacenter, + }); err != nil { + c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String) + continue + } + // Also update lifecycle cache so deletion time is available for rollups. + vmUUID := "" + if inv.VmUuid.Valid { + vmUUID = inv.VmUuid.String + } + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, inv.VmId.String, vmUUID, inv.Name, inv.Cluster.String, delTime.Int64); err != nil { + c.Logger.Warn("failed to mark lifecycle cache deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter) + } + c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable) + missing++ + } + + return missing +} + +// countNewFromPrevious returns how many VMs are present in the current snapshot but not in the previous snapshot. +func countNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) int { + if err := db.ValidateTableName(prevTable); err != nil { + return len(current) + } + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + + rows, err := dbConn.QueryxContext(ctx, query, vcenter) + if err != nil { + return len(current) + } + defer rows.Close() + + prevIDs := make(map[string]struct{}) + prevUUIDs := make(map[string]struct{}) + prevNames := make(map[string]struct{}) + for rows.Next() { + var vmID, vmUUID, name string + if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil { + continue + } + if vmID != "" { + prevIDs[vmID] = struct{}{} + } + if vmUUID != "" { + prevUUIDs[vmUUID] = struct{}{} + } + if name != "" { + prevNames[name] = struct{}{} + } + } + + newCount := 0 + for _, cur := range current { + id := cur.VmId.String + uuid := cur.VmUuid.String + name := cur.Name + if id != "" { + if _, ok := prevIDs[id]; ok { + continue + } + } + if uuid != "" { + if _, ok := prevUUIDs[uuid]; ok { + continue + } + } + if name != "" { + if _, ok := prevNames[name]; ok { + continue + } + } + newCount++ + } + return newCount +} + +// listNewFromPrevious returns the rows present now but not in the previous snapshot. +func listNewFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, current map[string]InventorySnapshotRow) []InventorySnapshotRow { + if err := db.ValidateTableName(prevTable); err != nil { + all := make([]InventorySnapshotRow, 0, len(current)) + for _, cur := range current { + all = append(all, cur) + } + return all + } + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, prevTable) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + + rows, err := dbConn.QueryxContext(ctx, query, vcenter) + if err != nil { + all := make([]InventorySnapshotRow, 0, len(current)) + for _, cur := range current { + all = append(all, cur) + } + return all + } + defer rows.Close() + + prevIDs := make(map[string]struct{}) + prevUUIDs := make(map[string]struct{}) + prevNames := make(map[string]struct{}) + for rows.Next() { + var vmID, vmUUID, name string + if scanErr := rows.Scan(&vmID, &vmUUID, &name); scanErr != nil { + continue + } + if vmID != "" { + prevIDs[vmID] = struct{}{} + } + if vmUUID != "" { + prevUUIDs[vmUUID] = struct{}{} + } + if name != "" { + prevNames[name] = struct{}{} + } + } + + newRows := make([]InventorySnapshotRow, 0) + for _, cur := range current { + id := cur.VmId.String + uuid := cur.VmUuid.String + name := cur.Name + if id != "" { + if _, ok := prevIDs[id]; ok { + continue + } + } + if uuid != "" { + if _, ok := prevUUIDs[uuid]; ok { + continue + } + } + if name != "" { + if _, ok := prevNames[name]; ok { + continue + } + } + newRows = append(newRows, cur) + } + return newRows +} + +// findVMInHourlySnapshots searches recent hourly snapshot tables for a VM by ID for the given vCenter. +// extraTables are searched first (e.g., known previous snapshot tables). +func findVMInHourlySnapshots(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID string, extraTables ...string) (InventorySnapshotRow, bool) { + if vmID == "" { + return InventorySnapshotRow{}, false + } + // Use a short timeout to avoid hanging if the DB is busy. + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // First search any explicit tables provided. + for _, table := range extraTables { + if table == "" { + continue + } + if err := db.ValidateTableName(table); err != nil { + continue + } + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + var row InventorySnapshotRow + if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil { + return row, true + } + } + + // Try a handful of most recent hourly tables from the registry. + rows, err := dbConn.QueryxContext(ctx, ` +SELECT table_name +FROM snapshot_registry +WHERE snapshot_type = 'hourly' +ORDER BY snapshot_time DESC +LIMIT 20 +`) + if err != nil { + return InventorySnapshotRow{}, false + } + defer rows.Close() + + checked := 0 + for rows.Next() { + var table string + if scanErr := rows.Scan(&table); scanErr != nil { + continue + } + if err := db.ValidateTableName(table); err != nil { + continue + } + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, table) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + var row InventorySnapshotRow + if err := dbConn.QueryRowxContext(ctx, query, vcenter, vmID).Scan(&row.VmId, &row.VmUuid, &row.Name, &row.Datacenter, &row.Cluster); err == nil { + return row, true + } + checked++ + if checked >= 10 { // limit work + break + } + } + return InventorySnapshotRow{}, false +} diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index e549bc9..b84d66b 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -3,6 +3,7 @@ package tasks import ( "context" "database/sql" + "errors" "fmt" "log/slog" "strconv" @@ -22,7 +23,7 @@ import ( "github.com/vmware/govmomi/vim25/types" ) -type inventorySnapshotRow struct { +type InventorySnapshotRow struct { InventoryId sql.NullInt64 Name string Vcenter string @@ -56,6 +57,7 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } + snapshotFreq := durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour) tracker := NewCronTracker(c.Database) // Clear stale marker for this job only (short timeout to avoid blocking). staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second) @@ -66,7 +68,8 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo startedAt := time.Now() defer func() { - logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt)) + // gocron logs the next run on its side, but log here for quick visibility. + logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt), "next_run_estimated", time.Now().Add(snapshotFreq)) }() done, skip, err := tracker.Start(jobCtx, "hourly_snapshot") if err != nil { @@ -419,25 +422,6 @@ func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } -func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error { - if _, err := db.SafeTableName(table); err != nil { - return err - } - _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table)) - return err -} - -func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error { - if _, err := db.SafeTableName(table); err != nil { - return err - } - _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table)) - if err != nil { - return fmt.Errorf("failed to clear table %s: %w", table, err) - } - return nil -} - func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord { filtered := snapshots[:0] for _, snapshot := range snapshots { @@ -549,6 +533,128 @@ func normalizeResourcePool(value string) string { return trimmed } +// backfillLifecycleDeletionsToday looks for VMs in the lifecycle cache that are not in the current inventory, +// have no DeletedAt, and determines their deletion time from today's hourly snapshots. +func backfillLifecycleDeletionsToday(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, present map[string]InventorySnapshotRow) error { + dayStart := truncateDate(snapshotTime) + dayEnd := dayStart.Add(24 * time.Hour) + + // Lifecycle entries missing DeletedAt. + queryLifecycle := ` +SELECT "VmId","VmUuid","Name","Cluster" +FROM vm_lifecycle_cache +WHERE "Vcenter" = ? AND ("DeletedAt" IS NULL OR "DeletedAt" = 0) +` + rows, err := dbConn.QueryxContext(ctx, queryLifecycle, vcenter) + if err != nil { + return err + } + defer rows.Close() + + type candidate struct { + vmID string + vmUUID string + name string + cluster string + } + var cands []candidate + for rows.Next() { + var vmID, vmUUID, name, cluster sql.NullString + if scanErr := rows.Scan(&vmID, &vmUUID, &name, &cluster); scanErr != nil { + continue + } + if vmID.String == "" { + continue + } + if _, ok := present[vmID.String]; ok { + continue // still present, skip + } + cands = append(cands, candidate{ + vmID: vmID.String, + vmUUID: vmUUID.String, + name: name.String, + cluster: cluster.String, + }) + } + + if len(cands) == 0 { + return nil + } + + // Get today's hourly tables. + query := ` +SELECT table_name, snapshot_time +FROM snapshot_registry +WHERE snapshot_type = 'hourly' AND snapshot_time >= ? AND snapshot_time < ? +ORDER BY snapshot_time ASC +` + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + var tables []struct { + Table string `db:"table_name"` + Time int64 `db:"snapshot_time"` + } + rowsTables, err := dbConn.QueryxContext(ctx, query, dayStart.Unix(), dayEnd.Unix()) + if err != nil { + return err + } + defer rowsTables.Close() + for rowsTables.Next() { + var t struct { + Table string `db:"table_name"` + Time int64 `db:"snapshot_time"` + } + if err := rowsTables.StructScan(&t); err != nil { + continue + } + tables = append(tables, t) + } + if len(tables) == 0 { + return nil + } + + for _, cand := range cands { + var lastSeen int64 + var deletion int64 + logger.Debug("lifecycle backfill candidate", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "tables", len(tables)) + for i, tbl := range tables { + if err := db.ValidateTableName(tbl.Table); err != nil { + continue + } + q := fmt.Sprintf(`SELECT "Name","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, tbl.Table) + q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q) + var name, cluster sql.NullString + err := dbConn.QueryRowxContext(ctx, q, vcenter, cand.vmID).Scan(&name, &cluster) + if err == nil { + lastSeen = tbl.Time + if cand.name == "" && name.Valid { + cand.name = name.String + } + if cand.cluster == "" && cluster.Valid { + cand.cluster = cluster.String + } + continue + } + // Not found in this table; if previously seen today, mark deletion at this snapshot time. + if lastSeen > 0 { + deletion = tbl.Time + break + } + // If never seen today and we're at the last table, mark deletion at current snapshot time. + if i == len(tables)-1 { + deletion = tbl.Time + } + } + if deletion > 0 { + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, cand.vmID, cand.vmUUID, cand.name, cand.cluster, deletion); err != nil { + logger.Warn("lifecycle backfill mark deleted failed", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion, "error", err) + continue + } + logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion) + } + } + return nil +} + func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { @@ -564,12 +670,12 @@ func (c *CronTask) generateReport(ctx context.Context, tableName string) error { return err } -func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (inventorySnapshotRow, error) { +func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (InventorySnapshotRow, error) { if vmObject == nil { - return inventorySnapshotRow{}, fmt.Errorf("missing VM object") + return InventorySnapshotRow{}, fmt.Errorf("missing VM object") } - row := inventorySnapshotRow{ + row := InventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, @@ -680,9 +786,11 @@ func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTi } } - if row.Cluster.String == "" { + if row.Cluster.String == "" && vmObject.Runtime.Host != nil { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} + } else if vc.Logger != nil { + vc.Logger.Warn("failed to resolve cluster from host", "vm_id", vmObject.Reference().Value, "error", err) } } @@ -695,8 +803,8 @@ func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTi return row, nil } -func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow { - return inventorySnapshotRow{ +func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) InventorySnapshotRow { + return InventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, @@ -720,167 +828,6 @@ func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) invent } } -func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []inventorySnapshotRow) error { - if len(rows) == 0 { - return nil - } - if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { - return err - } - driver := strings.ToLower(dbConn.DriverName()) - conflict := "" - verb := "INSERT INTO" - if driver == "sqlite" { - verb = "INSERT OR REPLACE INTO" - } else { - conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET - "VmUuid"=EXCLUDED."VmUuid", - "Name"=EXCLUDED."Name", - "CreationTime"=EXCLUDED."CreationTime", - "DeletionTime"=EXCLUDED."DeletionTime", - "ResourcePool"=EXCLUDED."ResourcePool", - "Datacenter"=EXCLUDED."Datacenter", - "Cluster"=EXCLUDED."Cluster", - "Folder"=EXCLUDED."Folder", - "ProvisionedDisk"=EXCLUDED."ProvisionedDisk", - "VcpuCount"=EXCLUDED."VcpuCount", - "RamGB"=EXCLUDED."RamGB", - "IsTemplate"=EXCLUDED."IsTemplate", - "PoweredOn"=EXCLUDED."PoweredOn", - "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` - } - - cols := []string{ - "SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool", - "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", - } - bind := sqlx.BindType(dbConn.DriverName()) - placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") - stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict) - stmtText = sqlx.Rebind(bind, stmtText) - - tx, err := dbConn.BeginTxx(ctx, nil) - if err != nil { - return err - } - stmt, err := tx.PreparexContext(ctx, stmtText) - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - - for _, r := range rows { - args := []interface{}{ - r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool, - r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder, - } - if _, err := stmt.ExecContext(ctx, args...); err != nil { - tx.Rollback() - return err - } - } - return tx.Commit() -} - -func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error { - if len(rows) == 0 { - return nil - } - if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { - return err - } - tx, err := dbConn.BeginTxx(ctx, nil) - if err != nil { - return err - } - - baseCols := []string{ - "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", - "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", - "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", - } - bind := sqlx.BindType(dbConn.DriverName()) - buildStmt := func(cols []string) (*sqlx.Stmt, error) { - colList := `"` + strings.Join(cols, `", "`) + `"` - placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") - return tx.PreparexContext(ctx, sqlx.Rebind(bind, fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, tableName, colList, placeholders))) - } - - stmt, err := buildStmt(baseCols) - if err != nil { - // Fallback for legacy tables that still have IsPresent. - withLegacy := append(append([]string{}, baseCols...), "IsPresent") - stmt, err = buildStmt(withLegacy) - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - for _, row := range rows { - args := []interface{}{ - row.InventoryId, - row.Name, - row.Vcenter, - row.VmId, - row.EventKey, - row.CloudId, - row.CreationTime, - row.DeletionTime, - row.ResourcePool, - row.Datacenter, - row.Cluster, - row.Folder, - row.ProvisionedDisk, - row.VcpuCount, - row.RamGB, - row.IsTemplate, - row.PoweredOn, - row.SrmPlaceholder, - row.VmUuid, - row.SnapshotTime, - "TRUE", - } - if _, err := stmt.ExecContext(ctx, args...); err != nil { - tx.Rollback() - return err - } - } - return tx.Commit() - } - defer stmt.Close() - - for _, row := range rows { - args := []interface{}{ - row.InventoryId, - row.Name, - row.Vcenter, - row.VmId, - row.EventKey, - row.CloudId, - row.CreationTime, - row.DeletionTime, - row.ResourcePool, - row.Datacenter, - row.Cluster, - row.Folder, - row.ProvisionedDisk, - row.VcpuCount, - row.RamGB, - row.IsTemplate, - row.PoweredOn, - row.SrmPlaceholder, - row.VmUuid, - row.SnapshotTime, - } - if _, err := stmt.ExecContext(ctx, args...); err != nil { - tx.Rollback() - return err - } - } - return tx.Commit() -} - func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { started := time.Now() c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) @@ -957,11 +904,25 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } dbConn := c.Database.DB() - presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) + presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms)) presentByUuid := make(map[string]struct{}, len(vcVms)) presentByName := make(map[string]struct{}, len(vcVms)) totals := snapshotTotals{} deletionsMarked := false + var prevVmCount sql.NullInt64 + countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1` + countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery) + if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) { + c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err) + } + type deletionCandidate struct { + vmID string + vmUUID string + name string + cluster string + datacenter sql.NullString + } + candidates := make([]deletionCandidate, 0) for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue @@ -1007,13 +968,14 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) - batch := make([]inventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) + batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) for _, row := range presentSnapshots { batch = append(batch, row) } c.Logger.Debug("checking inventory for missing VMs", "vcenter", url) missingCount := 0 + newCount := 0 for _, inv := range inventoryRows { c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) @@ -1061,19 +1023,74 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } - if err := db.MarkVmDeleted(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, startTime.Unix()); err != nil { - c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) - } clusterName := "" if inv.Cluster.Valid { clusterName = inv.Cluster.String } + + candidates = append(candidates, deletionCandidate{ + vmID: vmID, + vmUUID: uuid, + name: name, + cluster: clusterName, + datacenter: inv.Datacenter, + }) + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil { + c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) + } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) } missingCount++ } + // If deletions detected, refine deletion time using vCenter events in a small window. + if missingCount > 0 { + freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second + if freq <= 0 { + freq = time.Hour + } + begin := startTime.Add(-4 * freq) + end := startTime + events, err := vc.FindVmDeletionEvents(ctx, begin, end) + if err != nil { + c.Logger.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) + } else { + c.Logger.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) + for _, cand := range candidates { + if t, ok := events[cand.vmID]; ok { + delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: delTs, + VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, + DatacenterName: cand.datacenter, + }); err != nil { + c.Logger.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + } + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { + c.Logger.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + } + c.Logger.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) + } + } + } + } + // If VM count dropped vs prior totals but we didn't mark missing, still look for events (best-effort logging). + if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { + freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second + if freq <= 0 { + freq = time.Hour + } + begin := startTime.Add(-2 * freq) + end := startTime + events, err := vc.FindVmDeletionEvents(ctx, begin, end) + if err != nil { + c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) + } else { + c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end) + } + } + c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) if err := insertHourlyCache(ctx, dbConn, batch); err != nil { @@ -1093,11 +1110,108 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } // Compare with previous snapshot for this vcenter to mark deletions at snapshot time. - if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" { - moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) + prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime) + + if prevTableErr != nil { + c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url) + } + + if prevTableName != "" { + moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) missingCount += moreMissing - } else if err != nil { - c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", err, "url", url) + newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) + if newCount > 0 { + newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) + names := make([]string, 0, len(newRows)) + for _, r := range newRows { + if r.Name != "" { + names = append(names, r.Name) + } else if r.VmId.Valid { + names = append(names, r.VmId.String) + } + } + c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names) + } + c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount) + } else { + // No previous snapshot found (or lookup failed). + newCount = len(presentSnapshots) + } + + // If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window. + if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) { + // Fallback: compare against latest registered snapshot table. + if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" { + moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) + if moreMissing > 0 { + missingCount += moreMissing + } + // Reuse this table name for later snapshot lookups when correlating deletion events. + prevTableName = prevTable + } + freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second + if freq <= 0 { + freq = time.Hour + } + begin := startTime.Add(-4 * freq) + end := startTime + events, err := vc.FindVmDeletionEvents(ctx, begin, end) + if err != nil { + c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) + } else { + c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start_local", begin, "window_end_local", end, "window_start_utc", begin.UTC(), "window_end_utc", end.UTC(), "window_minutes", end.Sub(begin).Minutes()) + for vmID, t := range events { + // Skip if VM is still present. + if _, ok := presentSnapshots[vmID]; ok { + continue + } + inv, ok := inventoryByVmID[vmID] + var snapRow InventorySnapshotRow + if !ok { + var found bool + snapRow, found = findVMInHourlySnapshots(ctx, dbConn, url, vmID, prevTableName) + if !found { + c.Logger.Debug("count-drop: deletion event has no snapshot match", "vm_id", vmID, "vcenter", url, "event_time", t) + continue + } + inv = queries.Inventory{ + VmId: snapRow.VmId, + VmUuid: snapRow.VmUuid, + Name: snapRow.Name, + Datacenter: snapRow.Datacenter, + } + c.Logger.Info("count-drop: correlated deletion via snapshot lookup", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "snapshot_table", prevTableName) + } + // Prefer UUID from snapshot if inventory entry lacks it. + if !inv.VmUuid.Valid && snapRow.VmUuid.Valid { + inv.VmUuid = snapRow.VmUuid + } + delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: delTs, + VmId: inv.VmId, + DatacenterName: inv.Datacenter, + }); err != nil { + c.Logger.Warn("count-drop: failed to update inventory deletion time from event", "vm_id", vmID, "vcenter", url, "error", err) + } else { + c.Logger.Info("count-drop: correlated deletion event to inventory", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) + } + clusterName := "" + if inv.Cluster.Valid { + clusterName = inv.Cluster.String + } + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, vmID, inv.VmUuid.String, inv.Name, clusterName, t.Unix()); err != nil { + c.Logger.Warn("count-drop: failed to refine lifecycle cache deletion time", "vm_id", vmID, "vm_uuid", inv.VmUuid, "vcenter", url, "error", err) + } + missingCount++ + deletionsMarked = true + } + } + } + + // Backfill lifecycle deletions for VMs missing from inventory and without DeletedAt. + if err := backfillLifecycleDeletionsToday(ctx, c.Logger, dbConn, url, startTime, presentSnapshots); err != nil { + c.Logger.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err) } c.Logger.Info("Hourly snapshot summary", @@ -1107,6 +1221,8 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, "missing_marked", missingCount, + "created_since_prev", newCount, + "deleted_since_prev", missingCount, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil { @@ -1121,177 +1237,3 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } return nil } - -func boolStringFromInterface(value interface{}) string { - switch v := value.(type) { - case nil: - return "" - case string: - return v - case []byte: - return string(v) - case bool: - if v { - return "TRUE" - } - return "FALSE" - case int: - if v != 0 { - return "TRUE" - } - return "FALSE" - case int64: - if v != 0 { - return "TRUE" - } - return "FALSE" - default: - return fmt.Sprint(v) - } -} - -// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time. -func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) { - driver := strings.ToLower(dbConn.DriverName()) - var rows *sqlx.Rows - var err error - switch driver { - case "sqlite": - rows, err = dbConn.QueryxContext(ctx, ` -SELECT name FROM sqlite_master -WHERE type = 'table' AND name LIKE 'inventory_hourly_%' -`) - case "pgx", "postgres": - rows, err = dbConn.QueryxContext(ctx, ` -SELECT tablename FROM pg_catalog.pg_tables -WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%' -`) - default: - return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver) - } - if err != nil { - return "", err - } - defer rows.Close() - - var latest string - var latestTime int64 - for rows.Next() { - var name string - if scanErr := rows.Scan(&name); scanErr != nil { - continue - } - if !strings.HasPrefix(name, "inventory_hourly_") { - continue - } - suffix := strings.TrimPrefix(name, "inventory_hourly_") - epoch, parseErr := strconv.ParseInt(suffix, 10, 64) - if parseErr != nil { - continue - } - if epoch < cutoff.Unix() && epoch > latestTime { - latestTime = epoch - latest = name - } - } - return latest, nil -} - -// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. -func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time, - currentByID map[string]inventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{}, - invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int { - - if err := db.ValidateTableName(prevTable); err != nil { - return 0 - } - - query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Cluster","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable) - query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) - - type prevRow struct { - VmId sql.NullString `db:"VmId"` - VmUuid sql.NullString `db:"VmUuid"` - Name string `db:"Name"` - Cluster sql.NullString `db:"Cluster"` - Datacenter sql.NullString `db:"Datacenter"` - DeletionTime sql.NullInt64 `db:"DeletionTime"` - } - - rows, err := dbConn.QueryxContext(ctx, query, vcenter) - if err != nil { - c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) - return 0 - } - defer rows.Close() - - missing := 0 - for rows.Next() { - var r prevRow - if err := rows.StructScan(&r); err != nil { - continue - } - vmID := r.VmId.String - uuid := r.VmUuid.String - name := r.Name - cluster := r.Cluster.String - - found := false - if vmID != "" { - if _, ok := currentByID[vmID]; ok { - found = true - } - } - if !found && uuid != "" { - if _, ok := currentByUuid[uuid]; ok { - found = true - } - } - if !found && name != "" { - if _, ok := currentByName[name]; ok { - found = true - } - } - // If the name is missing but UUID+Cluster still exists in inventory/current, treat it as present (rename, not delete). - if !found && uuid != "" && cluster != "" { - if inv, ok := invByUuid[uuid]; ok && strings.EqualFold(inv.Cluster.String, cluster) { - found = true - } - } - if found { - continue - } - - var inv queries.Inventory - var ok bool - if vmID != "" { - inv, ok = invByID[vmID] - } - if !ok && uuid != "" { - inv, ok = invByUuid[uuid] - } - if !ok && name != "" { - inv, ok = invByName[name] - } - if !ok { - continue - } - if inv.DeletionTime.Valid { - continue - } - - delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true} - if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ - DeletionTime: delTime, - VmId: inv.VmId, - DatacenterName: inv.Datacenter, - }); err != nil { - c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String) - continue - } - c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable) - missing++ - } - - return missing -} diff --git a/internal/vcenter/vcenter.go b/internal/vcenter/vcenter.go index aa02245..51e6ae6 100644 --- a/internal/vcenter/vcenter.go +++ b/internal/vcenter/vcenter.go @@ -7,8 +7,10 @@ import ( "net/url" "path" "strings" + "time" "github.com/vmware/govmomi" + "github.com/vmware/govmomi/event" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/view" @@ -195,6 +197,90 @@ func (v *Vcenter) GetAllVMsWithProps() ([]mo.VirtualMachine, error) { return vms, nil } +// FindVmDeletionEvents returns a map of MoRef (VmId) to the deletion event time within the given window. +func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time) (map[string]time.Time, error) { + result := make(map[string]time.Time) + if v.client == nil || !v.client.Valid() { + return result, fmt.Errorf("vcenter client is not valid") + } + // vCenter events are stored in UTC; normalize the query window. + beginUTC := begin.UTC() + endUTC := end.UTC() + mgr := event.NewManager(v.client.Client) + + processEvents := func(evts []types.BaseEvent) { + for _, ev := range evts { + switch e := ev.(type) { + case *types.VmRemovedEvent: + if e.Vm != nil { + vmID := e.Vm.Vm.Value + if vmID != "" { + result[vmID] = e.CreatedTime + } + } + case *types.TaskEvent: + // Fallback for destroy task events. + if e.Info.Entity != nil { + vmID := e.Info.Entity.Value + msg := strings.ToLower(e.GetEvent().FullFormattedMessage) + if vmID != "" && (strings.Contains(msg, "destroy") || strings.Contains(msg, "deleted")) { + result[vmID] = e.CreatedTime + } + } + case *types.VmEvent: + if e.Vm != nil { + vmID := e.Vm.Vm.Value + if vmID != "" { + result[vmID] = e.CreatedTime + } + } + } + } + } + + // First attempt: specific deletion event types. + filter := types.EventFilterSpec{ + Time: &types.EventFilterSpecByTime{ + BeginTime: &beginUTC, + EndTime: &endUTC, + }, + EventTypeId: []string{ + "VmRemovedEvent", + "TaskEvent", + }, + } + collector, err := mgr.CreateCollectorForEvents(ctx, filter) + if err != nil { + return result, fmt.Errorf("failed to create event collector: %w", err) + } + defer collector.Destroy(ctx) + + events, err := collector.ReadNextEvents(ctx, 500) + if err != nil { + return result, fmt.Errorf("failed to read events: %w", err) + } + processEvents(events) + + // If nothing found, widen the filter to all event types in the window as a fallback. + if len(result) == 0 { + fallbackFilter := types.EventFilterSpec{ + Time: &types.EventFilterSpecByTime{ + BeginTime: &beginUTC, + EndTime: &endUTC, + }, + } + fc, err := mgr.CreateCollectorForEvents(ctx, fallbackFilter) + if err == nil { + defer fc.Destroy(ctx) + if evs, readErr := fc.ReadNextEvents(ctx, 500); readErr == nil { + processEvents(evs) + } + } + } + + return result, nil +} + func (v *Vcenter) BuildHostLookup() (map[string]HostLookup, error) { finder := find.NewFinder(v.client.Client, true) datacenters, err := finder.DatacenterList(v.ctx, "*") @@ -415,6 +501,10 @@ func (v *Vcenter) GetHostSystemObject(hostRef types.ManagedObjectReference) (*mo // Function to find the cluster or compute resource from a host reference func (v *Vcenter) GetClusterFromHost(hostRef *types.ManagedObjectReference) (string, error) { + if hostRef == nil { + v.Logger.Warn("nil hostRef passed to GetClusterFromHost") + return "", nil + } // Get the host object host, err := v.GetHostSystemObject(*hostRef) if err != nil { diff --git a/main.go b/main.go index bd06bd9..dbed5f0 100644 --- a/main.go +++ b/main.go @@ -19,8 +19,9 @@ import ( "vctp/server/router" "crypto/sha256" - "github.com/go-co-op/gocron/v2" "log/slog" + + "github.com/go-co-op/gocron/v2" ) var ( @@ -37,6 +38,7 @@ const fallbackEncryptionKey = "5L1l3B5KvwOCzUHMAlCgsgUTRAYMfSpa" func main() { settingsPath := flag.String("settings", "/etc/dtms/vctp.yml", "Path to settings YAML") + runInventoryOnce := flag.Bool("run-inventory", false, "Run a single inventory snapshot across all configured vCenters and exit") flag.Parse() bootstrapLogger := log.New(log.LevelInfo, log.OutputText) @@ -178,6 +180,14 @@ func main() { FirstHourlySnapshotCheck: true, } + // One-shot mode: run a single inventory snapshot across all configured vCenters and exit. + if *runInventoryOnce { + logger.Info("Running one-shot inventory snapshot across all vCenters") + ct.RunVcenterSnapshotHourly(ctx, logger) + logger.Info("One-shot inventory snapshot complete; exiting") + return + } + cronSnapshotFrequency = durationFromSeconds(s.Values.Settings.VcenterInventorySnapshotSeconds, 3600) logger.Debug("Setting VM inventory snapshot cronjob frequency to", "frequency", cronSnapshotFrequency)