extend average calculations in daily/monthly rollups
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-13 21:13:46 +11:00
parent 0f0bdf19c3
commit 5cc89968d9
2 changed files with 253 additions and 12 deletions

View File

@@ -37,7 +37,7 @@ steps:
- go install github.com/a-h/templ/cmd/templ@latest - go install github.com/a-h/templ/cmd/templ@latest
- go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest - go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
- go install github.com/swaggo/swag/cmd/swag@latest - go install github.com/swaggo/swag/cmd/swag@latest
- go install github.com/goreleaser/nfpm/v2/cmd/nfpm@latest # - go install github.com/goreleaser/nfpm/v2/cmd/nfpm@latest
- sqlc generate - sqlc generate
- templ generate -path ./components - templ generate -path ./components
- swag init --exclude "pkg.mod,pkg.build,pkg.tools" -o server/router/docs - swag init --exclude "pkg.mod,pkg.build,pkg.tools" -o server/router/docs

View File

@@ -86,6 +86,7 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
} }
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
totals := snapshotTotals{}
for _, vm := range vcVms { for _, vm := range vcVms {
if strings.HasPrefix(vm.Name(), "vCLS-") { if strings.HasPrefix(vm.Name(), "vCLS-") {
continue continue
@@ -113,6 +114,11 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
} }
row.IsPresent = "TRUE" row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row presentSnapshots[vm.Reference().Value] = row
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.InitialVcpus)
totals.RamTotal += nullInt64ToInt(row.InitialRam)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
} }
for _, row := range presentSnapshots { for _, row := range presentSnapshots {
@@ -137,6 +143,14 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
} }
vc.Logout() vc.Logout()
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_mb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
} }
c.Logger.Debug("Finished hourly vcenter snapshot") c.Logger.Debug("Finished hourly vcenter snapshot")
@@ -160,17 +174,61 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo
return err return err
} }
currentTotals, err := snapshotTotalsForTable(ctx, dbConn, sourceTable)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "table", sourceTable)
} else {
c.Logger.Info("Daily snapshot totals",
"table", sourceTable,
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_mb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevTable, _ := dailyInventoryTableName(targetTime.AddDate(0, 0, -1))
if prevTable != "" && tableExists(ctx, dbConn, prevTable) {
prevTotals, err := snapshotTotalsForTable(ctx, dbConn, prevTable)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "table", prevTable)
} else {
c.Logger.Info("Daily snapshot comparison",
"current_table", sourceTable,
"previous_table", prevTable,
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_mb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
}
insertQuery := fmt.Sprintf(` insertQuery := fmt.Sprintf(`
INSERT INTO %s ( INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent" "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"SamplesPresent", "AvgVcpus", "AvgRam", "AvgDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct"
) )
SELECT SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent" SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "InitialVcpus" IS NOT NULL THEN "InitialVcpus" END) AS "AvgVcpus",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "InitialRam" IS NOT NULL THEN "InitialRam" END) AS "AvgRam",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" END) AS "AvgDisk",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolTinPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolBronzePct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct"
FROM %s FROM %s
GROUP BY GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
@@ -222,12 +280,26 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01")) return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01"))
} }
monthlyTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01"))
} else {
c.Logger.Info("Monthly snapshot totals",
"month", targetMonth.Format("2006-01"),
"vm_count", monthlyTotals.VmCount,
"vcpu_total", monthlyTotals.VcpuTotal,
"ram_total_mb", monthlyTotals.RamTotal,
"disk_total_gb", monthlyTotals.DiskTotal,
)
}
insertQuery := fmt.Sprintf(` insertQuery := fmt.Sprintf(`
INSERT INTO %s ( INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"AvgVcpus", "AvgRam", "AvgIsPresent" "AvgVcpus", "AvgRam", "AvgDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct"
) )
SELECT SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
@@ -235,7 +307,16 @@ SELECT
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
AVG(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" END) AS "AvgVcpus", AVG(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" END) AS "AvgVcpus",
AVG(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" END) AS "AvgRam", AVG(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" END) AS "AvgRam",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent" AVG(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" END) AS "AvgDisk",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolTinPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolBronzePct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct"
FROM ( FROM (
%s %s
) snapshots ) snapshots
@@ -270,6 +351,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
return err return err
} }
removedHourly := 0
for _, table := range hourlyTables { for _, table := range hourlyTables {
if strings.HasPrefix(table, "inventory_daily_summary_") { if strings.HasPrefix(table, "inventory_daily_summary_") {
continue continue
@@ -281,6 +363,8 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
if tableDate.Before(truncateDate(hourlyCutoff)) { if tableDate.Before(truncateDate(hourlyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil { if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table)
} else {
removedHourly++
} }
} }
} }
@@ -289,6 +373,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
if err != nil { if err != nil {
return err return err
} }
removedDaily := 0
for _, table := range dailyTables { for _, table := range dailyTables {
tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102")
if !ok { if !ok {
@@ -297,11 +382,18 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
if tableDate.Before(truncateDate(dailyCutoff)) { if tableDate.Before(truncateDate(dailyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil { if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table)
} else {
removedDaily++
} }
} }
} }
c.Logger.Debug("Finished snapshot cleanup") c.Logger.Info("Finished snapshot cleanup",
"removed_hourly_tables", removedHourly,
"removed_daily_tables", removedDaily,
"hourly_max_age_days", hourlyMaxDays,
"daily_max_age_months", dailyMaxMonths,
)
return nil return nil
} }
@@ -379,13 +471,33 @@ func ensureDailySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName str
"PoweredOn" TEXT, "PoweredOn" TEXT,
"SrmPlaceholder" TEXT, "SrmPlaceholder" TEXT,
"VmUuid" TEXT, "VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL "SamplesPresent" BIGINT NOT NULL,
"AvgVcpus" REAL,
"AvgRam" REAL,
"AvgDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL
);`, tableName) );`, tableName)
_, err := dbConn.ExecContext(ctx, ddl) if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err return err
} }
return ensureSnapshotColumns(ctx, dbConn, tableName, []columnDef{
{Name: "AvgVcpus", Type: "REAL"},
{Name: "AvgRam", Type: "REAL"},
{Name: "AvgDisk", Type: "REAL"},
{Name: "AvgIsPresent", Type: "REAL"},
{Name: "PoolTinPct", Type: "REAL"},
{Name: "PoolBronzePct", Type: "REAL"},
{Name: "PoolSilverPct", Type: "REAL"},
{Name: "PoolGoldPct", Type: "REAL"},
})
}
func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT, "InventoryId" BIGINT,
@@ -410,13 +522,27 @@ func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName s
"VmUuid" TEXT, "VmUuid" TEXT,
"AvgVcpus" REAL, "AvgVcpus" REAL,
"AvgRam" REAL, "AvgRam" REAL,
"AvgIsPresent" REAL "AvgDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL
);`, tableName) );`, tableName)
_, err := dbConn.ExecContext(ctx, ddl) if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err return err
} }
return ensureSnapshotColumns(ctx, dbConn, tableName, []columnDef{
{Name: "AvgDisk", Type: "REAL"},
{Name: "PoolTinPct", Type: "REAL"},
{Name: "PoolBronzePct", Type: "REAL"},
{Name: "PoolSilverPct", Type: "REAL"},
{Name: "PoolGoldPct", Type: "REAL"},
})
}
func buildUnionQuery(tables []string, columns []string) string { func buildUnionQuery(tables []string, columns []string) string {
queries := make([]string, 0, len(tables)) queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ") columnList := strings.Join(columns, ", ")
@@ -453,6 +579,121 @@ func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error
return err return err
} }
type snapshotTotals struct {
VmCount int64
VcpuTotal int64
RamTotal int64
DiskTotal float64
}
type columnDef struct {
Name string
Type string
}
func ensureSnapshotColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []columnDef) error {
if _, err := safeTableName(tableName); err != nil {
return err
}
for _, column := range columns {
if err := addColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
return err
}
}
return nil
}
func addColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column columnDef) error {
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
if _, err := dbConn.ExecContext(ctx, query); err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
return nil
}
return err
}
return nil
}
func snapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (snapshotTotals, error) {
if _, err := safeTableName(table); err != nil {
return snapshotTotals{}, err
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM %s
WHERE "IsPresent" = 'TRUE'
`, table)
var totals snapshotTotals
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
return snapshotTotals{}, err
}
return totals, nil
}
func snapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (snapshotTotals, error) {
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM (
%s
) snapshots
WHERE "IsPresent" = 'TRUE'
`, unionQuery)
var totals snapshotTotals
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
return snapshotTotals{}, err
}
return totals, nil
}
func tableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
var count int
err := dbConn.GetContext(ctx, &count, `
SELECT COUNT(1)
FROM sqlite_master
WHERE type = 'table' AND name = ?
`, table)
return err == nil && count > 0
case "pgx", "postgres":
var count int
err := dbConn.GetContext(ctx, &count, `
SELECT COUNT(1)
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename = $1
`, table)
return err == nil && count > 0
default:
return false
}
}
func nullInt64ToInt(value sql.NullInt64) int64 {
if value.Valid {
return value.Int64
}
return 0
}
func nullFloat64ToFloat(value sql.NullFloat64) float64 {
if value.Valid {
return value.Float64
}
return 0
}
func getEnvInt(key string, fallback int) int { func getEnvInt(key string, fallback int) int {
raw := strings.TrimSpace(os.Getenv(key)) raw := strings.TrimSpace(os.Getenv(key))
if raw == "" { if raw == "" {