Files
vctp2/db/helpers.go
Nathan Coad 18be1fbe06
All checks were successful
continuous-integration/drone/push Build is passing
Add vCenter reference cache tables and update related functions
2026-02-13 14:45:13 +11:00

3456 lines
115 KiB
Go

package db
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"sort"
"strconv"
"strings"
"sync"
"time"
"vctp/db/queries"
"github.com/jmoiron/sqlx"
)
// SnapshotTotals summarizes counts and allocations for snapshot tables.
type SnapshotTotals struct {
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotal int64 `db:"ram_total"`
DiskTotal float64 `db:"disk_total"`
}
type ColumnDef struct {
Name string
Type string
}
type VcenterHostCacheEntry struct {
Cluster string
Datacenter string
}
type ensureOnceKey struct {
dbConn *sqlx.DB
name string
}
type ensureOnceState struct {
mu sync.Mutex
done bool
}
var ensureOnceRegistry sync.Map
// ensureOncePerDB runs fn once per DB connection for a given logical key.
// The function is considered complete only when fn returns nil.
func ensureOncePerDB(dbConn *sqlx.DB, name string, fn func() error) error {
if dbConn == nil {
return fmt.Errorf("db connection is nil")
}
key := ensureOnceKey{dbConn: dbConn, name: name}
stateAny, _ := ensureOnceRegistry.LoadOrStore(key, &ensureOnceState{})
state := stateAny.(*ensureOnceState)
state.mu.Lock()
defer state.mu.Unlock()
if state.done {
return nil
}
if err := fn(); err != nil {
return err
}
state.done = true
return nil
}
// TableRowCount returns COUNT(*) for a table.
func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, error) {
if err := ValidateTableName(table); err != nil {
return 0, err
}
start := time.Now()
slog.Debug("db row count start", "table", table)
var count int64
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)
if err := getLog(ctx, dbConn, &count, query); err != nil {
slog.Debug("db row count failed", "table", table, "duration", time.Since(start), "error", err)
return 0, err
}
slog.Debug("db row count complete", "table", table, "rows", count, "duration", time.Since(start))
return count, nil
}
// EnsureColumns adds the provided columns to a table if they are missing.
func EnsureColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []ColumnDef) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
for _, column := range columns {
if err := AddColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
return err
}
}
return nil
}
func execLog(ctx context.Context, dbConn *sqlx.DB, query string, args ...interface{}) (sql.Result, error) {
res, err := dbConn.ExecContext(ctx, query, args...)
if err != nil {
q := strings.TrimSpace(query)
const maxQueryLogChars = 240
truncated := false
if len(q) > maxQueryLogChars {
q = q[:maxQueryLogChars] + " ... [truncated]"
truncated = true
}
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "duplicate column name") || strings.Contains(msg, "already exists") {
slog.Debug("db exec skipped (already exists)", "query", q, "query_truncated", truncated, "error", err)
} else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || strings.Contains(msg, "statement timeout") {
slog.Debug("db exec timed out", "query", q, "query_truncated", truncated, "error", err)
} else {
slog.Warn("db exec failed", "query", q, "query_truncated", truncated, "error", err)
}
}
return res, err
}
func getLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
err := dbConn.GetContext(ctx, dest, query, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
slog.Debug("db get returned no rows", "query", strings.TrimSpace(query))
return err
}
// Soften logging for timeout/cancel scenarios commonly hit during best-effort probes.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
slog.Debug("db get timed out", "query", strings.TrimSpace(query), "error", err)
} else {
slog.Warn("db get failed", "query", strings.TrimSpace(query), "error", err)
}
}
return err
}
func selectLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
err := dbConn.SelectContext(ctx, dest, query, args...)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
slog.Debug("db select timed out", "query", strings.TrimSpace(query), "error", err)
} else {
slog.Warn("db select failed", "query", strings.TrimSpace(query), "error", err)
}
}
return err
}
// AddColumnIfMissing performs a best-effort ALTER TABLE to add a column, ignoring "already exists".
func AddColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column ColumnDef) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
// Skip ALTER if the column already exists to avoid noisy duplicate errors.
if exists, err := ColumnExists(ctx, dbConn, tableName, column.Name); err == nil && exists {
return nil
}
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
if _, err := execLog(ctx, dbConn, query); err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
return nil
}
return err
}
return nil
}
// ValidateTableName ensures table identifiers are safe for interpolation.
func ValidateTableName(name string) error {
if name == "" {
return fmt.Errorf("table name is empty")
}
for _, r := range name {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
continue
}
return fmt.Errorf("invalid table name: %s", name)
}
return nil
}
// SafeTableName returns the name if it passes validation.
func SafeTableName(name string) (string, error) {
if err := ValidateTableName(name); err != nil {
return "", err
}
return name, nil
}
// TableHasRows returns true when a table contains at least one row.
func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
if err := ValidateTableName(table); err != nil {
return false, err
}
// Avoid hanging on locked tables; apply a short timeout.
if ctx == nil {
ctx = context.Background()
}
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
var exists int
if err := getLog(ctx, dbConn, &exists, query); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return false, nil
}
return false, err
}
return true, nil
}
// TableExists checks if a table exists in the current schema.
func TableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
q := queries.New(dbConn)
count, err := q.SqliteTableExists(ctx, sql.NullString{String: table, Valid: table != ""})
return err == nil && count > 0
case "pgx", "postgres":
var count int
err := getLog(ctx, dbConn, &count, `
SELECT COUNT(1)
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename = $1
`, table)
return err == nil && count > 0
default:
return false
}
}
// ColumnExists checks if a column exists in a table.
func ColumnExists(ctx context.Context, dbConn *sqlx.DB, tableName string, columnName string) (bool, error) {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
if _, err := SafeTableName(tableName); err != nil {
return false, err
}
query := fmt.Sprintf(`PRAGMA table_info("%s")`, tableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var (
cid int
name string
colType string
notNull int
defaultVal sql.NullString
pk int
)
if err := rows.Scan(&cid, &name, &colType, &notNull, &defaultVal, &pk); err != nil {
return false, err
}
if strings.EqualFold(name, columnName) {
return true, nil
}
}
return false, rows.Err()
case "pgx", "postgres":
var count int
err := getLog(ctx, dbConn, &count, `
SELECT COUNT(1)
FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2
`, tableName, strings.ToLower(columnName))
if err != nil {
return false, err
}
return count > 0, nil
default:
return false, fmt.Errorf("unsupported driver for column lookup: %s", driver)
}
}
// SnapshotTotalsForTable returns totals for a snapshot table.
func SnapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (SnapshotTotals, error) {
if _, err := SafeTableName(table); err != nil {
return SnapshotTotals{}, err
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM %s
`, table)
var totals SnapshotTotals
if err := getLog(ctx, dbConn, &totals, query); err != nil {
return SnapshotTotals{}, err
}
return totals, nil
}
// SnapshotTotalsForUnion returns totals for a union query of snapshots.
func SnapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (SnapshotTotals, error) {
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM (
%s
) snapshots
`, unionQuery)
var totals SnapshotTotals
if err := getLog(ctx, dbConn, &totals, query); err != nil {
return SnapshotTotals{}, err
}
return totals, nil
}
// EnsureSnapshotTable creates a snapshot table with the standard schema if it does not exist.
func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" BIGSERIAL PRIMARY KEY,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL
);`, tableName)
default:
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL
);`, tableName)
}
_, err := execLog(ctx, dbConn, ddl)
if err != nil {
return err
}
return EnsureSnapshotIndexes(ctx, dbConn, tableName)
}
// EnsureSnapshotIndexes creates the standard indexes for a snapshot table.
func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
}
if driver != "sqlite" {
indexes = append(indexes,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
)
}
// PG-specific helpful indexes; safe no-ops on SQLite if executed, but keep them gated to reduce file bloat.
if driver == "pgx" || driver == "postgres" {
indexes = append(indexes,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_snapshottime_idx ON %s ("Vcenter","SnapshotTime")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_vcenter_idx ON %s ("Name","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
)
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
func listTablesByPrefix(ctx context.Context, dbConn *sqlx.DB, prefix string) ([]string, error) {
driver := strings.ToLower(dbConn.DriverName())
pattern := prefix + "%"
switch driver {
case "sqlite":
rows, err := dbConn.QueryxContext(ctx, `
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name LIKE ?
ORDER BY name DESC
`, pattern)
if err != nil {
return nil, err
}
defer rows.Close()
var tables []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, err
}
tables = append(tables, name)
}
return tables, rows.Err()
case "pgx", "postgres":
rows, err := dbConn.QueryxContext(ctx, `
SELECT tablename
FROM pg_catalog.pg_tables
WHERE schemaname = 'public'
AND tablename LIKE $1
ORDER BY tablename DESC
`, pattern)
if err != nil {
return nil, err
}
defer rows.Close()
var tables []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, err
}
tables = append(tables, name)
}
return tables, rows.Err()
default:
return nil, fmt.Errorf("unsupported driver: %s", driver)
}
}
// CleanupHourlySnapshotIndexes drops low-value per-table indexes on hourly snapshot tables.
func CleanupHourlySnapshotIndexes(ctx context.Context, dbConn *sqlx.DB) (int, error) {
return cleanupHourlySnapshotIndexes(ctx, dbConn, 0)
}
// CleanupHourlySnapshotIndexesOlderThan drops per-table hourly indexes for snapshot tables older than cutoff.
// cutoff <= 0 means drop across all hourly tables.
func CleanupHourlySnapshotIndexesOlderThan(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (int, error) {
return cleanupHourlySnapshotIndexes(ctx, dbConn, cutoff.Unix())
}
func cleanupHourlySnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, cutoffUnix int64) (int, error) {
driver := strings.ToLower(dbConn.DriverName())
if driver != "sqlite" {
return 0, fmt.Errorf("hourly snapshot index cleanup is only supported for sqlite")
}
tables, err := listTablesByPrefix(ctx, dbConn, "inventory_hourly_")
if err != nil {
return 0, err
}
var existing []struct {
Name string `db:"name"`
}
if err := selectLog(ctx, dbConn, &existing, `
SELECT name
FROM sqlite_master
WHERE type = 'index'
AND tbl_name LIKE 'inventory_hourly_%'
`); err != nil {
return 0, err
}
existingSet := make(map[string]struct{}, len(existing))
for _, idx := range existing {
existingSet[idx.Name] = struct{}{}
}
dropped := 0
for _, tableName := range tables {
if _, err := SafeTableName(tableName); err != nil {
continue
}
snapshotUnix, ok := parseHourlySnapshotUnix(tableName)
if !ok {
continue
}
if cutoffUnix > 0 && snapshotUnix >= cutoffUnix {
continue
}
indexNames := []string{
fmt.Sprintf("%s_vm_vcenter_idx", tableName),
fmt.Sprintf("%s_snapshottime_idx", tableName),
fmt.Sprintf("%s_resourcepool_idx", tableName),
}
for _, indexName := range indexNames {
if _, exists := existingSet[indexName]; !exists {
continue
}
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`DROP INDEX IF EXISTS %s`, indexName)); err != nil {
return dropped, err
}
delete(existingSet, indexName)
dropped++
}
}
return dropped, nil
}
func parseHourlySnapshotUnix(tableName string) (int64, bool) {
const prefix = "inventory_hourly_"
if !strings.HasPrefix(tableName, prefix) {
return 0, false
}
suffix := strings.TrimPrefix(tableName, prefix)
unix, err := strconv.ParseInt(suffix, 10, 64)
if err != nil || unix <= 0 {
return 0, false
}
return unix, true
}
// BackfillSerialColumn sets missing values in a serial-like column for Postgres tables.
func BackfillSerialColumn(ctx context.Context, dbConn *sqlx.DB, tableName, columnName string) error {
if err := ValidateTableName(tableName); err != nil {
return err
}
if columnName == "" {
return fmt.Errorf("column name is empty")
}
query := fmt.Sprintf(
`UPDATE %s SET "%s" = nextval(pg_get_serial_sequence('%s','%s')) WHERE "%s" IS NULL`,
tableName, columnName, tableName, columnName, columnName,
)
_, err := execLog(ctx, dbConn, query)
if err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "pg_get_serial_sequence") || strings.Contains(errText, "sequence") {
return nil
}
return err
}
return nil
}
// ApplySQLiteTuning applies lightweight WAL/synchronous tweaks for better concurrency in non-prod contexts.
func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) {
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
return
}
if ctx == nil {
ctx = context.Background()
}
if err := ensureOncePerDB(dbConn, "sqlite_tuning_pragmas", func() error {
// Best-effort pragmas; keep this list lightweight to avoid long-running startup work.
// `PRAGMA optimize` is intentionally excluded from the hourly snapshot hot path.
pragmas := []string{
`PRAGMA journal_mode=WAL;`,
`PRAGMA synchronous=NORMAL;`,
`PRAGMA temp_store=MEMORY;`,
`PRAGMA busy_timeout=5000;`,
}
for _, pragma := range pragmas {
pragmaCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
_, err := execLog(pragmaCtx, dbConn, pragma)
cancel()
if logger, ok := ctx.Value("logger").(*slog.Logger); ok && logger != nil {
logger.Debug("Applied SQLite tuning pragma", "pragma", pragma, "error", err)
}
}
return nil
}); err != nil {
slog.Warn("failed to apply SQLite tuning pragmas", "error", err)
}
}
// CheckpointSQLite forces a WAL checkpoint (truncate) when using SQLite. No-op for other drivers.
func CheckpointSQLite(ctx context.Context, dbConn *sqlx.DB) error {
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
return nil
}
if ctx == nil {
ctx = context.Background()
}
start := time.Now()
slog.Debug("sqlite checkpoint start")
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err := dbConn.ExecContext(cctx, `PRAGMA wal_checkpoint(TRUNCATE);`)
if err != nil {
slog.Warn("sqlite checkpoint failed", "error", err, "duration", time.Since(start))
return err
}
slog.Debug("sqlite checkpoint complete", "duration", time.Since(start))
return nil
}
// EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations.
func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vm_hourly_stats (
"SnapshotTime" BIGINT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"VmUuid" TEXT,
"Name" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
PRIMARY KEY ("Vcenter","VmId","SnapshotTime")
);`
return ensureOncePerDB(dbConn, "vm_hourly_stats", func() error {
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexQueries := []string{
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmid_time_idx ON vm_hourly_stats ("VmId","SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_name_time_idx ON vm_hourly_stats (lower("Name"),"SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`,
}
failedIndexes := 0
for _, q := range indexQueries {
if _, err := execLog(ctx, dbConn, q); err != nil {
failedIndexes++
}
}
if failedIndexes > 0 {
slog.Warn("vm_hourly_stats index ensure incomplete; continuing without retries until restart", "failed_indexes", failedIndexes)
}
return nil
})
}
// EnsureVmLifecycleCache creates an upsert cache for first/last seen VM info.
func EnsureVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vm_lifecycle_cache (
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"VmUuid" TEXT,
"Name" TEXT,
"Cluster" TEXT,
"FirstSeen" BIGINT,
"LastSeen" BIGINT,
"DeletedAt" BIGINT,
PRIMARY KEY ("Vcenter","VmId","VmUuid")
);`
return ensureOncePerDB(dbConn, "vm_lifecycle_cache", func() error {
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexQueries := []string{
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`,
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmid_idx ON vm_lifecycle_cache ("VmId")`,
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_name_idx ON vm_lifecycle_cache (lower("Name"))`,
}
failedIndexes := 0
for _, q := range indexQueries {
if _, err := execLog(ctx, dbConn, q); err != nil {
failedIndexes++
}
}
if failedIndexes > 0 {
slog.Warn("vm_lifecycle_cache index ensure incomplete; continuing without retries until restart", "failed_indexes", failedIndexes)
}
return nil
})
}
// UpsertVmLifecycleCache updates first/last seen info for a VM.
func UpsertVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID, vmUUID, name, cluster string, seen time.Time, creation sql.NullInt64) error {
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
bindType := sqlx.BindType(driver)
firstSeen := seen.Unix()
if creation.Valid && creation.Int64 > 0 && creation.Int64 < firstSeen {
firstSeen = creation.Int64
}
query := `
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen")
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
"Name"=EXCLUDED."Name",
"Cluster"=EXCLUDED."Cluster",
"LastSeen"=EXCLUDED."LastSeen",
"FirstSeen"=CASE
WHEN vm_lifecycle_cache."FirstSeen" IS NULL OR vm_lifecycle_cache."FirstSeen" = 0 THEN EXCLUDED."FirstSeen"
WHEN EXCLUDED."FirstSeen" IS NOT NULL AND EXCLUDED."FirstSeen" > 0 AND EXCLUDED."FirstSeen" < vm_lifecycle_cache."FirstSeen"
THEN EXCLUDED."FirstSeen"
ELSE vm_lifecycle_cache."FirstSeen"
END,
"DeletedAt"=NULL
`
query = sqlx.Rebind(bindType, query)
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, firstSeen, seen.Unix()}
_, err := dbConn.ExecContext(ctx, query, args...)
if err != nil {
slog.Warn("lifecycle upsert exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err)
}
return err
}
// MarkVmDeleted updates lifecycle cache with a deletion timestamp, carrying optional name/cluster.
func MarkVmDeletedWithDetails(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID, name, cluster string, deletedAt int64) error {
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
bindType := sqlx.BindType(driver)
query := `
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","DeletedAt","FirstSeen","LastSeen")
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
"DeletedAt"=CASE
WHEN vm_lifecycle_cache."DeletedAt" IS NULL OR vm_lifecycle_cache."DeletedAt"=0 OR EXCLUDED."DeletedAt"<vm_lifecycle_cache."DeletedAt"
THEN EXCLUDED."DeletedAt"
ELSE vm_lifecycle_cache."DeletedAt"
END,
"LastSeen"=COALESCE(vm_lifecycle_cache."LastSeen", EXCLUDED."LastSeen"),
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"),
"Name"=COALESCE(NULLIF(vm_lifecycle_cache."Name", ''), EXCLUDED."Name"),
"Cluster"=COALESCE(NULLIF(vm_lifecycle_cache."Cluster", ''), EXCLUDED."Cluster")
`
query = sqlx.Rebind(bindType, query)
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, deletedAt, deletedAt, deletedAt}
_, err := dbConn.ExecContext(ctx, query, args...)
if err != nil {
slog.Warn("lifecycle delete exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err)
}
return err
}
// MarkVmDeletedFromEvent updates lifecycle cache with a deletion timestamp from vCenter events.
// Event times should override snapshot-derived timestamps, even if later.
func MarkVmDeletedFromEvent(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID, name, cluster string, deletedAt int64) error {
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
bindType := sqlx.BindType(driver)
query := `
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","DeletedAt","FirstSeen","LastSeen")
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
"DeletedAt"=CASE
WHEN EXCLUDED."DeletedAt" IS NOT NULL AND EXCLUDED."DeletedAt" > 0 THEN EXCLUDED."DeletedAt"
ELSE vm_lifecycle_cache."DeletedAt"
END,
"LastSeen"=COALESCE(vm_lifecycle_cache."LastSeen", EXCLUDED."LastSeen"),
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"),
"Name"=COALESCE(NULLIF(vm_lifecycle_cache."Name", ''), EXCLUDED."Name"),
"Cluster"=COALESCE(NULLIF(vm_lifecycle_cache."Cluster", ''), EXCLUDED."Cluster")
`
query = sqlx.Rebind(bindType, query)
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, deletedAt, deletedAt, deletedAt}
_, err := dbConn.ExecContext(ctx, query, args...)
if err != nil {
slog.Warn("lifecycle delete event exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err)
}
return err
}
// MarkVmDeleted updates lifecycle cache with a deletion timestamp (legacy signature).
func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID string, deletedAt int64) error {
return MarkVmDeletedWithDetails(ctx, dbConn, vcenter, vmID, vmUUID, "", "", deletedAt)
}
// ApplyLifecycleDeletionToSummary updates DeletionTime values in a summary table from vm_lifecycle_cache.
func ApplyLifecycleDeletionToSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable string, start, end int64) (int64, error) {
if err := ValidateTableName(summaryTable); err != nil {
return 0, err
}
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return 0, err
}
query := fmt.Sprintf(`
UPDATE %[1]s
SET "DeletionTime" = (
SELECT MIN(l."DeletedAt")
FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0
AND l."DeletedAt" >= ? AND l."DeletedAt" < ?
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
)
WHERE EXISTS (
SELECT 1 FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."DeletedAt" IS NOT NULL AND l."DeletedAt" > 0
AND l."DeletedAt" >= ? AND l."DeletedAt" < ?
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
);
`, summaryTable)
bind := dbConn.Rebind(query)
res, err := execLog(ctx, dbConn, bind, start, end, start, end)
if err != nil {
return 0, err
}
rows, err := res.RowsAffected()
if err != nil {
return 0, err
}
return rows, nil
}
// ApplyLifecycleCreationToSummary updates CreationTime values in a summary table from vm_lifecycle_cache.
func ApplyLifecycleCreationToSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable string) (int64, error) {
if err := ValidateTableName(summaryTable); err != nil {
return 0, err
}
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
return 0, err
}
query := fmt.Sprintf(`
UPDATE %[1]s
SET "CreationTime" = (
SELECT MIN(l."FirstSeen")
FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."FirstSeen" IS NOT NULL AND l."FirstSeen" > 0
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
)
WHERE ("CreationTime" IS NULL OR "CreationTime" = 0)
AND EXISTS (
SELECT 1 FROM vm_lifecycle_cache l
WHERE l."Vcenter" = %[1]s."Vcenter"
AND l."FirstSeen" IS NOT NULL AND l."FirstSeen" > 0
AND (
(l."VmId" IS NOT NULL AND %[1]s."VmId" IS NOT NULL AND l."VmId" = %[1]s."VmId")
OR (l."VmUuid" IS NOT NULL AND %[1]s."VmUuid" IS NOT NULL AND l."VmUuid" = %[1]s."VmUuid")
OR (l."Name" IS NOT NULL AND %[1]s."Name" IS NOT NULL AND l."Name" = %[1]s."Name")
)
);
`, summaryTable)
bind := dbConn.Rebind(query)
res, err := execLog(ctx, dbConn, bind)
if err != nil {
return 0, err
}
rows, err := res.RowsAffected()
if err != nil {
return 0, err
}
return rows, nil
}
// UpsertVmDailyRollup writes/updates a daily rollup row.
func UpsertVmDailyRollup(ctx context.Context, dbConn *sqlx.DB, day int64, v VmDailyRollupRow) error {
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
query := `
INSERT INTO vm_daily_rollup (
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","SamplesPresent","TotalSamples",
"SumVcpu","SumRam","SumDisk","TinHits","BronzeHits","SilverHits","GoldHits",
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26)
ON CONFLICT ("Date","Vcenter","VmId","VmUuid") DO UPDATE SET
"CreationTime"=LEAST(COALESCE(vm_daily_rollup."CreationTime", $6), COALESCE($6, vm_daily_rollup."CreationTime")),
"DeletionTime"=CASE
WHEN vm_daily_rollup."DeletionTime" IS NULL OR vm_daily_rollup."DeletionTime"=0 THEN $7
WHEN $7 IS NOT NULL AND $7 > 0 AND $7 < vm_daily_rollup."DeletionTime" THEN $7
ELSE vm_daily_rollup."DeletionTime" END,
"SamplesPresent"=$8,
"TotalSamples"=$9,
"SumVcpu"=$10,
"SumRam"=$11,
"SumDisk"=$12,
"TinHits"=$13,
"BronzeHits"=$14,
"SilverHits"=$15,
"GoldHits"=$16,
"LastResourcePool"=$17,
"LastDatacenter"=$18,
"LastCluster"=$19,
"LastFolder"=$20,
"LastProvisionedDisk"=$21,
"LastVcpuCount"=$22,
"LastRamGB"=$23,
"IsTemplate"=$24,
"PoweredOn"=$25,
"SrmPlaceholder"=$26
`
args := []interface{}{
day, v.Vcenter, v.VmId, v.VmUuid, v.Name, v.CreationTime, v.DeletionTime, v.SamplesPresent, v.TotalSamples,
v.SumVcpu, v.SumRam, v.SumDisk, v.TinHits, v.BronzeHits, v.SilverHits, v.GoldHits,
v.LastResourcePool, v.LastDatacenter, v.LastCluster, v.LastFolder, v.LastProvisionedDisk, v.LastVcpuCount, v.LastRamGB, v.IsTemplate, v.PoweredOn, v.SrmPlaceholder,
}
if driver == "sqlite" {
query = `
INSERT OR REPLACE INTO vm_daily_rollup (
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","SamplesPresent","TotalSamples",
"SumVcpu","SumRam","SumDisk","TinHits","BronzeHits","SilverHits","GoldHits",
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
`
}
_, err := dbConn.ExecContext(ctx, query, args...)
return err
}
// VmDailyRollupRow represents the per-day cached aggregation.
type VmDailyRollupRow struct {
Vcenter string
VmId string
VmUuid string
Name string
CreationTime int64
DeletionTime int64
SamplesPresent int64
TotalSamples int64
SumVcpu float64
SumRam float64
SumDisk float64
TinHits int64
BronzeHits int64
SilverHits int64
GoldHits int64
LastResourcePool string
LastDatacenter string
LastCluster string
LastFolder string
LastProvisionedDisk float64
LastVcpuCount int64
LastRamGB int64
IsTemplate string
PoweredOn string
SrmPlaceholder string
}
// EnsureVmDailyRollup creates the per-day cache used by monthly aggregation.
func EnsureVmDailyRollup(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vm_daily_rollup (
"Date" BIGINT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"VmUuid" TEXT,
"Name" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"SamplesPresent" BIGINT,
"TotalSamples" BIGINT,
"SumVcpu" BIGINT,
"SumRam" BIGINT,
"SumDisk" REAL,
"TinHits" BIGINT,
"BronzeHits" BIGINT,
"SilverHits" BIGINT,
"GoldHits" BIGINT,
"LastResourcePool" TEXT,
"LastDatacenter" TEXT,
"LastCluster" TEXT,
"LastFolder" TEXT,
"LastProvisionedDisk" REAL,
"LastVcpuCount" BIGINT,
"LastRamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
PRIMARY KEY ("Date","Vcenter","VmId","VmUuid")
);`
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_date_idx ON vm_daily_rollup ("Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vcenter_date_idx ON vm_daily_rollup ("Vcenter","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vmid_date_idx ON vm_daily_rollup ("VmId","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vmuuid_date_idx ON vm_daily_rollup ("VmUuid","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_name_date_idx ON vm_daily_rollup (lower("Name"),"Date")`)
return nil
}
// EnsureVmIdentityTables creates the identity and rename audit tables.
func EnsureVmIdentityTables(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var identityDDL, renameDDL string
switch driver {
case "pgx", "postgres":
identityDDL = `
CREATE TABLE IF NOT EXISTS vm_identity (
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Cluster" TEXT,
"FirstSeen" BIGINT NOT NULL,
"LastSeen" BIGINT NOT NULL,
PRIMARY KEY ("VmId","VmUuid","Vcenter")
)`
renameDDL = `
CREATE TABLE IF NOT EXISTS vm_renames (
"RowId" BIGSERIAL PRIMARY KEY,
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"OldName" TEXT,
"NewName" TEXT,
"OldCluster" TEXT,
"NewCluster" TEXT,
"SnapshotTime" BIGINT NOT NULL
)`
default:
identityDDL = `
CREATE TABLE IF NOT EXISTS vm_identity (
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Cluster" TEXT,
"FirstSeen" BIGINT NOT NULL,
"LastSeen" BIGINT NOT NULL,
PRIMARY KEY ("VmId","VmUuid","Vcenter")
)`
renameDDL = `
CREATE TABLE IF NOT EXISTS vm_renames (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"VmId" TEXT NOT NULL,
"VmUuid" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"OldName" TEXT,
"NewName" TEXT,
"OldCluster" TEXT,
"NewCluster" TEXT,
"SnapshotTime" BIGINT NOT NULL
)`
}
if _, err := execLog(ctx, dbConn, identityDDL); err != nil {
return err
}
if _, err := execLog(ctx, dbConn, renameDDL); err != nil {
return err
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS vm_identity_vcenter_idx ON vm_identity ("Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_identity_uuid_idx ON vm_identity ("VmUuid","Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_identity_name_idx ON vm_identity ("Name","Vcenter")`,
`CREATE INDEX IF NOT EXISTS vm_renames_vcenter_idx ON vm_renames ("Vcenter","SnapshotTime")`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// EnsureVcenterReferenceCacheTables creates lookup caches for vCenter object references.
func EnsureVcenterReferenceCacheTables(ctx context.Context, dbConn *sqlx.DB) error {
return ensureOncePerDB(dbConn, "vcenter_reference_cache_tables", func() error {
ddls := []string{
`
CREATE TABLE IF NOT EXISTS vcenter_folder_cache (
"Vcenter" TEXT NOT NULL,
"FolderRef" TEXT NOT NULL,
"FolderPath" TEXT NOT NULL,
"LastSeen" BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY ("Vcenter","FolderRef")
)`,
`
CREATE TABLE IF NOT EXISTS vcenter_resource_pool_cache (
"Vcenter" TEXT NOT NULL,
"ResourcePoolRef" TEXT NOT NULL,
"ResourcePoolName" TEXT NOT NULL,
"LastSeen" BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY ("Vcenter","ResourcePoolRef")
)`,
`
CREATE TABLE IF NOT EXISTS vcenter_host_cache (
"Vcenter" TEXT NOT NULL,
"HostRef" TEXT NOT NULL,
"Cluster" TEXT,
"Datacenter" TEXT,
"LastSeen" BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY ("Vcenter","HostRef")
)`,
}
for _, ddl := range ddls {
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS vcenter_folder_cache_vcenter_idx ON vcenter_folder_cache ("Vcenter")`,
`CREATE INDEX IF NOT EXISTS vcenter_resource_pool_cache_vcenter_idx ON vcenter_resource_pool_cache ("Vcenter")`,
`CREATE INDEX IF NOT EXISTS vcenter_host_cache_vcenter_idx ON vcenter_host_cache ("Vcenter")`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
})
}
func LoadVcenterFolderCache(ctx context.Context, dbConn *sqlx.DB, vcenter string) (map[string]string, error) {
cache := make(map[string]string)
vcenter = strings.TrimSpace(vcenter)
if vcenter == "" {
return cache, nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return nil, err
}
query := dbConn.Rebind(`
SELECT "FolderRef","FolderPath"
FROM vcenter_folder_cache
WHERE "Vcenter" = ?
`)
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var ref string
var pathValue string
if err := rows.Scan(&ref, &pathValue); err != nil {
return nil, err
}
ref = strings.TrimSpace(ref)
pathValue = strings.TrimSpace(pathValue)
if ref == "" || pathValue == "" {
continue
}
cache[ref] = pathValue
}
return cache, rows.Err()
}
func LoadVcenterResourcePoolCache(ctx context.Context, dbConn *sqlx.DB, vcenter string) (map[string]string, error) {
cache := make(map[string]string)
vcenter = strings.TrimSpace(vcenter)
if vcenter == "" {
return cache, nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return nil, err
}
query := dbConn.Rebind(`
SELECT "ResourcePoolRef","ResourcePoolName"
FROM vcenter_resource_pool_cache
WHERE "Vcenter" = ?
`)
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var ref string
var name string
if err := rows.Scan(&ref, &name); err != nil {
return nil, err
}
ref = strings.TrimSpace(ref)
name = strings.TrimSpace(name)
if ref == "" || name == "" {
continue
}
cache[ref] = name
}
return cache, rows.Err()
}
func LoadVcenterHostCache(ctx context.Context, dbConn *sqlx.DB, vcenter string) (map[string]VcenterHostCacheEntry, error) {
cache := make(map[string]VcenterHostCacheEntry)
vcenter = strings.TrimSpace(vcenter)
if vcenter == "" {
return cache, nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return nil, err
}
query := dbConn.Rebind(`
SELECT "HostRef","Cluster","Datacenter"
FROM vcenter_host_cache
WHERE "Vcenter" = ?
`)
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var ref string
var cluster sql.NullString
var datacenter sql.NullString
if err := rows.Scan(&ref, &cluster, &datacenter); err != nil {
return nil, err
}
ref = strings.TrimSpace(ref)
if ref == "" {
continue
}
cache[ref] = VcenterHostCacheEntry{
Cluster: strings.TrimSpace(cluster.String),
Datacenter: strings.TrimSpace(datacenter.String),
}
}
return cache, rows.Err()
}
func UpsertVcenterFolderCache(ctx context.Context, dbConn *sqlx.DB, vcenter, folderRef, folderPath string, lastSeen int64) error {
vcenter = strings.TrimSpace(vcenter)
folderRef = strings.TrimSpace(folderRef)
folderPath = strings.TrimSpace(folderPath)
if vcenter == "" || folderRef == "" || folderPath == "" {
return nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return err
}
query := dbConn.Rebind(`
INSERT INTO vcenter_folder_cache ("Vcenter","FolderRef","FolderPath","LastSeen")
VALUES (?, ?, ?, ?)
ON CONFLICT ("Vcenter","FolderRef") DO UPDATE SET
"FolderPath" = EXCLUDED."FolderPath",
"LastSeen" = CASE
WHEN EXCLUDED."LastSeen" > vcenter_folder_cache."LastSeen" THEN EXCLUDED."LastSeen"
ELSE vcenter_folder_cache."LastSeen"
END
`)
_, err := execLog(ctx, dbConn, query, vcenter, folderRef, folderPath, lastSeen)
return err
}
func UpsertVcenterResourcePoolCache(ctx context.Context, dbConn *sqlx.DB, vcenter, resourcePoolRef, resourcePoolName string, lastSeen int64) error {
vcenter = strings.TrimSpace(vcenter)
resourcePoolRef = strings.TrimSpace(resourcePoolRef)
resourcePoolName = strings.TrimSpace(resourcePoolName)
if vcenter == "" || resourcePoolRef == "" || resourcePoolName == "" {
return nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return err
}
query := dbConn.Rebind(`
INSERT INTO vcenter_resource_pool_cache ("Vcenter","ResourcePoolRef","ResourcePoolName","LastSeen")
VALUES (?, ?, ?, ?)
ON CONFLICT ("Vcenter","ResourcePoolRef") DO UPDATE SET
"ResourcePoolName" = EXCLUDED."ResourcePoolName",
"LastSeen" = CASE
WHEN EXCLUDED."LastSeen" > vcenter_resource_pool_cache."LastSeen" THEN EXCLUDED."LastSeen"
ELSE vcenter_resource_pool_cache."LastSeen"
END
`)
_, err := execLog(ctx, dbConn, query, vcenter, resourcePoolRef, resourcePoolName, lastSeen)
return err
}
func UpsertVcenterHostCache(ctx context.Context, dbConn *sqlx.DB, vcenter, hostRef, cluster, datacenter string, lastSeen int64) error {
vcenter = strings.TrimSpace(vcenter)
hostRef = strings.TrimSpace(hostRef)
cluster = strings.TrimSpace(cluster)
datacenter = strings.TrimSpace(datacenter)
if vcenter == "" || hostRef == "" {
return nil
}
if cluster == "" && datacenter == "" {
return nil
}
if err := EnsureVcenterReferenceCacheTables(ctx, dbConn); err != nil {
return err
}
query := dbConn.Rebind(`
INSERT INTO vcenter_host_cache ("Vcenter","HostRef","Cluster","Datacenter","LastSeen")
VALUES (?, ?, ?, ?, ?)
ON CONFLICT ("Vcenter","HostRef") DO UPDATE SET
"Cluster" = COALESCE(NULLIF(EXCLUDED."Cluster", ''), vcenter_host_cache."Cluster"),
"Datacenter" = COALESCE(NULLIF(EXCLUDED."Datacenter", ''), vcenter_host_cache."Datacenter"),
"LastSeen" = CASE
WHEN EXCLUDED."LastSeen" > vcenter_host_cache."LastSeen" THEN EXCLUDED."LastSeen"
ELSE vcenter_host_cache."LastSeen"
END
`)
_, err := execLog(ctx, dbConn, query, vcenter, hostRef, cluster, datacenter, lastSeen)
return err
}
// UpsertVmIdentity updates/creates the identity record and records rename events.
func UpsertVmIdentity(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmId, vmUuid sql.NullString, name string, cluster sql.NullString, snapshotTime time.Time) error {
keyVmID := strings.TrimSpace(vmId.String)
keyUuid := strings.TrimSpace(vmUuid.String)
if keyVmID == "" || keyUuid == "" || strings.TrimSpace(vcenter) == "" {
return nil
}
if err := EnsureVmIdentityTables(ctx, dbConn); err != nil {
return err
}
type identityRow struct {
Name string `db:"Name"`
Cluster sql.NullString `db:"Cluster"`
FirstSeen sql.NullInt64 `db:"FirstSeen"`
LastSeen sql.NullInt64 `db:"LastSeen"`
}
var existing identityRow
err := getLog(ctx, dbConn, &existing, `
SELECT "Name","Cluster","FirstSeen","LastSeen"
FROM vm_identity
WHERE "Vcenter" = $1 AND "VmId" = $2 AND "VmUuid" = $3
`, vcenter, keyVmID, keyUuid)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "no rows") {
_, err = execLog(ctx, dbConn, `
INSERT INTO vm_identity ("VmId","VmUuid","Vcenter","Name","Cluster","FirstSeen","LastSeen")
VALUES ($1,$2,$3,$4,$5,$6,$6)
`, keyVmID, keyUuid, vcenter, name, nullString(cluster), snapshotTime.Unix())
return err
}
return err
}
renamed := !strings.EqualFold(existing.Name, name) || !strings.EqualFold(strings.TrimSpace(existing.Cluster.String), strings.TrimSpace(cluster.String))
if renamed {
_, _ = execLog(ctx, dbConn, `
INSERT INTO vm_renames ("VmId","VmUuid","Vcenter","OldName","NewName","OldCluster","NewCluster","SnapshotTime")
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
`, keyVmID, keyUuid, vcenter, existing.Name, name, existing.Cluster.String, cluster.String, snapshotTime.Unix())
}
_, err = execLog(ctx, dbConn, `
UPDATE vm_identity
SET "Name" = $1, "Cluster" = $2, "LastSeen" = $3
WHERE "Vcenter" = $4 AND "VmId" = $5 AND "VmUuid" = $6
`, name, nullString(cluster), snapshotTime.Unix(), vcenter, keyVmID, keyUuid)
return err
}
func nullString(val sql.NullString) interface{} {
if val.Valid {
return val.String
}
return nil
}
// EnsureVcenterTotalsTable creates the vcenter_totals table if missing.
func EnsureVcenterTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_totals (
"RowId" BIGSERIAL PRIMARY KEY,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
default:
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_totals (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS vcenter_totals_vc_time_idx ON vcenter_totals ("Vcenter","SnapshotTime" DESC)`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// EnsureVcenterLatestTotalsTable creates a compact table with one latest totals row per vCenter.
func EnsureVcenterLatestTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_latest_totals (
"Vcenter" TEXT PRIMARY KEY,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
default:
ddl = `
CREATE TABLE IF NOT EXISTS vcenter_latest_totals (
"Vcenter" TEXT PRIMARY KEY,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL
);`
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
return nil
}
// UpsertVcenterLatestTotals stores the latest totals per vCenter.
func UpsertVcenterLatestTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime int64, vmCount, vcpuTotal, ramTotal int64) error {
if strings.TrimSpace(vcenter) == "" {
return fmt.Errorf("vcenter is empty")
}
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
return err
}
_, err := execLog(ctx, dbConn, `
INSERT INTO vcenter_latest_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
VALUES ($1,$2,$3,$4,$5)
ON CONFLICT ("Vcenter") DO UPDATE SET
"SnapshotTime" = EXCLUDED."SnapshotTime",
"VmCount" = EXCLUDED."VmCount",
"VcpuTotal" = EXCLUDED."VcpuTotal",
"RamTotalGB" = EXCLUDED."RamTotalGB"
WHERE EXCLUDED."SnapshotTime" >= vcenter_latest_totals."SnapshotTime"
`, vcenter, snapshotTime, vmCount, vcpuTotal, ramTotal)
return err
}
// EnsureVcenterAggregateTotalsTable creates a compact cache for hourly/daily (and optional monthly) totals.
func EnsureVcenterAggregateTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS vcenter_aggregate_totals (
"SnapshotType" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"VmCount" BIGINT NOT NULL,
"VcpuTotal" BIGINT NOT NULL,
"RamTotalGB" BIGINT NOT NULL,
PRIMARY KEY ("SnapshotType","Vcenter","SnapshotTime")
);`
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vcenter_aggregate_totals_vc_type_time_idx ON vcenter_aggregate_totals ("Vcenter","SnapshotType","SnapshotTime" DESC)`)
return nil
}
// UpsertVcenterAggregateTotal stores per-vCenter totals for a snapshot type/time.
func UpsertVcenterAggregateTotal(ctx context.Context, dbConn *sqlx.DB, snapshotType, vcenter string, snapshotTime int64, vmCount, vcpuTotal, ramTotal int64) error {
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if snapshotType == "" {
return fmt.Errorf("snapshot type is empty")
}
if strings.TrimSpace(vcenter) == "" {
return fmt.Errorf("vcenter is empty")
}
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
return err
}
_, err := execLog(ctx, dbConn, `
INSERT INTO vcenter_aggregate_totals ("SnapshotType","Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
VALUES ($1,$2,$3,$4,$5,$6)
ON CONFLICT ("SnapshotType","Vcenter","SnapshotTime") DO UPDATE SET
"VmCount" = EXCLUDED."VmCount",
"VcpuTotal" = EXCLUDED."VcpuTotal",
"RamTotalGB" = EXCLUDED."RamTotalGB"
`, snapshotType, vcenter, snapshotTime, vmCount, vcpuTotal, ramTotal)
return err
}
// ReplaceVcenterAggregateTotalsFromSummary recomputes one snapshot's per-vCenter totals from a summary table.
func ReplaceVcenterAggregateTotalsFromSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable, snapshotType string, snapshotTime int64) (int, error) {
if err := ValidateTableName(summaryTable); err != nil {
return 0, err
}
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if snapshotType == "" {
return 0, fmt.Errorf("snapshot type is empty")
}
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
return 0, err
}
if _, err := execLog(ctx, dbConn, `
DELETE FROM vcenter_aggregate_totals
WHERE "SnapshotType" = $1 AND "SnapshotTime" = $2
`, snapshotType, snapshotTime); err != nil {
return 0, err
}
query := fmt.Sprintf(`
SELECT
"Vcenter" AS vcenter,
COUNT(1) AS vm_count,
CAST(COALESCE(SUM(COALESCE("AvgVcpuCount","VcpuCount")),0) AS BIGINT) AS vcpu_total,
CAST(COALESCE(SUM(COALESCE("AvgRamGB","RamGB")),0) AS BIGINT) AS ram_total
FROM %s
GROUP BY "Vcenter"
`, summaryTable)
var rows []struct {
Vcenter string `db:"vcenter"`
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotal int64 `db:"ram_total"`
}
if err := selectLog(ctx, dbConn, &rows, query); err != nil {
return 0, err
}
upserted := 0
for _, row := range rows {
if err := UpsertVcenterAggregateTotal(ctx, dbConn, snapshotType, row.Vcenter, snapshotTime, row.VmCount, row.VcpuTotal, row.RamTotal); err != nil {
return upserted, err
}
upserted++
}
return upserted, nil
}
// SyncVcenterAggregateTotalsFromRegistry refreshes cached totals for summary snapshots listed in snapshot_registry.
func SyncVcenterAggregateTotalsFromRegistry(ctx context.Context, dbConn *sqlx.DB, snapshotType string) (int, int, error) {
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if snapshotType == "" {
return 0, 0, fmt.Errorf("snapshot type is empty")
}
if snapshotType != "daily" && snapshotType != "monthly" {
return 0, 0, fmt.Errorf("unsupported snapshot type for summary cache sync: %s", snapshotType)
}
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
return 0, 0, err
}
query := dbConn.Rebind(`
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = ?
ORDER BY snapshot_time
`)
var snapshots []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &snapshots, query, snapshotType); err != nil {
return 0, 0, err
}
snapshotsRefreshed := 0
rowsUpserted := 0
failures := 0
for _, snapshot := range snapshots {
if err := ValidateTableName(snapshot.TableName); err != nil {
failures++
slog.Warn("skipping invalid summary table in snapshot registry", "snapshot_type", snapshotType, "table", snapshot.TableName, "error", err)
continue
}
upserted, err := ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, snapshot.TableName, snapshotType, snapshot.SnapshotTime)
if err != nil {
failures++
slog.Warn("failed to refresh vcenter aggregate cache from summary table", "snapshot_type", snapshotType, "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime, "error", err)
continue
}
snapshotsRefreshed++
rowsUpserted += upserted
}
if failures > 0 {
return snapshotsRefreshed, rowsUpserted, fmt.Errorf("vcenter aggregate cache sync finished with %d failed snapshot(s)", failures)
}
return snapshotsRefreshed, rowsUpserted, nil
}
// ListVcenterAggregateTotals lists cached totals by type.
func ListVcenterAggregateTotals(ctx context.Context, dbConn *sqlx.DB, vcenter, snapshotType string, limit int) ([]VcenterTotalRow, error) {
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
if limit <= 0 {
limit = 200
}
rows := make([]VcenterTotalRow, 0, limit)
query := `
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
FROM vcenter_aggregate_totals
WHERE "Vcenter" = $1 AND "SnapshotType" = $2
ORDER BY "SnapshotTime" DESC
LIMIT $3
`
if err := selectLog(ctx, dbConn, &rows, query, vcenter, snapshotType, limit); err != nil {
return nil, err
}
return rows, nil
}
// ListVcenterAggregateTotalsSince lists cached totals by type from a lower-bound timestamp.
func ListVcenterAggregateTotalsSince(ctx context.Context, dbConn *sqlx.DB, vcenter, snapshotType string, since time.Time) ([]VcenterTotalRow, error) {
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
rows := make([]VcenterTotalRow, 0, 256)
query := `
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
FROM vcenter_aggregate_totals
WHERE "Vcenter" = $1 AND "SnapshotType" = $2 AND "SnapshotTime" >= $3
ORDER BY "SnapshotTime" DESC
`
if err := selectLog(ctx, dbConn, &rows, query, vcenter, snapshotType, since.Unix()); err != nil {
return nil, err
}
return rows, nil
}
// SyncVcenterLatestTotalsFromHistory backfills latest totals from existing vcenter_totals history.
func SyncVcenterLatestTotalsFromHistory(ctx context.Context, dbConn *sqlx.DB) (int, error) {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return 0, err
}
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
return 0, err
}
var rows []struct {
Vcenter string `db:"Vcenter"`
SnapshotTime int64 `db:"SnapshotTime"`
VmCount int64 `db:"VmCount"`
VcpuTotal int64 `db:"VcpuTotal"`
RamTotalGB int64 `db:"RamTotalGB"`
}
if err := selectLog(ctx, dbConn, &rows, `
SELECT t."Vcenter", t."SnapshotTime", t."VmCount", t."VcpuTotal", t."RamTotalGB"
FROM vcenter_totals t
JOIN (
SELECT "Vcenter", MAX("SnapshotTime") AS max_snapshot_time
FROM vcenter_totals
GROUP BY "Vcenter"
) latest
ON latest."Vcenter" = t."Vcenter"
AND latest.max_snapshot_time = t."SnapshotTime"
`); err != nil {
return 0, err
}
upserted := 0
for _, row := range rows {
if err := UpsertVcenterLatestTotals(ctx, dbConn, row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
return upserted, err
}
upserted++
}
return upserted, nil
}
// InsertVcenterTotals records totals for a vcenter at a snapshot time.
func InsertVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, vmCount, vcpuTotal, ramTotal int64) error {
if strings.TrimSpace(vcenter) == "" {
return fmt.Errorf("vcenter is empty")
}
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return err
}
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
return err
}
_, err := execLog(ctx, dbConn, `
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
VALUES ($1,$2,$3,$4,$5)
`, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal)
if err != nil {
return err
}
if err := UpsertVcenterLatestTotals(ctx, dbConn, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal); err != nil {
return err
}
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal); err != nil {
slog.Warn("failed to upsert vcenter_aggregate_totals", "snapshot_type", "hourly", "vcenter", vcenter, "snapshot_time", snapshotTime.Unix(), "error", err)
}
return nil
}
// ListVcenters returns distinct vcenter URLs tracked.
func ListVcenters(ctx context.Context, dbConn *sqlx.DB) ([]string, error) {
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err == nil {
rows, err := dbConn.QueryxContext(ctx, `SELECT "Vcenter" FROM vcenter_latest_totals ORDER BY "Vcenter"`)
if err == nil {
defer rows.Close()
out := make([]string, 0, 32)
for rows.Next() {
var v string
if err := rows.Scan(&v); err != nil {
return nil, err
}
out = append(out, v)
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(out) > 0 {
return out, nil
}
// Older installs may have vcenter_totals populated but no latest cache yet.
if _, err := SyncVcenterLatestTotalsFromHistory(ctx, dbConn); err == nil {
refreshed := make([]string, 0, 32)
if err := selectLog(ctx, dbConn, &refreshed, `SELECT "Vcenter" FROM vcenter_latest_totals ORDER BY "Vcenter"`); err == nil && len(refreshed) > 0 {
return refreshed, nil
}
}
}
}
// Fallback for older DBs before vcenter_latest_totals gets populated.
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
rows, err := dbConn.QueryxContext(ctx, `SELECT DISTINCT "Vcenter" FROM vcenter_totals ORDER BY "Vcenter"`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []string
for rows.Next() {
var v string
if err := rows.Scan(&v); err != nil {
return nil, err
}
out = append(out, v)
}
return out, rows.Err()
}
// VcenterTotalRow holds per-snapshot totals for a vcenter.
type VcenterTotalRow struct {
SnapshotTime int64 `db:"SnapshotTime"`
Vcenter string `db:"Vcenter"`
VmCount int64 `db:"VmCount"`
VcpuTotal int64 `db:"VcpuTotal"`
RamTotalGB int64 `db:"RamTotalGB"`
}
// ListVcenterTotals lists totals for a vcenter sorted by snapshot_time desc, limited.
func ListVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, limit int) ([]VcenterTotalRow, error) {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
if limit <= 0 {
limit = 200
}
rows := make([]VcenterTotalRow, 0, limit)
query := `
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
FROM vcenter_totals
WHERE "Vcenter" = $1
ORDER BY "SnapshotTime" DESC
LIMIT $2`
if err := selectLog(ctx, dbConn, &rows, query, vcenter, limit); err != nil {
return nil, err
}
return rows, nil
}
// ListVcenterHourlyTotalsSince returns hourly totals for a vCenter from a minimum snapshot time.
func ListVcenterHourlyTotalsSince(ctx context.Context, dbConn *sqlx.DB, vcenter string, since time.Time) ([]VcenterTotalRow, error) {
cachedRows, cacheErr := ListVcenterAggregateTotalsSince(ctx, dbConn, vcenter, "hourly", since)
if cacheErr == nil && len(cachedRows) > 0 {
return cachedRows, nil
}
if cacheErr != nil {
slog.Warn("failed to read hourly totals cache", "vcenter", vcenter, "since", since.Unix(), "error", cacheErr)
}
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return nil, err
}
rows := make([]VcenterTotalRow, 0, 256)
query := `
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
FROM vcenter_totals
WHERE "Vcenter" = $1
AND "SnapshotTime" >= $2
ORDER BY "SnapshotTime" DESC`
if err := selectLog(ctx, dbConn, &rows, query, vcenter, since.Unix()); err != nil {
return nil, err
}
for _, row := range rows {
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
slog.Warn("failed to warm hourly totals cache", "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
break
}
}
return rows, nil
}
// ListVcenterTotalsByType returns totals for a vcenter for the requested snapshot type (hourly, daily, monthly).
// Prefer vcenter_aggregate_totals cache and fallback to source tables when cache is empty.
func ListVcenterTotalsByType(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotType string, limit int) ([]VcenterTotalRow, error) {
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
if snapshotType == "" {
snapshotType = "hourly"
}
cachedRows, cacheErr := ListVcenterAggregateTotals(ctx, dbConn, vcenter, snapshotType, limit)
if cacheErr == nil && len(cachedRows) > 0 {
return cachedRows, nil
}
if cacheErr != nil {
slog.Warn("failed to read vcenter aggregate totals cache", "snapshot_type", snapshotType, "vcenter", vcenter, "error", cacheErr)
}
if snapshotType == "hourly" {
rows, err := ListVcenterTotals(ctx, dbConn, vcenter, limit)
if err != nil {
return nil, err
}
for _, row := range rows {
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
slog.Warn("failed to warm hourly totals cache", "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
break
}
}
return rows, nil
}
if limit <= 0 {
limit = 200
}
driver := strings.ToLower(dbConn.DriverName())
query := `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = $1
ORDER BY snapshot_time DESC
LIMIT $2
`
if driver == "sqlite" {
query = strings.ReplaceAll(query, "$1", "?")
query = strings.ReplaceAll(query, "$2", "?")
}
var regRows []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &regRows, query, snapshotType, limit); err != nil {
return nil, err
}
out := make([]VcenterTotalRow, 0, len(regRows))
for _, r := range regRows {
if err := ValidateTableName(r.TableName); err != nil {
continue
}
agg, err := aggregateSummaryTotals(ctx, dbConn, r.TableName, vcenter)
if err != nil {
continue
}
out = append(out, VcenterTotalRow{
SnapshotTime: r.SnapshotTime,
Vcenter: vcenter,
VmCount: agg.VmCount,
VcpuTotal: agg.VcpuTotal,
RamTotalGB: agg.RamTotalGB,
})
}
for _, row := range out {
if err := UpsertVcenterAggregateTotal(ctx, dbConn, snapshotType, row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
slog.Warn("failed to warm vcenter aggregate totals cache", "snapshot_type", snapshotType, "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
break
}
}
return out, nil
}
type summaryAgg struct {
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotalGB int64 `db:"ram_total"`
}
// aggregateSummaryTotals computes totals for a single summary table (daily/monthly) for a given vcenter.
func aggregateSummaryTotals(ctx context.Context, dbConn *sqlx.DB, tableName string, vcenter string) (summaryAgg, error) {
if _, err := SafeTableName(tableName); err != nil {
return summaryAgg{}, err
}
driver := strings.ToLower(dbConn.DriverName())
query := fmt.Sprintf(`
SELECT
COUNT(1) AS vm_count,
COALESCE(SUM(COALESCE("AvgVcpuCount","VcpuCount")),0) AS vcpu_total,
COALESCE(SUM(COALESCE("AvgRamGB","RamGB")),0) AS ram_total
FROM %s
WHERE "Vcenter" = $1
`, tableName)
if driver == "sqlite" {
query = strings.ReplaceAll(query, "$1", "?")
}
var agg summaryAgg
if err := getLog(ctx, dbConn, &agg, query, vcenter); err != nil {
return summaryAgg{}, err
}
return agg, nil
}
// VmTraceRow holds snapshot data for a single VM across tables.
type VmTraceRow struct {
SnapshotTime int64 `db:"SnapshotTime"`
Name string `db:"Name"`
Vcenter string `db:"Vcenter"`
VmId string `db:"VmId"`
VmUuid string `db:"VmUuid"`
ResourcePool string `db:"ResourcePool"`
VcpuCount int64 `db:"VcpuCount"`
RamGB int64 `db:"RamGB"`
ProvisionedDisk float64 `db:"ProvisionedDisk"`
CreationTime sql.NullInt64 `db:"CreationTime"`
DeletionTime sql.NullInt64 `db:"DeletionTime"`
}
// VmLifecycle captures observed lifecycle times from hourly snapshots.
type VmLifecycle struct {
CreationTime int64
CreationApprox bool
FirstSeen int64
LastSeen int64
DeletionTime int64
}
// VmLifecycleSourceDiagnostics captures how one lifecycle source contributed to the final lifecycle decision.
type VmLifecycleSourceDiagnostics struct {
Source string
Used bool
Error string
MatchedRows int64
FirstSeen int64
LastSeen int64
CreationTime int64
CreationApprox bool
DeletionRows int64
DeletionMin int64
DeletionMax int64
SelectedDeletionTime int64
StaleDeletionIgnored bool
}
// VmLifecycleDiagnostics captures per-source lifecycle diagnostics for a VM lookup.
type VmLifecycleDiagnostics struct {
LookupField string
LookupValue string
HourlyCache VmLifecycleSourceDiagnostics
LifecycleCache VmLifecycleSourceDiagnostics
SnapshotFallback VmLifecycleSourceDiagnostics
FinalLifecycle VmLifecycle
}
func vmLookupPredicate(vmID, vmUUID, name string) (string, []interface{}, bool) {
vmID = strings.TrimSpace(vmID)
vmUUID = strings.TrimSpace(vmUUID)
name = strings.TrimSpace(name)
switch {
case vmID != "":
return `"VmId" = ?`, []interface{}{vmID}, true
case vmUUID != "":
return `"VmUuid" = ?`, []interface{}{vmUUID}, true
case name != "":
return `lower("Name") = ?`, []interface{}{strings.ToLower(name)}, true
default:
return "", nil, false
}
}
func vmLookupDescriptor(vmID, vmUUID, name string) (string, string) {
vmID = strings.TrimSpace(vmID)
vmUUID = strings.TrimSpace(vmUUID)
name = strings.TrimSpace(name)
switch {
case vmID != "":
return "vm_id", vmID
case vmUUID != "":
return "vm_uuid", vmUUID
case name != "":
return "name", name
default:
return "", ""
}
}
// FetchVmTrace returns combined hourly snapshot records for a VM (by id/uuid/name) ordered by snapshot time.
// It prefers the shared vm_hourly_stats history table and falls back to per-snapshot tables.
func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
if TableExists(ctx, dbConn, "vm_hourly_stats") {
rows, err := fetchVmTraceFromHourlyCache(ctx, dbConn, vmID, vmUUID, name)
if err != nil {
slog.Warn("vm trace cache query failed; falling back to hourly tables", "error", err)
} else if len(rows) > 0 {
slog.Debug("vm trace loaded from hourly cache", "row_count", len(rows))
return rows, nil
}
}
return fetchVmTraceFromSnapshotTables(ctx, dbConn, vmID, vmUUID, name)
}
// FetchVmTraceDaily returns one row per day for a VM, preferring vm_daily_rollup cache.
func FetchVmTraceDaily(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
if TableExists(ctx, dbConn, "vm_daily_rollup") {
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {
slog.Warn("failed to ensure vm_daily_rollup indexes", "error", err)
}
rows, err := fetchVmTraceDailyFromRollup(ctx, dbConn, vmID, vmUUID, name)
if err != nil {
slog.Warn("vm daily trace cache query failed; falling back to daily summary tables", "error", err)
} else if len(rows) > 0 {
slog.Debug("vm daily trace loaded from daily rollup cache", "row_count", len(rows))
return rows, nil
}
}
return fetchVmTraceDailyFromSummaryTables(ctx, dbConn, vmID, vmUUID, name)
}
func fetchVmTraceDailyFromRollup(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
query := fmt.Sprintf(`
SELECT "Date" AS "SnapshotTime",
COALESCE("Name",'') AS "Name",
COALESCE("Vcenter",'') AS "Vcenter",
COALESCE("VmId",'') AS "VmId",
COALESCE("VmUuid",'') AS "VmUuid",
COALESCE("LastResourcePool",'') AS "ResourcePool",
CAST(CASE
WHEN COALESCE("SamplesPresent",0) > 0 THEN ROUND(1.0 * COALESCE("SumVcpu",0) / "SamplesPresent")
ELSE COALESCE("LastVcpuCount",0)
END AS BIGINT) AS "VcpuCount",
CAST(CASE
WHEN COALESCE("SamplesPresent",0) > 0 THEN ROUND(1.0 * COALESCE("SumRam",0) / "SamplesPresent")
ELSE COALESCE("LastRamGB",0)
END AS BIGINT) AS "RamGB",
COALESCE("LastProvisionedDisk",0) AS "ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM vm_daily_rollup
WHERE %s
ORDER BY "Date"
`, matchWhere)
query = dbConn.Rebind(query)
rows := make([]VmTraceRow, 0, 128)
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
return rows, nil
}
func fetchVmTraceDailyFromSummaryTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
if !TableExists(ctx, dbConn, "snapshot_registry") {
return nil, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &tables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'daily'
ORDER BY snapshot_time
`); err != nil {
return nil, err
}
if len(tables) == 0 {
return nil, nil
}
rows := make([]VmTraceRow, 0, len(tables))
for _, t := range tables {
if err := ValidateTableName(t.TableName); err != nil {
continue
}
query := fmt.Sprintf(`
SELECT %d AS "SnapshotTime",
COALESCE("Name",'') AS "Name",
COALESCE("Vcenter",'') AS "Vcenter",
COALESCE("VmId",'') AS "VmId",
COALESCE("VmUuid",'') AS "VmUuid",
COALESCE("ResourcePool",'') AS "ResourcePool",
CAST(COALESCE("AvgVcpuCount","VcpuCount",0) AS BIGINT) AS "VcpuCount",
CAST(COALESCE("AvgRamGB","RamGB",0) AS BIGINT) AS "RamGB",
COALESCE("AvgProvisionedDisk","ProvisionedDisk",0) AS "ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM %s
WHERE %s
`, t.SnapshotTime, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var tmp []VmTraceRow
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
continue
}
rows = append(rows, tmp...)
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].SnapshotTime < rows[j].SnapshotTime
})
return rows, nil
}
func fetchVmTraceFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
query := fmt.Sprintf(`
SELECT "SnapshotTime","Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM vm_hourly_stats
WHERE %s
ORDER BY "SnapshotTime"
`, matchWhere)
query = dbConn.Rebind(query)
var rows []VmTraceRow
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
return rows, nil
}
func fetchVmTraceFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &tables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time
`); err != nil {
return nil, err
}
if len(tables) == 0 {
return nil, nil
}
rows := make([]VmTraceRow, 0, len(tables))
slog.Debug("vm trace scanning tables", "table_count", len(tables), "vm_id", vmID, "vm_uuid", vmUUID, "name", name)
for _, t := range tables {
if err := ValidateTableName(t.TableName); err != nil {
slog.Warn("vm trace skipping table (invalid name)", "table", t.TableName, "error", err)
continue
}
query := fmt.Sprintf(`
SELECT %d AS "SnapshotTime",
"Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM %s
WHERE %s
`, t.SnapshotTime, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var tmp []VmTraceRow
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
slog.Warn("vm trace query failed for table", "table", t.TableName, "error", err)
continue
}
slog.Debug("vm trace table rows", "table", t.TableName, "snapshot_time", t.SnapshotTime, "rows", len(tmp))
rows = append(rows, tmp...)
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].SnapshotTime < rows[j].SnapshotTime
})
slog.Info("vm trace combined rows", "total_rows", len(rows))
return rows, nil
}
// FetchVmLifecycle walks VM history data to determine lifecycle bounds for a VM.
// It prefers vm_hourly_stats + vm_lifecycle_cache and falls back to per-snapshot table probes.
func FetchVmLifecycle(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, error) {
lifecycle, _, err := FetchVmLifecycleWithDiagnostics(ctx, dbConn, vmID, vmUUID, name)
return lifecycle, err
}
// FetchVmLifecycleWithDiagnostics returns lifecycle details plus source-level diagnostics.
func FetchVmLifecycleWithDiagnostics(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, VmLifecycleDiagnostics, error) {
lookupField, lookupValue := vmLookupDescriptor(vmID, vmUUID, name)
diagnostics := VmLifecycleDiagnostics{
LookupField: lookupField,
LookupValue: lookupValue,
HourlyCache: VmLifecycleSourceDiagnostics{
Source: "vm_hourly_stats",
},
LifecycleCache: VmLifecycleSourceDiagnostics{
Source: "vm_lifecycle_cache",
},
SnapshotFallback: VmLifecycleSourceDiagnostics{
Source: "snapshot_registry/hourly_tables",
},
}
if TableExists(ctx, dbConn, "vm_hourly_stats") {
lifecycle, sourceDiag, found, err := fetchVmLifecycleFromHourlyCache(ctx, dbConn, vmID, vmUUID, name)
diagnostics.HourlyCache = sourceDiag
if err != nil {
diagnostics.HourlyCache.Error = err.Error()
slog.Warn("vm lifecycle cache query failed; falling back to hourly tables", "error", err)
} else if found {
diagnostics.HourlyCache.Used = true
if TableExists(ctx, dbConn, "vm_lifecycle_cache") {
cached, cachedDiag, cachedFound, cacheErr := fetchVmLifecycleFromLifecycleCache(ctx, dbConn, vmID, vmUUID, name)
diagnostics.LifecycleCache = cachedDiag
if cacheErr != nil {
diagnostics.LifecycleCache.Error = cacheErr.Error()
slog.Warn("vm lifecycle cache lookup failed", "error", cacheErr)
} else if cachedFound {
diagnostics.LifecycleCache.Used = true
lifecycle = mergeVmLifecycle(lifecycle, cached)
}
}
diagnostics.FinalLifecycle = lifecycle
return lifecycle, diagnostics, nil
}
}
lifecycle, fallbackDiag, err := fetchVmLifecycleFromSnapshotTables(ctx, dbConn, vmID, vmUUID, name)
diagnostics.SnapshotFallback = fallbackDiag
if err != nil {
diagnostics.SnapshotFallback.Error = err.Error()
return lifecycle, diagnostics, err
}
diagnostics.SnapshotFallback.Used = true
diagnostics.FinalLifecycle = lifecycle
return lifecycle, diagnostics, nil
}
func fetchVmLifecycleFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, VmLifecycleSourceDiagnostics, bool, error) {
diag := VmLifecycleSourceDiagnostics{Source: "vm_hourly_stats"}
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return VmLifecycle{}, diag, false, nil
}
var row struct {
Rows int64 `db:"rows"`
Creation sql.NullInt64 `db:"creation_time"`
FirstSeen sql.NullInt64 `db:"first_seen"`
LastSeen sql.NullInt64 `db:"last_seen"`
DeletionRows int64 `db:"deletion_rows"`
DeletionMin sql.NullInt64 `db:"deletion_min"`
DeletionMax sql.NullInt64 `db:"deletion_max"`
}
query := `
SELECT
COUNT(1) AS rows,
MIN(NULLIF("CreationTime",0)) AS creation_time,
MIN("SnapshotTime") AS first_seen,
MAX("SnapshotTime") AS last_seen,
SUM(CASE WHEN COALESCE("DeletionTime",0) > 0 THEN 1 ELSE 0 END) AS deletion_rows,
MIN(NULLIF("DeletionTime",0)) AS deletion_min,
MAX(NULLIF("DeletionTime",0)) AS deletion_max
FROM vm_hourly_stats
WHERE ` + matchWhere + `
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, args...); err != nil {
diag.Error = err.Error()
return VmLifecycle{}, diag, false, err
}
diag.MatchedRows = row.Rows
diag.FirstSeen = row.FirstSeen.Int64
diag.LastSeen = row.LastSeen.Int64
diag.DeletionRows = row.DeletionRows
if row.DeletionMin.Valid {
diag.DeletionMin = row.DeletionMin.Int64
}
if row.DeletionMax.Valid {
diag.DeletionMax = row.DeletionMax.Int64
}
if row.Rows == 0 {
return VmLifecycle{}, diag, false, nil
}
lifecycle := VmLifecycle{
FirstSeen: row.FirstSeen.Int64,
LastSeen: row.LastSeen.Int64,
}
if row.Creation.Valid && row.Creation.Int64 > 0 {
lifecycle.CreationTime = row.Creation.Int64
lifecycle.CreationApprox = false
} else if row.FirstSeen.Valid && row.FirstSeen.Int64 > 0 {
lifecycle.CreationTime = row.FirstSeen.Int64
lifecycle.CreationApprox = true
}
diag.CreationTime = lifecycle.CreationTime
diag.CreationApprox = lifecycle.CreationApprox
if row.DeletionMax.Valid && row.DeletionMax.Int64 > 0 {
if row.LastSeen.Valid && row.LastSeen.Int64 > 0 && row.DeletionMax.Int64 <= row.LastSeen.Int64 {
diag.StaleDeletionIgnored = true
slog.Warn("ignoring stale VM deletion from hourly cache",
"vm_id", vmID,
"vm_uuid", vmUUID,
"name", name,
"deletion_rows", row.DeletionRows,
"deletion_min", row.DeletionMin.Int64,
"deletion_max", row.DeletionMax.Int64,
"last_seen", row.LastSeen.Int64,
)
} else {
lifecycle.DeletionTime = row.DeletionMax.Int64
}
}
diag.SelectedDeletionTime = lifecycle.DeletionTime
return lifecycle, diag, true, nil
}
func fetchVmLifecycleFromLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, VmLifecycleSourceDiagnostics, bool, error) {
diag := VmLifecycleSourceDiagnostics{Source: "vm_lifecycle_cache"}
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return VmLifecycle{}, diag, false, nil
}
var row struct {
Rows int64 `db:"rows"`
FirstSeen sql.NullInt64 `db:"first_seen"`
LastSeen sql.NullInt64 `db:"last_seen"`
DeletionRows int64 `db:"deletion_rows"`
DeletionMin sql.NullInt64 `db:"deletion_min"`
DeletionMax sql.NullInt64 `db:"deletion_max"`
}
query := `
SELECT
COUNT(1) AS rows,
MIN(NULLIF("FirstSeen",0)) AS first_seen,
MAX(NULLIF("LastSeen",0)) AS last_seen,
SUM(CASE WHEN COALESCE("DeletedAt",0) > 0 THEN 1 ELSE 0 END) AS deletion_rows,
MIN(NULLIF("DeletedAt",0)) AS deletion_min,
MAX(NULLIF("DeletedAt",0)) AS deletion_max
FROM vm_lifecycle_cache
WHERE ` + matchWhere + `
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, args...); err != nil {
diag.Error = err.Error()
return VmLifecycle{}, diag, false, err
}
diag.MatchedRows = row.Rows
diag.FirstSeen = row.FirstSeen.Int64
diag.LastSeen = row.LastSeen.Int64
diag.DeletionRows = row.DeletionRows
if row.DeletionMin.Valid {
diag.DeletionMin = row.DeletionMin.Int64
}
if row.DeletionMax.Valid {
diag.DeletionMax = row.DeletionMax.Int64
}
if row.Rows == 0 {
return VmLifecycle{}, diag, false, nil
}
lifecycle := VmLifecycle{
FirstSeen: row.FirstSeen.Int64,
LastSeen: row.LastSeen.Int64,
}
if row.FirstSeen.Valid && row.FirstSeen.Int64 > 0 {
lifecycle.CreationTime = row.FirstSeen.Int64
lifecycle.CreationApprox = true
}
diag.CreationTime = lifecycle.CreationTime
diag.CreationApprox = lifecycle.CreationApprox
if row.DeletionMax.Valid && row.DeletionMax.Int64 > 0 {
if row.LastSeen.Valid && row.LastSeen.Int64 > 0 && row.DeletionMax.Int64 <= row.LastSeen.Int64 {
diag.StaleDeletionIgnored = true
slog.Warn("ignoring stale VM deletion from lifecycle cache",
"vm_id", vmID,
"vm_uuid", vmUUID,
"name", name,
"deletion_rows", row.DeletionRows,
"deletion_min", row.DeletionMin.Int64,
"deletion_max", row.DeletionMax.Int64,
"last_seen", row.LastSeen.Int64,
)
} else {
lifecycle.DeletionTime = row.DeletionMax.Int64
}
}
diag.SelectedDeletionTime = lifecycle.DeletionTime
return lifecycle, diag, true, nil
}
func mergeVmLifecycle(base, overlay VmLifecycle) VmLifecycle {
out := base
if overlay.FirstSeen > 0 && (out.FirstSeen == 0 || overlay.FirstSeen < out.FirstSeen) {
out.FirstSeen = overlay.FirstSeen
}
if overlay.LastSeen > out.LastSeen {
out.LastSeen = overlay.LastSeen
}
if overlay.DeletionTime > 0 && (out.DeletionTime == 0 || overlay.DeletionTime > out.DeletionTime) {
out.DeletionTime = overlay.DeletionTime
}
if overlay.CreationTime > 0 {
if out.CreationTime == 0 || overlay.CreationTime < out.CreationTime {
out.CreationTime = overlay.CreationTime
out.CreationApprox = overlay.CreationApprox
} else if out.CreationTime == overlay.CreationTime && !overlay.CreationApprox {
out.CreationApprox = false
}
}
if out.CreationTime == 0 && out.FirstSeen > 0 {
out.CreationTime = out.FirstSeen
out.CreationApprox = true
}
if out.DeletionTime > 0 && out.LastSeen > 0 && out.DeletionTime <= out.LastSeen {
out.DeletionTime = 0
}
return out
}
func fetchVmLifecycleFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, VmLifecycleSourceDiagnostics, error) {
var lifecycle VmLifecycle
diag := VmLifecycleSourceDiagnostics{Source: "snapshot_registry/hourly_tables"}
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return lifecycle, diag, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &tables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time
`); err != nil {
diag.Error = err.Error()
return lifecycle, diag, err
}
minCreation := int64(0)
consecutiveMissing := 0
firstMissingAfterLastSeen := int64(0)
matchedRows := int64(0)
for _, t := range tables {
if err := ValidateTableName(t.TableName); err != nil {
continue
}
query := fmt.Sprintf(`
SELECT MIN(NULLIF("CreationTime",0)) AS min_creation, COUNT(1) AS cnt
FROM %s
WHERE %s
`, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var probe struct {
MinCreation sql.NullInt64 `db:"min_creation"`
Cnt int64 `db:"cnt"`
}
if err := getLog(ctx, dbConn, &probe, query, args...); err != nil {
continue
}
if probe.Cnt > 0 {
matchedRows++
if lifecycle.FirstSeen == 0 {
lifecycle.FirstSeen = t.SnapshotTime
}
if lifecycle.DeletionTime > 0 && t.SnapshotTime > lifecycle.DeletionTime {
diag.StaleDeletionIgnored = true
slog.Warn("ignoring stale VM deletion from snapshot-table fallback", "vm_id", vmID, "vm_uuid", vmUUID, "name", name, "deletion_time", lifecycle.DeletionTime, "seen_again_at", t.SnapshotTime)
lifecycle.DeletionTime = 0
}
lifecycle.LastSeen = t.SnapshotTime
consecutiveMissing = 0
firstMissingAfterLastSeen = 0
if probe.MinCreation.Valid {
if minCreation == 0 || probe.MinCreation.Int64 < minCreation {
minCreation = probe.MinCreation.Int64
}
}
} else if lifecycle.LastSeen > 0 && lifecycle.DeletionTime == 0 && t.SnapshotTime > lifecycle.LastSeen {
consecutiveMissing++
if firstMissingAfterLastSeen == 0 {
firstMissingAfterLastSeen = t.SnapshotTime
}
if consecutiveMissing >= 2 {
lifecycle.DeletionTime = firstMissingAfterLastSeen
}
} else {
consecutiveMissing = 0
firstMissingAfterLastSeen = 0
}
}
if minCreation > 0 {
lifecycle.CreationTime = minCreation
lifecycle.CreationApprox = false
} else if lifecycle.FirstSeen > 0 {
lifecycle.CreationTime = lifecycle.FirstSeen
lifecycle.CreationApprox = true
}
diag.MatchedRows = matchedRows
diag.FirstSeen = lifecycle.FirstSeen
diag.LastSeen = lifecycle.LastSeen
diag.CreationTime = lifecycle.CreationTime
diag.CreationApprox = lifecycle.CreationApprox
diag.SelectedDeletionTime = lifecycle.DeletionTime
return lifecycle, diag, nil
}
// SyncVcenterTotalsFromSnapshots backfills vcenter_totals using hourly snapshot tables in snapshot_registry.
func SyncVcenterTotalsFromSnapshots(ctx context.Context, dbConn *sqlx.DB) error {
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
return err
}
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var hourlyTables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &hourlyTables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'hourly'
ORDER BY snapshot_time
`); err != nil {
return err
}
for _, ht := range hourlyTables {
if err := ValidateTableName(ht.TableName); err != nil {
continue
}
// Aggregate per vcenter from the snapshot table.
query := fmt.Sprintf(`
SELECT "Vcenter" AS vcenter,
COUNT(1) AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total
FROM %s
GROUP BY "Vcenter"
`, ht.TableName)
type aggRow struct {
Vcenter string `db:"vcenter"`
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`
RamTotal int64 `db:"ram_total"`
}
var aggs []aggRow
if err := selectLog(ctx, dbConn, &aggs, query); err != nil {
continue
}
for _, a := range aggs {
// Insert if missing.
insert := `
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
SELECT $1,$2,$3,$4,$5
WHERE NOT EXISTS (
SELECT 1 FROM vcenter_totals WHERE "Vcenter" = $1 AND "SnapshotTime" = $2
)
`
if driver == "sqlite" {
insert = strings.ReplaceAll(insert, "$", "?")
}
if _, err := execLog(ctx, dbConn, insert, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
slog.Warn("failed to backfill vcenter_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
}
if err := UpsertVcenterLatestTotals(ctx, dbConn, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
slog.Warn("failed to upsert vcenter_latest_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
}
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
slog.Warn("failed to upsert vcenter_aggregate_totals", "table", ht.TableName, "snapshot_type", "hourly", "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
}
}
}
return nil
}
// AnalyzeTableIfPostgres runs ANALYZE on a table to refresh planner stats.
func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string) {
if _, err := SafeTableName(tableName); err != nil {
return
}
driver := strings.ToLower(dbConn.DriverName())
if driver != "pgx" && driver != "postgres" {
return
}
start := time.Now()
slog.Debug("db analyze start", "table", tableName)
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`ANALYZE %s`, tableName)); err != nil {
slog.Warn("failed to ANALYZE table", "table", tableName, "error", err)
return
}
slog.Debug("db analyze complete", "table", tableName, "duration", time.Since(start))
}
// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers.
func SetPostgresWorkMem(ctx context.Context, dbConn *sqlx.DB, workMemMB int) {
if workMemMB <= 0 {
return
}
driver := strings.ToLower(dbConn.DriverName())
if driver != "pgx" && driver != "postgres" {
return
}
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`SET LOCAL work_mem = '%dMB'`, workMemMB)); err != nil {
slog.Warn("failed to set work_mem", "work_mem_mb", workMemMB, "error", err)
}
}
// CheckMigrationState ensures goose migrations are present and not dirty.
func CheckMigrationState(ctx context.Context, dbConn *sqlx.DB) error {
driver := strings.ToLower(dbConn.DriverName())
var tableExists bool
switch driver {
case "sqlite":
err := getLog(ctx, dbConn, &tableExists, `
SELECT COUNT(1) > 0 FROM sqlite_master WHERE type='table' AND name='goose_db_version'
`)
if err != nil {
return err
}
case "pgx", "postgres":
err := getLog(ctx, dbConn, &tableExists, `
SELECT EXISTS (
SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'goose_db_version'
)
`)
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported driver for migration check: %s", driver)
}
if !tableExists {
return fmt.Errorf("goose_db_version table not found; database migrations may not be applied")
}
var dirty bool
err := getLog(ctx, dbConn, &dirty, `
SELECT NOT is_applied
FROM goose_db_version
ORDER BY id DESC
LIMIT 1
`)
if err != nil {
return err
}
if dirty {
return fmt.Errorf("database migrations are in a dirty state; please resolve goose_db_version")
}
return nil
}
// BuildDailySummaryInsert returns the SQL to aggregate hourly snapshots into a daily summary table.
func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error) {
if _, err := SafeTableName(tableName); err != nil {
return "", err
}
insert := fmt.Sprintf(`
WITH snapshots AS (
%s
), totals AS (
SELECT "Vcenter", COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot
FROM snapshots
GROUP BY "Vcenter"
), agg AS (
SELECT
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion,
MIN(s."SnapshotTime") AS first_present,
MAX(s."SnapshotTime") AS last_present,
COUNT(*) AS samples_present,
s."Datacenter", s."Cluster", s."Folder",
AVG(COALESCE(s."ProvisionedDisk",0)) AS avg_disk,
AVG(COALESCE(s."VcpuCount",0)) AS avg_vcpu_raw,
AVG(COALESCE(s."RamGB",0)) AS avg_ram_raw,
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits,
SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits
FROM snapshots s
GROUP BY
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
s."Datacenter", s."Cluster", s."Folder",
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold"
)
SELECT
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
COALESCE(agg.any_creation, 0) AS "CreationTime",
CASE
WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(
NULLIF(agg.any_deletion, 0),
(SELECT MIN(s2."SnapshotTime") FROM snapshots s2 WHERE s2."SnapshotTime" > agg.last_present),
totals.max_snapshot,
agg.last_present
)
ELSE NULLIF(agg.any_deletion, 0)
END AS "DeletionTime",
(
SELECT s2."ResourcePool"
FROM snapshots s2
WHERE s2."VmId" = agg."VmId"
AND s2."Vcenter" = agg."Vcenter"
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "ResourcePool",
agg."Datacenter", agg."Cluster", agg."Folder",
(
SELECT s2."ProvisionedDisk"
FROM snapshots s2
WHERE s2."VmId" = agg."VmId"
AND s2."Vcenter" = agg."Vcenter"
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "ProvisionedDisk",
(
SELECT s2."VcpuCount"
FROM snapshots s2
WHERE s2."VmId" = agg."VmId"
AND s2."Vcenter" = agg."Vcenter"
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "VcpuCount",
(
SELECT s2."RamGB"
FROM snapshots s2
WHERE s2."VmId" = agg."VmId"
AND s2."Vcenter" = agg."Vcenter"
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "RamGB",
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.last_present AS "SnapshotTime",
agg.samples_present AS "SamplesPresent",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_vcpu / totals.total_samples
ELSE NULL END AS "AvgVcpuCount",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_ram / totals.total_samples
ELSE NULL END AS "AvgRamGB",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.sum_disk / totals.total_samples
ELSE NULL END AS "AvgProvisionedDisk",
CASE WHEN totals.total_samples > 0
THEN 1.0 * agg.samples_present / totals.total_samples
ELSE NULL END AS "AvgIsPresent",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.tin_hits / agg.samples_present
ELSE NULL END AS "PoolTinPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.bronze_hits / agg.samples_present
ELSE NULL END AS "PoolBronzePct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.silver_hits / agg.samples_present
ELSE NULL END AS "PoolSilverPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.gold_hits / agg.samples_present
ELSE NULL END AS "PoolGoldPct",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.tin_hits / agg.samples_present
ELSE NULL END AS "Tin",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.bronze_hits / agg.samples_present
ELSE NULL END AS "Bronze",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.silver_hits / agg.samples_present
ELSE NULL END AS "Silver",
CASE WHEN agg.samples_present > 0
THEN 100.0 * agg.gold_hits / agg.samples_present
ELSE NULL END AS "Gold"
FROM agg
JOIN totals ON totals."Vcenter" = agg."Vcenter"
GROUP BY
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
agg."Datacenter", agg."Cluster", agg."Folder",
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
totals.total_samples, totals.max_snapshot;
`, unionQuery, tableName)
return insert, nil
}
// UpdateSummaryPresenceByWindow recomputes AvgIsPresent using CreationTime/DeletionTime overlap with the window.
func UpdateSummaryPresenceByWindow(ctx context.Context, dbConn *sqlx.DB, summaryTable string, windowStart, windowEnd int64) error {
if err := ValidateTableName(summaryTable); err != nil {
return err
}
if windowEnd <= windowStart {
return fmt.Errorf("invalid presence window: %d to %d", windowStart, windowEnd)
}
duration := float64(windowEnd - windowStart)
startExpr := `CASE WHEN "CreationTime" IS NOT NULL AND "CreationTime" > 0 AND "CreationTime" > ? THEN "CreationTime" ELSE ? END`
endExpr := `CASE WHEN "DeletionTime" IS NOT NULL AND "DeletionTime" > 0 AND "DeletionTime" < ? THEN "DeletionTime" ELSE ? END`
query := fmt.Sprintf(`
UPDATE %s
SET "AvgIsPresent" = CASE
WHEN ("CreationTime" IS NOT NULL AND "CreationTime" > 0) OR ("DeletionTime" IS NOT NULL AND "DeletionTime" > 0) THEN
CASE
WHEN %s > %s THEN (CAST((%s - %s) AS REAL) / ?)
ELSE 0
END
ELSE "AvgIsPresent"
END
`, summaryTable, endExpr, startExpr, endExpr, startExpr)
query = dbConn.Rebind(query)
args := []interface{}{
windowEnd, windowEnd,
windowStart, windowStart,
windowEnd, windowEnd,
windowStart, windowStart,
duration,
}
_, err := execLog(ctx, dbConn, query, args...)
return err
}
// RefineCreationDeletionFromUnion walks all snapshot rows in a period and tightens CreationTime/DeletionTime
// by using the first and last observed samples and the first sample after disappearance.
func RefineCreationDeletionFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
if unionQuery == "" {
return fmt.Errorf("union query is empty")
}
if _, err := SafeTableName(summaryTable); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var sql string
switch driver {
case "pgx", "postgres":
sql = fmt.Sprintf(`
WITH snapshots AS (
%s
), timeline AS (
SELECT
s."VmId",
s."VmUuid",
s."Name",
s."Vcenter",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MIN(s."SnapshotTime") AS first_seen,
MAX(s."SnapshotTime") AS last_seen
FROM snapshots s
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
), timeline_enriched AS (
SELECT
tl."VmId",
tl."VmUuid",
tl."Name",
tl."Vcenter",
tl.any_creation,
tl.first_seen,
tl.last_seen,
MIN(t2.first_seen) AS t_last_after
FROM timeline tl
LEFT JOIN timeline t2
ON t2."Vcenter" = tl."Vcenter"
AND COALESCE(t2."VmId", '') = COALESCE(tl."VmId", '')
AND t2.first_seen > tl.last_seen
GROUP BY tl."VmId", tl."VmUuid", tl."Name", tl."Vcenter", tl.any_creation, tl.first_seen, tl.last_seen
)
UPDATE %s dst
SET
"CreationTime" = CASE
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.any_creation), t.any_creation)
ELSE dst."CreationTime"
END,
"DeletionTime" = CASE
WHEN t.t_last_after IS NOT NULL
AND (dst."DeletionTime" IS NULL OR dst."DeletionTime" = 0)
THEN t.t_last_after
ELSE dst."DeletionTime"
END
FROM timeline_enriched t
WHERE dst."Vcenter" = t."Vcenter"
AND (
(dst."VmId" IS NOT DISTINCT FROM t."VmId")
OR (dst."VmUuid" IS NOT DISTINCT FROM t."VmUuid")
OR (dst."Name" IS NOT DISTINCT FROM t."Name")
);
`, unionQuery, summaryTable)
default:
// SQLite variant (no FROM in UPDATE, no IS NOT DISTINCT FROM). Uses positional args to avoid placeholder count issues.
sql = fmt.Sprintf(`
WITH snapshots AS (
%[1]s
), timeline AS (
SELECT
s."VmId",
s."VmUuid",
s."Name",
s."Vcenter",
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
MIN(s."SnapshotTime") AS first_seen,
MAX(s."SnapshotTime") AS last_seen
FROM snapshots s
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
), enriched AS (
SELECT
tl.*,
(
SELECT MIN(s2."SnapshotTime")
FROM snapshots s2
WHERE s2."Vcenter" = tl."Vcenter"
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
AND s2."SnapshotTime" > tl.last_seen
) AS first_after
FROM timeline tl
)
UPDATE %[2]s
SET
"CreationTime" = COALESCE(
(
SELECT CASE
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.any_creation) > t.any_creation THEN t.any_creation
ELSE NULL
END
FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
LIMIT 1
),
"CreationTime"
),
"DeletionTime" = COALESCE(
(
SELECT t.first_after
FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
AND t.first_after IS NOT NULL
AND ("DeletionTime" IS NULL OR "DeletionTime" = 0)
LIMIT 1
),
"DeletionTime"
)
WHERE EXISTS (
SELECT 1 FROM enriched t
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
)
);
`, unionQuery, summaryTable)
}
_, err := execLog(ctx, dbConn, sql)
return err
}
// BuildMonthlySummaryInsert returns the SQL to aggregate daily summaries into a monthly summary table.
func BuildMonthlySummaryInsert(tableName string, unionQuery string) (string, error) {
if _, err := SafeTableName(tableName); err != nil {
return "", err
}
insert := fmt.Sprintf(`
WITH daily AS (
%s
), enriched AS (
SELECT
d.*,
CASE
WHEN d."AvgIsPresent" IS NOT NULL AND d."AvgIsPresent" > 0 THEN d."SamplesPresent" / d."AvgIsPresent"
ELSE CAST(d."SamplesPresent" AS REAL)
END AS total_samples_day
FROM daily d
), totals AS (
SELECT COALESCE(SUM(total_samples_day), 0) AS total_samples FROM enriched
)
-- monthly averages are weighted by the implied sample counts per day (SamplesPresent / AvgIsPresent)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
"Tin", "Bronze", "Silver", "Gold"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime",
NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime",
MAX("ResourcePool") AS "ResourcePool",
"Datacenter", "Cluster", "Folder",
MAX("ProvisionedDisk") AS "ProvisionedDisk",
MAX("VcpuCount") AS "VcpuCount",
MAX("RamGB") AS "RamGB",
"IsTemplate",
MAX("PoweredOn") AS "PoweredOn",
"SrmPlaceholder", "VmUuid",
SUM("SamplesPresent") AS "SamplesPresent",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgVcpuCount",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgRamGB" IS NOT NULL THEN "AvgRamGB" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgRamGB",
CASE WHEN totals.total_samples > 0
THEN SUM(CASE WHEN "AvgProvisionedDisk" IS NOT NULL THEN "AvgProvisionedDisk" * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "AvgProvisionedDisk",
CASE WHEN totals.total_samples > 0
THEN SUM("SamplesPresent") * 1.0 / totals.total_samples
ELSE NULL END AS "AvgIsPresent",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolTinPct" IS NOT NULL THEN ("PoolTinPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolTinPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolBronzePct" IS NOT NULL THEN ("PoolBronzePct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolBronzePct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolSilverPct" IS NOT NULL THEN ("PoolSilverPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolSilverPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "PoolGoldPct" IS NOT NULL THEN ("PoolGoldPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "PoolGoldPct",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Tin" IS NOT NULL THEN ("Tin" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Tin",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Bronze" IS NOT NULL THEN ("Bronze" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Bronze",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Silver" IS NOT NULL THEN ("Silver" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Silver",
CASE WHEN totals.total_samples > 0
THEN 100.0 * SUM(CASE WHEN "Gold" IS NOT NULL THEN ("Gold" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
ELSE NULL END AS "Gold"
FROM enriched
CROSS JOIN totals
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
"Datacenter", "Cluster", "Folder",
"IsTemplate", "SrmPlaceholder", "VmUuid";
`, unionQuery, tableName)
return insert, nil
}
// EnsureSummaryTable creates a daily/monthly summary table with the standard schema if it does not exist.
func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if _, err := SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var ddl string
switch driver {
case "pgx", "postgres":
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" BIGSERIAL PRIMARY KEY,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"SnapshotTime" BIGINT,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
"Gold" REAL
);`, tableName)
default:
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL,
"SnapshotTime" BIGINT,
"Tin" REAL,
"Bronze" REAL,
"Silver" REAL,
"Gold" REAL
);`, tableName)
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
// Best-effort: drop legacy IsPresent column if it exists.
if hasIsPresent, err := ColumnExists(ctx, dbConn, tableName, "IsPresent"); err == nil && hasIsPresent {
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, tableName))
}
// Ensure SnapshotTime exists for lifecycle refinement.
if hasSnapshot, err := ColumnExists(ctx, dbConn, tableName, "SnapshotTime"); err == nil && !hasSnapshot {
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "SnapshotTime" BIGINT`, tableName))
}
indexes := []string{
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
}
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
indexes = append(indexes,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_idx ON %s ("Vcenter")`, tableName, tableName),
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
)
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// BackfillSnapshotTimeFromUnion sets SnapshotTime in a summary table using the max snapshot time per VM from a union query.
func BackfillSnapshotTimeFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
if unionQuery == "" {
return fmt.Errorf("union query is empty")
}
if _, err := SafeTableName(summaryTable); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
var sql string
switch driver {
case "pgx", "postgres":
sql = fmt.Sprintf(`
WITH snapshots AS (
%s
)
UPDATE %s dst
SET "SnapshotTime" = sub.max_time
FROM (
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
FROM snapshots s
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
) sub
WHERE (dst."SnapshotTime" IS NULL OR dst."SnapshotTime" = 0)
AND dst."Vcenter" = sub."Vcenter"
AND (
(dst."VmId" IS NOT DISTINCT FROM sub."VmId")
OR (dst."VmUuid" IS NOT DISTINCT FROM sub."VmUuid")
OR (dst."Name" IS NOT DISTINCT FROM sub."Name")
);
`, unionQuery, summaryTable)
default:
sql = fmt.Sprintf(`
WITH snapshots AS (
%[1]s
), grouped AS (
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
FROM snapshots s
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
)
UPDATE %[2]s
SET "SnapshotTime" = (
SELECT max_time FROM grouped g
WHERE %[2]s."Vcenter" = g."Vcenter"
AND (
(%[2]s."VmId" IS NOT NULL AND g."VmId" IS NOT NULL AND %[2]s."VmId" = g."VmId")
OR (%[2]s."VmUuid" IS NOT NULL AND g."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = g."VmUuid")
OR (%[2]s."Name" IS NOT NULL AND g."Name" IS NOT NULL AND %[2]s."Name" = g."Name")
)
)
WHERE "SnapshotTime" IS NULL OR "SnapshotTime" = 0;
`, unionQuery, summaryTable)
}
_, err := execLog(ctx, dbConn, sql)
return err
}
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
ddl = `
CREATE TABLE IF NOT EXISTS snapshot_runs (
"RowId" BIGSERIAL PRIMARY KEY,
"Vcenter" TEXT NOT NULL,
"SnapshotTime" BIGINT NOT NULL,
"Attempts" INTEGER NOT NULL DEFAULT 0,
"Success" TEXT NOT NULL DEFAULT 'FALSE',
"LastError" TEXT,
"LastAttempt" BIGINT NOT NULL
);
`
}
if _, err := execLog(ctx, dbConn, ddl); err != nil {
return err
}
indexes := []string{
`CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`,
`CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`,
}
for _, idx := range indexes {
if _, err := execLog(ctx, dbConn, idx); err != nil {
return err
}
}
return nil
}
// EnsureCronStatusTable creates a table to track cron job progress/status.
func EnsureCronStatusTable(ctx context.Context, dbConn *sqlx.DB) error {
ddl := `
CREATE TABLE IF NOT EXISTS cron_status (
job_name TEXT PRIMARY KEY,
started_at BIGINT NOT NULL,
ended_at BIGINT NOT NULL,
duration_ms BIGINT NOT NULL,
last_error TEXT,
in_progress BOOLEAN NOT NULL DEFAULT FALSE
);`
_, err := execLog(ctx, dbConn, ddl)
return err
}
// UpsertSnapshotRun updates or inserts snapshot run status.
func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error {
if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
successStr := "FALSE"
if success {
successStr = "TRUE"
}
now := time.Now().Unix()
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
_, err := execLog(ctx, dbConn, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES (?, ?, 1, ?, ?, ?)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = excluded."Success",
"LastError" = excluded."LastError",
"LastAttempt" = excluded."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
case "pgx", "postgres":
_, err := execLog(ctx, dbConn, `
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
VALUES ($1, $2, 1, $3, $4, $5)
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
"Attempts" = snapshot_runs."Attempts" + 1,
"Success" = EXCLUDED."Success",
"LastError" = EXCLUDED."LastError",
"LastAttempt" = EXCLUDED."LastAttempt"
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
return err
default:
return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver)
}
}
// ListFailedSnapshotRuns returns vcenter/time pairs needing retry.
func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, error) {
if maxAttempts <= 0 {
maxAttempts = 3
}
driver := strings.ToLower(dbConn.DriverName())
query := `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < ?
ORDER BY "LastAttempt" ASC
`
args := []interface{}{maxAttempts}
if driver == "pgx" || driver == "postgres" {
query = `
SELECT "Vcenter","SnapshotTime","Attempts"
FROM snapshot_runs
WHERE "Success" = 'FALSE' AND "Attempts" < $1
ORDER BY "LastAttempt" ASC
`
}
type row struct {
Vcenter string `db:"Vcenter"`
SnapshotTime int64 `db:"SnapshotTime"`
Attempts int `db:"Attempts"`
}
rows := []row{}
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
results := make([]struct {
Vcenter string
SnapshotTime int64
Attempts int
}, 0, len(rows))
for _, r := range rows {
results = append(results, struct {
Vcenter string
SnapshotTime int64
Attempts int
}{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts})
}
return results, nil
}