From 0a2c529111709d35370ef056d22992f94dff801f Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Wed, 21 Jan 2026 14:40:37 +1100 Subject: [PATCH] code refactor --- internal/tasks/inventorySnapshots.go | 324 ++++++++++++++++----------- internal/tasks/monitorVcenter.go | 4 +- 2 files changed, 192 insertions(+), 136 deletions(-) diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index da568cb..5ce6cd1 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -25,6 +25,21 @@ import ( type ctxLoggerKey struct{} +type deletionCandidate struct { + vmID string + vmUUID string + name string + cluster string + datacenter sql.NullString +} + +type vcenterResources struct { + vms []mo.VirtualMachine + hostLookup map[string]vcenter.HostLookup + folderLookup map[string]string + rpLookup map[string]string +} + func loggerFromCtx(ctx context.Context, fallback *slog.Logger) *slog.Logger { if ctx == nil { return fallback @@ -712,68 +727,11 @@ func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) Invent } } -func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { - log := c.Logger.With("vcenter", url) - ctx = context.WithValue(ctx, ctxLoggerKey{}, log) - started := time.Now() - log.Debug("connecting to vcenter for hourly snapshot", "url", url) - vc := vcenter.New(c.Logger, c.VcCreds) - if err := vc.Login(url); err != nil { - metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) - if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { - log.Warn("failed to record snapshot run", "url", url, "error", upErr) - } - return fmt.Errorf("unable to connect to vcenter: %w", err) - } - defer func() { - logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := vc.Logout(logCtx); err != nil { - log.Warn("vcenter logout failed", "url", url, "error", err) - } else { - log.Debug("vcenter logout succeeded", "url", url) - } - }() - - vcVms, err := vc.GetAllVMsWithProps() +func loadInventoryMaps(ctx context.Context, dbConn *sqlx.DB, url string) ([]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, error) { + inventoryRows, err := queries.New(dbConn).GetInventoryByVcenter(ctx, url) if err != nil { - metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) - if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { - log.Warn("failed to record snapshot run", "url", url, "error", upErr) - } - return fmt.Errorf("unable to get VMs from vcenter: %w", err) + return nil, nil, nil, nil, fmt.Errorf("unable to query inventory table: %w", err) } - log.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vcVms)) - if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { - log.Warn("failed to ensure vm identity tables", "error", err) - } - hostLookup, err := vc.BuildHostLookup() - if err != nil { - log.Warn("failed to build host lookup", "url", url, "error", err) - hostLookup = nil - } else { - log.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) - } - folderLookup, err := vc.BuildFolderPathLookup() - if err != nil { - log.Warn("failed to build folder lookup", "url", url, "error", err) - folderLookup = nil - } else { - log.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) - } - rpLookup, err := vc.BuildResourcePoolLookup() - if err != nil { - log.Warn("failed to build resource pool lookup", "url", url, "error", err) - rpLookup = nil - } else { - log.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) - } - - inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) - if err != nil { - return fmt.Errorf("unable to query inventory table: %w", err) - } - inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByName := make(map[string]queries.Inventory, len(inventoryRows)) @@ -788,84 +746,17 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim inventoryByName[inv.Name] = inv } } + return inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, nil +} - dbConn := c.Database.DB() - presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms)) - presentByUuid := make(map[string]struct{}, len(vcVms)) - presentByName := make(map[string]struct{}, len(vcVms)) - totals := snapshotTotals{} - deletionsMarked := false - var prevVmCount sql.NullInt64 - countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1` - countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery) - if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) { - c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err) - } - type deletionCandidate struct { - vmID string - vmUUID string - name string - cluster string - datacenter sql.NullString - } +func prepareDeletionCandidates(ctx context.Context, log *slog.Logger, dbConn *sqlx.DB, url string, inventoryRows []queries.Inventory, + presentSnapshots map[string]InventorySnapshotRow, presentByUuid, presentByName map[string]struct{}, startTime time.Time) (int, bool, []deletionCandidate) { candidates := make([]deletionCandidate, 0) - for _, vm := range vcVms { - if strings.HasPrefix(vm.Name, "vCLS-") { - continue - } - - if vm.Config != nil && vm.Config.Template { - continue - } - - var inv *queries.Inventory - if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { - existingCopy := existing - inv = &existingCopy - } - - row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) - if err != nil { - c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) - continue - } - if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { - c.Logger.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) - } - clusterName := "" - if row.Cluster.Valid { - clusterName = row.Cluster.String - } - if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { - c.Logger.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) - } - presentSnapshots[vm.Reference().Value] = row - if row.VmUuid.Valid { - presentByUuid[row.VmUuid.String] = struct{}{} - } - if row.Name != "" { - presentByName[row.Name] = struct{}{} - } - - totals.VmCount++ - totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) - totals.RamTotal += nullInt64ToInt(row.RamGB) - totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) - } - c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) - - batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) - for _, row := range presentSnapshots { - batch = append(batch, row) - } - log.Debug("checking inventory for missing VMs") - missingCount := 0 - newCount := 0 - prevTableName := "" + deletionsMarked := false for _, inv := range inventoryRows { - c.Logger.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) + log.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name) if strings.HasPrefix(inv.Name, "vCLS-") { continue } @@ -900,7 +791,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} - if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + if err := queries.New(dbConn).InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: row.DeletionTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, @@ -931,6 +822,171 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim missingCount++ } + return missingCount, deletionsMarked, candidates +} + +// buildPresentSnapshots converts vCenter VM objects into snapshot rows and aggregates totals. +func (c *CronTask) buildPresentSnapshots(ctx context.Context, dbConn *sqlx.DB, vc *vcenter.Vcenter, vcVms []mo.VirtualMachine, startTime time.Time, url string, + inventoryByVmID map[string]queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup map[string]string, rpLookup map[string]string) (map[string]InventorySnapshotRow, map[string]struct{}, map[string]struct{}, snapshotTotals) { + log := loggerFromCtx(ctx, c.Logger) + presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms)) + presentByUuid := make(map[string]struct{}, len(vcVms)) + presentByName := make(map[string]struct{}, len(vcVms)) + totals := snapshotTotals{} + + for _, vm := range vcVms { + if strings.HasPrefix(vm.Name, "vCLS-") { + continue + } + if vm.Config != nil && vm.Config.Template { + continue + } + + var inv *queries.Inventory + if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { + existingCopy := existing + inv = &existingCopy + } + + row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) + if err != nil { + log.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) + continue + } + if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil { + log.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) + } + clusterName := "" + if row.Cluster.Valid { + clusterName = row.Cluster.String + } + if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil { + log.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err) + } + presentSnapshots[vm.Reference().Value] = row + if row.VmUuid.Valid { + presentByUuid[row.VmUuid.String] = struct{}{} + } + if row.Name != "" { + presentByName[row.Name] = struct{}{} + } + + totals.VmCount++ + totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) + totals.RamTotal += nullInt64ToInt(row.RamGB) + totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) + } + + return presentSnapshots, presentByUuid, presentByName, totals +} + +// initVcenterResources logs into vCenter, fetches VMs, builds lookups, and returns a cleanup function for logout. +func (c *CronTask) initVcenterResources(ctx context.Context, log *slog.Logger, url string, startTime, started time.Time) (*vcenter.Vcenter, vcenterResources, func(), error) { + res := vcenterResources{} + vc := vcenter.New(c.Logger, c.VcCreds) + if err := vc.Login(url); err != nil { + metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) + if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { + log.Warn("failed to record snapshot run", "url", url, "error", upErr) + } + return nil, res, nil, fmt.Errorf("unable to connect to vcenter: %w", err) + } + + cleanup := func() { + logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := vc.Logout(logCtx); err != nil { + log.Warn("vcenter logout failed", "url", url, "error", err) + } else { + log.Debug("vcenter logout succeeded", "url", url) + } + } + + vms, err := vc.GetAllVMsWithProps() + if err != nil { + metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) + if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { + log.Warn("failed to record snapshot run", "url", url, "error", upErr) + } + cleanup() + return nil, res, nil, fmt.Errorf("unable to get VMs from vcenter: %w", err) + } + log.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vms)) + if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { + log.Warn("failed to ensure vm identity tables", "error", err) + } + + hostLookup, err := vc.BuildHostLookup() + if err != nil { + log.Warn("failed to build host lookup", "url", url, "error", err) + hostLookup = nil + } else { + log.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) + } + folderLookup, err := vc.BuildFolderPathLookup() + if err != nil { + log.Warn("failed to build folder lookup", "url", url, "error", err) + folderLookup = nil + } else { + log.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) + } + rpLookup, err := vc.BuildResourcePoolLookup() + if err != nil { + log.Warn("failed to build resource pool lookup", "url", url, "error", err) + rpLookup = nil + } else { + log.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) + } + + res.vms = vms + res.hostLookup = hostLookup + res.folderLookup = folderLookup + res.rpLookup = rpLookup + return vc, res, cleanup, nil +} + +func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { + log := c.Logger.With("vcenter", url) + ctx = context.WithValue(ctx, ctxLoggerKey{}, log) + started := time.Now() + log.Debug("connecting to vcenter for hourly snapshot", "url", url) + vc, resources, cleanup, err := c.initVcenterResources(ctx, log, url, startTime, started) + if err != nil { + return err + } + defer cleanup() + vcVms := resources.vms + hostLookup := resources.hostLookup + folderLookup := resources.folderLookup + rpLookup := resources.rpLookup + + inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, err := loadInventoryMaps(ctx, c.Database.DB(), url) + if err != nil { + return err + } + + dbConn := c.Database.DB() + presentSnapshots, presentByUuid, presentByName, totals := c.buildPresentSnapshots(ctx, dbConn, vc, vcVms, startTime, url, inventoryByVmID, hostLookup, folderLookup, rpLookup) + deletionsMarked := false + candidates := make([]deletionCandidate, 0) + var prevVmCount sql.NullInt64 + countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1` + countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery) + if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) { + c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err) + } + c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots)) + + batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) + for _, row := range presentSnapshots { + batch = append(batch, row) + } + log.Debug("checking inventory for missing VMs") + + missingCount, deletionsMarked, candidates := prepareDeletionCandidates(ctx, log, dbConn, url, inventoryRows, presentSnapshots, presentByUuid, presentByName, startTime) + newCount := 0 + prevTableName := "" + // If deletions detected, refine deletion time using vCenter events in a small window. if missingCount > 0 { freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second diff --git a/internal/tasks/monitorVcenter.go b/internal/tasks/monitorVcenter.go index 15b021c..ded9847 100644 --- a/internal/tasks/monitorVcenter.go +++ b/internal/tasks/monitorVcenter.go @@ -3,11 +3,9 @@ package tasks import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "log/slog" - "runtime" "strings" "time" "vctp/db/queries" @@ -402,6 +400,7 @@ func (c *CronTask) AddVmToInventory(vmObject *mo.VirtualMachine, vc *vcenter.Vce return nil } +/* // prettyPrint comes from https://gist.github.com/sfate/9d45f6c5405dc4c9bf63bf95fe6d1a7c func prettyPrint(args ...interface{}) { var caller string @@ -429,3 +428,4 @@ func prettyPrint(args ...interface{}) { fmt.Printf("%s%s\n", prefix, string(s)) } } +*/