improve tracking of VM deletions
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-15 14:25:51 +11:00
parent bba308ad28
commit 8dee30ea97
7 changed files with 436 additions and 42 deletions

View File

@@ -88,6 +88,8 @@ Snapshots:
- `settings.daily_snapshot_max_age_months`: retention for daily tables - `settings.daily_snapshot_max_age_months`: retention for daily tables
- `settings.snapshot_cleanup_cron`: cron expression for cleanup job - `settings.snapshot_cleanup_cron`: cron expression for cleanup job
- `settings.reports_dir`: directory to store generated XLSX reports (default: `/var/lib/vctp/reports`) - `settings.reports_dir`: directory to store generated XLSX reports (default: `/var/lib/vctp/reports`)
- `settings.hourly_snapshot_retry_seconds`: interval for retrying failed hourly snapshots (default: 300 seconds)
- `settings.hourly_snapshot_max_retries`: maximum retry attempts per vCenter snapshot (default: 3)
Filters/chargeback: Filters/chargeback:
- `settings.tenants_to_filter`: list of tenant name patterns to exclude - `settings.tenants_to_filter`: list of tenant name patterns to exclude
@@ -130,4 +132,4 @@ Run `swag init --exclude "pkg.mod,pkg.build,pkg.tools" -o server/router/docs`
- Build step installs generators (`templ`, `sqlc`, `swag`), regenerates code/docs, runs project scripts, and produces the `vctp-linux-amd64` binary. - Build step installs generators (`templ`, `sqlc`, `swag`), regenerates code/docs, runs project scripts, and produces the `vctp-linux-amd64` binary.
- RPM step packages via `nfpm` using `vctp.yml`, emits RPMs into `./build/`. - RPM step packages via `nfpm` using `vctp.yml`, emits RPMs into `./build/`.
- Optional SFTP deploy step uploads build artifacts (e.g., `vctp*`) to a remote host. - Optional SFTP deploy step uploads build artifacts (e.g., `vctp*`) to a remote host.
- Cache rebuild step preserves Go caches across runs. - Cache rebuild step preserves Go caches across runs.

View File

