All checks were successful
continuous-integration/drone/push Build is passing
1964 lines
65 KiB
Go
1964 lines
65 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"vctp/db/queries"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
// SnapshotTotals summarizes counts and allocations for snapshot tables.
|
|
type SnapshotTotals struct {
|
|
VmCount int64 `db:"vm_count"`
|
|
VcpuTotal int64 `db:"vcpu_total"`
|
|
RamTotal int64 `db:"ram_total"`
|
|
DiskTotal float64 `db:"disk_total"`
|
|
}
|
|
|
|
type ColumnDef struct {
|
|
Name string
|
|
Type string
|
|
}
|
|
|
|
// TableRowCount returns COUNT(*) for a table.
|
|
func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, error) {
|
|
if err := ValidateTableName(table); err != nil {
|
|
return 0, err
|
|
}
|
|
var count int64
|
|
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)
|
|
if err := getLog(ctx, dbConn, &count, query); err != nil {
|
|
return 0, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// EnsureColumns adds the provided columns to a table if they are missing.
|
|
func EnsureColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []ColumnDef) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
for _, column := range columns {
|
|
if err := AddColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func execLog(ctx context.Context, dbConn *sqlx.DB, query string, args ...interface{}) (sql.Result, error) {
|
|
res, err := dbConn.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
q := strings.TrimSpace(query)
|
|
msg := strings.ToLower(err.Error())
|
|
if strings.Contains(msg, "duplicate column name") || strings.Contains(msg, "already exists") {
|
|
slog.Debug("db exec skipped (already exists)", "query", q, "error", err)
|
|
} else {
|
|
slog.Warn("db exec failed", "query", q, "error", err)
|
|
}
|
|
}
|
|
return res, err
|
|
}
|
|
|
|
func getLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
|
|
err := dbConn.GetContext(ctx, dest, query, args...)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
slog.Debug("db get returned no rows", "query", strings.TrimSpace(query))
|
|
return err
|
|
}
|
|
// Soften logging for timeout/cancel scenarios commonly hit during best-effort probes.
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
slog.Debug("db get timed out", "query", strings.TrimSpace(query), "error", err)
|
|
} else {
|
|
slog.Warn("db get failed", "query", strings.TrimSpace(query), "error", err)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func selectLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error {
|
|
err := dbConn.SelectContext(ctx, dest, query, args...)
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
slog.Debug("db select timed out", "query", strings.TrimSpace(query), "error", err)
|
|
} else {
|
|
slog.Warn("db select failed", "query", strings.TrimSpace(query), "error", err)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// AddColumnIfMissing performs a best-effort ALTER TABLE to add a column, ignoring "already exists".
|
|
func AddColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column ColumnDef) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
// Skip ALTER if the column already exists to avoid noisy duplicate errors.
|
|
if exists, err := ColumnExists(ctx, dbConn, tableName, column.Name); err == nil && exists {
|
|
return nil
|
|
}
|
|
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
|
|
if _, err := execLog(ctx, dbConn, query); err != nil {
|
|
errText := strings.ToLower(err.Error())
|
|
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ValidateTableName ensures table identifiers are safe for interpolation.
|
|
func ValidateTableName(name string) error {
|
|
if name == "" {
|
|
return fmt.Errorf("table name is empty")
|
|
}
|
|
for _, r := range name {
|
|
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
|
|
continue
|
|
}
|
|
return fmt.Errorf("invalid table name: %s", name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SafeTableName returns the name if it passes validation.
|
|
func SafeTableName(name string) (string, error) {
|
|
if err := ValidateTableName(name); err != nil {
|
|
return "", err
|
|
}
|
|
return name, nil
|
|
}
|
|
|
|
// TableHasRows returns true when a table contains at least one row.
|
|
func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
|
|
if err := ValidateTableName(table); err != nil {
|
|
return false, err
|
|
}
|
|
// Avoid hanging on locked tables; apply a short timeout.
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, 15*time.Second)
|
|
defer cancel()
|
|
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
|
var exists int
|
|
if err := getLog(ctx, dbConn, &exists, query); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return false, nil
|
|
}
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// TableExists checks if a table exists in the current schema.
|
|
func TableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
q := queries.New(dbConn)
|
|
count, err := q.SqliteTableExists(ctx, sql.NullString{String: table, Valid: table != ""})
|
|
return err == nil && count > 0
|
|
case "pgx", "postgres":
|
|
var count int
|
|
err := getLog(ctx, dbConn, &count, `
|
|
SELECT COUNT(1)
|
|
FROM pg_catalog.pg_tables
|
|
WHERE schemaname = 'public' AND tablename = $1
|
|
`, table)
|
|
return err == nil && count > 0
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// ColumnExists checks if a column exists in a table.
|
|
func ColumnExists(ctx context.Context, dbConn *sqlx.DB, tableName string, columnName string) (bool, error) {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return false, err
|
|
}
|
|
query := fmt.Sprintf(`PRAGMA table_info("%s")`, tableName)
|
|
rows, err := dbConn.QueryxContext(ctx, query)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var (
|
|
cid int
|
|
name string
|
|
colType string
|
|
notNull int
|
|
defaultVal sql.NullString
|
|
pk int
|
|
)
|
|
if err := rows.Scan(&cid, &name, &colType, ¬Null, &defaultVal, &pk); err != nil {
|
|
return false, err
|
|
}
|
|
if strings.EqualFold(name, columnName) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, rows.Err()
|
|
case "pgx", "postgres":
|
|
var count int
|
|
err := getLog(ctx, dbConn, &count, `
|
|
SELECT COUNT(1)
|
|
FROM information_schema.columns
|
|
WHERE table_name = $1 AND column_name = $2
|
|
`, tableName, strings.ToLower(columnName))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return count > 0, nil
|
|
default:
|
|
return false, fmt.Errorf("unsupported driver for column lookup: %s", driver)
|
|
}
|
|
}
|
|
|
|
// SnapshotTotalsForTable returns totals for a snapshot table.
|
|
func SnapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (SnapshotTotals, error) {
|
|
if _, err := SafeTableName(table); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
COUNT(DISTINCT "VmId") AS vm_count,
|
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
|
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
|
FROM %s
|
|
`, table)
|
|
|
|
var totals SnapshotTotals
|
|
if err := getLog(ctx, dbConn, &totals, query); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
return totals, nil
|
|
}
|
|
|
|
// SnapshotTotalsForUnion returns totals for a union query of snapshots.
|
|
func SnapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (SnapshotTotals, error) {
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
COUNT(DISTINCT "VmId") AS vm_count,
|
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
|
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
|
FROM (
|
|
%s
|
|
) snapshots
|
|
`, unionQuery)
|
|
|
|
var totals SnapshotTotals
|
|
if err := getLog(ctx, dbConn, &totals, query); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
return totals, nil
|
|
}
|
|
|
|
// EnsureSnapshotTable creates a snapshot table with the standard schema if it does not exist.
|
|
func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
);`, tableName)
|
|
default:
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
);`, tableName)
|
|
}
|
|
|
|
_, err := execLog(ctx, dbConn, ddl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return EnsureSnapshotIndexes(ctx, dbConn, tableName)
|
|
}
|
|
|
|
// EnsureSnapshotIndexes creates the standard indexes for a snapshot table.
|
|
func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
indexes := []string{
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
|
|
}
|
|
// PG-specific helpful indexes; safe no-ops on SQLite if executed, but keep them gated to reduce file bloat.
|
|
if driver == "pgx" || driver == "postgres" {
|
|
indexes = append(indexes,
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_snapshottime_idx ON %s ("Vcenter","SnapshotTime")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_vcenter_idx ON %s ("Name","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
|
|
)
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := execLog(ctx, dbConn, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BackfillSerialColumn sets missing values in a serial-like column for Postgres tables.
|
|
func BackfillSerialColumn(ctx context.Context, dbConn *sqlx.DB, tableName, columnName string) error {
|
|
if err := ValidateTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
if columnName == "" {
|
|
return fmt.Errorf("column name is empty")
|
|
}
|
|
query := fmt.Sprintf(
|
|
`UPDATE %s SET "%s" = nextval(pg_get_serial_sequence('%s','%s')) WHERE "%s" IS NULL`,
|
|
tableName, columnName, tableName, columnName, columnName,
|
|
)
|
|
_, err := execLog(ctx, dbConn, query)
|
|
if err != nil {
|
|
errText := strings.ToLower(err.Error())
|
|
if strings.Contains(errText, "pg_get_serial_sequence") || strings.Contains(errText, "sequence") {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ApplySQLiteTuning applies lightweight WAL/synchronous tweaks for better concurrency in non-prod contexts.
|
|
func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) {
|
|
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
|
|
return
|
|
}
|
|
// Best-effort pragmas; ignore errors to stay safe in constrained environments.
|
|
var err error
|
|
pragmas := []string{
|
|
`PRAGMA journal_mode=WAL;`,
|
|
`PRAGMA synchronous=NORMAL;`,
|
|
`PRAGMA temp_store=MEMORY;`,
|
|
`PRAGMA optimize;`,
|
|
`PRAGMA busy_timeout=5000;`,
|
|
}
|
|
for _, pragma := range pragmas {
|
|
_, err = execLog(ctx, dbConn, pragma)
|
|
if logger, ok := ctx.Value("logger").(*slog.Logger); ok && logger != nil {
|
|
logger.Debug("Applied SQLite tuning pragma", "pragma", pragma, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// CheckpointSQLite forces a WAL checkpoint (truncate) when using SQLite. No-op for other drivers.
|
|
func CheckpointSQLite(ctx context.Context, dbConn *sqlx.DB) error {
|
|
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
|
|
return nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
_, err := dbConn.ExecContext(cctx, `PRAGMA wal_checkpoint(TRUNCATE);`)
|
|
return err
|
|
}
|
|
|
|
// EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations.
|
|
func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error {
|
|
ddl := `
|
|
CREATE TABLE IF NOT EXISTS vm_hourly_stats (
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"VmUuid" TEXT,
|
|
"Name" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
PRIMARY KEY ("Vcenter","VmId","SnapshotTime")
|
|
);`
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`)
|
|
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`)
|
|
return nil
|
|
}
|
|
|
|
// EnsureVmLifecycleCache creates an upsert cache for first/last seen VM info.
|
|
func EnsureVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB) error {
|
|
ddl := `
|
|
CREATE TABLE IF NOT EXISTS vm_lifecycle_cache (
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"VmUuid" TEXT,
|
|
"Name" TEXT,
|
|
"Cluster" TEXT,
|
|
"FirstSeen" BIGINT,
|
|
"LastSeen" BIGINT,
|
|
"DeletedAt" BIGINT,
|
|
PRIMARY KEY ("Vcenter","VmId","VmUuid")
|
|
);`
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`)
|
|
return nil
|
|
}
|
|
|
|
// UpsertVmLifecycleCache updates first/last seen info for a VM.
|
|
func UpsertVmLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmID, vmUUID, name, cluster string, seen time.Time) error {
|
|
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
bindType := sqlx.BindType(driver)
|
|
query := `
|
|
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","FirstSeen","LastSeen")
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
|
|
"Name"=EXCLUDED."Name",
|
|
"Cluster"=EXCLUDED."Cluster",
|
|
"LastSeen"=EXCLUDED."LastSeen",
|
|
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"),
|
|
"DeletedAt"=NULL
|
|
`
|
|
query = sqlx.Rebind(bindType, query)
|
|
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, seen.Unix(), seen.Unix()}
|
|
_, err := dbConn.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
slog.Warn("lifecycle upsert exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// MarkVmDeleted updates lifecycle cache with a deletion timestamp, carrying optional name/cluster.
|
|
func MarkVmDeletedWithDetails(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID, name, cluster string, deletedAt int64) error {
|
|
if err := EnsureVmLifecycleCache(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
bindType := sqlx.BindType(driver)
|
|
|
|
query := `
|
|
INSERT INTO vm_lifecycle_cache ("Vcenter","VmId","VmUuid","Name","Cluster","DeletedAt","FirstSeen","LastSeen")
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT ("Vcenter","VmId","VmUuid") DO UPDATE SET
|
|
"DeletedAt"=CASE
|
|
WHEN vm_lifecycle_cache."DeletedAt" IS NULL OR vm_lifecycle_cache."DeletedAt"=0 OR EXCLUDED."DeletedAt"<vm_lifecycle_cache."DeletedAt"
|
|
THEN EXCLUDED."DeletedAt"
|
|
ELSE vm_lifecycle_cache."DeletedAt"
|
|
END,
|
|
"LastSeen"=COALESCE(vm_lifecycle_cache."LastSeen", EXCLUDED."LastSeen"),
|
|
"FirstSeen"=COALESCE(vm_lifecycle_cache."FirstSeen", EXCLUDED."FirstSeen"),
|
|
"Name"=COALESCE(NULLIF(vm_lifecycle_cache."Name", ''), EXCLUDED."Name"),
|
|
"Cluster"=COALESCE(NULLIF(vm_lifecycle_cache."Cluster", ''), EXCLUDED."Cluster")
|
|
`
|
|
query = sqlx.Rebind(bindType, query)
|
|
args := []interface{}{vcenter, vmID, vmUUID, name, cluster, deletedAt, deletedAt, deletedAt}
|
|
_, err := dbConn.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
slog.Warn("lifecycle delete exec failed", "vcenter", vcenter, "vm_id", vmID, "vm_uuid", vmUUID, "driver", driver, "args_len", len(args), "args", fmt.Sprint(args), "query", strings.TrimSpace(query), "error", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// MarkVmDeleted updates lifecycle cache with a deletion timestamp (legacy signature).
|
|
func MarkVmDeleted(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID string, deletedAt int64) error {
|
|
return MarkVmDeletedWithDetails(ctx, dbConn, vcenter, vmID, vmUUID, "", "", deletedAt)
|
|
}
|
|
|
|
// UpsertVmDailyRollup writes/updates a daily rollup row.
|
|
func UpsertVmDailyRollup(ctx context.Context, dbConn *sqlx.DB, day int64, v VmDailyRollupRow) error {
|
|
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
query := `
|
|
INSERT INTO vm_daily_rollup (
|
|
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","SamplesPresent","TotalSamples",
|
|
"SumVcpu","SumRam","SumDisk","TinHits","BronzeHits","SilverHits","GoldHits",
|
|
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
|
|
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26)
|
|
ON CONFLICT ("Date","Vcenter","VmId","VmUuid") DO UPDATE SET
|
|
"CreationTime"=LEAST(COALESCE(vm_daily_rollup."CreationTime", $6), COALESCE($6, vm_daily_rollup."CreationTime")),
|
|
"DeletionTime"=CASE
|
|
WHEN vm_daily_rollup."DeletionTime" IS NULL OR vm_daily_rollup."DeletionTime"=0 THEN $7
|
|
WHEN $7 IS NOT NULL AND $7 > 0 AND $7 < vm_daily_rollup."DeletionTime" THEN $7
|
|
ELSE vm_daily_rollup."DeletionTime" END,
|
|
"SamplesPresent"=$8,
|
|
"TotalSamples"=$9,
|
|
"SumVcpu"=$10,
|
|
"SumRam"=$11,
|
|
"SumDisk"=$12,
|
|
"TinHits"=$13,
|
|
"BronzeHits"=$14,
|
|
"SilverHits"=$15,
|
|
"GoldHits"=$16,
|
|
"LastResourcePool"=$17,
|
|
"LastDatacenter"=$18,
|
|
"LastCluster"=$19,
|
|
"LastFolder"=$20,
|
|
"LastProvisionedDisk"=$21,
|
|
"LastVcpuCount"=$22,
|
|
"LastRamGB"=$23,
|
|
"IsTemplate"=$24,
|
|
"PoweredOn"=$25,
|
|
"SrmPlaceholder"=$26
|
|
`
|
|
args := []interface{}{
|
|
day, v.Vcenter, v.VmId, v.VmUuid, v.Name, v.CreationTime, v.DeletionTime, v.SamplesPresent, v.TotalSamples,
|
|
v.SumVcpu, v.SumRam, v.SumDisk, v.TinHits, v.BronzeHits, v.SilverHits, v.GoldHits,
|
|
v.LastResourcePool, v.LastDatacenter, v.LastCluster, v.LastFolder, v.LastProvisionedDisk, v.LastVcpuCount, v.LastRamGB, v.IsTemplate, v.PoweredOn, v.SrmPlaceholder,
|
|
}
|
|
if driver == "sqlite" {
|
|
query = `
|
|
INSERT OR REPLACE INTO vm_daily_rollup (
|
|
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime","SamplesPresent","TotalSamples",
|
|
"SumVcpu","SumRam","SumDisk","TinHits","BronzeHits","SilverHits","GoldHits",
|
|
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
|
|
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
`
|
|
}
|
|
_, err := dbConn.ExecContext(ctx, query, args...)
|
|
return err
|
|
}
|
|
|
|
// VmDailyRollupRow represents the per-day cached aggregation.
|
|
type VmDailyRollupRow struct {
|
|
Vcenter string
|
|
VmId string
|
|
VmUuid string
|
|
Name string
|
|
CreationTime int64
|
|
DeletionTime int64
|
|
SamplesPresent int64
|
|
TotalSamples int64
|
|
SumVcpu float64
|
|
SumRam float64
|
|
SumDisk float64
|
|
TinHits int64
|
|
BronzeHits int64
|
|
SilverHits int64
|
|
GoldHits int64
|
|
LastResourcePool string
|
|
LastDatacenter string
|
|
LastCluster string
|
|
LastFolder string
|
|
LastProvisionedDisk float64
|
|
LastVcpuCount int64
|
|
LastRamGB int64
|
|
IsTemplate string
|
|
PoweredOn string
|
|
SrmPlaceholder string
|
|
}
|
|
|
|
// EnsureVmDailyRollup creates the per-day cache used by monthly aggregation.
|
|
func EnsureVmDailyRollup(ctx context.Context, dbConn *sqlx.DB) error {
|
|
ddl := `
|
|
CREATE TABLE IF NOT EXISTS vm_daily_rollup (
|
|
"Date" BIGINT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"VmUuid" TEXT,
|
|
"Name" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"SamplesPresent" BIGINT,
|
|
"TotalSamples" BIGINT,
|
|
"SumVcpu" BIGINT,
|
|
"SumRam" BIGINT,
|
|
"SumDisk" REAL,
|
|
"TinHits" BIGINT,
|
|
"BronzeHits" BIGINT,
|
|
"SilverHits" BIGINT,
|
|
"GoldHits" BIGINT,
|
|
"LastResourcePool" TEXT,
|
|
"LastDatacenter" TEXT,
|
|
"LastCluster" TEXT,
|
|
"LastFolder" TEXT,
|
|
"LastProvisionedDisk" REAL,
|
|
"LastVcpuCount" BIGINT,
|
|
"LastRamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
PRIMARY KEY ("Date","Vcenter","VmId","VmUuid")
|
|
);`
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_date_idx ON vm_daily_rollup ("Date")`)
|
|
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vcenter_date_idx ON vm_daily_rollup ("Vcenter","Date")`)
|
|
return nil
|
|
}
|
|
|
|
// EnsureVmIdentityTables creates the identity and rename audit tables.
|
|
func EnsureVmIdentityTables(ctx context.Context, dbConn *sqlx.DB) error {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var identityDDL, renameDDL string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
identityDDL = `
|
|
CREATE TABLE IF NOT EXISTS vm_identity (
|
|
"VmId" TEXT NOT NULL,
|
|
"VmUuid" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"Name" TEXT NOT NULL,
|
|
"Cluster" TEXT,
|
|
"FirstSeen" BIGINT NOT NULL,
|
|
"LastSeen" BIGINT NOT NULL,
|
|
PRIMARY KEY ("VmId","VmUuid","Vcenter")
|
|
)`
|
|
renameDDL = `
|
|
CREATE TABLE IF NOT EXISTS vm_renames (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"VmId" TEXT NOT NULL,
|
|
"VmUuid" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"OldName" TEXT,
|
|
"NewName" TEXT,
|
|
"OldCluster" TEXT,
|
|
"NewCluster" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
)`
|
|
default:
|
|
identityDDL = `
|
|
CREATE TABLE IF NOT EXISTS vm_identity (
|
|
"VmId" TEXT NOT NULL,
|
|
"VmUuid" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"Name" TEXT NOT NULL,
|
|
"Cluster" TEXT,
|
|
"FirstSeen" BIGINT NOT NULL,
|
|
"LastSeen" BIGINT NOT NULL,
|
|
PRIMARY KEY ("VmId","VmUuid","Vcenter")
|
|
)`
|
|
renameDDL = `
|
|
CREATE TABLE IF NOT EXISTS vm_renames (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"VmId" TEXT NOT NULL,
|
|
"VmUuid" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"OldName" TEXT,
|
|
"NewName" TEXT,
|
|
"OldCluster" TEXT,
|
|
"NewCluster" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
)`
|
|
}
|
|
if _, err := execLog(ctx, dbConn, identityDDL); err != nil {
|
|
return err
|
|
}
|
|
if _, err := execLog(ctx, dbConn, renameDDL); err != nil {
|
|
return err
|
|
}
|
|
indexes := []string{
|
|
`CREATE INDEX IF NOT EXISTS vm_identity_vcenter_idx ON vm_identity ("Vcenter")`,
|
|
`CREATE INDEX IF NOT EXISTS vm_identity_uuid_idx ON vm_identity ("VmUuid","Vcenter")`,
|
|
`CREATE INDEX IF NOT EXISTS vm_identity_name_idx ON vm_identity ("Name","Vcenter")`,
|
|
`CREATE INDEX IF NOT EXISTS vm_renames_vcenter_idx ON vm_renames ("Vcenter","SnapshotTime")`,
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := execLog(ctx, dbConn, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpsertVmIdentity updates/creates the identity record and records rename events.
|
|
func UpsertVmIdentity(ctx context.Context, dbConn *sqlx.DB, vcenter string, vmId, vmUuid sql.NullString, name string, cluster sql.NullString, snapshotTime time.Time) error {
|
|
keyVmID := strings.TrimSpace(vmId.String)
|
|
keyUuid := strings.TrimSpace(vmUuid.String)
|
|
if keyVmID == "" || keyUuid == "" || strings.TrimSpace(vcenter) == "" {
|
|
return nil
|
|
}
|
|
if err := EnsureVmIdentityTables(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
|
|
type identityRow struct {
|
|
Name string `db:"Name"`
|
|
Cluster sql.NullString `db:"Cluster"`
|
|
FirstSeen sql.NullInt64 `db:"FirstSeen"`
|
|
LastSeen sql.NullInt64 `db:"LastSeen"`
|
|
}
|
|
var existing identityRow
|
|
err := getLog(ctx, dbConn, &existing, `
|
|
SELECT "Name","Cluster","FirstSeen","LastSeen"
|
|
FROM vm_identity
|
|
WHERE "Vcenter" = $1 AND "VmId" = $2 AND "VmUuid" = $3
|
|
`, vcenter, keyVmID, keyUuid)
|
|
|
|
if err != nil {
|
|
if strings.Contains(strings.ToLower(err.Error()), "no rows") {
|
|
_, err = execLog(ctx, dbConn, `
|
|
INSERT INTO vm_identity ("VmId","VmUuid","Vcenter","Name","Cluster","FirstSeen","LastSeen")
|
|
VALUES ($1,$2,$3,$4,$5,$6,$6)
|
|
`, keyVmID, keyUuid, vcenter, name, nullString(cluster), snapshotTime.Unix())
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
renamed := !strings.EqualFold(existing.Name, name) || !strings.EqualFold(strings.TrimSpace(existing.Cluster.String), strings.TrimSpace(cluster.String))
|
|
if renamed {
|
|
_, _ = execLog(ctx, dbConn, `
|
|
INSERT INTO vm_renames ("VmId","VmUuid","Vcenter","OldName","NewName","OldCluster","NewCluster","SnapshotTime")
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
|
|
`, keyVmID, keyUuid, vcenter, existing.Name, name, existing.Cluster.String, cluster.String, snapshotTime.Unix())
|
|
}
|
|
_, err = execLog(ctx, dbConn, `
|
|
UPDATE vm_identity
|
|
SET "Name" = $1, "Cluster" = $2, "LastSeen" = $3
|
|
WHERE "Vcenter" = $4 AND "VmId" = $5 AND "VmUuid" = $6
|
|
`, name, nullString(cluster), snapshotTime.Unix(), vcenter, keyVmID, keyUuid)
|
|
return err
|
|
}
|
|
|
|
func nullString(val sql.NullString) interface{} {
|
|
if val.Valid {
|
|
return val.String
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EnsureVcenterTotalsTable creates the vcenter_totals table if missing.
|
|
func EnsureVcenterTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS vcenter_totals (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"VmCount" BIGINT NOT NULL,
|
|
"VcpuTotal" BIGINT NOT NULL,
|
|
"RamTotalGB" BIGINT NOT NULL
|
|
);`
|
|
default:
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS vcenter_totals (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"VmCount" BIGINT NOT NULL,
|
|
"VcpuTotal" BIGINT NOT NULL,
|
|
"RamTotalGB" BIGINT NOT NULL
|
|
);`
|
|
}
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
indexes := []string{
|
|
`CREATE INDEX IF NOT EXISTS vcenter_totals_vc_time_idx ON vcenter_totals ("Vcenter","SnapshotTime" DESC)`,
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := execLog(ctx, dbConn, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// InsertVcenterTotals records totals for a vcenter at a snapshot time.
|
|
func InsertVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, vmCount, vcpuTotal, ramTotal int64) error {
|
|
if strings.TrimSpace(vcenter) == "" {
|
|
return fmt.Errorf("vcenter is empty")
|
|
}
|
|
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
_, err := execLog(ctx, dbConn, `
|
|
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
|
|
VALUES ($1,$2,$3,$4,$5)
|
|
`, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal)
|
|
return err
|
|
}
|
|
|
|
// ListVcenters returns distinct vcenter URLs tracked.
|
|
func ListVcenters(ctx context.Context, dbConn *sqlx.DB) ([]string, error) {
|
|
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
|
return nil, err
|
|
}
|
|
rows, err := dbConn.QueryxContext(ctx, `SELECT DISTINCT "Vcenter" FROM vcenter_totals ORDER BY "Vcenter"`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []string
|
|
for rows.Next() {
|
|
var v string
|
|
if err := rows.Scan(&v); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, v)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// VcenterTotalRow holds per-snapshot totals for a vcenter.
|
|
type VcenterTotalRow struct {
|
|
SnapshotTime int64 `db:"SnapshotTime"`
|
|
Vcenter string `db:"Vcenter"`
|
|
VmCount int64 `db:"VmCount"`
|
|
VcpuTotal int64 `db:"VcpuTotal"`
|
|
RamTotalGB int64 `db:"RamTotalGB"`
|
|
}
|
|
|
|
// ListVcenterTotals lists totals for a vcenter sorted by snapshot_time desc, limited.
|
|
func ListVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, limit int) ([]VcenterTotalRow, error) {
|
|
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
|
return nil, err
|
|
}
|
|
if limit <= 0 {
|
|
limit = 200
|
|
}
|
|
rows := make([]VcenterTotalRow, 0, limit)
|
|
query := `
|
|
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
|
|
FROM vcenter_totals
|
|
WHERE "Vcenter" = $1
|
|
ORDER BY "SnapshotTime" DESC
|
|
LIMIT $2`
|
|
if err := selectLog(ctx, dbConn, &rows, query, vcenter, limit); err != nil {
|
|
return nil, err
|
|
}
|
|
return rows, nil
|
|
}
|
|
|
|
// ListVcenterTotalsByType returns totals for a vcenter for the requested snapshot type (hourly, daily, monthly).
|
|
// Hourly values come from vcenter_totals; daily/monthly are derived from the summary tables referenced in snapshot_registry.
|
|
func ListVcenterTotalsByType(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotType string, limit int) ([]VcenterTotalRow, error) {
|
|
snapshotType = strings.ToLower(snapshotType)
|
|
if snapshotType == "" {
|
|
snapshotType = "hourly"
|
|
}
|
|
if snapshotType == "hourly" {
|
|
return ListVcenterTotals(ctx, dbConn, vcenter, limit)
|
|
}
|
|
|
|
if limit <= 0 {
|
|
limit = 200
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
query := `
|
|
SELECT table_name, snapshot_time
|
|
FROM snapshot_registry
|
|
WHERE snapshot_type = $1
|
|
ORDER BY snapshot_time DESC
|
|
LIMIT $2
|
|
`
|
|
if driver == "sqlite" {
|
|
query = strings.ReplaceAll(query, "$1", "?")
|
|
query = strings.ReplaceAll(query, "$2", "?")
|
|
}
|
|
|
|
var regRows []struct {
|
|
TableName string `db:"table_name"`
|
|
SnapshotTime int64 `db:"snapshot_time"`
|
|
}
|
|
if err := selectLog(ctx, dbConn, ®Rows, query, snapshotType, limit); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := make([]VcenterTotalRow, 0, len(regRows))
|
|
for _, r := range regRows {
|
|
if err := ValidateTableName(r.TableName); err != nil {
|
|
continue
|
|
}
|
|
agg, err := aggregateSummaryTotals(ctx, dbConn, r.TableName, vcenter)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
out = append(out, VcenterTotalRow{
|
|
SnapshotTime: r.SnapshotTime,
|
|
Vcenter: vcenter,
|
|
VmCount: agg.VmCount,
|
|
VcpuTotal: agg.VcpuTotal,
|
|
RamTotalGB: agg.RamTotalGB,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
type summaryAgg struct {
|
|
VmCount int64 `db:"vm_count"`
|
|
VcpuTotal int64 `db:"vcpu_total"`
|
|
RamTotalGB int64 `db:"ram_total"`
|
|
}
|
|
|
|
// aggregateSummaryTotals computes totals for a single summary table (daily/monthly) for a given vcenter.
|
|
func aggregateSummaryTotals(ctx context.Context, dbConn *sqlx.DB, tableName string, vcenter string) (summaryAgg, error) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return summaryAgg{}, err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
COUNT(1) AS vm_count,
|
|
COALESCE(SUM(COALESCE("AvgVcpuCount","VcpuCount")),0) AS vcpu_total,
|
|
COALESCE(SUM(COALESCE("AvgRamGB","RamGB")),0) AS ram_total
|
|
FROM %s
|
|
WHERE "Vcenter" = $1
|
|
`, tableName)
|
|
if driver == "sqlite" {
|
|
query = strings.ReplaceAll(query, "$1", "?")
|
|
}
|
|
var agg summaryAgg
|
|
if err := getLog(ctx, dbConn, &agg, query, vcenter); err != nil {
|
|
return summaryAgg{}, err
|
|
}
|
|
return agg, nil
|
|
}
|
|
|
|
// VmTraceRow holds snapshot data for a single VM across tables.
|
|
type VmTraceRow struct {
|
|
SnapshotTime int64 `db:"SnapshotTime"`
|
|
Name string `db:"Name"`
|
|
Vcenter string `db:"Vcenter"`
|
|
VmId string `db:"VmId"`
|
|
VmUuid string `db:"VmUuid"`
|
|
ResourcePool string `db:"ResourcePool"`
|
|
VcpuCount int64 `db:"VcpuCount"`
|
|
RamGB int64 `db:"RamGB"`
|
|
ProvisionedDisk float64 `db:"ProvisionedDisk"`
|
|
CreationTime sql.NullInt64 `db:"CreationTime"`
|
|
DeletionTime sql.NullInt64 `db:"DeletionTime"`
|
|
}
|
|
|
|
// VmLifecycle captures observed lifecycle times from hourly snapshots.
|
|
type VmLifecycle struct {
|
|
CreationTime int64
|
|
CreationApprox bool
|
|
FirstSeen int64
|
|
LastSeen int64
|
|
DeletionTime int64
|
|
}
|
|
|
|
// FetchVmTrace returns combined hourly snapshot records for a VM (by id/uuid/name) ordered by snapshot time.
|
|
// To avoid SQLite's UNION term limits, this iterates tables one by one and merges in-memory.
|
|
func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
|
|
var tables []struct {
|
|
TableName string `db:"table_name"`
|
|
SnapshotTime int64 `db:"snapshot_time"`
|
|
}
|
|
if err := selectLog(ctx, dbConn, &tables, `
|
|
SELECT table_name, snapshot_time
|
|
FROM snapshot_registry
|
|
WHERE snapshot_type = 'hourly'
|
|
ORDER BY snapshot_time
|
|
`); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tables) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
rows := make([]VmTraceRow, 0, len(tables))
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
|
|
slog.Debug("vm trace scanning tables", "table_count", len(tables), "vm_id", vmID, "vm_uuid", vmUUID, "name", name)
|
|
|
|
for _, t := range tables {
|
|
if err := ValidateTableName(t.TableName); err != nil {
|
|
slog.Warn("vm trace skipping table (invalid name)", "table", t.TableName, "error", err)
|
|
continue
|
|
}
|
|
query := fmt.Sprintf(`
|
|
SELECT %d AS "SnapshotTime",
|
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
|
|
COALESCE("CreationTime",0) AS "CreationTime",
|
|
COALESCE("DeletionTime",0) AS "DeletionTime"
|
|
FROM %s
|
|
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
|
|
`, t.SnapshotTime, t.TableName)
|
|
args := []interface{}{vmID, vmUUID, name}
|
|
if driver != "sqlite" {
|
|
// convert ? to $1 style for postgres/pgx
|
|
query = strings.Replace(query, "?", "$1", 1)
|
|
query = strings.Replace(query, "?", "$2", 1)
|
|
query = strings.Replace(query, "?", "$3", 1)
|
|
}
|
|
var tmp []VmTraceRow
|
|
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
|
|
slog.Warn("vm trace query failed for table", "table", t.TableName, "error", err)
|
|
continue
|
|
}
|
|
slog.Debug("vm trace table rows", "table", t.TableName, "snapshot_time", t.SnapshotTime, "rows", len(tmp))
|
|
rows = append(rows, tmp...)
|
|
}
|
|
|
|
sort.Slice(rows, func(i, j int) bool {
|
|
return rows[i].SnapshotTime < rows[j].SnapshotTime
|
|
})
|
|
slog.Info("vm trace combined rows", "total_rows", len(rows))
|
|
return rows, nil
|
|
}
|
|
|
|
// FetchVmLifecycle walks hourly snapshots to determine lifecycle bounds for a VM.
|
|
func FetchVmLifecycle(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, error) {
|
|
var lifecycle VmLifecycle
|
|
var tables []struct {
|
|
TableName string `db:"table_name"`
|
|
SnapshotTime int64 `db:"snapshot_time"`
|
|
}
|
|
if err := selectLog(ctx, dbConn, &tables, `
|
|
SELECT table_name, snapshot_time
|
|
FROM snapshot_registry
|
|
WHERE snapshot_type = 'hourly'
|
|
ORDER BY snapshot_time
|
|
`); err != nil {
|
|
return lifecycle, err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
|
|
minCreation := int64(0)
|
|
consecutiveMissing := 0
|
|
for _, t := range tables {
|
|
if err := ValidateTableName(t.TableName); err != nil {
|
|
continue
|
|
}
|
|
// Probe this table for the VM.
|
|
query := fmt.Sprintf(`
|
|
SELECT MIN(NULLIF("CreationTime",0)) AS min_creation, COUNT(1) AS cnt
|
|
FROM %s
|
|
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
|
|
`, t.TableName)
|
|
args := []interface{}{vmID, vmUUID, name}
|
|
if driver != "sqlite" {
|
|
query = strings.Replace(query, "?", "$1", 1)
|
|
query = strings.Replace(query, "?", "$2", 1)
|
|
query = strings.Replace(query, "?", "$3", 1)
|
|
}
|
|
var probe struct {
|
|
MinCreation sql.NullInt64 `db:"min_creation"`
|
|
Cnt int64 `db:"cnt"`
|
|
}
|
|
if err := getLog(ctx, dbConn, &probe, query, args...); err != nil {
|
|
continue
|
|
}
|
|
if probe.Cnt > 0 {
|
|
if lifecycle.FirstSeen == 0 {
|
|
lifecycle.FirstSeen = t.SnapshotTime
|
|
}
|
|
lifecycle.LastSeen = t.SnapshotTime
|
|
consecutiveMissing = 0
|
|
if probe.MinCreation.Valid {
|
|
if minCreation == 0 || probe.MinCreation.Int64 < minCreation {
|
|
minCreation = probe.MinCreation.Int64
|
|
}
|
|
}
|
|
} else if lifecycle.LastSeen > 0 && lifecycle.DeletionTime == 0 && t.SnapshotTime > lifecycle.LastSeen {
|
|
consecutiveMissing++
|
|
if consecutiveMissing >= 2 {
|
|
lifecycle.DeletionTime = t.SnapshotTime
|
|
break
|
|
}
|
|
} else {
|
|
// reset if we haven't seen the VM yet
|
|
consecutiveMissing = 0
|
|
}
|
|
}
|
|
if minCreation > 0 {
|
|
lifecycle.CreationTime = minCreation
|
|
lifecycle.CreationApprox = false
|
|
} else if lifecycle.FirstSeen > 0 {
|
|
lifecycle.CreationTime = lifecycle.FirstSeen
|
|
lifecycle.CreationApprox = true
|
|
}
|
|
return lifecycle, nil
|
|
}
|
|
|
|
// SyncVcenterTotalsFromSnapshots backfills vcenter_totals using hourly snapshot tables in snapshot_registry.
|
|
func SyncVcenterTotalsFromSnapshots(ctx context.Context, dbConn *sqlx.DB) error {
|
|
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var hourlyTables []struct {
|
|
TableName string `db:"table_name"`
|
|
SnapshotTime int64 `db:"snapshot_time"`
|
|
}
|
|
if err := selectLog(ctx, dbConn, &hourlyTables, `
|
|
SELECT table_name, snapshot_time
|
|
FROM snapshot_registry
|
|
WHERE snapshot_type = 'hourly'
|
|
ORDER BY snapshot_time
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
for _, ht := range hourlyTables {
|
|
if err := ValidateTableName(ht.TableName); err != nil {
|
|
continue
|
|
}
|
|
// Aggregate per vcenter from the snapshot table.
|
|
query := fmt.Sprintf(`
|
|
SELECT "Vcenter" AS vcenter,
|
|
COUNT(1) AS vm_count,
|
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total
|
|
FROM %s
|
|
GROUP BY "Vcenter"
|
|
`, ht.TableName)
|
|
type aggRow struct {
|
|
Vcenter string `db:"vcenter"`
|
|
VmCount int64 `db:"vm_count"`
|
|
VcpuTotal int64 `db:"vcpu_total"`
|
|
RamTotal int64 `db:"ram_total"`
|
|
}
|
|
var aggs []aggRow
|
|
if err := selectLog(ctx, dbConn, &aggs, query); err != nil {
|
|
continue
|
|
}
|
|
for _, a := range aggs {
|
|
// Insert if missing.
|
|
insert := `
|
|
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
|
|
SELECT $1,$2,$3,$4,$5
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM vcenter_totals WHERE "Vcenter" = $1 AND "SnapshotTime" = $2
|
|
)
|
|
`
|
|
if driver == "sqlite" {
|
|
insert = strings.ReplaceAll(insert, "$", "?")
|
|
}
|
|
if _, err := execLog(ctx, dbConn, insert, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
|
|
slog.Warn("failed to backfill vcenter_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AnalyzeTableIfPostgres runs ANALYZE on a table to refresh planner stats.
|
|
func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
if driver != "pgx" && driver != "postgres" {
|
|
return
|
|
}
|
|
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`ANALYZE %s`, tableName)); err != nil {
|
|
slog.Warn("failed to ANALYZE table", "table", tableName, "error", err)
|
|
}
|
|
}
|
|
|
|
// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers.
|
|
func SetPostgresWorkMem(ctx context.Context, dbConn *sqlx.DB, workMemMB int) {
|
|
if workMemMB <= 0 {
|
|
return
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
if driver != "pgx" && driver != "postgres" {
|
|
return
|
|
}
|
|
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`SET LOCAL work_mem = '%dMB'`, workMemMB)); err != nil {
|
|
slog.Warn("failed to set work_mem", "work_mem_mb", workMemMB, "error", err)
|
|
}
|
|
}
|
|
|
|
// CheckMigrationState ensures goose migrations are present and not dirty.
|
|
func CheckMigrationState(ctx context.Context, dbConn *sqlx.DB) error {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var tableExists bool
|
|
switch driver {
|
|
case "sqlite":
|
|
err := getLog(ctx, dbConn, &tableExists, `
|
|
SELECT COUNT(1) > 0 FROM sqlite_master WHERE type='table' AND name='goose_db_version'
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case "pgx", "postgres":
|
|
err := getLog(ctx, dbConn, &tableExists, `
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'goose_db_version'
|
|
)
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return fmt.Errorf("unsupported driver for migration check: %s", driver)
|
|
}
|
|
|
|
if !tableExists {
|
|
return fmt.Errorf("goose_db_version table not found; database migrations may not be applied")
|
|
}
|
|
|
|
var dirty bool
|
|
err := getLog(ctx, dbConn, &dirty, `
|
|
SELECT NOT is_applied
|
|
FROM goose_db_version
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dirty {
|
|
return fmt.Errorf("database migrations are in a dirty state; please resolve goose_db_version")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BuildDailySummaryInsert returns the SQL to aggregate hourly snapshots into a daily summary table.
|
|
func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return "", err
|
|
}
|
|
insert := fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%s
|
|
), totals AS (
|
|
SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots
|
|
), agg AS (
|
|
SELECT
|
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion,
|
|
MAX(COALESCE(inv."DeletionTime", 0)) AS inv_deletion,
|
|
MIN(s."SnapshotTime") AS first_present,
|
|
MAX(s."SnapshotTime") AS last_present,
|
|
COUNT(*) AS samples_present,
|
|
s."Datacenter", s."Cluster", s."Folder",
|
|
AVG(COALESCE(s."ProvisionedDisk",0)) AS avg_disk,
|
|
AVG(COALESCE(s."VcpuCount",0)) AS avg_vcpu_raw,
|
|
AVG(COALESCE(s."RamGB",0)) AS avg_ram_raw,
|
|
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
|
|
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
|
|
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
|
|
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits
|
|
FROM snapshots s
|
|
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
|
|
GROUP BY
|
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
|
s."Datacenter", s."Cluster", s."Folder",
|
|
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
|
|
)
|
|
INSERT INTO %s (
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
|
|
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
|
|
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
|
|
"Tin", "Bronze", "Silver", "Gold"
|
|
)
|
|
SELECT
|
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
|
COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime",
|
|
CASE
|
|
WHEN NULLIF(agg.inv_deletion, 0) IS NOT NULL THEN NULLIF(agg.inv_deletion, 0)
|
|
WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(
|
|
NULLIF(agg.any_deletion, 0),
|
|
(SELECT MIN(s2."SnapshotTime") FROM snapshots s2 WHERE s2."SnapshotTime" > agg.last_present),
|
|
totals.max_snapshot,
|
|
agg.last_present
|
|
)
|
|
ELSE NULLIF(agg.any_deletion, 0)
|
|
END AS "DeletionTime",
|
|
(
|
|
SELECT s2."ResourcePool"
|
|
FROM snapshots s2
|
|
WHERE s2."VmId" = agg."VmId"
|
|
AND s2."Vcenter" = agg."Vcenter"
|
|
ORDER BY s2."SnapshotTime" DESC
|
|
LIMIT 1
|
|
) AS "ResourcePool",
|
|
agg."Datacenter", agg."Cluster", agg."Folder",
|
|
(
|
|
SELECT s2."ProvisionedDisk"
|
|
FROM snapshots s2
|
|
WHERE s2."VmId" = agg."VmId"
|
|
AND s2."Vcenter" = agg."Vcenter"
|
|
ORDER BY s2."SnapshotTime" DESC
|
|
LIMIT 1
|
|
) AS "ProvisionedDisk",
|
|
(
|
|
SELECT s2."VcpuCount"
|
|
FROM snapshots s2
|
|
WHERE s2."VmId" = agg."VmId"
|
|
AND s2."Vcenter" = agg."Vcenter"
|
|
ORDER BY s2."SnapshotTime" DESC
|
|
LIMIT 1
|
|
) AS "VcpuCount",
|
|
(
|
|
SELECT s2."RamGB"
|
|
FROM snapshots s2
|
|
WHERE s2."VmId" = agg."VmId"
|
|
AND s2."Vcenter" = agg."Vcenter"
|
|
ORDER BY s2."SnapshotTime" DESC
|
|
LIMIT 1
|
|
) AS "RamGB",
|
|
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
|
agg.last_present AS "SnapshotTime",
|
|
agg.samples_present AS "SamplesPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.sum_vcpu / totals.total_samples
|
|
ELSE NULL END AS "AvgVcpuCount",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.sum_ram / totals.total_samples
|
|
ELSE NULL END AS "AvgRamGB",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 1.0 * agg.sum_disk / agg.samples_present
|
|
ELSE NULL END AS "AvgProvisionedDisk",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.samples_present / totals.total_samples
|
|
ELSE NULL END AS "AvgIsPresent",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.tin_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolTinPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.bronze_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolBronzePct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.silver_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolSilverPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.gold_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolGoldPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.tin_hits / agg.samples_present
|
|
ELSE NULL END AS "Tin",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.bronze_hits / agg.samples_present
|
|
ELSE NULL END AS "Bronze",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.silver_hits / agg.samples_present
|
|
ELSE NULL END AS "Silver",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.gold_hits / agg.samples_present
|
|
ELSE NULL END AS "Gold"
|
|
FROM agg
|
|
CROSS JOIN totals
|
|
GROUP BY
|
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
|
agg."Datacenter", agg."Cluster", agg."Folder",
|
|
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
|
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
|
totals.total_samples;
|
|
`, unionQuery, tableName)
|
|
return insert, nil
|
|
}
|
|
|
|
// RefineCreationDeletionFromUnion walks all snapshot rows in a period and tightens CreationTime/DeletionTime
|
|
// by using the first and last observed samples and the first sample after disappearance.
|
|
func RefineCreationDeletionFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
|
|
if unionQuery == "" {
|
|
return fmt.Errorf("union query is empty")
|
|
}
|
|
if _, err := SafeTableName(summaryTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var sql string
|
|
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%s
|
|
), timeline AS (
|
|
SELECT
|
|
s."VmId",
|
|
s."VmUuid",
|
|
s."Name",
|
|
s."Vcenter",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MIN(s."SnapshotTime") AS first_seen,
|
|
MAX(s."SnapshotTime") AS last_seen
|
|
FROM snapshots s
|
|
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
|
|
)
|
|
UPDATE %s dst
|
|
SET
|
|
"CreationTime" = CASE
|
|
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.any_creation), t.any_creation)
|
|
WHEN t.first_seen IS NOT NULL THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.first_seen), t.first_seen)
|
|
ELSE dst."CreationTime"
|
|
END,
|
|
"DeletionTime" = CASE
|
|
WHEN t_last_after IS NOT NULL
|
|
AND (dst."DeletionTime" IS NULL OR dst."DeletionTime" = 0 OR t_last_after < dst."DeletionTime")
|
|
THEN t_last_after
|
|
ELSE dst."DeletionTime"
|
|
END
|
|
FROM (
|
|
SELECT
|
|
tl.*,
|
|
(
|
|
SELECT MIN(s2."SnapshotTime")
|
|
FROM snapshots s2
|
|
WHERE s2."Vcenter" = tl."Vcenter"
|
|
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
|
|
AND s2."SnapshotTime" > tl.last_seen
|
|
) AS t_last_after
|
|
FROM timeline tl
|
|
) t
|
|
WHERE dst."Vcenter" = t."Vcenter"
|
|
AND (
|
|
(dst."VmId" IS NOT DISTINCT FROM t."VmId")
|
|
OR (dst."VmUuid" IS NOT DISTINCT FROM t."VmUuid")
|
|
OR (dst."Name" IS NOT DISTINCT FROM t."Name")
|
|
);
|
|
`, unionQuery, summaryTable)
|
|
default:
|
|
// SQLite variant (no FROM in UPDATE, no IS NOT DISTINCT FROM). Uses positional args to avoid placeholder count issues.
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%[1]s
|
|
), timeline AS (
|
|
SELECT
|
|
s."VmId",
|
|
s."VmUuid",
|
|
s."Name",
|
|
s."Vcenter",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MIN(s."SnapshotTime") AS first_seen,
|
|
MAX(s."SnapshotTime") AS last_seen
|
|
FROM snapshots s
|
|
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
|
|
), enriched AS (
|
|
SELECT
|
|
tl.*,
|
|
(
|
|
SELECT MIN(s2."SnapshotTime")
|
|
FROM snapshots s2
|
|
WHERE s2."Vcenter" = tl."Vcenter"
|
|
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
|
|
AND s2."SnapshotTime" > tl.last_seen
|
|
) AS first_after
|
|
FROM timeline tl
|
|
)
|
|
UPDATE %[2]s
|
|
SET
|
|
"CreationTime" = COALESCE(
|
|
(
|
|
SELECT CASE
|
|
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.any_creation) > t.any_creation THEN t.any_creation
|
|
WHEN t.any_creation IS NULL AND t.first_seen IS NOT NULL AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.first_seen) > t.first_seen THEN t.first_seen
|
|
ELSE NULL
|
|
END
|
|
FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
LIMIT 1
|
|
),
|
|
"CreationTime"
|
|
),
|
|
"DeletionTime" = COALESCE(
|
|
(
|
|
SELECT t.first_after
|
|
FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
AND t.first_after IS NOT NULL
|
|
AND ("DeletionTime" IS NULL OR "DeletionTime" = 0 OR t.first_after < "DeletionTime")
|
|
LIMIT 1
|
|
),
|
|
"DeletionTime"
|
|
)
|
|
WHERE EXISTS (
|
|
SELECT 1 FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
);
|
|
`, unionQuery, summaryTable)
|
|
}
|
|
|
|
_, err := execLog(ctx, dbConn, sql)
|
|
return err
|
|
}
|
|
|
|
// BuildMonthlySummaryInsert returns the SQL to aggregate daily summaries into a monthly summary table.
|
|
func BuildMonthlySummaryInsert(tableName string, unionQuery string) (string, error) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return "", err
|
|
}
|
|
insert := fmt.Sprintf(`
|
|
WITH daily AS (
|
|
%s
|
|
), enriched AS (
|
|
SELECT
|
|
d.*,
|
|
CASE
|
|
WHEN d."AvgIsPresent" IS NOT NULL AND d."AvgIsPresent" > 0 THEN d."SamplesPresent" / d."AvgIsPresent"
|
|
ELSE CAST(d."SamplesPresent" AS REAL)
|
|
END AS total_samples_day
|
|
FROM daily d
|
|
), totals AS (
|
|
SELECT COALESCE(SUM(total_samples_day), 0) AS total_samples FROM enriched
|
|
)
|
|
-- monthly averages are weighted by the implied sample counts per day (SamplesPresent / AvgIsPresent)
|
|
INSERT INTO %s (
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
|
|
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
|
|
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
|
|
"Tin", "Bronze", "Silver", "Gold"
|
|
)
|
|
SELECT
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
|
|
COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime",
|
|
NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime",
|
|
MAX("ResourcePool") AS "ResourcePool",
|
|
"Datacenter", "Cluster", "Folder",
|
|
MAX("ProvisionedDisk") AS "ProvisionedDisk",
|
|
MAX("VcpuCount") AS "VcpuCount",
|
|
MAX("RamGB") AS "RamGB",
|
|
"IsTemplate",
|
|
MAX("PoweredOn") AS "PoweredOn",
|
|
"SrmPlaceholder", "VmUuid",
|
|
SUM("SamplesPresent") AS "SamplesPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgVcpuCount",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgRamGB" IS NOT NULL THEN "AvgRamGB" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgRamGB",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgProvisionedDisk" IS NOT NULL THEN "AvgProvisionedDisk" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgProvisionedDisk",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM("SamplesPresent") * 1.0 / totals.total_samples
|
|
ELSE NULL END AS "AvgIsPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolTinPct" IS NOT NULL THEN ("PoolTinPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolTinPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolBronzePct" IS NOT NULL THEN ("PoolBronzePct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolBronzePct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolSilverPct" IS NOT NULL THEN ("PoolSilverPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolSilverPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolGoldPct" IS NOT NULL THEN ("PoolGoldPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolGoldPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Tin" IS NOT NULL THEN ("Tin" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Tin",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Bronze" IS NOT NULL THEN ("Bronze" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Bronze",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Silver" IS NOT NULL THEN ("Silver" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Silver",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Gold" IS NOT NULL THEN ("Gold" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Gold"
|
|
FROM enriched
|
|
CROSS JOIN totals
|
|
GROUP BY
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
|
|
"Datacenter", "Cluster", "Folder",
|
|
"IsTemplate", "SrmPlaceholder", "VmUuid";
|
|
`, unionQuery, tableName)
|
|
return insert, nil
|
|
}
|
|
|
|
// EnsureSummaryTable creates a daily/monthly summary table with the standard schema if it does not exist.
|
|
func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SamplesPresent" BIGINT NOT NULL,
|
|
"AvgVcpuCount" REAL,
|
|
"AvgRamGB" REAL,
|
|
"AvgProvisionedDisk" REAL,
|
|
"AvgIsPresent" REAL,
|
|
"PoolTinPct" REAL,
|
|
"PoolBronzePct" REAL,
|
|
"PoolSilverPct" REAL,
|
|
"PoolGoldPct" REAL,
|
|
"SnapshotTime" BIGINT,
|
|
"Tin" REAL,
|
|
"Bronze" REAL,
|
|
"Silver" REAL,
|
|
"Gold" REAL
|
|
);`, tableName)
|
|
default:
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SamplesPresent" BIGINT NOT NULL,
|
|
"AvgVcpuCount" REAL,
|
|
"AvgRamGB" REAL,
|
|
"AvgProvisionedDisk" REAL,
|
|
"AvgIsPresent" REAL,
|
|
"PoolTinPct" REAL,
|
|
"PoolBronzePct" REAL,
|
|
"PoolSilverPct" REAL,
|
|
"PoolGoldPct" REAL,
|
|
"SnapshotTime" BIGINT,
|
|
"Tin" REAL,
|
|
"Bronze" REAL,
|
|
"Silver" REAL,
|
|
"Gold" REAL
|
|
);`, tableName)
|
|
}
|
|
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Best-effort: drop legacy IsPresent column if it exists.
|
|
if hasIsPresent, err := ColumnExists(ctx, dbConn, tableName, "IsPresent"); err == nil && hasIsPresent {
|
|
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, tableName))
|
|
}
|
|
// Ensure SnapshotTime exists for lifecycle refinement.
|
|
if hasSnapshot, err := ColumnExists(ctx, dbConn, tableName, "SnapshotTime"); err == nil && !hasSnapshot {
|
|
_, _ = execLog(ctx, dbConn, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "SnapshotTime" BIGINT`, tableName))
|
|
}
|
|
|
|
indexes := []string{
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
|
|
}
|
|
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
|
|
indexes = append(indexes,
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_idx ON %s ("Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
|
|
)
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := execLog(ctx, dbConn, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BackfillSnapshotTimeFromUnion sets SnapshotTime in a summary table using the max snapshot time per VM from a union query.
|
|
func BackfillSnapshotTimeFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
|
|
if unionQuery == "" {
|
|
return fmt.Errorf("union query is empty")
|
|
}
|
|
if _, err := SafeTableName(summaryTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var sql string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%s
|
|
)
|
|
UPDATE %s dst
|
|
SET "SnapshotTime" = sub.max_time
|
|
FROM (
|
|
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
|
|
FROM snapshots s
|
|
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
|
|
) sub
|
|
WHERE (dst."SnapshotTime" IS NULL OR dst."SnapshotTime" = 0)
|
|
AND dst."Vcenter" = sub."Vcenter"
|
|
AND (
|
|
(dst."VmId" IS NOT DISTINCT FROM sub."VmId")
|
|
OR (dst."VmUuid" IS NOT DISTINCT FROM sub."VmUuid")
|
|
OR (dst."Name" IS NOT DISTINCT FROM sub."Name")
|
|
);
|
|
`, unionQuery, summaryTable)
|
|
default:
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%[1]s
|
|
), grouped AS (
|
|
SELECT s."Vcenter", s."VmId", s."VmUuid", s."Name", MAX(s."SnapshotTime") AS max_time
|
|
FROM snapshots s
|
|
GROUP BY s."Vcenter", s."VmId", s."VmUuid", s."Name"
|
|
)
|
|
UPDATE %[2]s
|
|
SET "SnapshotTime" = (
|
|
SELECT max_time FROM grouped g
|
|
WHERE %[2]s."Vcenter" = g."Vcenter"
|
|
AND (
|
|
(%[2]s."VmId" IS NOT NULL AND g."VmId" IS NOT NULL AND %[2]s."VmId" = g."VmId")
|
|
OR (%[2]s."VmUuid" IS NOT NULL AND g."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = g."VmUuid")
|
|
OR (%[2]s."Name" IS NOT NULL AND g."Name" IS NOT NULL AND %[2]s."Name" = g."Name")
|
|
)
|
|
)
|
|
WHERE "SnapshotTime" IS NULL OR "SnapshotTime" = 0;
|
|
`, unionQuery, summaryTable)
|
|
}
|
|
_, err := execLog(ctx, dbConn, sql)
|
|
return err
|
|
}
|
|
|
|
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
|
|
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
|
|
ddl := `
|
|
CREATE TABLE IF NOT EXISTS snapshot_runs (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"Attempts" INTEGER NOT NULL DEFAULT 0,
|
|
"Success" TEXT NOT NULL DEFAULT 'FALSE',
|
|
"LastError" TEXT,
|
|
"LastAttempt" BIGINT NOT NULL
|
|
);
|
|
`
|
|
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS snapshot_runs (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"Attempts" INTEGER NOT NULL DEFAULT 0,
|
|
"Success" TEXT NOT NULL DEFAULT 'FALSE',
|
|
"LastError" TEXT,
|
|
"LastAttempt" BIGINT NOT NULL
|
|
);
|
|
`
|
|
}
|
|
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
|
return err
|
|
}
|
|
indexes := []string{
|
|
`CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`,
|
|
`CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`,
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := execLog(ctx, dbConn, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpsertSnapshotRun updates or inserts snapshot run status.
|
|
func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error {
|
|
if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
successStr := "FALSE"
|
|
if success {
|
|
successStr = "TRUE"
|
|
}
|
|
now := time.Now().Unix()
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
_, err := execLog(ctx, dbConn, `
|
|
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
|
|
VALUES (?, ?, 1, ?, ?, ?)
|
|
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
|
|
"Attempts" = snapshot_runs."Attempts" + 1,
|
|
"Success" = excluded."Success",
|
|
"LastError" = excluded."LastError",
|
|
"LastAttempt" = excluded."LastAttempt"
|
|
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
|
|
return err
|
|
case "pgx", "postgres":
|
|
_, err := execLog(ctx, dbConn, `
|
|
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
|
|
VALUES ($1, $2, 1, $3, $4, $5)
|
|
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
|
|
"Attempts" = snapshot_runs."Attempts" + 1,
|
|
"Success" = EXCLUDED."Success",
|
|
"LastError" = EXCLUDED."LastError",
|
|
"LastAttempt" = EXCLUDED."LastAttempt"
|
|
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
|
|
return err
|
|
default:
|
|
return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver)
|
|
}
|
|
}
|
|
|
|
// ListFailedSnapshotRuns returns vcenter/time pairs needing retry.
|
|
func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}, error) {
|
|
if maxAttempts <= 0 {
|
|
maxAttempts = 3
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
query := `
|
|
SELECT "Vcenter","SnapshotTime","Attempts"
|
|
FROM snapshot_runs
|
|
WHERE "Success" = 'FALSE' AND "Attempts" < ?
|
|
ORDER BY "LastAttempt" ASC
|
|
`
|
|
args := []interface{}{maxAttempts}
|
|
if driver == "pgx" || driver == "postgres" {
|
|
query = `
|
|
SELECT "Vcenter","SnapshotTime","Attempts"
|
|
FROM snapshot_runs
|
|
WHERE "Success" = 'FALSE' AND "Attempts" < $1
|
|
ORDER BY "LastAttempt" ASC
|
|
`
|
|
}
|
|
type row struct {
|
|
Vcenter string `db:"Vcenter"`
|
|
SnapshotTime int64 `db:"SnapshotTime"`
|
|
Attempts int `db:"Attempts"`
|
|
}
|
|
rows := []row{}
|
|
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
|
|
return nil, err
|
|
}
|
|
results := make([]struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}, 0, len(rows))
|
|
for _, r := range rows {
|
|
results = append(results, struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts})
|
|
}
|
|
return results, nil
|
|
}
|