add repair functionality
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-17 12:51:11 +11:00
parent 22fa250a43
commit e186644db7
10 changed files with 426 additions and 18 deletions

View File

@@ -1287,7 +1287,7 @@ WITH snapshots AS (
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold"
@@ -1339,6 +1339,7 @@ SELECT
LIMIT 1
) AS "RamGB",
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.last_present AS "SnapshotTime",
agg.samples_present AS "SamplesPresent",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_vcpu / totals.total_samples
@@ -1382,7 +1383,7 @@ GROUP BY
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
agg."Datacenter", agg."Cluster", agg."Folder",
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
totals.total_samples;
`, unionQuery, tableName)
return insert, nil
@@ -1660,6 +1661,7 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"SnapshotTime" BIGINT,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
@@ -1690,12 +1692,13 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"SnapshotTime" BIGINT,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
@@ -1711,6 +1714,10 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
if hasIsPresent, err := ColumnExists(ctx, dbConn, tableName, "IsPresent"); err == nil && hasIsPresent {
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, tableName))
}
// Ensure SnapshotTime exists for lifecycle refinement.
if hasSnapshot, err := ColumnExists(ctx, dbConn, tableName, "SnapshotTime"); err == nil && !hasSnapshot {
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "SnapshotTime" BIGINT`, tableName))
}
indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
@@ -1730,6 +1737,64 @@ func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string)
return nil
}
// BackfillSnapshotTimeFromUnion sets SnapshotTime in a summary table using the max snapshot time per VM from a union query.
func BackfillSnapshotTimeFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
if unionQuery == "" {
return fmt.Errorf("union query is empty")
}
if _, err := SafeTableName(summaryTable); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var sql string
switch driver {
case "pgx", "postgres":
sql = fmt.Sprintf(`
WITH snapshots AS (
%s
)
UPDATE %s dst
SET "SnapshotTime" = sub.max_time
FROM (
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
FROM snapshots s
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
) sub
WHERE (dst."SnapshotTime" IS NULL OR dst."SnapshotTime" = 0)
AND dst."Vcenter" = sub."Vcenter"
AND (
(dst."VmId" IS NOT DISTINCT FROM sub."VmId")
OR (dst."VmUuid" IS NOT DISTINCT FROM sub."VmUuid")
OR (dst."Name" IS NOT DISTINCT FROM sub."Name")
);
`, unionQuery, summaryTable)
default:
sql = fmt.Sprintf(`
WITH snapshots AS (
%[1]s
), grouped AS (
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
FROM snapshots s
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
)
UPDATE %[2]s
SET "SnapshotTime" = (
SELECT max_time FROM grouped g
WHERE %[2]s."Vcenter" = g."Vcenter"
AND (
(%[2]s."VmId" IS NOT NULL AND g."VmId" IS NOT NULL AND %[2]s."VmId" = g."VmId")
OR (%[2]s."VmUuid" IS NOT NULL AND g."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = g."VmUuid")
OR (%[2]s."Name" IS NOT NULL AND g."Name" IS NOT NULL AND %[2]s."Name" = g."Name")
)
)
WHERE "SnapshotTime" IS NULL OR "SnapshotTime" = 0;
`, unionQuery, summaryTable)
}
_, err := execLog(ctx, dbConn, sql)
return err
}
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `

View File

@@ -625,12 +625,12 @@ func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg
defer tx.Rollback()
driver := strings.ToLower(dbConn.DriverName())
placeholders := makePlaceholders(driver, 29)
placeholders := makePlaceholders(driver, 30)
insert := fmt.Sprintf(`
INSERT INTO %s (
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
"CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
"CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
) VALUES (%s)
`, table, placeholders)
@@ -674,6 +674,7 @@ INSERT INTO %s (
v.srmPlaceholder,
v.creation,
v.deletion,
v.lastSeen,
v.samples,
avgVcpu,
avgRam,

View File

@@ -478,7 +478,7 @@ var monthlyUnionColumns = []string{
`"SrmPlaceholder"`, `"VmUuid"`,
`"SamplesPresent"`, `"AvgVcpuCount"`, `"AvgRamGB"`, `"AvgProvisionedDisk"`, `"AvgIsPresent"`,
`"PoolTinPct"`, `"PoolBronzePct"`, `"PoolSilverPct"`, `"PoolGoldPct"`,
`"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`,
`"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`, `"SnapshotTime"`,
}
func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error {

View File

@@ -189,13 +189,8 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo
return err
}
// Refine creation/deletion using SQL helper (requires SnapshotTime in union).
if strings.Contains(unionQuery, `"SnapshotTime"`) {
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
}
} else {
c.Logger.Debug("Skipping lifecycle refinement for monthly aggregation (no SnapshotTime in union)")
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
}
// Backfill missing creation times to the start of the month for rows lacking creation info.

