All checks were successful
continuous-integration/drone/push Build is passing
1000 lines
33 KiB
Go
1000 lines
33 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"vctp/db/queries"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
// SnapshotTotals summarizes counts and allocations for snapshot tables.
|
|
type SnapshotTotals struct {
|
|
VmCount int64 `db:"vm_count"`
|
|
VcpuTotal int64 `db:"vcpu_total"`
|
|
RamTotal int64 `db:"ram_total"`
|
|
DiskTotal float64 `db:"disk_total"`
|
|
}
|
|
|
|
type ColumnDef struct {
|
|
Name string
|
|
Type string
|
|
}
|
|
|
|
// TableRowCount returns COUNT(*) for a table.
|
|
func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, error) {
|
|
if err := ValidateTableName(table); err != nil {
|
|
return 0, err
|
|
}
|
|
var count int64
|
|
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)
|
|
if err := dbConn.GetContext(ctx, &count, query); err != nil {
|
|
return 0, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// EnsureColumns adds the provided columns to a table if they are missing.
|
|
func EnsureColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []ColumnDef) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
for _, column := range columns {
|
|
if err := AddColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddColumnIfMissing performs a best-effort ALTER TABLE to add a column, ignoring "already exists".
|
|
func AddColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column ColumnDef) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
|
|
if _, err := dbConn.ExecContext(ctx, query); err != nil {
|
|
errText := strings.ToLower(err.Error())
|
|
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ValidateTableName ensures table identifiers are safe for interpolation.
|
|
func ValidateTableName(name string) error {
|
|
if name == "" {
|
|
return fmt.Errorf("table name is empty")
|
|
}
|
|
for _, r := range name {
|
|
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
|
|
continue
|
|
}
|
|
return fmt.Errorf("invalid table name: %s", name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SafeTableName returns the name if it passes validation.
|
|
func SafeTableName(name string) (string, error) {
|
|
if err := ValidateTableName(name); err != nil {
|
|
return "", err
|
|
}
|
|
return name, nil
|
|
}
|
|
|
|
// TableHasRows returns true when a table contains at least one row.
|
|
func TableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
|
|
if err := ValidateTableName(table); err != nil {
|
|
return false, err
|
|
}
|
|
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
|
|
var exists int
|
|
if err := dbConn.GetContext(ctx, &exists, query); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// TableExists checks if a table exists in the current schema.
|
|
func TableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
q := queries.New(dbConn)
|
|
count, err := q.SqliteTableExists(ctx, sql.NullString{String: table, Valid: table != ""})
|
|
return err == nil && count > 0
|
|
case "pgx", "postgres":
|
|
var count int
|
|
err := dbConn.GetContext(ctx, &count, `
|
|
SELECT COUNT(1)
|
|
FROM pg_catalog.pg_tables
|
|
WHERE schemaname = 'public' AND tablename = $1
|
|
`, table)
|
|
return err == nil && count > 0
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// ColumnExists checks if a column exists in a table.
|
|
func ColumnExists(ctx context.Context, dbConn *sqlx.DB, tableName string, columnName string) (bool, error) {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return false, err
|
|
}
|
|
query := fmt.Sprintf(`PRAGMA table_info("%s")`, tableName)
|
|
rows, err := dbConn.QueryxContext(ctx, query)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var (
|
|
cid int
|
|
name string
|
|
colType string
|
|
notNull int
|
|
defaultVal sql.NullString
|
|
pk int
|
|
)
|
|
if err := rows.Scan(&cid, &name, &colType, ¬Null, &defaultVal, &pk); err != nil {
|
|
return false, err
|
|
}
|
|
if strings.EqualFold(name, columnName) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, rows.Err()
|
|
case "pgx", "postgres":
|
|
var count int
|
|
err := dbConn.GetContext(ctx, &count, `
|
|
SELECT COUNT(1)
|
|
FROM information_schema.columns
|
|
WHERE table_name = $1 AND column_name = $2
|
|
`, tableName, strings.ToLower(columnName))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return count > 0, nil
|
|
default:
|
|
return false, fmt.Errorf("unsupported driver for column lookup: %s", driver)
|
|
}
|
|
}
|
|
|
|
// SnapshotTotalsForTable returns totals for a snapshot table.
|
|
func SnapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (SnapshotTotals, error) {
|
|
if _, err := SafeTableName(table); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
COUNT(DISTINCT "VmId") AS vm_count,
|
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
|
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
|
FROM %s
|
|
`, table)
|
|
|
|
var totals SnapshotTotals
|
|
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
return totals, nil
|
|
}
|
|
|
|
// SnapshotTotalsForUnion returns totals for a union query of snapshots.
|
|
func SnapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (SnapshotTotals, error) {
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
COUNT(DISTINCT "VmId") AS vm_count,
|
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
|
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
|
|
FROM (
|
|
%s
|
|
) snapshots
|
|
`, unionQuery)
|
|
|
|
var totals SnapshotTotals
|
|
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
|
|
return SnapshotTotals{}, err
|
|
}
|
|
return totals, nil
|
|
}
|
|
|
|
// EnsureSnapshotTable creates a snapshot table with the standard schema if it does not exist.
|
|
func EnsureSnapshotTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
);`, tableName)
|
|
default:
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SnapshotTime" BIGINT NOT NULL
|
|
);`, tableName)
|
|
}
|
|
|
|
_, err := dbConn.ExecContext(ctx, ddl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return EnsureSnapshotIndexes(ctx, dbConn, tableName)
|
|
}
|
|
|
|
// EnsureSnapshotIndexes creates the standard indexes for a snapshot table.
|
|
func EnsureSnapshotIndexes(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
indexes := []string{
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_snapshottime_idx ON %s ("SnapshotTime")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
|
|
}
|
|
// PG-specific helpful indexes; safe no-ops on SQLite if executed, but keep them gated to reduce file bloat.
|
|
if driver == "pgx" || driver == "postgres" {
|
|
indexes = append(indexes,
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_snapshottime_idx ON %s ("Vcenter","SnapshotTime")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_vcenter_idx ON %s ("Name","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
|
|
)
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := dbConn.ExecContext(ctx, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BackfillSerialColumn sets missing values in a serial-like column for Postgres tables.
|
|
func BackfillSerialColumn(ctx context.Context, dbConn *sqlx.DB, tableName, columnName string) error {
|
|
if err := ValidateTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
if columnName == "" {
|
|
return fmt.Errorf("column name is empty")
|
|
}
|
|
query := fmt.Sprintf(
|
|
`UPDATE %s SET "%s" = nextval(pg_get_serial_sequence('%s','%s')) WHERE "%s" IS NULL`,
|
|
tableName, columnName, tableName, columnName, columnName,
|
|
)
|
|
_, err := dbConn.ExecContext(ctx, query)
|
|
if err != nil {
|
|
errText := strings.ToLower(err.Error())
|
|
if strings.Contains(errText, "pg_get_serial_sequence") || strings.Contains(errText, "sequence") {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ApplySQLiteTuning applies lightweight WAL/synchronous tweaks for better concurrency in non-prod contexts.
|
|
func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) {
|
|
if strings.ToLower(dbConn.DriverName()) != "sqlite" {
|
|
return
|
|
}
|
|
// Best-effort pragmas; ignore errors to stay safe in constrained environments.
|
|
pragmas := []string{
|
|
`PRAGMA journal_mode=WAL;`,
|
|
`PRAGMA synchronous=NORMAL;`,
|
|
`PRAGMA temp_store=MEMORY;`,
|
|
}
|
|
for _, pragma := range pragmas {
|
|
_, _ = dbConn.ExecContext(ctx, pragma)
|
|
}
|
|
}
|
|
|
|
// AnalyzeTableIfPostgres runs ANALYZE on a table to refresh planner stats.
|
|
func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
if driver != "pgx" && driver != "postgres" {
|
|
return
|
|
}
|
|
_, _ = dbConn.ExecContext(ctx, fmt.Sprintf(`ANALYZE %s`, tableName))
|
|
}
|
|
|
|
// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers.
|
|
func SetPostgresWorkMem(ctx context.Context, dbConn *sqlx.DB, workMemMB int) {
|
|
if workMemMB <= 0 {
|
|
return
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
if driver != "pgx" && driver != "postgres" {
|
|
return
|
|
}
|
|
_, _ = dbConn.ExecContext(ctx, fmt.Sprintf(`SET LOCAL work_mem = '%dMB'`, workMemMB))
|
|
}
|
|
|
|
// CheckMigrationState ensures goose migrations are present and not dirty.
|
|
func CheckMigrationState(ctx context.Context, dbConn *sqlx.DB) error {
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var tableExists bool
|
|
switch driver {
|
|
case "sqlite":
|
|
err := dbConn.GetContext(ctx, &tableExists, `
|
|
SELECT COUNT(1) > 0 FROM sqlite_master WHERE type='table' AND name='goose_db_version'
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case "pgx", "postgres":
|
|
err := dbConn.GetContext(ctx, &tableExists, `
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'goose_db_version'
|
|
)
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return fmt.Errorf("unsupported driver for migration check: %s", driver)
|
|
}
|
|
|
|
if !tableExists {
|
|
return fmt.Errorf("goose_db_version table not found; database migrations may not be applied")
|
|
}
|
|
|
|
var dirty bool
|
|
err := dbConn.GetContext(ctx, &dirty, `
|
|
SELECT NOT is_applied
|
|
FROM goose_db_version
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dirty {
|
|
return fmt.Errorf("database migrations are in a dirty state; please resolve goose_db_version")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BuildDailySummaryInsert returns the SQL to aggregate hourly snapshots into a daily summary table.
|
|
func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return "", err
|
|
}
|
|
insert := fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%s
|
|
), totals AS (
|
|
SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots
|
|
), agg AS (
|
|
SELECT
|
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MAX(NULLIF(s."DeletionTime", 0)) AS any_deletion,
|
|
MAX(COALESCE(inv."DeletionTime", 0)) AS inv_deletion,
|
|
MIN(s."SnapshotTime") AS first_present,
|
|
MAX(s."SnapshotTime") AS last_present,
|
|
COUNT(*) AS samples_present,
|
|
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
|
|
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
|
|
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
|
|
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
|
|
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'tin' THEN 1 ELSE 0 END) AS tin_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'bronze' THEN 1 ELSE 0 END) AS bronze_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'silver' THEN 1 ELSE 0 END) AS silver_hits,
|
|
SUM(CASE WHEN LOWER(s."ResourcePool") = 'gold' THEN 1 ELSE 0 END) AS gold_hits
|
|
FROM snapshots s
|
|
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
|
|
GROUP BY
|
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
|
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
|
|
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
|
|
)
|
|
INSERT INTO %s (
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
|
|
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
|
|
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
|
|
"Tin", "Bronze", "Silver", "Gold"
|
|
)
|
|
SELECT
|
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
|
COALESCE(agg.any_creation, agg.first_present, 0) AS "CreationTime",
|
|
CASE
|
|
WHEN NULLIF(agg.inv_deletion, 0) IS NOT NULL THEN NULLIF(agg.inv_deletion, 0)
|
|
WHEN totals.max_snapshot IS NOT NULL AND agg.last_present < totals.max_snapshot THEN COALESCE(
|
|
NULLIF(agg.any_deletion, 0),
|
|
(SELECT MIN(s2."SnapshotTime") FROM snapshots s2 WHERE s2."SnapshotTime" > agg.last_present),
|
|
totals.max_snapshot,
|
|
agg.last_present
|
|
)
|
|
ELSE NULLIF(agg.any_deletion, 0)
|
|
END AS "DeletionTime",
|
|
(
|
|
SELECT s2."ResourcePool"
|
|
FROM snapshots s2
|
|
WHERE s2."VmId" = agg."VmId"
|
|
AND s2."Vcenter" = agg."Vcenter"
|
|
ORDER BY s2."SnapshotTime" DESC
|
|
LIMIT 1
|
|
) AS "ResourcePool",
|
|
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
|
|
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
|
agg.samples_present AS "SamplesPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.sum_vcpu / totals.total_samples
|
|
ELSE NULL END AS "AvgVcpuCount",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.sum_ram / totals.total_samples
|
|
ELSE NULL END AS "AvgRamGB",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.sum_disk / totals.total_samples
|
|
ELSE NULL END AS "AvgProvisionedDisk",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 1.0 * agg.samples_present / totals.total_samples
|
|
ELSE NULL END AS "AvgIsPresent",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.tin_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolTinPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.bronze_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolBronzePct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.silver_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolSilverPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.gold_hits / agg.samples_present
|
|
ELSE NULL END AS "PoolGoldPct",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.tin_hits / agg.samples_present
|
|
ELSE NULL END AS "Tin",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.bronze_hits / agg.samples_present
|
|
ELSE NULL END AS "Bronze",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.silver_hits / agg.samples_present
|
|
ELSE NULL END AS "Silver",
|
|
CASE WHEN agg.samples_present > 0
|
|
THEN 100.0 * agg.gold_hits / agg.samples_present
|
|
ELSE NULL END AS "Gold"
|
|
FROM agg
|
|
CROSS JOIN totals
|
|
GROUP BY
|
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
|
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
|
|
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
|
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
|
totals.total_samples;
|
|
`, unionQuery, tableName)
|
|
return insert, nil
|
|
}
|
|
|
|
// RefineCreationDeletionFromUnion walks all snapshot rows in a period and tightens CreationTime/DeletionTime
|
|
// by using the first and last observed samples and the first sample after disappearance.
|
|
func RefineCreationDeletionFromUnion(ctx context.Context, dbConn *sqlx.DB, summaryTable, unionQuery string) error {
|
|
if unionQuery == "" {
|
|
return fmt.Errorf("union query is empty")
|
|
}
|
|
if _, err := SafeTableName(summaryTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var sql string
|
|
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%s
|
|
), timeline AS (
|
|
SELECT
|
|
s."VmId",
|
|
s."VmUuid",
|
|
s."Name",
|
|
s."Vcenter",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MIN(s."SnapshotTime") AS first_seen,
|
|
MAX(s."SnapshotTime") AS last_seen
|
|
FROM snapshots s
|
|
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
|
|
)
|
|
UPDATE %s dst
|
|
SET
|
|
"CreationTime" = CASE
|
|
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.any_creation), t.any_creation)
|
|
WHEN t.first_seen IS NOT NULL THEN LEAST(COALESCE(NULLIF(dst."CreationTime", 0), t.first_seen), t.first_seen)
|
|
ELSE dst."CreationTime"
|
|
END,
|
|
"DeletionTime" = CASE
|
|
WHEN t_last_after IS NOT NULL
|
|
AND (dst."DeletionTime" IS NULL OR dst."DeletionTime" = 0 OR t_last_after < dst."DeletionTime")
|
|
THEN t_last_after
|
|
ELSE dst."DeletionTime"
|
|
END
|
|
FROM (
|
|
SELECT
|
|
tl.*,
|
|
(
|
|
SELECT MIN(s2."SnapshotTime")
|
|
FROM snapshots s2
|
|
WHERE s2."Vcenter" = tl."Vcenter"
|
|
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
|
|
AND s2."SnapshotTime" > tl.last_seen
|
|
) AS t_last_after
|
|
FROM timeline tl
|
|
) t
|
|
WHERE dst."Vcenter" = t."Vcenter"
|
|
AND (
|
|
(dst."VmId" IS NOT DISTINCT FROM t."VmId")
|
|
OR (dst."VmUuid" IS NOT DISTINCT FROM t."VmUuid")
|
|
OR (dst."Name" IS NOT DISTINCT FROM t."Name")
|
|
);
|
|
`, unionQuery, summaryTable)
|
|
default:
|
|
// SQLite variant (no FROM in UPDATE, no IS NOT DISTINCT FROM). Uses positional args to avoid placeholder count issues.
|
|
sql = fmt.Sprintf(`
|
|
WITH snapshots AS (
|
|
%[1]s
|
|
), timeline AS (
|
|
SELECT
|
|
s."VmId",
|
|
s."VmUuid",
|
|
s."Name",
|
|
s."Vcenter",
|
|
MIN(NULLIF(s."CreationTime", 0)) AS any_creation,
|
|
MIN(s."SnapshotTime") AS first_seen,
|
|
MAX(s."SnapshotTime") AS last_seen
|
|
FROM snapshots s
|
|
GROUP BY s."VmId", s."VmUuid", s."Name", s."Vcenter"
|
|
), enriched AS (
|
|
SELECT
|
|
tl.*,
|
|
(
|
|
SELECT MIN(s2."SnapshotTime")
|
|
FROM snapshots s2
|
|
WHERE s2."Vcenter" = tl."Vcenter"
|
|
AND COALESCE(s2."VmId", '') = COALESCE(tl."VmId", '')
|
|
AND s2."SnapshotTime" > tl.last_seen
|
|
) AS first_after
|
|
FROM timeline tl
|
|
)
|
|
UPDATE %[2]s
|
|
SET
|
|
"CreationTime" = COALESCE(
|
|
(
|
|
SELECT CASE
|
|
WHEN t.any_creation IS NOT NULL AND t.any_creation > 0 AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.any_creation) > t.any_creation THEN t.any_creation
|
|
WHEN t.any_creation IS NULL AND t.first_seen IS NOT NULL AND COALESCE(NULLIF(%[2]s."CreationTime", 0), t.first_seen) > t.first_seen THEN t.first_seen
|
|
ELSE NULL
|
|
END
|
|
FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
LIMIT 1
|
|
),
|
|
"CreationTime"
|
|
),
|
|
"DeletionTime" = COALESCE(
|
|
(
|
|
SELECT t.first_after
|
|
FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
AND t.first_after IS NOT NULL
|
|
AND ("DeletionTime" IS NULL OR "DeletionTime" = 0 OR t.first_after < "DeletionTime")
|
|
LIMIT 1
|
|
),
|
|
"DeletionTime"
|
|
)
|
|
WHERE EXISTS (
|
|
SELECT 1 FROM enriched t
|
|
WHERE %[2]s."Vcenter" = t."Vcenter" AND (
|
|
(%[2]s."VmId" IS NOT NULL AND t."VmId" IS NOT NULL AND %[2]s."VmId" = t."VmId") OR
|
|
(%[2]s."VmId" IS NULL AND t."VmId" IS NULL) OR
|
|
(%[2]s."VmUuid" IS NOT NULL AND t."VmUuid" IS NOT NULL AND %[2]s."VmUuid" = t."VmUuid") OR
|
|
(%[2]s."VmUuid" IS NULL AND t."VmUuid" IS NULL) OR
|
|
(%[2]s."Name" IS NOT NULL AND t."Name" IS NOT NULL AND %[2]s."Name" = t."Name")
|
|
)
|
|
);
|
|
`, unionQuery, summaryTable)
|
|
}
|
|
|
|
_, err := dbConn.ExecContext(ctx, sql)
|
|
return err
|
|
}
|
|
|
|
// BuildMonthlySummaryInsert returns the SQL to aggregate daily summaries into a monthly summary table.
|
|
func BuildMonthlySummaryInsert(tableName string, unionQuery string) (string, error) {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return "", err
|
|
}
|
|
insert := fmt.Sprintf(`
|
|
WITH daily AS (
|
|
%s
|
|
), enriched AS (
|
|
SELECT
|
|
d.*,
|
|
CASE
|
|
WHEN d."AvgIsPresent" IS NOT NULL AND d."AvgIsPresent" > 0 THEN d."SamplesPresent" / d."AvgIsPresent"
|
|
ELSE CAST(d."SamplesPresent" AS REAL)
|
|
END AS total_samples_day
|
|
FROM daily d
|
|
), totals AS (
|
|
SELECT COALESCE(SUM(total_samples_day), 0) AS total_samples FROM enriched
|
|
)
|
|
-- monthly averages are weighted by the implied sample counts per day (SamplesPresent / AvgIsPresent)
|
|
INSERT INTO %s (
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
|
|
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
|
|
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
|
|
"Tin", "Bronze", "Silver", "Gold"
|
|
)
|
|
SELECT
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
|
|
COALESCE(NULLIF("CreationTime", 0), MIN(NULLIF("CreationTime", 0)), 0) AS "CreationTime",
|
|
NULLIF(MAX(NULLIF("DeletionTime", 0)), 0) AS "DeletionTime",
|
|
MAX("ResourcePool") AS "ResourcePool",
|
|
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
|
|
SUM("SamplesPresent") AS "SamplesPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgVcpuCount" IS NOT NULL THEN "AvgVcpuCount" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgVcpuCount",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgRamGB" IS NOT NULL THEN "AvgRamGB" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgRamGB",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM(CASE WHEN "AvgProvisionedDisk" IS NOT NULL THEN "AvgProvisionedDisk" * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "AvgProvisionedDisk",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN SUM("SamplesPresent") * 1.0 / totals.total_samples
|
|
ELSE NULL END AS "AvgIsPresent",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolTinPct" IS NOT NULL THEN ("PoolTinPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolTinPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolBronzePct" IS NOT NULL THEN ("PoolBronzePct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolBronzePct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolSilverPct" IS NOT NULL THEN ("PoolSilverPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolSilverPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "PoolGoldPct" IS NOT NULL THEN ("PoolGoldPct" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "PoolGoldPct",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Tin" IS NOT NULL THEN ("Tin" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Tin",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Bronze" IS NOT NULL THEN ("Bronze" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Bronze",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Silver" IS NOT NULL THEN ("Silver" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Silver",
|
|
CASE WHEN totals.total_samples > 0
|
|
THEN 100.0 * SUM(CASE WHEN "Gold" IS NOT NULL THEN ("Gold" / 100.0) * total_samples_day ELSE 0 END) / totals.total_samples
|
|
ELSE NULL END AS "Gold"
|
|
FROM enriched
|
|
CROSS JOIN totals
|
|
GROUP BY
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
|
|
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
|
|
`, unionQuery, tableName)
|
|
return insert, nil
|
|
}
|
|
|
|
// EnsureSummaryTable creates a daily/monthly summary table with the standard schema if it does not exist.
|
|
func EnsureSummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
|
|
if _, err := SafeTableName(tableName); err != nil {
|
|
return err
|
|
}
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
var ddl string
|
|
switch driver {
|
|
case "pgx", "postgres":
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SamplesPresent" BIGINT NOT NULL,
|
|
"AvgVcpuCount" REAL,
|
|
"AvgRamGB" REAL,
|
|
"AvgProvisionedDisk" REAL,
|
|
"AvgIsPresent" REAL,
|
|
"PoolTinPct" REAL,
|
|
"PoolBronzePct" REAL,
|
|
"PoolSilverPct" REAL,
|
|
"PoolGoldPct" REAL,
|
|
"Tin" REAL,
|
|
"Bronze" REAL,
|
|
"Silver" REAL,
|
|
"Gold" REAL
|
|
);`, tableName)
|
|
default:
|
|
ddl = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"InventoryId" BIGINT,
|
|
"Name" TEXT NOT NULL,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"VmId" TEXT,
|
|
"EventKey" TEXT,
|
|
"CloudId" TEXT,
|
|
"CreationTime" BIGINT,
|
|
"DeletionTime" BIGINT,
|
|
"ResourcePool" TEXT,
|
|
"Datacenter" TEXT,
|
|
"Cluster" TEXT,
|
|
"Folder" TEXT,
|
|
"ProvisionedDisk" REAL,
|
|
"VcpuCount" BIGINT,
|
|
"RamGB" BIGINT,
|
|
"IsTemplate" TEXT,
|
|
"PoweredOn" TEXT,
|
|
"SrmPlaceholder" TEXT,
|
|
"VmUuid" TEXT,
|
|
"SamplesPresent" BIGINT NOT NULL,
|
|
"AvgVcpuCount" REAL,
|
|
"AvgRamGB" REAL,
|
|
"AvgProvisionedDisk" REAL,
|
|
"AvgIsPresent" REAL,
|
|
"PoolTinPct" REAL,
|
|
"PoolBronzePct" REAL,
|
|
"PoolSilverPct" REAL,
|
|
"PoolGoldPct" REAL,
|
|
"Tin" REAL,
|
|
"Bronze" REAL,
|
|
"Silver" REAL,
|
|
"Gold" REAL
|
|
);`, tableName)
|
|
}
|
|
|
|
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Best-effort: drop legacy IsPresent column if it exists.
|
|
if hasIsPresent, err := ColumnExists(ctx, dbConn, tableName, "IsPresent"); err == nil && hasIsPresent {
|
|
_, _ = dbConn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, tableName))
|
|
}
|
|
|
|
indexes := []string{
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vm_vcenter_idx ON %s ("VmId","Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_resourcepool_idx ON %s ("ResourcePool")`, tableName, tableName),
|
|
}
|
|
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
|
|
indexes = append(indexes,
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vcenter_idx ON %s ("Vcenter")`, tableName, tableName),
|
|
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_vmuuid_vcenter_idx ON %s ("VmUuid","Vcenter")`, tableName, tableName),
|
|
)
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := dbConn.ExecContext(ctx, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EnsureSnapshotRunTable creates a table to track per-vCenter hourly snapshot attempts.
|
|
func EnsureSnapshotRunTable(ctx context.Context, dbConn *sqlx.DB) error {
|
|
ddl := `
|
|
CREATE TABLE IF NOT EXISTS snapshot_runs (
|
|
"RowId" INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"Attempts" INTEGER NOT NULL DEFAULT 0,
|
|
"Success" TEXT NOT NULL DEFAULT 'FALSE',
|
|
"LastError" TEXT,
|
|
"LastAttempt" BIGINT NOT NULL
|
|
);
|
|
`
|
|
if strings.ToLower(dbConn.DriverName()) == "pgx" || strings.ToLower(dbConn.DriverName()) == "postgres" {
|
|
ddl = `
|
|
CREATE TABLE IF NOT EXISTS snapshot_runs (
|
|
"RowId" BIGSERIAL PRIMARY KEY,
|
|
"Vcenter" TEXT NOT NULL,
|
|
"SnapshotTime" BIGINT NOT NULL,
|
|
"Attempts" INTEGER NOT NULL DEFAULT 0,
|
|
"Success" TEXT NOT NULL DEFAULT 'FALSE',
|
|
"LastError" TEXT,
|
|
"LastAttempt" BIGINT NOT NULL
|
|
);
|
|
`
|
|
}
|
|
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
|
|
return err
|
|
}
|
|
indexes := []string{
|
|
`CREATE UNIQUE INDEX IF NOT EXISTS snapshot_runs_vc_time_idx ON snapshot_runs ("Vcenter","SnapshotTime")`,
|
|
`CREATE INDEX IF NOT EXISTS snapshot_runs_success_idx ON snapshot_runs ("Success")`,
|
|
}
|
|
for _, idx := range indexes {
|
|
if _, err := dbConn.ExecContext(ctx, idx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpsertSnapshotRun updates or inserts snapshot run status.
|
|
func UpsertSnapshotRun(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, success bool, errMsg string) error {
|
|
if err := EnsureSnapshotRunTable(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
successStr := "FALSE"
|
|
if success {
|
|
successStr = "TRUE"
|
|
}
|
|
now := time.Now().Unix()
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
switch driver {
|
|
case "sqlite":
|
|
_, err := dbConn.ExecContext(ctx, `
|
|
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
|
|
VALUES (?, ?, 1, ?, ?, ?)
|
|
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
|
|
"Attempts" = snapshot_runs."Attempts" + 1,
|
|
"Success" = excluded."Success",
|
|
"LastError" = excluded."LastError",
|
|
"LastAttempt" = excluded."LastAttempt"
|
|
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
|
|
return err
|
|
case "pgx", "postgres":
|
|
_, err := dbConn.ExecContext(ctx, `
|
|
INSERT INTO snapshot_runs ("Vcenter","SnapshotTime","Attempts","Success","LastError","LastAttempt")
|
|
VALUES ($1, $2, 1, $3, $4, $5)
|
|
ON CONFLICT("Vcenter","SnapshotTime") DO UPDATE SET
|
|
"Attempts" = snapshot_runs."Attempts" + 1,
|
|
"Success" = EXCLUDED."Success",
|
|
"LastError" = EXCLUDED."LastError",
|
|
"LastAttempt" = EXCLUDED."LastAttempt"
|
|
`, vcenter, snapshotTime.Unix(), successStr, errMsg, now)
|
|
return err
|
|
default:
|
|
return fmt.Errorf("unsupported driver for snapshot_runs upsert: %s", driver)
|
|
}
|
|
}
|
|
|
|
// ListFailedSnapshotRuns returns vcenter/time pairs needing retry.
|
|
func ListFailedSnapshotRuns(ctx context.Context, dbConn *sqlx.DB, maxAttempts int) ([]struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}, error) {
|
|
if maxAttempts <= 0 {
|
|
maxAttempts = 3
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
query := `
|
|
SELECT "Vcenter","SnapshotTime","Attempts"
|
|
FROM snapshot_runs
|
|
WHERE "Success" = 'FALSE' AND "Attempts" < ?
|
|
ORDER BY "LastAttempt" ASC
|
|
`
|
|
args := []interface{}{maxAttempts}
|
|
if driver == "pgx" || driver == "postgres" {
|
|
query = `
|
|
SELECT "Vcenter","SnapshotTime","Attempts"
|
|
FROM snapshot_runs
|
|
WHERE "Success" = 'FALSE' AND "Attempts" < $1
|
|
ORDER BY "LastAttempt" ASC
|
|
`
|
|
}
|
|
type row struct {
|
|
Vcenter string `db:"Vcenter"`
|
|
SnapshotTime int64 `db:"SnapshotTime"`
|
|
Attempts int `db:"Attempts"`
|
|
}
|
|
rows := []row{}
|
|
if err := dbConn.SelectContext(ctx, &rows, query, args...); err != nil {
|
|
return nil, err
|
|
}
|
|
results := make([]struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}, 0, len(rows))
|
|
for _, r := range rows {
|
|
results = append(results, struct {
|
|
Vcenter string
|
|
SnapshotTime int64
|
|
Attempts int
|
|
}{Vcenter: r.Vcenter, SnapshotTime: r.SnapshotTime, Attempts: r.Attempts})
|
|
}
|
|
return results, nil
|
|
}
|