@@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"strings" "strings"
"time"
"vctp/db/queries" "vctp/db/queries"
@@ -281,6 +282,15 @@ func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
return err return err
} }
return EnsureSnapshotIndexes(ctx, dbConn, tableName)
}
// EnsureSnapshotIndexes creates the standard indexes for a snapshot table.
func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
indexes := []string{ indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName), fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName),
@@ -387,30 +397,31 @@ func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error
WITH snapshots AS ( WITH snapshots AS (
%s %s
), totals AS ( ), totals AS (
SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples FROM snapshots SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots
), agg AS ( ), agg AS (
SELECT SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
MIN(NULLIF("CreationTime", 0)) AS any_creation, MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MAX(NULLIF("DeletionTime", 0)) AS any_deletion, MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion,
MIN(CASE WHEN "IsPresent" = 'TRUE' THEN "SnapshotTime" END) AS first_present, MAX(COALESCE(inv."DeletionTime", 0)) AS inv_deletion,
MAX(CASE WHEN "IsPresent" = 'TRUE' THEN "SnapshotTime" END) AS last_present, MIN(s."SnapshotTime") AS first_present,
MAX(CASE WHEN "IsPresent" = 'FALSE' THEN "SnapshotTime" END) AS last_absent, MAX(s."SnapshotTime") AS last_present,
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", COUNT(*) AS samples_present,
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS samples_present, s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
SUM(CASE WHEN "IsPresent" = 'TRUE' AND "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END) AS sum_vcpu, SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END) AS sum_ram, SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END) AS sum_disk, SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits, SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits, SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits, SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits,
SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits
FROM snapshots FROM snapshots s
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
GROUP BY GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid" s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
) )
INSERT INTO %s ( INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
@@ -424,8 +435,8 @@ SELECT
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId", agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime", COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime",
CASE CASE
WHEN agg.last_present IS NULL THEN NULLIF(agg.any_deletion, 0) WHEN NULLIF(agg.inv_deletion, 0) IS NOT NULL THEN NULLIF(agg.inv_deletion, 0)
WHEN agg.last_absent IS NOT NULL AND agg.last_absent > agg.last_present THEN agg.last_absent WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(NULLIF(agg.any_deletion, 0), totals.max_snapshot, agg.last_present)
ELSE NULLIF(agg.any_deletion, 0) ELSE NULLIF(agg.any_deletion, 0)
END AS "DeletionTime", END AS "DeletionTime",
( (
@@ -482,7 +493,7 @@ GROUP BY
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId", agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount", agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid", agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present, agg.last_absent, agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
totals.total_samples; totals.total_samples;
`, unionQuery, tableName) `, unionQuery, tableName)
return insert, nil return insert, nil
@@ -668,3 +679,132 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
} }
return nil return nil
} }
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
ddl = `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" BIGSERIAL PRIMARY KEY,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
}
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err
}
indexes := []string{
`CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`,
}
for _, idx := range indexes {
if _, err := dbConn.ExecContext(ctx, idx); err != nil {
return err
}
}
return nil
}
// UpsertSnapshotRun updates or inserts snapshot run status.
func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error {
if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
successStr := "FALSE"
if success {
successStr = "TRUE"
}
now := time.Now().Unix()
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
_, err := dbConn.ExecContext(ctx, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES (?, ?, 1, ?, ?, ?)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = excluded."Success",
"LastError" = excluded."LastError",
"LastAttempt" = excluded."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
case "pgx", "postgres":
_, err := dbConn.ExecContext(ctx, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES ($1, $2, 1, $3, $4, $5)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = EXCLUDED."Success",
"LastError" = EXCLUDED."LastError",
"LastAttempt" = EXCLUDED."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
default:
return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver)
}
}
// ListFailedSnapshotRuns returns vcenter/time pairs needing retry.
func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, error) {
if maxAttempts <= 0 {
maxAttempts = 3
}
driver := strings.ToLower(dbConn.DriverName())
query := `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < ?
ORDER BY "LastAttempt" ASC
`
args := []interface{}{maxAttempts}
if driver == "pgx" || driver == "postgres" {
query = `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < $1
ORDER BY "LastAttempt" ASC
`
}
type row struct {
Vcenter string `db:"Vcenter"`
SnapshotTime int64 `db:"SnapshotTime"`
Attempts int `db:"Attempts"`
}
rows := []row{}
if err := dbConn.SelectContext(ctx, &rows, query, args...); err != nil {
return nil, err
}
results := make([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, 0, len(rows))
for _, r := range rows {
results = append(results, struct {
Vcenter string
SnapshotTime int64
Attempts int
}{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts})
}
return results, nil
}

View File

@@ -43,6 +43,8 @@ type SettingsYML struct {
ReportsDir string `yaml:"reports_dir"` ReportsDir string `yaml:"reports_dir"`
HourlyJobTimeoutSeconds int `yaml:"hourly_job_timeout_seconds"` HourlyJobTimeoutSeconds int `yaml:"hourly_job_timeout_seconds"`
HourlySnapshotTimeoutSeconds int `yaml:"hourly_snapshot_timeout_seconds"` HourlySnapshotTimeoutSeconds int `yaml:"hourly_snapshot_timeout_seconds"`
HourlySnapshotRetrySeconds int `yaml:"hourly_snapshot_retry_seconds"`
HourlySnapshotMaxRetries int `yaml:"hourly_snapshot_max_retries"`
DailyJobTimeoutSeconds int `yaml:"daily_job_timeout_seconds"` DailyJobTimeoutSeconds int `yaml:"daily_job_timeout_seconds"`
MonthlyJobTimeoutSeconds int `yaml:"monthly_job_timeout_seconds"` MonthlyJobTimeoutSeconds int `yaml:"monthly_job_timeout_seconds"`
CleanupJobTimeoutSeconds int `yaml:"cleanup_job_timeout_seconds"` CleanupJobTimeoutSeconds int `yaml:"cleanup_job_timeout_seconds"`

View File

@@ -67,6 +67,10 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
hourlyTables := make([]string, 0, len(hourlySnapshots)) hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots { for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName) hourlyTables = append(hourlyTables, snapshot.TableName)
// Ensure indexes exist on historical hourly tables for faster aggregation.
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
} }
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil { if err != nil {

View File

@@ -79,6 +79,9 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil {
return err return err
} }
if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil {
return err
}
// reload settings in case vcenter list has changed // reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings() c.Settings.ReadYMLSettings()
@@ -178,6 +181,47 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
return nil return nil
} }
// RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count.
func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) {
jobStart := time.Now()
defer func() {
logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart))
}()
maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries
if maxRetries <= 0 {
maxRetries = 3
}
dbConn := c.Database.DB()
if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries)
if err != nil {
return err
}
if len(failed) == 0 {
logger.Debug("No failed hourly snapshots to retry")
return nil
}
for _, f := range failed {
startTime := time.Unix(f.SnapshotTime, 0)
tableName, tnErr := hourlyInventoryTableName(startTime)
if tnErr != nil {
logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter)
continue
}
logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1)
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil {
logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err)
}
}
return nil
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx jobCtx := ctx
@@ -696,6 +740,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
vc := vcenter.New(c.Logger, c.VcCreds) vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil { if err := vc.Login(url); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return fmt.Errorf("unable to connect to vcenter: %w", err) return fmt.Errorf("unable to connect to vcenter: %w", err)
} }
defer func() { defer func() {
@@ -707,12 +752,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
vcVms, err := vc.GetAllVMsWithProps() vcVms, err := vc.GetAllVMsWithProps()
if err != nil { if err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return fmt.Errorf("unable to get VMs from vcenter: %w", err) return fmt.Errorf("unable to get VMs from vcenter: %w", err)
} }
canDetectMissing := len(vcVms) > 0
if !canDetectMissing {
c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url)
}
hostLookup, err := vc.BuildHostLookup() hostLookup, err := vc.BuildHostLookup()
if err != nil { if err != nil {
c.Logger.Warn("failed to build host lookup", "url", url, "error", err) c.Logger.Warn("failed to build host lookup", "url", url, "error", err)
@@ -741,15 +783,26 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
} }
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByName := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows { for _, inv := range inventoryRows {
if inv.VmId.Valid { if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv inventoryByVmID[inv.VmId.String] = inv
} }
if inv.VmUuid.Valid {
inventoryByUuid[inv.VmUuid.String] = inv
}
if inv.Name != "" {
inventoryByName[inv.Name] = inv
}
} }
dbConn := c.Database.DB() dbConn := c.Database.DB()
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
presentByUuid := make(map[string]struct{}, len(vcVms))
presentByName := make(map[string]struct{}, len(vcVms))
totals := snapshotTotals{} totals := snapshotTotals{}
deletionsMarked := false
for _, vm := range vcVms { for _, vm := range vcVms {
if strings.HasPrefix(vm.Name, "vCLS-") { if strings.HasPrefix(vm.Name, "vCLS-") {
continue continue
@@ -772,6 +825,12 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
} }
row.IsPresent = "TRUE" row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row presentSnapshots[vm.Reference().Value] = row
if row.VmUuid.Valid {
presentByUuid[row.VmUuid.String] = struct{}{}
}
if row.Name != "" {
presentByName[row.Name] = struct{}{}
}
totals.VmCount++ totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
@@ -784,30 +843,40 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
batch = append(batch, row) batch = append(batch, row)
} }
if !canDetectMissing { missingCount := 0
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
return nil
}
for _, inv := range inventoryRows { for _, inv := range inventoryRows {
if strings.HasPrefix(inv.Name, "vCLS-") { if strings.HasPrefix(inv.Name, "vCLS-") {
continue continue
} }
vmID := inv.VmId.String vmID := inv.VmId.String
uuid := ""
if inv.VmUuid.Valid {
uuid = inv.VmUuid.String
}
name := inv.Name
found := false
if vmID != "" { if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok { if _, ok := presentSnapshots[vmID]; ok {
continue found = true
} }
} }
if !found && uuid != "" {
if _, ok := presentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := presentByName[name]; ok {
found = true
}
}
if found {
continue
}
row := snapshotFromInventory(inv, startTime) row := snapshotFromInventory(inv, startTime)
row.IsPresent = "FALSE"
if !row.DeletionTime.Valid { if !row.DeletionTime.Valid {
deletionTime := startTime.Unix() deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
@@ -818,23 +887,43 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
}); err != nil { }); err != nil {
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
} }
c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime)
deletionsMarked = true
} }
batch = append(batch, row) missingCount++
} }
if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return err return err
} }
// Compare with previous snapshot for this vcenter to mark deletions at snapshot time.
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
missingCount += moreMissing
} else if err != nil {
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", err, "url", url)
}
c.Logger.Info("Hourly snapshot summary", c.Logger.Info("Hourly snapshot summary",
"vcenter", url, "vcenter", url,
"vm_count", totals.VmCount, "vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal, "vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal, "ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal, "disk_total_gb", totals.DiskTotal,
"missing_marked", missingCount,
) )
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, "")
if deletionsMarked {
if err := c.generateReport(ctx, tableName); err != nil {
c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName)
} else {
c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName)
}
}
return nil return nil
} }
@@ -865,3 +954,141 @@ func boolStringFromInterface(value interface{}) string {
return fmt.Sprint(v) return fmt.Sprint(v)
} }
} }
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time.
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
driver := strings.ToLower(dbConn.DriverName())
var rows *sqlx.Rows
var err error
switch driver {
case "sqlite":
rows, err = dbConn.QueryxContext(ctx, `
SELECT name FROM sqlite_master
WHERE type = 'table' AND name LIKE 'inventory_hourly_%'
`)
case "pgx", "postgres":
rows, err = dbConn.QueryxContext(ctx, `
SELECT tablename FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%'
`)
default:
return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver)
}
if err != nil {
return "", err
}
defer rows.Close()
var latest string
var latestTime int64
for rows.Next() {
var name string
if scanErr := rows.Scan(&name); scanErr != nil {
continue
}
if !strings.HasPrefix(name, "inventory_hourly_") {
continue
}
suffix := strings.TrimPrefix(name, "inventory_hourly_")
epoch, parseErr := strconv.ParseInt(suffix, 10, 64)
if parseErr != nil {
continue
}
if epoch < cutoff.Unix() && epoch > latestTime {
latestTime = epoch
latest = name
}
}
return latest, nil
}
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time,
currentByID map[string]inventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{},
invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int {
if err := db.ValidateTableName(prevTable); err != nil {
return 0
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
type prevRow struct {
VmId sql.NullString `db:"VmId"`
VmUuid sql.NullString `db:"VmUuid"`
Name string `db:"Name"`
Datacenter sql.NullString `db:"Datacenter"`
DeletionTime sql.NullInt64 `db:"DeletionTime"`
}
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter)
return 0
}
defer rows.Close()
missing := 0
for rows.Next() {
var r prevRow
if err := rows.StructScan(&r); err != nil {
continue
}
vmID := r.VmId.String
uuid := r.VmUuid.String
name := r.Name
found := false
if vmID != "" {
if _, ok := currentByID[vmID]; ok {
found = true
}
}
if !found && uuid != "" {
if _, ok := currentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := currentByName[name]; ok {
found = true
}
}
if found {
continue
}
var inv queries.Inventory
var ok bool
if vmID != "" {
inv, ok = invByID[vmID]
}
if !ok && uuid != "" {
inv, ok = invByUuid[uuid]
}
if !ok && name != "" {
inv, ok = invByName[name]
}
if !ok {
continue
}
if inv.DeletionTime.Valid {
continue
}
delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: delTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String)
continue
}
c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable)
missing++
}
return missing
}

17
main.go
View File

@@ -281,6 +281,23 @@ func main() {
} }
logger.Debug("Created snapshot cleanup cron job", "job", job6.ID()) logger.Debug("Created snapshot cleanup cron job", "job", job6.ID())
// Retry failed hourly snapshots
retrySeconds := s.Values.Settings.HourlySnapshotRetrySeconds
if retrySeconds <= 0 {
retrySeconds = 300
}
job7, err := c.NewJob(
gocron.DurationJob(time.Duration(retrySeconds)*time.Second),
gocron.NewTask(func() {
ct.RunHourlySnapshotRetry(ctx, logger)
}), gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
logger.Error("failed to start hourly snapshot retry cron job", "error", err)
os.Exit(1)
}
logger.Debug("Created hourly snapshot retry cron job", "job", job7.ID(), "interval_seconds", retrySeconds)
// start cron scheduler // start cron scheduler
c.Start() c.Start()

View File

@@ -20,6 +20,8 @@ settings:
hourly_snapshot_max_age_days: 60 hourly_snapshot_max_age_days: 60
daily_snapshot_max_age_months: 12 daily_snapshot_max_age_months: 12
snapshot_cleanup_cron: "30 2 * * *" snapshot_cleanup_cron: "30 2 * * *"
hourly_snapshot_retry_seconds: 300
hourly_snapshot_max_retries: 3
hourly_job_timeout_seconds: 1200 hourly_job_timeout_seconds: 1200
hourly_snapshot_timeout_seconds: 600 hourly_snapshot_timeout_seconds: 600
daily_job_timeout_seconds: 900 daily_job_timeout_seconds: 900