From 8dee30ea97622ea00a4a0b687e4b1faa46078357 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 15 Jan 2026 14:25:51 +1100 Subject: [PATCH] improve tracking of VM deletions --- README.md | 4 +- db/helpers.go | 188 ++++++++++++++++--- internal/settings/settings.go | 2 + internal/tasks/dailyAggregate.go | 4 + internal/tasks/inventorySnapshots.go | 261 +++++++++++++++++++++++++-- main.go | 17 ++ src/vctp.yml | 2 + 7 files changed, 436 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 2ed723a..cbdfedc 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,8 @@ Snapshots: - `settings.daily_snapshot_max_age_months`: retention for daily tables - `settings.snapshot_cleanup_cron`: cron expression for cleanup job - `settings.reports_dir`: directory to store generated XLSX reports (default: `/var/lib/vctp/reports`) +- `settings.hourly_snapshot_retry_seconds`: interval for retrying failed hourly snapshots (default: 300 seconds) +- `settings.hourly_snapshot_max_retries`: maximum retry attempts per vCenter snapshot (default: 3) Filters/chargeback: - `settings.tenants_to_filter`: list of tenant name patterns to exclude @@ -130,4 +132,4 @@ Run `swag init --exclude "pkg.mod,pkg.build,pkg.tools" -o server/router/docs` - Build step installs generators (`templ`, `sqlc`, `swag`), regenerates code/docs, runs project scripts, and produces the `vctp-linux-amd64` binary. - RPM step packages via `nfpm` using `vctp.yml`, emits RPMs into `./build/`. - Optional SFTP deploy step uploads build artifacts (e.g., `vctp*`) to a remote host. - - Cache rebuild step preserves Go caches across runs. \ No newline at end of file + - Cache rebuild step preserves Go caches across runs. diff --git a/db/helpers.go b/db/helpers.go index 05edd65..96ac630 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "strings" + "time" "vctp/db/queries" @@ -281,6 +282,15 @@ func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string) return err } + return EnsureSnapshotIndexes(ctx, dbConn, tableName) +} + +// EnsureSnapshotIndexes creates the standard indexes for a snapshot table. +func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error { + if _, err := SafeTableName(tableName); err != nil { + return err + } + indexes := []string{ fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName), @@ -387,30 +397,31 @@ func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error WITH snapshots AS ( %s ), totals AS ( - SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples FROM snapshots + SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots ), agg AS ( SELECT - "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", - MIN(NULLIF("CreationTime", 0)) AS any_creation, - MAX(NULLIF("DeletionTime", 0)) AS any_deletion, - MIN(CASE WHEN "IsPresent" = 'TRUE' THEN "SnapshotTime" END) AS first_present, - MAX(CASE WHEN "IsPresent" = 'TRUE' THEN "SnapshotTime" END) AS last_present, - MAX(CASE WHEN "IsPresent" = 'FALSE' THEN "SnapshotTime" END) AS last_absent, - "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", - "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", - SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS samples_present, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END) AS sum_vcpu, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END) AS sum_ram, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END) AS sum_disk, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits, - SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits - FROM snapshots + s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId", + MIN(NULLIF(s."CreationTime", 0)) AS any_creation, + MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion, + MAX(COALESCE(inv."DeletionTime", 0)) AS inv_deletion, + MIN(s."SnapshotTime") AS first_present, + MAX(s."SnapshotTime") AS last_present, + COUNT(*) AS samples_present, + s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount", + s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid", + SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu, + SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram, + SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk, + SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits, + SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits, + SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits, + SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits + FROM snapshots s + LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter" GROUP BY - "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", - "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", - "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid" + s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId", + s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount", + s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid" ) INSERT INTO %s ( "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", @@ -424,8 +435,8 @@ SELECT agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId", COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime", CASE - WHEN agg.last_present IS NULL THEN NULLIF(agg.any_deletion, 0) - WHEN agg.last_absent IS NOT NULL AND agg.last_absent > agg.last_present THEN agg.last_absent + WHEN NULLIF(agg.inv_deletion, 0) IS NOT NULL THEN NULLIF(agg.inv_deletion, 0) + WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(NULLIF(agg.any_deletion, 0), totals.max_snapshot, agg.last_present) ELSE NULLIF(agg.any_deletion, 0) END AS "DeletionTime", ( @@ -482,7 +493,7 @@ GROUP BY agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId", agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount", agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid", - agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present, agg.last_absent, + agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present, totals.total_samples; `, unionQuery, tableName) return insert, nil @@ -668,3 +679,132 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) } return nil } + +// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts. +func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error { + ddl := ` +CREATE TABLE IF NOT EXISTS snapshot_runs ( + "RowId" INTEGER PRIMARY KEY AUTOINCREMENT, + "Vcenter" TEXT NOT NULL, + "SnapshotTime" BIGINT NOT NULL, + "Attempts" INTEGER NOT NULL DEFAULT 0, + "Success" TEXT NOT NULL DEFAULT 'FALSE', + "LastError" TEXT, + "LastAttempt" BIGINT NOT NULL +); +` + if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" { + ddl = ` +CREATE TABLE IF NOT EXISTS snapshot_runs ( + "RowId" BIGSERIAL PRIMARY KEY, + "Vcenter" TEXT NOT NULL, + "SnapshotTime" BIGINT NOT NULL, + "Attempts" INTEGER NOT NULL DEFAULT 0, + "Success" TEXT NOT NULL DEFAULT 'FALSE', + "LastError" TEXT, + "LastAttempt" BIGINT NOT NULL +); +` + } + if _, err := dbConn.ExecContext(ctx, ddl); err != nil { + return err + } + indexes := []string{ + `CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`, + `CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`, + } + for _, idx := range indexes { + if _, err := dbConn.ExecContext(ctx, idx); err != nil { + return err + } + } + return nil +} + +// UpsertSnapshotRun updates or inserts snapshot run status. +func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error { + if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil { + return err + } + successStr := "FALSE" + if success { + successStr = "TRUE" + } + now := time.Now().Unix() + driver := strings.ToLower(dbConn.DriverName()) + switch driver { + case "sqlite": + _, err := dbConn.ExecContext(ctx, ` +INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt") +VALUES (?, ?, 1, ?, ?, ?) +ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET + "Attempts" = snapshot_runs."Attempts" + 1, + "Success" = excluded."Success", + "LastError" = excluded."LastError", + "LastAttempt" = excluded."LastAttempt" +`, vcenter, snapshotTime.Unix(), successStr, errMsg, now) + return err + case "pgx", "postgres": + _, err := dbConn.ExecContext(ctx, ` +INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt") +VALUES ($1, $2, 1, $3, $4, $5) +ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET + "Attempts" = snapshot_runs."Attempts" + 1, + "Success" = EXCLUDED."Success", + "LastError" = EXCLUDED."LastError", + "LastAttempt" = EXCLUDED."LastAttempt" +`, vcenter, snapshotTime.Unix(), successStr, errMsg, now) + return err + default: + return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver) + } +} + +// ListFailedSnapshotRuns returns vcenter/time pairs needing retry. +func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct { + Vcenter string + SnapshotTime int64 + Attempts int +}, error) { + if maxAttempts <= 0 { + maxAttempts = 3 + } + driver := strings.ToLower(dbConn.DriverName()) + query := ` +SELECT "Vcenter","SnapshotTime","Attempts" +FROM snapshot_runs +WHERE "Success" = 'FALSE' AND "Attempts" < ? +ORDER BY "LastAttempt" ASC +` + args := []interface{}{maxAttempts} + if driver == "pgx" || driver == "postgres" { + query = ` +SELECT "Vcenter","SnapshotTime","Attempts" +FROM snapshot_runs +WHERE "Success" = 'FALSE' AND "Attempts" < $1 +ORDER BY "LastAttempt" ASC +` + } + type row struct { + Vcenter string `db:"Vcenter"` + SnapshotTime int64 `db:"SnapshotTime"` + Attempts int `db:"Attempts"` + } + rows := []row{} + if err := dbConn.SelectContext(ctx, &rows, query, args...); err != nil { + return nil, err + } + results := make([]struct { + Vcenter string + SnapshotTime int64 + Attempts int + }, 0, len(rows)) + for _, r := range rows { + results = append(results, struct { + Vcenter string + SnapshotTime int64 + Attempts int + }{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts}) + } + return results, nil +} diff --git a/internal/settings/settings.go b/internal/settings/settings.go index 92bd362..8a81717 100644 --- a/internal/settings/settings.go +++ b/internal/settings/settings.go @@ -43,6 +43,8 @@ type SettingsYML struct { ReportsDir string `yaml:"reports_dir"` HourlyJobTimeoutSeconds int `yaml:"hourly_job_timeout_seconds"` HourlySnapshotTimeoutSeconds int `yaml:"hourly_snapshot_timeout_seconds"` + HourlySnapshotRetrySeconds int `yaml:"hourly_snapshot_retry_seconds"` + HourlySnapshotMaxRetries int `yaml:"hourly_snapshot_max_retries"` DailyJobTimeoutSeconds int `yaml:"daily_job_timeout_seconds"` MonthlyJobTimeoutSeconds int `yaml:"monthly_job_timeout_seconds"` CleanupJobTimeoutSeconds int `yaml:"cleanup_job_timeout_seconds"` diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index 9ec99b5..0917945 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -67,6 +67,10 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) + // Ensure indexes exist on historical hourly tables for faster aggregation. + if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { + c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) + } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 0f8db08..71de41b 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -79,6 +79,9 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { return err } + if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil { + return err + } // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() @@ -178,6 +181,47 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo return nil } +// RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count. +func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) { + jobStart := time.Now() + defer func() { + logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart)) + }() + + maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries + if maxRetries <= 0 { + maxRetries = 3 + } + + dbConn := c.Database.DB() + if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil { + return err + } + + failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries) + if err != nil { + return err + } + if len(failed) == 0 { + logger.Debug("No failed hourly snapshots to retry") + return nil + } + + for _, f := range failed { + startTime := time.Unix(f.SnapshotTime, 0) + tableName, tnErr := hourlyInventoryTableName(startTime) + if tnErr != nil { + logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter) + continue + } + logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1) + if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil { + logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err) + } + } + return nil +} + // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx @@ -696,6 +740,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) + _ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()) return fmt.Errorf("unable to connect to vcenter: %w", err) } defer func() { @@ -707,12 +752,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim vcVms, err := vc.GetAllVMsWithProps() if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) + _ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()) return fmt.Errorf("unable to get VMs from vcenter: %w", err) } - canDetectMissing := len(vcVms) > 0 - if !canDetectMissing { - c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url) - } hostLookup, err := vc.BuildHostLookup() if err != nil { c.Logger.Warn("failed to build host lookup", "url", url, "error", err) @@ -741,15 +783,26 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) + inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows)) + inventoryByName := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } + if inv.VmUuid.Valid { + inventoryByUuid[inv.VmUuid.String] = inv + } + if inv.Name != "" { + inventoryByName[inv.Name] = inv + } } dbConn := c.Database.DB() presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) + presentByUuid := make(map[string]struct{}, len(vcVms)) + presentByName := make(map[string]struct{}, len(vcVms)) totals := snapshotTotals{} + deletionsMarked := false for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue @@ -772,6 +825,12 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } row.IsPresent = "TRUE" presentSnapshots[vm.Reference().Value] = row + if row.VmUuid.Valid { + presentByUuid[row.VmUuid.String] = struct{}{} + } + if row.Name != "" { + presentByName[row.Name] = struct{}{} + } totals.VmCount++ totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) @@ -784,30 +843,40 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim batch = append(batch, row) } - if !canDetectMissing { - c.Logger.Info("Hourly snapshot summary", - "vcenter", url, - "vm_count", totals.VmCount, - "vcpu_total", totals.VcpuTotal, - "ram_total_gb", totals.RamTotal, - "disk_total_gb", totals.DiskTotal, - ) - return nil - } + missingCount := 0 for _, inv := range inventoryRows { if strings.HasPrefix(inv.Name, "vCLS-") { continue } vmID := inv.VmId.String + uuid := "" + if inv.VmUuid.Valid { + uuid = inv.VmUuid.String + } + name := inv.Name + + found := false if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { - continue + found = true } } + if !found && uuid != "" { + if _, ok := presentByUuid[uuid]; ok { + found = true + } + } + if !found && name != "" { + if _, ok := presentByName[name]; ok { + found = true + } + } + if found { + continue + } row := snapshotFromInventory(inv, startTime) - row.IsPresent = "FALSE" if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} @@ -818,23 +887,43 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim }); err != nil { c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } + c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) + deletionsMarked = true } - batch = append(batch, row) + missingCount++ } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) + _ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()) return err } + // Compare with previous snapshot for this vcenter to mark deletions at snapshot time. + if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" { + moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) + missingCount += moreMissing + } else if err != nil { + c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", err, "url", url) + } + c.Logger.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, + "missing_marked", missingCount, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) + _ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, "") + if deletionsMarked { + if err := c.generateReport(ctx, tableName); err != nil { + c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName) + } else { + c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName) + } + } return nil } @@ -865,3 +954,141 @@ func boolStringFromInterface(value interface{}) string { return fmt.Sprint(v) } } + +// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time. +func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) { + driver := strings.ToLower(dbConn.DriverName()) + var rows *sqlx.Rows + var err error + switch driver { + case "sqlite": + rows, err = dbConn.QueryxContext(ctx, ` +SELECT name FROM sqlite_master +WHERE type = 'table' AND name LIKE 'inventory_hourly_%' +`) + case "pgx", "postgres": + rows, err = dbConn.QueryxContext(ctx, ` +SELECT tablename FROM pg_catalog.pg_tables +WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%' +`) + default: + return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver) + } + if err != nil { + return "", err + } + defer rows.Close() + + var latest string + var latestTime int64 + for rows.Next() { + var name string + if scanErr := rows.Scan(&name); scanErr != nil { + continue + } + if !strings.HasPrefix(name, "inventory_hourly_") { + continue + } + suffix := strings.TrimPrefix(name, "inventory_hourly_") + epoch, parseErr := strconv.ParseInt(suffix, 10, 64) + if parseErr != nil { + continue + } + if epoch < cutoff.Unix() && epoch > latestTime { + latestTime = epoch + latest = name + } + } + return latest, nil +} + +// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. +func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time, + currentByID map[string]inventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{}, + invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int { + + if err := db.ValidateTableName(prevTable); err != nil { + return 0 + } + + query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable) + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + + type prevRow struct { + VmId sql.NullString `db:"VmId"` + VmUuid sql.NullString `db:"VmUuid"` + Name string `db:"Name"` + Datacenter sql.NullString `db:"Datacenter"` + DeletionTime sql.NullInt64 `db:"DeletionTime"` + } + + rows, err := dbConn.QueryxContext(ctx, query, vcenter) + if err != nil { + c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) + return 0 + } + defer rows.Close() + + missing := 0 + for rows.Next() { + var r prevRow + if err := rows.StructScan(&r); err != nil { + continue + } + vmID := r.VmId.String + uuid := r.VmUuid.String + name := r.Name + + found := false + if vmID != "" { + if _, ok := currentByID[vmID]; ok { + found = true + } + } + if !found && uuid != "" { + if _, ok := currentByUuid[uuid]; ok { + found = true + } + } + if !found && name != "" { + if _, ok := currentByName[name]; ok { + found = true + } + } + if found { + continue + } + + var inv queries.Inventory + var ok bool + if vmID != "" { + inv, ok = invByID[vmID] + } + if !ok && uuid != "" { + inv, ok = invByUuid[uuid] + } + if !ok && name != "" { + inv, ok = invByName[name] + } + if !ok { + continue + } + if inv.DeletionTime.Valid { + continue + } + + delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: delTime, + VmId: inv.VmId, + DatacenterName: inv.Datacenter, + }); err != nil { + c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String) + continue + } + c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable) + missing++ + } + + return missing +} diff --git a/main.go b/main.go index 1342541..69d8741 100644 --- a/main.go +++ b/main.go @@ -281,6 +281,23 @@ func main() { } logger.Debug("Created snapshot cleanup cron job", "job", job6.ID()) + // Retry failed hourly snapshots + retrySeconds := s.Values.Settings.HourlySnapshotRetrySeconds + if retrySeconds <= 0 { + retrySeconds = 300 + } + job7, err := c.NewJob( + gocron.DurationJob(time.Duration(retrySeconds)*time.Second), + gocron.NewTask(func() { + ct.RunHourlySnapshotRetry(ctx, logger) + }), gocron.WithSingletonMode(gocron.LimitModeReschedule), + ) + if err != nil { + logger.Error("failed to start hourly snapshot retry cron job", "error", err) + os.Exit(1) + } + logger.Debug("Created hourly snapshot retry cron job", "job", job7.ID(), "interval_seconds", retrySeconds) + // start cron scheduler c.Start() diff --git a/src/vctp.yml b/src/vctp.yml index dae84d4..dc698ca 100644 --- a/src/vctp.yml +++ b/src/vctp.yml @@ -20,6 +20,8 @@ settings: hourly_snapshot_max_age_days: 60 daily_snapshot_max_age_months: 12 snapshot_cleanup_cron: "30 2 * * *" + hourly_snapshot_retry_seconds: 300 + hourly_snapshot_max_retries: 3 hourly_job_timeout_seconds: 1200 hourly_snapshot_timeout_seconds: 600 daily_job_timeout_seconds: 900