View File

@@ -0,0 +1,219 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"vctp/db"
"vctp/internal/report"
)
// SnapshotRepair scans existing daily summaries and backfills missing SnapshotTime and lifecycle fields.
// @Summary Repair daily summaries
// @Description Backfills SnapshotTime and lifecycle info for existing daily summary tables and reruns monthly lifecycle refinement using hourly data.
// @Tags snapshots
// @Produce json
// @Success 200 {object} map[string]string
// @Router /api/snapshots/repair [post]
func (h *Handler) SnapshotRepair(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
h.Logger.Info("snapshot repair started", "scope", "daily")
repaired, failed := h.repairDailySummaries(r.Context(), time.Now())
h.Logger.Info("snapshot repair finished", "daily_repaired", repaired, "daily_failed", failed)
resp := map[string]string{
"status": "ok",
"repaired": strconv.Itoa(repaired),
"failed": strconv.Itoa(failed),
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}
func (h *Handler) repairDailySummaries(ctx context.Context, now time.Time) (repaired int, failed int) {
dbConn := h.Database.DB()
dailyRecs, err := report.SnapshotRecordsWithFallback(ctx, h.Database, "daily", "inventory_daily_summary_", "20060102", time.Time{}, now)
if err != nil {
h.Logger.Warn("failed to list daily summaries", "error", err)
return 0, 1
}
for _, rec := range dailyRecs {
h.Logger.Debug("repair daily summary table", "table", rec.TableName, "snapshot_time", rec.SnapshotTime)
dayStart := rec.SnapshotTime
dayEnd := dayStart.Add(24 * time.Hour)
if err := db.EnsureSummaryTable(ctx, dbConn, rec.TableName); err != nil {
h.Logger.Warn("ensure summary table failed", "table", rec.TableName, "error", err)
failed++
continue
}
hourlyRecs, err := report.SnapshotRecordsWithFallback(ctx, h.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil || len(hourlyRecs) == 0 {
h.Logger.Warn("no hourly snapshots for repair window", "table", rec.TableName, "error", err)
failed++
continue
}
cols := []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`,
}
union, err := buildUnionFromRecords(hourlyRecs, cols, `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE','true','1')`)
if err != nil {
h.Logger.Warn("failed to build union for repair", "table", rec.TableName, "error", err)
failed++
continue
}
h.Logger.Debug("built hourly union for repair", "table", rec.TableName, "hourly_tables", len(hourlyRecs))
if err := db.BackfillSnapshotTimeFromUnion(ctx, dbConn, rec.TableName, union); err != nil {
h.Logger.Warn("failed to backfill snapshot time", "table", rec.TableName, "error", err)
failed++
continue
}
h.Logger.Debug("snapshot time backfill complete", "table", rec.TableName)
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, rec.TableName, union); err != nil {
h.Logger.Warn("failed to refine lifecycle during repair", "table", rec.TableName, "error", err)
failed++
continue
}
h.Logger.Debug("lifecycle refinement complete", "table", rec.TableName)
h.Logger.Info("repair applied", "table", rec.TableName, "actions", "snapshot_time+lifecycle")
repaired++
}
return repaired, failed
}
// SnapshotRepairSuite runs a sequence of repair routines to fix older deployments in one call.
// It rebuilds the snapshot registry, syncs vcenter totals, repairs daily summaries, and refines monthly lifecycle data.
// @Summary Run full snapshot repair suite
// @Description Rebuilds snapshot registry, backfills per-vCenter totals, repairs daily summaries (SnapshotTime/lifecycle), and refines monthly lifecycle.
// @Tags snapshots
// @Produce json
// @Success 200 {object} map[string]string
// @Router /api/snapshots/repair/all [post]
func (h *Handler) SnapshotRepairSuite(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
ctx := r.Context()
dbConn := h.Database.DB()
// Step 1: rebuild snapshot registry from existing tables.
h.Logger.Info("repair suite step", "step", "snapshot_registry")
if stats, err := report.MigrateSnapshotRegistry(ctx, h.Database); err != nil {
h.Logger.Warn("snapshot registry migration failed", "error", err)
} else {
h.Logger.Info("snapshot registry migration complete", "hourly_renamed", stats.HourlyRenamed, "daily_registered", stats.DailyRegistered, "monthly_registered", stats.MonthlyRegistered, "errors", stats.Errors)
}
// Step 2: backfill vcenter_totals from registry hourly tables.
h.Logger.Info("repair suite step", "step", "vcenter_totals")
if err := db.SyncVcenterTotalsFromSnapshots(ctx, dbConn); err != nil {
h.Logger.Warn("sync vcenter totals failed", "error", err)
}
// Step 3: repair daily summaries (snapshot time + lifecycle).
h.Logger.Info("repair suite step", "step", "daily_summaries")
dailyRepaired, dailyFailed := h.repairDailySummaries(ctx, time.Now())
// Step 4: refine monthly lifecycle using daily summaries (requires SnapshotTime now present after step 3).
h.Logger.Info("repair suite step", "step", "monthly_refine")
monthlyRefined, monthlyFailed := h.refineMonthlyFromDaily(ctx, time.Now())
resp := map[string]string{
"status": "ok",
"daily_repaired": strconv.Itoa(dailyRepaired),
"daily_failed": strconv.Itoa(dailyFailed),
"monthly_refined": strconv.Itoa(monthlyRefined),
"monthly_failed": strconv.Itoa(monthlyFailed),
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}
func (h *Handler) refineMonthlyFromDaily(ctx context.Context, now time.Time) (refined int, failed int) {
dbConn := h.Database.DB()
dailyRecs, err := report.SnapshotRecordsWithFallback(ctx, h.Database, "daily", "inventory_daily_summary_", "20060102", time.Time{}, now)
if err != nil {
h.Logger.Warn("failed to list daily summaries for monthly refine", "error", err)
return 0, 1
}
// Group daily tables by month (YYYYMM).
grouped := make(map[string][]report.SnapshotRecord)
for _, rec := range dailyRecs {
key := rec.SnapshotTime.Format("200601")
grouped[key] = append(grouped[key], rec)
}
cols := []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`,
}
for monthKey, recs := range grouped {
summaryTable := fmt.Sprintf("inventory_monthly_summary_%s", monthKey)
h.Logger.Debug("monthly refine", "table", summaryTable, "daily_tables", len(recs))
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
h.Logger.Warn("ensure monthly summary failed", "table", summaryTable, "error", err)
failed++
continue
}
union, err := buildUnionFromRecords(recs, cols, `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE','true','1')`)
if err != nil {
h.Logger.Warn("failed to build union for monthly refine", "table", summaryTable, "error", err)
failed++
continue
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, union); err != nil {
h.Logger.Warn("failed to refine monthly lifecycle", "table", summaryTable, "error", err)
failed++
continue
}
h.Logger.Debug("monthly refine applied", "table", summaryTable)
refined++
}
return refined, failed
}
func buildUnionFromRecords(recs []report.SnapshotRecord, columns []string, where string) (string, error) {
if len(recs) == 0 {
return "", fmt.Errorf("no tables provided for union")
}
colList := strings.Join(columns, ", ")
parts := make([]string, 0, len(recs))
for _, rec := range recs {
if err := db.ValidateTableName(rec.TableName); err != nil {
continue
}
q := fmt.Sprintf(`SELECT %s FROM %s`, colList, rec.TableName)
if where != "" {
q = q + " WHERE " + where
}
parts = append(parts, q)
}
if len(parts) == 0 {
return "", fmt.Errorf("no valid tables for union")
}
return strings.Join(parts, "\nUNION ALL\n"), nil
}

View File

@@ -760,6 +760,52 @@ const docTemplate = `{
}
}
},
"/api/snapshots/repair": {
"post": {
"description": "Backfills SnapshotTime and lifecycle info for existing daily summary tables and reruns monthly lifecycle refinement using hourly data.",
"produces": [
"application/json"
],
"tags": [
"snapshots"
],
"summary": "Repair daily summaries",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
}
}
},
"/api/snapshots/repair/all": {
"post": {
"description": "Rebuilds snapshot registry, backfills per-vCenter totals, repairs daily summaries (SnapshotTime/lifecycle), and refines monthly lifecycle.",
"produces": [
"application/json"
],
"tags": [
"snapshots"
],
"summary": "Run full snapshot repair suite",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
}
}
},
"/metrics": {
"get": {
"description": "Exposes Prometheus metrics for vctp.",

View File

@@ -749,6 +749,52 @@
}
}
},
"/api/snapshots/repair": {
"post": {
"description": "Backfills SnapshotTime and lifecycle info for existing daily summary tables and reruns monthly lifecycle refinement using hourly data.",
"produces": [
"application/json"
],
"tags": [
"snapshots"
],
"summary": "Repair daily summaries",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
}
}
},
"/api/snapshots/repair/all": {
"post": {
"description": "Rebuilds snapshot registry, backfills per-vCenter totals, repairs daily summaries (SnapshotTime/lifecycle), and refines monthly lifecycle.",
"produces": [
"application/json"
],
"tags": [
"snapshots"
],
"summary": "Run full snapshot repair suite",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
}
}
},
"/metrics": {
"get": {
"description": "Exposes Prometheus metrics for vctp.",

View File

@@ -658,6 +658,38 @@ paths:
summary: Regenerate hourly snapshot reports
tags:
- snapshots
/api/snapshots/repair:
post:
description: Backfills SnapshotTime and lifecycle info for existing daily summary
tables and reruns monthly lifecycle refinement using hourly data.
produces:
- application/json
responses:
"200":
description: OK
schema:
additionalProperties:
type: string
type: object
summary: Repair daily summaries
tags:
- snapshots
/api/snapshots/repair/all:
post:
description: Rebuilds snapshot registry, backfills per-vCenter totals, repairs
daily summaries (SnapshotTime/lifecycle), and refines monthly lifecycle.
produces:
- application/json
responses:
"200":
description: OK
schema:
additionalProperties:
type: string
type: object
summary: Run full snapshot repair suite
tags:
- snapshots
/metrics:
get:
description: Exposes Prometheus metrics for vctp.

View File

@@ -65,6 +65,8 @@ func New(logger *slog.Logger, database db.Database, buildTime string, sha1ver st
mux.HandleFunc("/api/snapshots/aggregate", h.SnapshotAggregateForce)
mux.HandleFunc("/api/snapshots/hourly/force", h.SnapshotForceHourly)
mux.HandleFunc("/api/snapshots/migrate", h.SnapshotMigrate)
mux.HandleFunc("/api/snapshots/repair", h.SnapshotRepair)
mux.HandleFunc("/api/snapshots/repair/all", h.SnapshotRepairSuite)
mux.HandleFunc("/api/snapshots/regenerate-hourly-reports", h.SnapshotRegenerateHourlyReports)
mux.HandleFunc("/vm/trace", h.VmTrace)
mux.HandleFunc("/vcenters", h.VcenterList)

View File

@@ -1 +1,3 @@
CPE_OPTS='-settings /etc/dtms/vctp.yml'
MONTHLY_AGG_GO=0
DAILY_AGG_GO=0