Files
vctp2/db/helpers.go
Nathan Coad ab01c0fc4d
All checks were successful
continuous-integration/drone/push Build is passing
enhance database logging
2026-01-16 14:28:26 +11:00

1494 lines
49 KiB
Go

package db
import (
"context"
"database/sql"
"fmt"
"log/slog"
"sort"
"strings"
"time"
"vctp/db/queries"
"github.com/jmoiron/sqlx"
)
// SnapshotTotals summarizes counts and allocations for snapshot tables.
type SnapshotTotals struct {
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotal int64 `db:"ram_total"`
DiskTotal float64 `db:"disk_total"`
}
type ColumnDef struct {
Name string
Type string
}
// TableRowCount returns COUNT(*) for a table.
func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, error) {
if err := ValidateTableName(table); err != nil {
return 0, err
}
var count int64
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)
if err := getLog(ctx, dbConn, &count, query); err != nil {
return 0, err
}
return count, nil
}
// EnsureColumns adds the provided columns to a table if they are missing.
func EnsureColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []ColumnDef) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
for _, column := range columns {
if err := AddColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
return err
}
}
return nil
}
func execLog(ctx context.Context, dbConn *sqlx.DB, query string, args ...interface{}) (sql.Result, error) {
res, err := dbConn.ExecContext(ctx, query, args...)
if err != nil {
slog.Warn("db exec failed", "query", strings.TrimSpace(query), "error", err)
}
return res, err
}
func getLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
err := dbConn.GetContext(ctx, dest, query, args...)
if err != nil {
slog.Warn("db get failed", "query", strings.TrimSpace(query), "error", err)
}
return err
}
func selectLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
err := dbConn.SelectContext(ctx, dest, query, args...)
if err != nil {
slog.Warn("db select failed", "query", strings.TrimSpace(query), "error", err)
}
return err
}
// AddColumnIfMissing performs a best-effort ALTER TABLE to add a column, ignoring "already exists".
func AddColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column ColumnDef) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
if _, err := execLog(ctx, dbConn, query); err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
return nil
}
return err
}
return nil
}
// ValidateTableName ensures table identifiers are safe for interpolation.
func ValidateTableName(name string) error {
if name == "" {
return fmt.Errorf("table name is empty")
}
for _, r := range name {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
continue
}
return fmt.Errorf("invalid table name: %s", name)
}
return nil
}
// SafeTableName returns the name if it passes validation.
func SafeTableName(name string) (string, error) {
if err := ValidateTableName(name); err != nil {
return "", err
}
return name, nil
}
// TableHasRows returns true when a table contains at least one row.
func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
if err := ValidateTableName(table); err != nil {
return false, err
}
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
var exists int
if err := getLog(ctx, dbConn, &exists, query); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
// TableExists checks if a table exists in the current schema.
func TableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
q := queries.New(dbConn)
count, err := q.SqliteTableExists(ctx, sql.NullString{String: table, Valid: table != ""})
return err == nil && count > 0
case "pgx", "postgres":
var count int
err := getLog(ctx, dbConn, &count, `
SELECT COUNT(1)
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename = $1
`, table)
return err == nil && count > 0
default:
return false
}
}
// ColumnExists checks if a column exists in a table.
func ColumnExists(ctx context.Context, dbConn *sqlx.DB, tableName string, columnName string) (bool, error) {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
if _, err := SafeTableName(tableName); err != nil {
return false, err
}
query := fmt.Sprintf(`PRAGMA table_info("%s")`, tableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var (
cid int
name string
colType string
notNull int
defaultVal sql.NullString
pk int
)
if err := rows.Scan(&cid, &name, &colType, &notNull, &defaultVal, &pk); err != nil {
return false, err
}
if strings.EqualFold(name, columnName) {
return true, nil
}
}
return false, rows.Err()
case "pgx", "postgres":
var count int
err := getLog(ctx, dbConn, &count, `
SELECT COUNT(1)
FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2
`, tableName, strings.ToLower(columnName))
if err != nil {
return false, err
}
return count > 0, nil
default:
return false, fmt.Errorf("unsupported driver for column lookup: %s", driver)
}
}
// SnapshotTotalsForTable returns totals for a snapshot table.
func SnapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (SnapshotTotals, error) {
if _, err := SafeTableName(table); err != nil {
return SnapshotTotals{}, err
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM %s
`, table)
var totals SnapshotTotals
if err := getLog(ctx, dbConn, &totals, query); err != nil {
return SnapshotTotals{}, err
}
return totals, nil
}
// SnapshotTotalsForUnion returns totals for a union query of snapshots.
func SnapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (SnapshotTotals, error) {
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM (
%s
) snapshots
`, unionQuery)
var totals SnapshotTotals
if err := getLog(ctx, dbConn, &totals, query); err != nil {
return SnapshotTotals{}, err
}
return totals, nil
}
// EnsureSnapshotTable creates a snapshot table with the standard schema if it does not exist.
func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" BIGSERIAL PRIMARY KEY,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL
);`, tableName)
default:
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL
);`, tableName)
}
_, err := execLog(ctx, dbConn, ddl)
if err != nil {
return err
}
return EnsureSnapshotIndexes(ctx, dbConn, tableName)
}
// EnsureSnapshotIndexes creates the standard indexes for a snapshot table.
func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
}
// PG-specific helpful indexes; safe no-ops on SQLite if executed, but keep them gated to reduce file bloat.
if driver == "pgx" || driver == "postgres" {
indexes = append(indexes,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_snapshottime_idx ON %s ("Vcenter","SnapshotTime")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_vcenter_idx ON %s ("Name","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
)
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// BackfillSerialColumn sets missing values in a serial-like column for Postgres tables.
func BackfillSerialColumn(ctx context.Context, dbConn *sqlx.DB, tableName, columnName string) error {
if err := ValidateTableName(tableName); err != nil {
return err
}
if columnName == "" {
return fmt.Errorf("column name is empty")
}
query := fmt.Sprintf(
`UPDATE %s SET "%s" = nextval(pg_get_serial_sequence('%s','%s')) WHERE "%s" IS NULL`,
tableName, columnName, tableName, columnName, columnName,
)
_, err := execLog(ctx, dbConn, query)
if err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "pg_get_serial_sequence") || strings.Contains(errText, "sequence") {
return nil
}
return err
}
return nil
}
// ApplySQLiteTuning applies lightweight WAL/synchronous tweaks for better concurrency in non-prod contexts.
func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) {
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
return
}
// Best-effort pragmas; ignore errors to stay safe in constrained environments.
var err error
pragmas := []string{
`PRAGMA journal_mode=WAL;`,
`PRAGMA synchronous=NORMAL;`,
`PRAGMA temp_store=MEMORY;`,
`PRAGMA optimize;`,
}
for _, pragma := range pragmas {
_, err = execLog(ctx, dbConn, pragma)
if logger, ok := ctx.Value("logger").(*slog.Logger); ok && logger != nil {
logger.Debug("Applied SQLite tuning pragma", "pragma", pragma, "error", err)
}
}
}
// EnsureVmIdentityTables creates the identity and rename audit tables.
func EnsureVmIdentityTables(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var identityDDL, renameDDL string
switch driver {
case "pgx", "postgres":
identityDDL = `
CREATE TABLE IF NOT EXISTS vm_identity (
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Cluster" TEXT,
"FirstSeen" BIGINT NOT NULL,
"LastSeen" BIGINT NOT NULL,
PRIMARY KEY ("VmId","VmUuid","Vcenter")
)`
renameDDL = `
CREATE TABLE IF NOT EXISTS vm_renames (
"RowId" BIGSERIAL PRIMARY KEY,
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"OldName" TEXT,
"NewName" TEXT,
"OldCluster" TEXT,
"NewCluster" TEXT,
"SnapshotTime" BIGINT NOT NULL
)`
default:
identityDDL = `
CREATE TABLE IF NOT EXISTS vm_identity (
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Cluster" TEXT,
"FirstSeen" BIGINT NOT NULL,
"LastSeen" BIGINT NOT NULL,
PRIMARY KEY ("VmId","VmUuid","Vcenter")
)`
renameDDL = `
CREATE TABLE IF NOT EXISTS vm_renames (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"OldName" TEXT,
"NewName" TEXT,
"OldCluster" TEXT,
"NewCluster" TEXT,
"SnapshotTime" BIGINT NOT NULL
)`
}
if _, err := execLog(ctx, dbConn, identityDDL); err != nil {
return err
}
if _, err := execLog(ctx, dbConn, renameDDL); err != nil {
return err
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS vm_identity_vcenter_idx ON vm_identity ("Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_identity_uuid_idx ON vm_identity ("VmUuid","Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_identity_name_idx ON vm_identity ("Name","Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_renames_vcenter_idx ON vm_renames ("Vcenter","SnapshotTime")`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// UpsertVmIdentity updates/creates the identity record and records rename events.
func UpsertVmIdentity(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmId, vmUuid sql.NullString, name string, cluster sql.NullString, snapshotTime time.Time) error {
keyVmID := strings.TrimSpace(vmId.String)
keyUuid := strings.TrimSpace(vmUuid.String)
if keyVmID == "" || keyUuid == "" || strings.TrimSpace(vcenter) == "" {
return nil
}
if err := EnsureVmIdentityTables(ctx, dbConn); err != nil {
return err
}
type identityRow struct {
Name string `db:"Name"`
Cluster sql.NullString `db:"Cluster"`
FirstSeen sql.NullInt64 `db:"FirstSeen"`
LastSeen sql.NullInt64 `db:"LastSeen"`
}
var existing identityRow
err := getLog(ctx, dbConn, &existing, `
SELECT "Name","Cluster","FirstSeen","LastSeen"
FROM vm_identity
WHERE "Vcenter" = $1 AND "VmId" = $2 AND "VmUuid" = $3
`, vcenter, keyVmID, keyUuid)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "no rows") {
_, err = execLog(ctx, dbConn, `
INSERT INTO vm_identity ("VmId","VmUuid","Vcenter","Name","Cluster","FirstSeen","LastSeen")
VALUES ($1,$2,$3,$4,$5,$6,$6)
`, keyVmID, keyUuid, vcenter, name, nullString(cluster), snapshotTime.Unix())
return err
}
return err
}
renamed := !strings.EqualFold(existing.Name, name) || !strings.EqualFold(strings.TrimSpace(existing.Cluster.String), strings.TrimSpace(cluster.String))
if renamed {
_, _ = execLog(ctx, dbConn, `
INSERT INTO vm_renames ("VmId","VmUuid","Vcenter","OldName","NewName","OldCluster","NewCluster","SnapshotTime")
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
`, keyVmID, keyUuid, vcenter, existing.Name, name, existing.Cluster.String, cluster.String, snapshotTime.Unix())
}
_, err = execLog(ctx, dbConn, `
UPDATE vm_identity
SET "Name" = $1, "Cluster" = $2, "LastSeen" = $3
WHERE "Vcenter" = $4 AND "VmId" = $5 AND "VmUuid" = $6
`, name, nullString(cluster), snapshotTime.Unix(), vcenter, keyVmID, keyUuid)
return err
}
func nullString(val sql.NullString) interface{} {
if val.Valid {
return val.String
}
return nil
}
// EnsureVcenterTotalsTable creates the vcenter_totals table if missing.
func EnsureVcenterTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_totals (
"RowId" BIGSERIAL PRIMARY KEY,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
default:
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_totals (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS vcenter_totals_vc_time_idx ON vcenter_totals ("Vcenter","SnapshotTime" DESC)`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// InsertVcenterTotals records totals for a vcenter at a snapshot time.
func InsertVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, vmCount, vcpuTotal, ramTotal int64) error {
if strings.TrimSpace(vcenter) == "" {
return fmt.Errorf("vcenter is empty")
}
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return err
}
_, err := execLog(ctx, dbConn, `
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
VALUES ($1,$2,$3,$4,$5)
`, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal)
return err
}
// ListVcenters returns distinct vcenter URLs tracked.
func ListVcenters(ctx context.Context, dbConn *sqlx.DB) ([]string, error) {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
rows, err := dbConn.QueryxContext(ctx, `SELECT DISTINCT "Vcenter" FROM vcenter_totals ORDER BY "Vcenter"`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []string
for rows.Next() {
var v string
if err := rows.Scan(&v); err != nil {
return nil, err
}
out = append(out, v)
}
return out, rows.Err()
}
// VcenterTotalRow holds per-snapshot totals for a vcenter.
type VcenterTotalRow struct {
SnapshotTime int64 `db:"SnapshotTime"`
Vcenter string `db:"Vcenter"`
VmCount int64 `db:"VmCount"`
VcpuTotal int64 `db:"VcpuTotal"`
RamTotalGB int64 `db:"RamTotalGB"`
}
// ListVcenterTotals lists totals for a vcenter sorted by snapshot_time desc, limited.
func ListVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, limit int) ([]VcenterTotalRow, error) {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
if limit <= 0 {
limit = 200
}
rows := make([]VcenterTotalRow, 0, limit)
query := `
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
FROM vcenter_totals
WHERE "Vcenter" = $1
ORDER BY "SnapshotTime" DESC
LIMIT $2`
if err := selectLog(ctx, dbConn, &rows, query, vcenter, limit); err != nil {
return nil, err
}
return rows, nil
}
// ListVcenterTotalsByType returns totals for a vcenter for the requested snapshot type (hourly, daily, monthly).
// Hourly values come from vcenter_totals; daily/monthly are derived from the summary tables referenced in snapshot_registry.
func ListVcenterTotalsByType(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotType string, limit int) ([]VcenterTotalRow, error) {
snapshotType = strings.ToLower(snapshotType)
if snapshotType == "" {
snapshotType = "hourly"
}
if snapshotType == "hourly" {
return ListVcenterTotals(ctx, dbConn, vcenter, limit)
}
if limit <= 0 {
limit = 200
}
driver := strings.ToLower(dbConn.DriverName())
query := `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = $1
ORDER BY snapshot_time DESC
LIMIT $2
`
if driver == "sqlite" {
query = strings.ReplaceAll(query, "$1", "?")
query = strings.ReplaceAll(query, "$2", "?")
}
var regRows []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &regRows, query, snapshotType, limit); err != nil {
return nil, err
}
out := make([]VcenterTotalRow, 0, len(regRows))
for _, r := range regRows {
if err := ValidateTableName(r.TableName); err != nil {
continue
}
agg, err := aggregateSummaryTotals(ctx, dbConn, r.TableName, vcenter)
if err != nil {
continue
}
out = append(out, VcenterTotalRow{
SnapshotTime: r.SnapshotTime,
Vcenter: vcenter,
VmCount: agg.VmCount,
VcpuTotal: agg.VcpuTotal,
RamTotalGB: agg.RamTotalGB,
})
}
return out, nil
}
type summaryAgg struct {
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotalGB int64 `db:"ram_total"`
}
// aggregateSummaryTotals computes totals for a single summary table (daily/monthly) for a given vcenter.
func aggregateSummaryTotals(ctx context.Context, dbConn *sqlx.DB, tableName string, vcenter string) (summaryAgg, error) {
if _, err := SafeTableName(tableName); err != nil {
return summaryAgg{}, err
}
driver := strings.ToLower(dbConn.DriverName())
query := fmt.Sprintf(`
SELECT
COUNT(1) AS vm_count,
COALESCE(SUM(COALESCE("AvgVcpuCount","VcpuCount")),0) AS vcpu_total,
COALESCE(SUM(COALESCE("AvgRamGB","RamGB")),0) AS ram_total
FROM %s
WHERE "Vcenter" = $1
`, tableName)
if driver == "sqlite" {
query = strings.ReplaceAll(query, "$1", "?")
}
var agg summaryAgg
if err := getLog(ctx, dbConn, &agg, query, vcenter); err != nil {
return summaryAgg{}, err
}
return agg, nil
}
// VmTraceRow holds snapshot data for a single VM across tables.
type VmTraceRow struct {
SnapshotTime int64 `db:"SnapshotTime"`
Name string `db:"Name"`
Vcenter string `db:"Vcenter"`
VmId string `db:"VmId"`
VmUuid string `db:"VmUuid"`
ResourcePool string `db:"ResourcePool"`
VcpuCount int64 `db:"VcpuCount"`
RamGB int64 `db:"RamGB"`
ProvisionedDisk float64 `db:"ProvisionedDisk"`
CreationTime sql.NullInt64 `db:"CreationTime"`
DeletionTime sql.NullInt64 `db:"DeletionTime"`
}
// FetchVmTrace returns combined hourly snapshot records for a VM (by id/uuid/name) ordered by snapshot time.
// To avoid SQLite's UNION term limits, this iterates tables one by one and merges in-memory.
func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &tables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time
`); err != nil {
return nil, err
}
if len(tables) == 0 {
return nil, nil
}
rows := make([]VmTraceRow, 0, len(tables))
driver := strings.ToLower(dbConn.DriverName())
slog.Debug("vm trace scanning tables", "table_count", len(tables), "vm_id", vmID, "vm_uuid", vmUUID, "name", name)
for _, t := range tables {
if err := ValidateTableName(t.TableName); err != nil {
slog.Warn("vm trace skipping table (invalid name)", "table", t.TableName, "error", err)
continue
}
query := fmt.Sprintf(`
SELECT %d AS "SnapshotTime",
"Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM %s
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`, t.SnapshotTime, t.TableName)
args := []interface{}{vmID, vmUUID, name}
if driver != "sqlite" {
// convert ? to $1 style for postgres/pgx
query = strings.Replace(query, "?", "$1", 1)
query = strings.Replace(query, "?", "$2", 1)
query = strings.Replace(query, "?", "$3", 1)
}
var tmp []VmTraceRow
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
slog.Warn("vm trace query failed for table", "table", t.TableName, "error", err)
continue
}
slog.Debug("vm trace table rows", "table", t.TableName, "snapshot_time", t.SnapshotTime, "rows", len(tmp))
rows = append(rows, tmp...)
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].SnapshotTime < rows[j].SnapshotTime
})
slog.Info("vm trace combined rows", "total_rows", len(rows))
return rows, nil
}
// SyncVcenterTotalsFromSnapshots backfills vcenter_totals using hourly snapshot tables in snapshot_registry.
func SyncVcenterTotalsFromSnapshots(ctx context.Context, dbConn *sqlx.DB) error {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var hourlyTables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &hourlyTables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time
`); err != nil {
return err
}
for _, ht := range hourlyTables {
if err := ValidateTableName(ht.TableName); err != nil {
continue
}
// Aggregate per vcenter from the snapshot table.
query := fmt.Sprintf(`
SELECT "Vcenter" AS vcenter,
COUNT(1) AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total
FROM %s
GROUP BY "Vcenter"
`, ht.TableName)
type aggRow struct {
Vcenter string `db:"vcenter"`
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotal int64 `db:"ram_total"`
}
var aggs []aggRow
if err := selectLog(ctx, dbConn, &aggs, query); err != nil {
continue
}
for _, a := range aggs {
// Insert if missing.
insert := `
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
SELECT $1,$2,$3,$4,$5
WHERE NOT EXISTS (
SELECT 1 FROM vcenter_totals WHERE "Vcenter" = $1 AND "SnapshotTime" = $2
)
`
if driver == "sqlite" {
insert = strings.ReplaceAll(insert, "$", "?")
}
if _, err := execLog(ctx, dbConn, insert, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
slog.Warn("failed to backfill vcenter_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
}
}
}
return nil
}
// AnalyzeTableIfPostgres runs ANALYZE on a table to refresh planner stats.
func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string) {
if _, err := SafeTableName(tableName); err != nil {
return
}
driver := strings.ToLower(dbConn.DriverName())
if driver != "pgx" && driver != "postgres" {
return
}
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`ANALYZE %s`, tableName)); err != nil {
slog.Warn("failed to ANALYZE table", "table", tableName, "error", err)
}
}
// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers.
func SetPostgresWorkMem(ctx context.Context, dbConn *sqlx.DB, workMemMB int) {
if workMemMB <= 0 {
return
}
driver := strings.ToLower(dbConn.DriverName())
if driver != "pgx" && driver != "postgres" {
return
}
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`SET LOCAL work_mem = '%dMB'`, workMemMB)); err != nil {
slog.Warn("failed to set work_mem", "work_mem_mb", workMemMB, "error", err)
}
}
// CheckMigrationState ensures goose migrations are present and not dirty.
func CheckMigrationState(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var tableExists bool
switch driver {
case "sqlite":
err := getLog(ctx, dbConn, &tableExists, `
SELECT COUNT(1) > 0 FROM sqlite_master WHERE type='table' AND name='goose_db_version'
`)
if err != nil {
return err
}
case "pgx", "postgres":
err := getLog(ctx, dbConn, &tableExists, `
SELECT EXISTS (
SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'goose_db_version'
)
`)
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported driver for migration check: %s", driver)
}
if !tableExists {
return fmt.Errorf("goose_db_version table not found; database migrations may not be applied")
}
var dirty bool
err := getLog(ctx, dbConn, &dirty, `
SELECT NOT is_applied
FROM goose_db_version
ORDER BY id DESC
LIMIT 1
`)
if err != nil {
return err
}
if dirty {
return fmt.Errorf("database migrations are in a dirty state; please resolve goose_db_version")
}
return nil
}
// BuildDailySummaryInsert returns the SQL to aggregate hourly snapshots into a daily summary table.
func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error) {
if _, err := SafeTableName(tableName); err != nil {
return "", err
}
insert := fmt.Sprintf(`
WITH snapshots AS (
%s
), totals AS (
SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots
), agg AS (
SELECT
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion,
MAX(COALESCE(inv."DeletionTime", 0)) AS inv_deletion,
MIN(s."SnapshotTime") AS first_present,
MAX(s."SnapshotTime") AS last_present,
COUNT(*) AS samples_present,
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits
FROM snapshots s
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
GROUP BY
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold"
)
SELECT
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime",
CASE
WHEN NULLIF(agg.inv_deletion, 0) IS NOT NULL THEN NULLIF(agg.inv_deletion, 0)
WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(
NULLIF(agg.any_deletion, 0),
(SELECT MIN(s2."SnapshotTime") FROM snapshots s2 WHERE s2."SnapshotTime" > agg.last_present),
totals.max_snapshot,
agg.last_present
)
ELSE NULLIF(agg.any_deletion, 0)
END AS "DeletionTime",
(
SELECT s2."ResourcePool"
FROM snapshots s2
WHERE s2."VmId" = agg."VmId"
AND s2."Vcenter" = agg."Vcenter"
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "ResourcePool",
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.samples_present AS "SamplesPresent",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_vcpu / totals.total_samples
ELSE NULL END AS "AvgVcpuCount",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_ram / totals.total_samples
ELSE NULL END AS "AvgRamGB",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_disk / totals.total_samples
ELSE NULL END AS "AvgProvisionedDisk",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.samples_present / totals.total_samples
ELSE NULL END AS "AvgIsPresent",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.tin_hits / agg.samples_present
ELSE NULL END AS "PoolTinPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.bronze_hits / agg.samples_present
ELSE NULL END AS "PoolBronzePct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.silver_hits / agg.samples_present
ELSE NULL END AS "PoolSilverPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.gold_hits / agg.samples_present
ELSE NULL END AS "PoolGoldPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.tin_hits / agg.samples_present
ELSE NULL END AS "Tin",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.bronze_hits / agg.samples_present
ELSE NULL END AS "Bronze",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.silver_hits / agg.samples_present
ELSE NULL END AS "Silver",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.gold_hits / agg.samples_present
ELSE NULL END AS "Gold"
FROM agg
CROSS JOIN totals
GROUP BY
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
totals.total_samples;
`, unionQuery, tableName)
return insert, nil
}
// RefineCreationDeletionFromUnion walks all snapshot rows in a period and tightens CreationTime/DeletionTime
// by using the first and last observed samples and the first sample after disappearance.
func RefineCreationDeletionFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
if unionQuery == "" {
return fmt.Errorf("union query is empty")
}
if _, err := SafeTableName(summaryTable); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var sql string
switch driver {
case "pgx", "postgres":
sql = fmt.Sprintf(`
WITH snapshots AS (
%s
), timeline AS (
SELECT
s."VmId",
s."VmUuid",
s."Name",
s."Vcenter",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MIN(s."SnapshotTime") AS first_seen,
MAX(s."SnapshotTime") AS last_seen
FROM snapshots s
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
)
UPDATE %s dst
SET
"CreationTime" = CASE
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.any_creation), t.any_creation)
WHEN t.first_seen IS NOT NULL THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.first_seen), t.first_seen)
ELSE dst."CreationTime"
END,
"DeletionTime" = CASE
WHEN t_last_after IS NOT NULL
AND (dst."DeletionTime" IS NULL OR dst."DeletionTime" = 0 OR t_last_after < dst."DeletionTime")
THEN t_last_after
ELSE dst."DeletionTime"
END
FROM (
SELECT
tl.*,
(
SELECT MIN(s2."SnapshotTime")
FROM snapshots s2
WHERE s2."Vcenter" = tl."Vcenter"
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
AND s2."SnapshotTime" > tl.last_seen
) AS t_last_after
FROM timeline tl
) t
WHERE dst."Vcenter" = t."Vcenter"
AND (
(dst."VmId" IS NOT DISTINCT FROM t."VmId")
OR (dst."VmUuid" IS NOT DISTINCT FROM t."VmUuid")
OR (dst."Name" IS NOT DISTINCT FROM t."Name")
);
`, unionQuery, summaryTable)
default:
// SQLite variant (no FROM in UPDATE, no IS NOT DISTINCT FROM). Uses positional args to avoid placeholder count issues.
sql = fmt.Sprintf(`
WITH snapshots AS (
%[1]s
), timeline AS (
SELECT
s."VmId",
s."VmUuid",
s."Name",
s."Vcenter",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MIN(s."SnapshotTime") AS first_seen,
MAX(s."SnapshotTime") AS last_seen
FROM snapshots s
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
), enriched AS (
SELECT
tl.*,
(
SELECT MIN(s2."SnapshotTime")
FROM snapshots s2
WHERE s2."Vcenter" = tl."Vcenter"
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
AND s2."SnapshotTime" > tl.last_seen
) AS first_after
FROM timeline tl
)
UPDATE %[2]s
SET
"CreationTime" = COALESCE(
(
SELECT CASE
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.any_creation) > t.any_creation THEN t.any_creation
WHEN t.any_creation IS NULL AND t.first_seen IS NOT NULL AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.first_seen) > t.first_seen THEN t.first_seen
ELSE NULL
END
FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
LIMIT 1
),
"CreationTime"
),
"DeletionTime" = COALESCE(
(
SELECT t.first_after
FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
AND t.first_after IS NOT NULL
AND ("DeletionTime" IS NULL OR "DeletionTime" = 0 OR t.first_after < "DeletionTime")
LIMIT 1
),
"DeletionTime"
)
WHERE EXISTS (
SELECT 1 FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
);
`, unionQuery, summaryTable)
}
_, err := execLog(ctx, dbConn, sql)
return err
}
// BuildMonthlySummaryInsert returns the SQL to aggregate daily summaries into a monthly summary table.
func BuildMonthlySummaryInsert(tableName string, unionQuery string) (string, error) {
if _, err := SafeTableName(tableName); err != nil {
return "", err
}
insert := fmt.Sprintf(`
WITH daily AS (
%s
), enriched AS (
SELECT
d.*,
CASE
WHEN d."AvgIsPresent" IS NOT NULL AND d."AvgIsPresent" > 0 THEN d."SamplesPresent" / d."AvgIsPresent"
ELSE CAST(d."SamplesPresent" AS REAL)
END AS total_samples_day
FROM daily d
), totals AS (
SELECT COALESCE(SUM(total_samples_day), 0) AS total_samples FROM enriched
)
-- monthly averages are weighted by the implied sample counts per day (SamplesPresent / AvgIsPresent)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime",
NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime",
MAX("ResourcePool") AS "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
SUM("SamplesPresent") AS "SamplesPresent",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgVcpuCount",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgRamGB" IS NOT NULL THEN "AvgRamGB" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgRamGB",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgProvisionedDisk" IS NOT NULL THEN "AvgProvisionedDisk" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgProvisionedDisk",
CASE WHEN totals.total_samples > 0
THEN SUM("SamplesPresent") * 1.0 / totals.total_samples
ELSE NULL END AS "AvgIsPresent",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolTinPct" IS NOT NULL THEN ("PoolTinPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolTinPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolBronzePct" IS NOT NULL THEN ("PoolBronzePct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolBronzePct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolSilverPct" IS NOT NULL THEN ("PoolSilverPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolSilverPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolGoldPct" IS NOT NULL THEN ("PoolGoldPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolGoldPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Tin" IS NOT NULL THEN ("Tin" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Tin",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Bronze" IS NOT NULL THEN ("Bronze" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Bronze",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Silver" IS NOT NULL THEN ("Silver" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Silver",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Gold" IS NOT NULL THEN ("Gold" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Gold"
FROM enriched
CROSS JOIN totals
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, unionQuery, tableName)
return insert, nil
}
// EnsureSummaryTable creates a daily/monthly summary table with the standard schema if it does not exist.
func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" BIGSERIAL PRIMARY KEY,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
"Gold" REAL
);`, tableName)
default:
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
"Gold" REAL
);`, tableName)
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
// Best-effort: drop legacy IsPresent column if it exists.
if hasIsPresent, err := ColumnExists(ctx, dbConn, tableName, "IsPresent"); err == nil && hasIsPresent {
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, tableName))
}
indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
}
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
indexes = append(indexes,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_idx ON %s ("Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
)
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
ddl = `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" BIGSERIAL PRIMARY KEY,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexes := []string{
`CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// UpsertSnapshotRun updates or inserts snapshot run status.
func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error {
if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
successStr := "FALSE"
if success {
successStr = "TRUE"
}
now := time.Now().Unix()
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
_, err := execLog(ctx, dbConn, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES (?, ?, 1, ?, ?, ?)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = excluded."Success",
"LastError" = excluded."LastError",
"LastAttempt" = excluded."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
case "pgx", "postgres":
_, err := execLog(ctx, dbConn, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES ($1, $2, 1, $3, $4, $5)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = EXCLUDED."Success",
"LastError" = EXCLUDED."LastError",
"LastAttempt" = EXCLUDED."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
default:
return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver)
}
}
// ListFailedSnapshotRuns returns vcenter/time pairs needing retry.
func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, error) {
if maxAttempts <= 0 {
maxAttempts = 3
}
driver := strings.ToLower(dbConn.DriverName())
query := `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < ?
ORDER BY "LastAttempt" ASC
`
args := []interface{}{maxAttempts}
if driver == "pgx" || driver == "postgres" {
query = `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < $1
ORDER BY "LastAttempt" ASC
`
}
type row struct {
Vcenter string `db:"Vcenter"`
SnapshotTime int64 `db:"SnapshotTime"`
Attempts int `db:"Attempts"`
}
rows := []row{}
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
results := make([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, 0, len(rows))
for _, r := range rows {
results = append(results, struct {
Vcenter string
SnapshotTime int64
Attempts int
}{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts})
}
return results, nil
}